Commit 015bc27f authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Don't be wasteful when splitting brt leaves. Fixes #294.

This makes big difference for space (46% smaller) and a small time difference (5% faster), as measured by benchmark-test.

Before:
{{{
$ ./benchmark-test
nodesize=1048576
keysize=8
valsize=8
Serial and random insertions of 1048576 per batch
serial  8.753964s   119783/s    random  5.640094s   185915/s    cumulative 14.394118s   145695/s
serial  9.381472s   111771/s    random  7.325284s   143145/s    cumulative 31.100944s   134861/s
serial  9.859233s   106355/s    random  6.734307s   155707/s    cumulative 47.694553s   131911/s
serial 11.069200s    94729/s    random  6.885863s   152280/s    cumulative 65.649695s   127778/s
Shutdown  4.636875s
Total time 70.286611s for 8388608 insertions =   119349/s
$ ls -l sinsert.brt
-rwxrwxr-x 1 bradley bradley 730344924 Jan 22 11:47 sinsert.brt
}}}

After:
{{{
$ ./benchmark-test
nodesize=1048576
keysize=8
valsize=8
Serial and random insertions of 1048576 per batch
serial  8.521855s   123046/s    random  5.730942s   182967/s    cumulative 14.252861s   147139/s
serial  9.106047s   115152/s    random  7.001765s   149759/s    cumulative 30.360740s   138149/s
serial  9.543696s   109871/s    random  6.651000s   157657/s    cumulative 46.555503s   135139/s
serial 10.627035s    98671/s    random  6.555884s   159944/s    cumulative 63.738491s   131610/s
Shutdown  2.818513s
Total time 66.557042s for 8388608 insertions =   126036/s
$ ls -l sinsert.brt
-rwxrwxr-x 1 bradley bradley 396894480 Jan 22 11:45 sinsert.brt
}}}



git-svn-id: file:///svn/tokudb@1798 c7de825b-a66e-492c-adef-691d508d4ae1
parent c3195063
......@@ -188,7 +188,7 @@ static inline int toku_brt_cursor_active(BRT_CURSOR cursor) {
void toku_brt_cursor_new_root(BRT_CURSOR cursor, BRT t, BRTNODE newroot, BRTNODE left, BRTNODE right);
/* a brt leaf has split. modify this cursor if it includes the old node in its path. */
void toku_brt_cursor_leaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
void toku_brt_cursor_leaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE newright);
/* a brt internal node has expanded. modify this cursor if it includes the old node in its path. */
void toku_brt_cursor_nonleaf_expand(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, int childnum, BRTNODE left, BRTNODE right, struct kv_pair *splitk);
......
......@@ -75,7 +75,7 @@ static long brtnode_size(BRTNODE node) {
}
static void brt_update_cursors_new_root(BRT t, BRTNODE newroot, BRTNODE left, BRTNODE right);
static void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
static void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE newnode);
static void brt_update_cursors_nonleaf_expand(BRT t, BRTNODE oldnode, int childnum, BRTNODE left, BRTNODE right, struct kv_pair *splitk);
static void brt_update_cursors_nonleaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right);
......@@ -318,6 +318,7 @@ static void create_new_brtnode (BRT t, BRTNODE *result, int height, TOKUTXN txn)
//printf("%s:%d putting %p (%lld) parent=%p\n", __FILE__, __LINE__, n, n->thisnodename, parent_brtnode);
r=toku_cachetable_put(t->cf, n->thisnodename, n, brtnode_size(n),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t);
assert(r==0);
r=toku_log_newbrtnode(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), n->thisnodename, height, n->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, n->rand4fingerprint);
assert(r==0);
}
......@@ -360,15 +361,13 @@ static int insert_to_buffer_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT
static int brtleaf_split (TOKUTXN txn, FILENUM filenum, BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk) {
BRTNODE A,B;
BRTNODE B;
assert(node->height==0);
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
create_new_brtnode(t, &A, 0, txn);
create_new_brtnode(t, &B, 0, txn);
//printf("leaf_split %lld - %lld %lld\n", node->thisnodename, A->thisnodename, B->thisnodename);
//printf("%s:%d A PMA= %p\n", __FILE__, __LINE__, A->u.l.buffer);
//printf("%s:%d B PMA= %p\n", __FILE__, __LINE__, A->u.l.buffer);
assert(A->nodesize>0);
assert(B->nodesize>0);
assert(node->nodesize>0);
//printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename);
......@@ -376,20 +375,19 @@ static int brtleaf_split (TOKUTXN txn, FILENUM filenum, BRT t, BRTNODE node, BRT
assert(node->height>0 || node->u.l.buffer!=0);
int r;
r = toku_pma_split(txn, filenum,
node->thisnodename, node->u.l.buffer, &node->u.l.n_bytes_in_buffer, splitk,
A->thisnodename, A->u.l.buffer, &A->u.l.n_bytes_in_buffer, A->rand4fingerprint, &A->local_fingerprint,
B->thisnodename, B->u.l.buffer, &B->u.l.n_bytes_in_buffer, B->rand4fingerprint, &B->local_fingerprint);
node->thisnodename, node->u.l.buffer, &node->u.l.n_bytes_in_buffer, node->rand4fingerprint, &node->local_fingerprint,
splitk,
B->thisnodename, B->u.l.buffer, &B->u.l.n_bytes_in_buffer, B->rand4fingerprint, &B->local_fingerprint);
assert(r == 0);
assert(node->height>0 || node->u.l.buffer!=0);
/* Remove it from the cache table, and free its storage. */
//printf("%s:%d old pma = %p\n", __FILE__, __LINE__, node->u.l.buffer);
brt_update_cursors_leaf_split(t, node, A, B);
delete_node(t, node);
brt_update_cursors_leaf_split(t, node, B);
*nodea = A;
*nodea = node;
*nodeb = B;
assert(toku_serialize_brtnode_size(A)<A->nodesize);
assert(toku_serialize_brtnode_size(B)<B->nodesize);
assert(toku_serialize_brtnode_size(node)<node->nodesize);
assert(toku_serialize_brtnode_size(B) <B->nodesize);
return 0;
}
......@@ -1965,14 +1963,13 @@ void brt_update_cursors_new_root(BRT t, BRTNODE newroot, BRTNODE left, BRTNODE r
}
}
static void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right) {
static void brt_update_cursors_leaf_split(BRT t, BRTNODE oldnode, BRTNODE newnode) {
BRT_CURSOR cursor;
if (brt_update_debug) printf("brt_update_cursors_leaf_split %lld %lld %lld\n", oldnode->thisnodename,
left->thisnodename, right->thisnodename);
if (brt_update_debug) printf("brt_update_cursors_leaf_split %lld %lld\n", oldnode->thisnodename, newnode->thisnodename);
for (cursor = t->cursors_head; cursor; cursor = cursor->next) {
if (toku_brt_cursor_active(cursor)) {
toku_brt_cursor_leaf_split(cursor, t, oldnode, left, right);
toku_brt_cursor_leaf_split(cursor, t, oldnode, newnode);
}
}
}
......@@ -2029,35 +2026,29 @@ void toku_brt_cursor_new_root(BRT_CURSOR cursor, BRT t, BRTNODE newroot, BRTNODE
brt_node_add_cursor(newroot, childnum, cursor);
}
void toku_brt_cursor_leaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE left, BRTNODE right) {
void toku_brt_cursor_leaf_split(BRT_CURSOR cursor, BRT t, BRTNODE oldnode, BRTNODE newright) {
int r;
BRTNODE newnode;
PMA pma;
void *v;
assert(oldnode->height == 0);
if (cursor->path[cursor->path_len-1] == oldnode) {
assert(left->height == 0 && right->height == 0);
assert(newright->height == 0);
r = toku_pma_cursor_get_pma(cursor->pmacurs, &pma);
assert(r == 0);
if (pma == left->u.l.buffer)
newnode = left;
else if (pma == right->u.l.buffer)
newnode = right;
else
newnode = 0;
assert(newnode);
if (pma == newright->u.l.buffer) {
r = toku_cachetable_unpin(t->cf, oldnode->thisnodename, oldnode->dirty, brtnode_size(oldnode));
assert(r == 0);
r = toku_cachetable_maybe_get_and_pin(t->cf, newright->thisnodename, &v);
assert(r == 0 && v == newright);
cursor->path[cursor->path_len-1] = newright;
}
if (0) printf("toku_brt_cursor_leaf_split %p oldnode %lld newnode %lld\n", cursor,
oldnode->thisnodename, newnode->thisnodename);
oldnode->thisnodename, newright->thisnodename);
//verify_local_fingerprint_nonleaf(oldnode);
r = toku_cachetable_unpin(t->cf, oldnode->thisnodename, oldnode->dirty, brtnode_size(oldnode));
assert(r == 0);
r = toku_cachetable_maybe_get_and_pin(t->cf, newnode->thisnodename, &v);
assert(r == 0 && v == newnode);
cursor->path[cursor->path_len-1] = newnode;
}
}
......
......@@ -5,7 +5,7 @@
#include "key.h"
#include "brt-internal.h"
void dump_header (int f) {
void dump_header (int f, struct brt_header **header) {
struct brt_header *h;
int r;
r = toku_deserialize_brtheader_from (f, 0, &h); assert(r==0);
......@@ -15,12 +15,32 @@ void dump_header (int f) {
printf(" freelist=%lld\n", h->freelist);
printf(" unused_memory=%lld\n", h->unused_memory);
printf(" unnamed_root=%lld\n", h->unnamed_root);
printf(" n_named_roots=%d\n", h->n_named_roots);
if (h->n_named_roots>=0) {
int i;
for (i=0; i<h->n_named_roots; i++) {
printf(" %s -> %lld\n", h->names[i], h->roots[i]);
}
}
printf(" flags=%d\n", h->flags);
*header = h;
}
void dump_node (int f, DISKOFF off, struct brt_header *h) {
BRTNODE n;
int r = toku_deserialize_brtnode_from (f, off, &n, h->flags, h->nodesize,
toku_default_compare_fun, toku_default_compare_fun,
(DB*)0, (FILENUM){0});
assert(r==0);
}
int main (int argc, const char *argv[]) {
assert(argc==2);
const char *n = argv[1];
int f = open(n, O_RDONLY); assert(f>=0);
dump_header(f);
struct brt_header *h;
dump_header(f, &h);
dump_node(f, 1<<20, h);
return 0;
}
......@@ -847,17 +847,15 @@ static void test_pma_compare_fun (int wrong_endian_p) {
}
static void test_pma_split_n(int n) {
PMA pmaa, pmab, pmac;
PMA pmaa, pmac;
int error;
int i;
int na, nb, nc;
int na, nc;
u_int32_t rand4fingerprint = random();
u_int32_t sum = 0;
u_int32_t expect_fingerprint = 0;
u_int32_t brand = random();
u_int32_t bsum = 0;
u_int32_t arand = random();
u_int32_t asum = 0;
u_int32_t crand = random();
u_int32_t csum = 0;
......@@ -865,8 +863,6 @@ static void test_pma_split_n(int n) {
error = toku_pma_create(&pmaa, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
error = toku_pma_create(&pmab, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
error = toku_pma_create(&pmac, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
......@@ -876,7 +872,7 @@ static void test_pma_split_n(int n) {
sprintf(k, "%4.4d", i);
v = i;
do_insert(pmaa, k, strlen(k)+1, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
do_insert(pmaa, k, strlen(k)+1, &v, sizeof v, arand, &asum, &expect_fingerprint);
toku_pma_verify(pmaa);
}
......@@ -884,46 +880,38 @@ static void test_pma_split_n(int n) {
if (verbose) { printf("a:"); toku_print_pma(pmaa); }
error = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, 0,
null_diskoff, pmab, 0, brand, &bsum,
null_diskoff, pmaa, 0, arand, &asum,
0,
null_diskoff, pmac, 0, crand, &csum);
assert(error == 0);
toku_pma_verify(pmaa);
toku_pma_verify(pmab);
toku_pma_verify(pmac);
toku_pma_verify_fingerprint(pmab, brand, bsum);
toku_pma_verify_fingerprint(pmaa, arand, asum);
toku_pma_verify_fingerprint(pmac, crand, csum);
if (verbose) { printf("a:"); toku_print_pma(pmaa); }
na = toku_pma_n_entries(pmaa);
if (verbose) { printf("b:"); toku_print_pma(pmab); }
nb = toku_pma_n_entries(pmab);
if (verbose) { printf("c:"); toku_print_pma(pmac); }
nc = toku_pma_n_entries(pmac);
assert(na == 0);
assert(nb + nc == n);
assert(na + nc == n);
error = toku_pma_free(&pmaa);
assert(error == 0);
error = toku_pma_free(&pmab);
assert(error == 0);
error = toku_pma_free(&pmac);
assert(error == 0);
}
static void test_pma_dup_split_n(int n, int dup_mode) {
PMA pmaa, pmab, pmac;
PMA pmaa, pmac;
int error;
int i;
int na, nb, nc;
int na, nc;
u_int32_t rand4sum = random();
u_int32_t sum = 0;
u_int32_t expect_sum = 0;
u_int32_t expect_asum = 0;
u_int32_t brand = random();
u_int32_t bsum = 0;
u_int32_t arand = random();
u_int32_t asum = 0;
u_int32_t crand = random();
u_int32_t csum = 0;
......@@ -933,10 +921,6 @@ static void test_pma_dup_split_n(int n, int dup_mode) {
assert(error == 0);
toku_pma_set_dup_mode(pmaa, dup_mode);
toku_pma_set_dup_compare(pmaa, toku_default_compare_fun);
error = toku_pma_create(&pmab, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
toku_pma_set_dup_mode(pmab, dup_mode);
toku_pma_set_dup_compare(pmab, toku_default_compare_fun);
error = toku_pma_create(&pmac, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
toku_pma_set_dup_mode(pmac, dup_mode);
......@@ -946,7 +930,7 @@ static void test_pma_dup_split_n(int n, int dup_mode) {
int dupkey = random();
for (i=0; i<n; i++) {
int v = i;
do_insert(pmaa, &dupkey, sizeof dupkey, &v, sizeof v, rand4sum, &sum, &expect_sum);
do_insert(pmaa, &dupkey, sizeof dupkey, &v, sizeof v, arand, &asum, &expect_asum);
toku_pma_verify(pmaa);
}
......@@ -956,20 +940,17 @@ static void test_pma_dup_split_n(int n, int dup_mode) {
DBT splitk;
error = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, &splitk,
null_diskoff, pmab, 0, brand, &bsum,
null_diskoff, pmaa, 0, arand, &asum,
&splitk,
null_diskoff, pmac, 0, crand, &csum);
assert(error == 0);
toku_pma_verify(pmaa);
toku_pma_verify(pmab);
toku_pma_verify(pmac);
toku_pma_verify_fingerprint(pmab, brand, bsum);
toku_pma_verify_fingerprint(pmaa, arand, asum);
toku_pma_verify_fingerprint(pmac, crand, csum);
if (0) { printf("a:"); toku_print_pma(pmaa); }
na = toku_pma_n_entries(pmaa);
if (0) { printf("b:"); toku_print_pma(pmab); }
nb = toku_pma_n_entries(pmab);
if (0) { printf("c:"); toku_print_pma(pmac); }
nc = toku_pma_n_entries(pmac);
......@@ -985,13 +966,10 @@ static void test_pma_dup_split_n(int n, int dup_mode) {
if (splitk.data) toku_free(splitk.data);
assert(na == 0);
assert(nb + nc == n);
assert(na + nc == n);
error = toku_pma_free(&pmaa);
assert(error == 0);
error = toku_pma_free(&pmab);
assert(error == 0);
error = toku_pma_free(&pmac);
assert(error == 0);
}
......@@ -999,17 +977,15 @@ static void test_pma_dup_split_n(int n, int dup_mode) {
static void test_pma_split_varkey(void) {
char *keys[] = {
"this", "is", "a", "key", "this is a really really big key", "zz", 0 };
PMA pmaa, pmab, pmac;
PMA pmaa, pmac;
int error;
int i;
int n, na, nb, nc;
int n, na, nc;
u_int32_t rand4fingerprint = random();
u_int32_t sum = 0;
u_int32_t expect_fingerprint = 0;
u_int32_t brand = random();
u_int32_t bsum = 0;
u_int32_t arand = random();
u_int32_t asum = 0;
u_int32_t crand = random();
u_int32_t csum = 0;
......@@ -1017,45 +993,37 @@ static void test_pma_split_varkey(void) {
error = toku_pma_create(&pmaa, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
error = toku_pma_create(&pmab, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
error = toku_pma_create(&pmac, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
/* insert some kv pairs */
for (i=0; keys[i]; i++) {
char v = i;
do_insert(pmaa, keys[i], strlen(keys[i])+1, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
do_insert(pmaa, keys[i], strlen(keys[i])+1, &v, sizeof v, arand, &asum, &expect_fingerprint);
}
n = i;
if (verbose) { printf("a:"); toku_print_pma(pmaa); }
error = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, 0,
null_diskoff, pmab, 0, brand, &bsum,
null_diskoff, pmaa, 0, arand, &asum,
0,
null_diskoff, pmac, 0, crand, &csum);
assert(error == 0);
toku_pma_verify(pmaa);
toku_pma_verify(pmab);
toku_pma_verify(pmac);
toku_pma_verify_fingerprint(pmab, brand, bsum);
toku_pma_verify_fingerprint(pmaa, arand, asum);
toku_pma_verify_fingerprint(pmac, crand, csum);
if (verbose) { printf("a:"); toku_print_pma(pmaa); }
na = toku_pma_n_entries(pmaa);
if (verbose) { printf("b:"); toku_print_pma(pmab); }
nb = toku_pma_n_entries(pmab);
if (verbose) { printf("c:"); toku_print_pma(pmac); }
nc = toku_pma_n_entries(pmac);
assert(na == 0);
assert(nb + nc == n);
assert(na + nc == n);
error = toku_pma_free(&pmaa);
assert(error == 0);
error = toku_pma_free(&pmab);
assert(error == 0);
error = toku_pma_free(&pmac);
assert(error == 0);
}
......@@ -1120,18 +1088,16 @@ static void walk_cursor_reverse(const char *str, PMA_CURSOR cursor) {
}
static void test_pma_split_cursor(void) {
PMA pmaa, pmab, pmac;
PMA pmaa, pmac;
PMA_CURSOR cursora, cursorb, cursorc;
int error;
int i;
int na, nb, nc;
int na, nc;
u_int32_t rand4fingerprint = random();
u_int32_t sum = 0;
u_int32_t expect_fingerprint = 0;
u_int32_t brand = random();
u_int32_t bsum = 0;
u_int32_t arand = random();
u_int32_t asum = 0;
u_int32_t crand = random();
u_int32_t csum = 0;
......@@ -1140,8 +1106,6 @@ static void test_pma_split_cursor(void) {
error = toku_pma_create(&pmaa, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
error = toku_pma_create(&pmab, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
error = toku_pma_create(&pmac, toku_default_compare_fun, null_db, null_filenum, 0);
assert(error == 0);
......@@ -1152,7 +1116,7 @@ static void test_pma_split_cursor(void) {
snprintf(k, sizeof k, "%.10d", i);
v = i;
do_insert(pmaa, k, sizeof k, &v, sizeof v, rand4fingerprint, &sum, &expect_fingerprint);
do_insert(pmaa, k, sizeof k, &v, sizeof v, arand, &asum, &expect_fingerprint);
}
assert(toku_pma_n_entries(pmaa) == 16);
if (verbose) { printf("a:"); toku_print_pma(pmaa); }
......@@ -1181,25 +1145,22 @@ static void test_pma_split_cursor(void) {
assert_cursor_val(cursorc, 16);
error = toku_pma_split(null_txn, null_filenum,
null_diskoff, pmaa, 0, 0,
null_diskoff, pmab, 0, brand, &bsum,
null_diskoff, pmaa, 0, arand, &asum,
0,
null_diskoff, pmac, 0, crand, &csum);
assert(error == 0);
toku_pma_verify_fingerprint(pmab, brand, bsum);
toku_pma_verify_fingerprint(pmaa, arand, asum);
toku_pma_verify_fingerprint(pmac, crand, csum);
if (verbose) { printf("a:"); toku_print_pma(pmaa); }
na = toku_pma_n_entries(pmaa);
assert(na == 0);
if (verbose) { printf("b:"); toku_print_pma(pmab); }
nb = toku_pma_n_entries(pmab);
if (verbose) { printf("c:"); toku_print_pma(pmac); }
nc = toku_pma_n_entries(pmac);
assert(nb + nc == 16);
assert(na + nc == 16);
/* cursors open, should fail */
error = toku_pma_free(&pmab);
error = toku_pma_free(&pmaa);
assert(error != 0);
/* walk cursora */
......@@ -1224,8 +1185,6 @@ static void test_pma_split_cursor(void) {
error = toku_pma_free(&pmaa);
assert(error == 0);
error = toku_pma_free(&pmab);
assert(error == 0);
error = toku_pma_free(&pmac);
assert(error == 0);
}
......
......@@ -1218,43 +1218,38 @@ static void __pma_relocate_kvpairs(PMA pma) {
int toku_pma_split(TOKUTXN txn, FILENUM filenum,
DISKOFF origdiskoff, PMA origpma, unsigned int *origpma_size, DBT *splitk,
DISKOFF leftdiskoff, PMA leftpma, unsigned int *leftpma_size, u_int32_t leftrand4fp, u_int32_t *leftfingerprint,
DISKOFF rightdiskoff, PMA rightpma, unsigned int *rightpma_size, u_int32_t rightrand4fp, u_int32_t *rightfingerprint) {
DISKOFF diskoff, PMA pma, unsigned int *pma_size_p, u_int32_t rand4fp, u_int32_t *fingerprint_p,
DBT *splitk,
DISKOFF newdiskoff, PMA newpma, unsigned int *newpma_size_p, u_int32_t newrand4fp, u_int32_t *newfingerprint_p) {
int error;
int npairs;
struct kv_pair_tag *pairs;
int sumlen;
int runlen;
int i;
int n;
int spliti;
struct list cursors;
/* extract the pairs */
npairs = toku_pma_n_entries(origpma);
npairs = toku_pma_n_entries(pma);
if (npairs == 0) {
if (splitk)
memset(splitk, 0, sizeof *splitk);
return 0;
}
assert(toku_pma_n_entries(leftpma) == 0);
assert(toku_pma_n_entries(rightpma) == 0);
/* TODO move pairs to the stack */
pairs = pma_extract_pairs(origpma, npairs, 0, origpma->N);
pairs = pma_extract_pairs(pma, npairs, 0, pma->N);
assert(pairs);
origpma->n_pairs_present = 0;
assert(toku_pma_n_entries(newpma) == 0);
/* debug check the kv length sum */
sumlen = 0;
unsigned int sumlen = 0;
for (i=0; i<npairs; i++)
sumlen += kv_pair_keylen(pairs[i].pair) + kv_pair_vallen(pairs[i].pair) + PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD;
if (origpma_size)
assert(*(int *)origpma_size == sumlen);
if (pma_size_p)
assert(*pma_size_p == sumlen);
runlen = 0;
unsigned int runlen = 0;
for (i=0; i<npairs;) {
runlen += kv_pair_keylen(pairs[i].pair) + kv_pair_vallen(pairs[i].pair) + PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD;
i++;
......@@ -1262,36 +1257,31 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum,
break;
}
spliti = i;
if (leftpma_size)
*leftpma_size = runlen;
if (rightpma_size)
*rightpma_size = sumlen - runlen;
/* set the cursor set to be all of the cursors from the original pma */
unsigned int revised_leftpmasize = runlen;
unsigned int revised_rightpmasize = sumlen-runlen;
/* Get all of the cursors from the original pma */
struct list cursors;
list_init(&cursors);
if (!list_empty(&origpma->cursors))
list_move(&cursors, &origpma->cursors);
if (!list_empty(&pma->cursors))
list_move(&cursors, &pma->cursors);
{
u_int32_t sum = 0;
for (i=0; i<spliti; i++) {
sum+=toku_calccrc32_kvpair(kv_pair_key_const(pairs[i].pair), kv_pair_keylen(pairs[i].pair),
kv_pair_val_const(pairs[i].pair), kv_pair_vallen(pairs[i].pair));
}
*leftfingerprint += leftrand4fp * sum;
}
u_int32_t revised_left_fingerprint;
u_int32_t revised_right_fingerprint;
{
u_int32_t sum = 0;
for (i=spliti; i<npairs; i++) {
sum+=toku_calccrc32_kvpair(kv_pair_key_const(pairs[i].pair), kv_pair_keylen(pairs[i].pair),
kv_pair_val_const(pairs[i].pair), kv_pair_vallen(pairs[i].pair));
}
*rightfingerprint += rightrand4fp * sum;
revised_left_fingerprint = -rand4fp * sum;
revised_right_fingerprint = newrand4fp * sum;
}
if (splitk) {
struct kv_pair *a = pairs[spliti-1].pair;
if (origpma->dup_mode & TOKU_DB_DUPSORT) {
if (pma->dup_mode & TOKU_DB_DUPSORT) {
splitk->data = kv_pair_malloc(kv_pair_key(a), kv_pair_keylen(a), kv_pair_val(a), kv_pair_vallen(a));
splitk->size = kv_pair_keylen(a) + kv_pair_vallen(a);
} else {
......@@ -1303,38 +1293,40 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum,
/* put the first half of pairs into the left pma */
n = spliti;
error = pma_resize_array(txn, filenum, leftdiskoff, leftpma, n + n/4, 0);
error = pma_resize_array(txn, filenum, diskoff, pma, n + n/4, 0); // zeros the elements
assert(error == 0);
distribute_data(leftpma->pairs, toku_pma_index_limit(leftpma), &pairs[0], n, leftpma);
int r = pma_log_distribute(txn, filenum, origdiskoff, leftdiskoff, spliti, &pairs[0]);
distribute_data(pma->pairs, toku_pma_index_limit(pma), &pairs[0], n, pma);
int r = pma_log_distribute(txn, filenum, diskoff, diskoff, spliti, &pairs[0]);
if (r!=0) { toku_free(pairs); return r; }
#if PMA_USE_MEMPOOL
__pma_relocate_kvpairs(leftpma);
#endif
__pma_update_cursors(leftpma, &cursors, &pairs[0], spliti);
leftpma->n_pairs_present = spliti;
// Don't have to relocate kvpairs, because these ones are still there.
__pma_update_cursors(pma, &cursors, &pairs[0], n);
pma->n_pairs_present = spliti;
/* put the second half of pairs into the right pma */
n = npairs - spliti;
error = pma_resize_array(txn, filenum, rightdiskoff, rightpma, n + n/4, 0);
error = pma_resize_array(txn, filenum, newdiskoff, newpma, n + n/4, 0);
assert(error == 0);
distribute_data(rightpma->pairs, toku_pma_index_limit(rightpma), &pairs[spliti], n, rightpma);
r = pma_log_distribute(txn, filenum, origdiskoff, rightdiskoff, n, &pairs[spliti]);
distribute_data(newpma->pairs, toku_pma_index_limit(newpma), &pairs[spliti], n, newpma);
r = pma_log_distribute(txn, filenum, diskoff, newdiskoff, n, &pairs[spliti]);
if (r!=0) { toku_free(pairs); return r; }
#if PMA_USE_MEMPOOL
__pma_relocate_kvpairs(rightpma);
__pma_relocate_kvpairs(newpma);
// If it's in an mpool, we must free those pairs.
for (i=spliti; i<npairs; i++) {
pma_mfree_kv_pair(pma, pairs[i].pair);
}
#endif
__pma_update_cursors(rightpma, &cursors, &pairs[spliti], n);
rightpma->n_pairs_present = n;
__pma_update_cursors(newpma, &cursors, &pairs[spliti], n);
newpma->n_pairs_present = n;
toku_free(pairs);
/* bind the remaining cursors to the left pma*/
while (!list_empty(&cursors)) {
struct list *list = list_head(&cursors);
list_remove(list);
list_push(&leftpma->cursors, list);
}
/* The remaining cursors are in the left pma */
if (fingerprint_p) *fingerprint_p += revised_left_fingerprint;
if (newfingerprint_p) *newfingerprint_p += revised_right_fingerprint;
if (pma_size_p) *pma_size_p = revised_leftpmasize;
if (newpma_size_p) *newpma_size_p = revised_rightpmasize;
return 0;
}
......
......@@ -69,19 +69,27 @@ int toku_pma_insert_or_replace (PMA /*pma*/, DBT */*k*/, DBT */*v*/,
enum pma_errors toku_pma_lookup (PMA, DBT*, DBT*);
/*
* The kv pairs in the original pma are split into 2 equal sized sets
* and moved to the leftpma and rightpma. The size is determined by
* the sum of the keys and values. the left and right pma's must be
* empty.
* The kv pairs in PMA are split into two (nearly) equal sized sets.
* THe ones in the left half are left in PMA, the ones in the right half are put into NEWPMA.
* The size is determined by the sum of the sizes of the keys and values.
* The NEWPMA must be empty.
*
* origpma - the pma to be split
* leftpma - the pma assigned keys <= pivot key
* rightpma - the pma assigned keys > pivot key
* DISKOFF - the disk offset of the node containing the PMA to be split. (Needed for logging)
* PMA - the pma to be split.
* PMA_SIZE - a variable containing the size of the disk image of the PMA.
* RAND4SUM - the random number for fingerprinting
* FINGERPRINT - the current fingerprint of the PMA.
*
* NEWDISKOFF, NEWPMA, NEWPMASIZE, NEWRAND4SUM, NEWFINGERPRINT - The same information fo the pma to hold the stuff to be moved out of PMA.
*
* SPLITK filled in with the resulting pivot key.
* The original PMA gets keys <= pivot key
* The NEWPMA gets keys > pivot key
*/
int toku_pma_split(TOKUTXN, FILENUM,
DISKOFF /*origdiskoff*/, PMA /*origpma*/, unsigned int */*origpma_size*/, DBT */*splitk*/,
DISKOFF /*leftdiskoff*/, PMA /*leftpma*/, unsigned int */*leftpma_size*/, u_int32_t /*leftrand4sum*/, u_int32_t */*leftfingerprint*/,
DISKOFF /*rightdiskoff*/, PMA /*rightpma*/, unsigned int */*rightpma_size*/, u_int32_t /*rightrand4sum*/, u_int32_t */*rightfingerprint*/);
DISKOFF /*diskoff*/, PMA /*pma*/, unsigned int */*pma_size*/, u_int32_t /*rand4sum*/, u_int32_t */*fingerprint*/,
DBT */*splitk*/,
DISKOFF /*newdiskoff*/, PMA /*newpma*/, unsigned int */*newpma_size*/, u_int32_t /*newrand4sum*/, u_int32_t */*newfingerprint*/);
/*
* Insert several key value pairs into an empty pma.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment