Commit a4a9ef7b authored by Rich Prohaska's avatar Rich Prohaska

use pma split in the brt split leaf



git-svn-id: file:///svn/tokudb@103 c7de825b-a66e-492c-adef-691d508d4ae1
parent 5cc3518b
...@@ -263,13 +263,15 @@ void delete_node (BRT t, BRTNODE node) { ...@@ -263,13 +263,15 @@ void delete_node (BRT t, BRTNODE node) {
cachetable_remove(t->cf, node->thisnodename, 0); /* Don't write it back to disk. */ cachetable_remove(t->cf, node->thisnodename, 0); /* Don't write it back to disk. */
} }
#define USE_PMA_SPLIT 1
#if ! USE_PMA_SPLIT
static void insert_to_buffer_in_leaf (BRTNODE node, DBT *k, DBT *v, DB *db) { static void insert_to_buffer_in_leaf (BRTNODE node, DBT *k, DBT *v, DB *db) {
unsigned int n_bytes_added = KEY_VALUE_OVERHEAD + k->size + v->size; unsigned int n_bytes_added = KEY_VALUE_OVERHEAD + k->size + v->size;
int r = pma_insert(node->u.l.buffer, k, v, db); int r = pma_insert(node->u.l.buffer, k, v, db);
assert(r==0); assert(r==0);
node->u.l.n_bytes_in_buffer += n_bytes_added; node->u.l.n_bytes_in_buffer += n_bytes_added;
} }
#endif
static int insert_to_hash_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v) { static int insert_to_hash_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v) {
unsigned int n_bytes_added = KEY_VALUE_OVERHEAD + k->size + v->size; unsigned int n_bytes_added = KEY_VALUE_OVERHEAD + k->size + v->size;
...@@ -282,7 +284,6 @@ static int insert_to_hash_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v ...@@ -282,7 +284,6 @@ static int insert_to_hash_in_nonleaf (BRTNODE node, int childnum, DBT *k, DBT *v
int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, void *app_private, DB *db) { int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *splitk, void *app_private, DB *db) {
int did_split=0;
BRTNODE A,B; BRTNODE A,B;
assert(node->height==0); assert(node->height==0);
assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */ assert(t->h->nodesize>=node->nodesize); /* otherwise we might be in trouble because the nodesize shrank. */
...@@ -296,6 +297,25 @@ int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl ...@@ -296,6 +297,25 @@ int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
//printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename); //printf("%s:%d A is at %lld\n", __FILE__, __LINE__, A->thisnodename);
//printf("%s:%d B is at %lld nodesize=%d\n", __FILE__, __LINE__, B->thisnodename, B->nodesize); //printf("%s:%d B is at %lld nodesize=%d\n", __FILE__, __LINE__, B->thisnodename, B->nodesize);
assert(node->height>0 || node->u.l.buffer!=0); assert(node->height>0 || node->u.l.buffer!=0);
#if USE_PMA_SPLIT
{
int r;
r = pma_split(node->u.l.buffer, &node->u.l.n_bytes_in_buffer,
A->u.l.buffer, &A->u.l.n_bytes_in_buffer,
B->u.l.buffer, &B->u.l.n_bytes_in_buffer);
assert(r == 0);
r = pma_get_last(A->u.l.buffer, splitk, 0);
assert(r == 0);
/* unused */
app_private = app_private;
db = db;
}
#else
{
int did_split = 0;
PMA_ITERATE(node->u.l.buffer, key, keylen, val, vallen, PMA_ITERATE(node->u.l.buffer, key, keylen, val, vallen,
({ ({
DBT k,v; DBT k,v;
...@@ -309,12 +329,14 @@ int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl ...@@ -309,12 +329,14 @@ int brtleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *nodeb, DBT *spl
insert_to_buffer_in_leaf(B, fill_dbt_ap(&k, key, keylen, app_private), fill_dbt(&v, val, vallen), db); insert_to_buffer_in_leaf(B, fill_dbt_ap(&k, key, keylen, app_private), fill_dbt(&v, val, vallen), db);
} }
})); }));
assert(did_split==1);
}
#endif
assert(node->height>0 || node->u.l.buffer!=0); assert(node->height>0 || node->u.l.buffer!=0);
/* Remove it from the cache table, and free its storage. */ /* Remove it from the cache table, and free its storage. */
//printf("%s:%d old pma = %p\n", __FILE__, __LINE__, node->u.l.buffer); //printf("%s:%d old pma = %p\n", __FILE__, __LINE__, node->u.l.buffer);
delete_node(t, node); delete_node(t, node);
assert(did_split==1);
*nodea = A; *nodea = A;
*nodeb = B; *nodeb = B;
assert(serialize_brtnode_size(A)<A->nodesize); assert(serialize_brtnode_size(A)<A->nodesize);
......
...@@ -538,6 +538,7 @@ void test_pma_cursor_4() { ...@@ -538,6 +538,7 @@ void test_pma_cursor_4() {
PMA_CURSOR cursora, cursorb, cursorc; PMA_CURSOR cursora, cursorb, cursorc;
int i; int i;
printf("test_pma_cursor_4\n");
error = pma_create(&pma, default_compare_fun); error = pma_create(&pma, default_compare_fun);
assert(error == 0); assert(error == 0);
...@@ -577,7 +578,7 @@ void test_pma_cursor_4() { ...@@ -577,7 +578,7 @@ void test_pma_cursor_4() {
assert(error == 0); assert(error == 0);
assert_cursor_val(cursorc, 4); assert_cursor_val(cursorc, 4);
for (i=5; i<=6; i += 1) { for (i=5; i<=8; i += 1) {
DBT dbtk, dbtv; DBT dbtk, dbtv;
char k[5]; int v; char k[5]; int v;
...@@ -589,7 +590,7 @@ void test_pma_cursor_4() { ...@@ -589,7 +590,7 @@ void test_pma_cursor_4() {
error = pma_insert(pma, &dbtk, &dbtv, 0); error = pma_insert(pma, &dbtk, &dbtv, 0);
assert(error == BRT_OK); assert(error == BRT_OK);
} }
assert(pma_n_entries(pma) == 6); assert(pma_n_entries(pma) == 8);
printf("a:"); print_pma(pma); printf("a:"); print_pma(pma);
assert_cursor_val(cursora, 1); assert_cursor_val(cursora, 1);
...@@ -681,6 +682,10 @@ void test_pma_split_n(int n) { ...@@ -681,6 +682,10 @@ void test_pma_split_n(int n) {
error = pma_create(&pmaa, default_compare_fun); error = pma_create(&pmaa, default_compare_fun);
assert(error == 0); assert(error == 0);
error = pma_create(&pmab, default_compare_fun);
assert(error == 0);
error = pma_create(&pmac, default_compare_fun);
assert(error == 0);
/* insert some kv pairs */ /* insert some kv pairs */
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
...@@ -698,7 +703,7 @@ void test_pma_split_n(int n) { ...@@ -698,7 +703,7 @@ void test_pma_split_n(int n) {
printf("a:"); print_pma(pmaa); printf("a:"); print_pma(pmaa);
error = pma_split(pmaa, &pmab, &pmac); error = pma_split(pmaa, 0, pmab, 0, pmac, 0);
assert(error == 0); assert(error == 0);
printf("a:"); print_pma(pmaa); printf("a:"); print_pma(pmaa);
...@@ -731,6 +736,10 @@ void test_pma_split_varkey() { ...@@ -731,6 +736,10 @@ void test_pma_split_varkey() {
error = pma_create(&pmaa, default_compare_fun); error = pma_create(&pmaa, default_compare_fun);
assert(error == 0); assert(error == 0);
error = pma_create(&pmab, default_compare_fun);
assert(error == 0);
error = pma_create(&pmac, default_compare_fun);
assert(error == 0);
/* insert some kv pairs */ /* insert some kv pairs */
for (i=0; keys[i]; i++) { for (i=0; keys[i]; i++) {
...@@ -748,7 +757,7 @@ void test_pma_split_varkey() { ...@@ -748,7 +757,7 @@ void test_pma_split_varkey() {
printf("a:"); print_pma(pmaa); printf("a:"); print_pma(pmaa);
error = pma_split(pmaa, &pmab, &pmac); error = pma_split(pmaa, 0, pmab, 0, pmac, 0);
assert(error == 0); assert(error == 0);
printf("a:"); print_pma(pmaa); printf("a:"); print_pma(pmaa);
...@@ -837,6 +846,10 @@ void test_pma_split_cursor() { ...@@ -837,6 +846,10 @@ void test_pma_split_cursor() {
error = pma_create(&pmaa, default_compare_fun); error = pma_create(&pmaa, default_compare_fun);
assert(error == 0); assert(error == 0);
error = pma_create(&pmab, default_compare_fun);
assert(error == 0);
error = pma_create(&pmac, default_compare_fun);
assert(error == 0);
/* insert some kv pairs */ /* insert some kv pairs */
for (i=1; i<=16; i += 1) { for (i=1; i<=16; i += 1) {
...@@ -877,7 +890,7 @@ void test_pma_split_cursor() { ...@@ -877,7 +890,7 @@ void test_pma_split_cursor() {
// print_cursor("cursorc", cursorc); // print_cursor("cursorc", cursorc);
assert_cursor_val(cursorc, 16); assert_cursor_val(cursorc, 16);
error = pma_split(pmaa, &pmab, &pmac); error = pma_split(pmaa, 0, pmab, 0, pmac, 0);
assert(error == 0); assert(error == 0);
printf("a:"); print_pma(pmaa); printf("a:"); print_pma(pmaa);
...@@ -975,6 +988,7 @@ void test_pma_bulk_insert_n(int n) { ...@@ -975,6 +988,7 @@ void test_pma_bulk_insert_n(int n) {
/* verify */ /* verify */
print_pma(pma); print_pma(pma);
assert(n == pma_n_entries(pma));
/* cleanup */ /* cleanup */
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#include "kv-pair.h" #include "kv-pair.h"
#include "pma-internal.h" #include "pma-internal.h"
/* TODO get this from a include file */
#define KEY_VALUE_OVERHEAD 8
int pma_n_entries (PMA pma) { int pma_n_entries (PMA pma) {
return pma->n_pairs_present; return pma->n_pairs_present;
...@@ -616,7 +618,7 @@ void pma_update_region(PMA pma, struct list *cursor_set, struct kv_pair_tag *pai ...@@ -616,7 +618,7 @@ void pma_update_region(PMA pma, struct list *cursor_set, struct kv_pair_tag *pai
} }
} }
struct kv_pair_tag *pmainternal_extract_pairs(PMA pma, int lo, int hi) { struct kv_pair_tag *pma_extract_pairs(PMA pma, int lo, int hi) {
int npairs; int npairs;
struct kv_pair_tag *pairs; struct kv_pair_tag *pairs;
int i; int i;
...@@ -639,94 +641,109 @@ struct kv_pair_tag *pmainternal_extract_pairs(PMA pma, int lo, int hi) { ...@@ -639,94 +641,109 @@ struct kv_pair_tag *pmainternal_extract_pairs(PMA pma, int lo, int hi) {
return pairs; return pairs;
} }
int pma_split(PMA old, PMA *newap, PMA *newbp) { int pma_split(PMA origpma, unsigned int *origpma_size,
PMA newa, newb; PMA leftpma, unsigned int *leftpma_size,
PMA rightpma, unsigned int *rightpma_size) {
int error; int error;
int npairs; int npairs;
struct kv_pair_tag *pairs; struct kv_pair_tag *pairs;
int sumlen; int sumlen;
int runlen; int runlen;
int len;
int i; int i;
int n;
int spliti; int spliti;
struct list cursors; struct list cursors;
/* create the new pma's */
error = pma_create(newap, old->compare_fun);
if (error != 0)
return error;
error = pma_create(newbp, old->compare_fun);
if (error != 0) {
pma_free(newap);
return error;
}
newa = *newap;
newb = *newbp;
/* extract the pairs */ /* extract the pairs */
npairs = pma_n_entries(old); npairs = pma_n_entries(origpma);
pairs = pmainternal_extract_pairs(old, 0, old->N); if (npairs == 0)
return 0;
assert(pma_n_entries(leftpma) == 0);
assert(pma_n_entries(rightpma) == 0);
/* TODO move pairs to the stack */
pairs = pma_extract_pairs(origpma, 0, origpma->N);
assert(pairs); assert(pairs);
old->n_pairs_present = 0; origpma->n_pairs_present = 0;
/* split the pairs in half by length (TODO: combine sum with extract) */ /* debug check the kv length sum */
sumlen = 0; sumlen = 0;
for (i=0; i<npairs; i++) for (i=0; i<npairs; i++)
sumlen += 4 + kv_pair_keylen(pairs[i].pair) + 4 + kv_pair_vallen(pairs[i].pair); sumlen += kv_pair_keylen(pairs[i].pair) + kv_pair_vallen(pairs[i].pair) + KEY_VALUE_OVERHEAD;
if (origpma_size)
assert(*(int *)origpma_size == sumlen);
runlen = 0; runlen = 0;
for (i=0; i < npairs; i++) { for (i=0; i<npairs;) {
len = 4 + kv_pair_keylen(pairs[i].pair) + 4 + kv_pair_vallen(pairs[i].pair); runlen += kv_pair_keylen(pairs[i].pair) + kv_pair_vallen(pairs[i].pair) + KEY_VALUE_OVERHEAD;
if (runlen + len > sumlen/2) i++;
if (2*runlen >= sumlen)
break; break;
runlen += len;
} }
spliti = i; spliti = i;
if (leftpma_size)
*leftpma_size = runlen;
if (rightpma_size)
*rightpma_size = sumlen - runlen;
/* set the cursor set to be all of the cursors from the old pma */ /* set the cursor set to be all of the cursors from the original pma */
list_init(&cursors); list_init(&cursors);
if (!list_empty(&old->cursors)) if (!list_empty(&origpma->cursors))
list_move(&cursors, &old->cursors); list_move(&cursors, &origpma->cursors);
/* put the first half of pairs into newa */ /* put the first half of pairs into the left pma */
error = pma_resize_array(newa, 2 * spliti); n = spliti;
error = pma_resize_array(leftpma, n + n/4);
assert(error == 0); assert(error == 0);
distribute_data(newa->pairs, pma_index_limit(newa), &pairs[0], spliti, newa); distribute_data(leftpma->pairs, pma_index_limit(leftpma), &pairs[0], n, leftpma);
pma_update_region(newa, &cursors, &pairs[0], spliti); pma_update_region(leftpma, &cursors, &pairs[0], spliti);
newa->n_pairs_present = spliti; leftpma->n_pairs_present = spliti;
/* put the second half of pairs into newb */ /* put the second half of pairs into the right pma */
error = pma_resize_array(newb, 2 * (npairs-spliti)); n = npairs - spliti;
error = pma_resize_array(rightpma, n + n/4);
assert(error == 0); assert(error == 0);
distribute_data(newb->pairs, pma_index_limit(newb), &pairs[spliti], npairs-spliti, newb); distribute_data(rightpma->pairs, pma_index_limit(rightpma), &pairs[spliti], n, rightpma);
pma_update_region(newb, &cursors, &pairs[spliti], npairs-spliti); pma_update_region(rightpma, &cursors, &pairs[spliti], n);
newb->n_pairs_present = npairs-spliti; rightpma->n_pairs_present = n;
toku_free(pairs); toku_free(pairs);
/* bind the remaining cursors to pma b */ /* bind the remaining cursors to the left pma*/
while (!list_empty(&cursors)) { while (!list_empty(&cursors)) {
struct list *list = list_head(&cursors); struct list *list = list_head(&cursors);
list_remove(list); list_remove(list);
list_push(&newa->cursors, list); list_push(&leftpma->cursors, list);
} }
return 0; return 0;
} }
int pma_bulk_insert_pairs(PMA pma, struct kv_pair_tag *newpairs, int n_newpairs) { int pma_get_last(PMA pma, DBT *key, DBT *val) {
int error; int position;
struct kv_pair *pair;
void *v; int vlen;
if (!list_empty(&pma->cursors)) position = pma->N - 1;
return -1; while ((pair = pma->pairs[position]) == 0) {
if (pma_n_entries(pma) > 0) if (position > 0)
return -2; position--;
error = pma_resize_array(pma, 2 * n_newpairs); else
if (error) return DB_NOTFOUND;
return error; }
distribute_data(pma->pairs, pma_index_limit(pma), newpairs, n_newpairs, pma);
pma->n_pairs_present = n_newpairs; if (key) {
v = kv_pair_key(pair);
vlen = kv_pair_keylen(pair);
fill_dbt(key, memdup(v, vlen), vlen);
}
if (val) {
v = kv_pair_val(pair);
vlen = kv_pair_vallen(pair);
fill_dbt(val, memdup(v, vlen), vlen);
}
return 0; return 0;
} }
...@@ -738,16 +755,23 @@ void __pma_bulk_cleanup(struct kv_pair_tag *pairs, int n) { ...@@ -738,16 +755,23 @@ void __pma_bulk_cleanup(struct kv_pair_tag *pairs, int n) {
if (pairs[i].pair) if (pairs[i].pair)
kv_pair_free(pairs[i].pair); kv_pair_free(pairs[i].pair);
} }
int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs) { int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs) {
struct kv_pair_tag *newpairs; struct kv_pair_tag *newpairs;
int i; int i;
int error; int error;
if (n_newpairs == 0)
return 0;
if (!list_empty(&pma->cursors))
return -1;
if (pma_n_entries(pma) > 0)
return -2;
/* TODO put newpairs on the stack */
newpairs = toku_malloc(n_newpairs * sizeof (struct kv_pair_tag)); newpairs = toku_malloc(n_newpairs * sizeof (struct kv_pair_tag));
if (newpairs == 0) { if (newpairs == 0) {
error = -1; return error; error = -3; return error;
} }
for (i=0; i<n_newpairs; i++) { for (i=0; i<n_newpairs; i++) {
...@@ -756,15 +780,20 @@ int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs) { ...@@ -756,15 +780,20 @@ int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs) {
if (newpairs[i].pair == 0) { if (newpairs[i].pair == 0) {
__pma_bulk_cleanup(newpairs, i); __pma_bulk_cleanup(newpairs, i);
toku_free(newpairs); toku_free(newpairs);
error = -2; return error; error = -4; return error;
} }
} }
error = pma_bulk_insert_pairs(pma, newpairs, n_newpairs); error = pma_resize_array(pma, n_newpairs + n_newpairs/4);
if (error) if (error) {
__pma_bulk_cleanup(newpairs, n_newpairs); __pma_bulk_cleanup(newpairs, n_newpairs);
toku_free(newpairs);
error = -5; return error;
}
distribute_data(pma->pairs, pma_index_limit(pma), newpairs, n_newpairs, pma);
pma->n_pairs_present = n_newpairs;
toku_free(newpairs); toku_free(newpairs);
return error; return 0;
} }
...@@ -37,23 +37,27 @@ int pma_delete (PMA, DBT *, DB*); ...@@ -37,23 +37,27 @@ int pma_delete (PMA, DBT *, DB*);
enum pma_errors pma_lookup (PMA, DBT*, DBT*, DB*); enum pma_errors pma_lookup (PMA, DBT*, DBT*, DB*);
/* /*
* split a pma into 2 pma's. the new pma's are designated the * The kv pairs in the original pma are split into 2 equal sized sets
* left and right pma's. the left and right pma's have roughly the same * and moved to the leftpma and rightpma. The size is determined by
* key and value space. * the sum of the keys and values. the left and right pma's must be
* empty.
* *
* old - the old pma * origpma - the pma to be split
* newa - the new pma on the left * leftpma - the pma assigned keys <= pivot key
* newb - the new pma on the right * rightpma - the pma assigned keys > pivot key
*/ */
int pma_split(PMA old, PMA *newa, PMA *newb); int pma_split(PMA origpma, unsigned int *origpma_size,
PMA leftpma, unsigned int *leftpma_size,
PMA rightpma, unsigned int *rightpma_size);
/* /*
* insert several key value pairs into an empty pma * Insert several key value pairs into an empty pma. The keys are
* assumed to be sorted.
* *
* pma - the pma that the key value pairs will be inserted into. * pma - the pma that the key value pairs will be inserted into.
* must be empty with no cursors. * must be empty with no cursors.
* keys - an array of pointers and lengths of the keys * keys - an array of keys
* vals - an array of pointers and lengths of the values * vals - an array of values
* n_newpairs - the number of key value pairs * n_newpairs - the number of key value pairs
*/ */
int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs); int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs);
...@@ -68,6 +72,11 @@ int pma_cursor_set_position_next (PMA_CURSOR c); /* Requires the cursor is init' ...@@ -68,6 +72,11 @@ int pma_cursor_set_position_next (PMA_CURSOR c); /* Requires the cursor is init'
int pma_cursor_set_position_prev (PMA_CURSOR c); int pma_cursor_set_position_prev (PMA_CURSOR c);
int pma_cget_current (PMA_CURSOR c, DBT *key, DBT *val); int pma_cget_current (PMA_CURSOR c, DBT *key, DBT *val);
/*
* Get the last key and value in the pma
*/
int pma_get_last(PMA pma, DBT *key, DBT *val);
/* Return PMA_NOTFOUND if the pma is empty. */ /* Return PMA_NOTFOUND if the pma is empty. */
#if 0 #if 0
int pma_cget_first (PMA_CURSOR, YBT */*key*/, YBT */*val*/); int pma_cget_first (PMA_CURSOR, YBT */*key*/, YBT */*val*/);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment