Commit e3c9ea02 authored by Rich Prohaska's avatar Rich Prohaska

change the pma index from an array of pairs to an array of pointers

to the key and value.  this change was made to address the slow 
insert performance on an amd64 machine.



git-svn-id: file:///svn/tokudb@87 c7de825b-a66e-492c-adef-691d508d4ae1
parent 681e1323
/*
* the key value pair contains a key and a value in a contiguous space. the
* key is right after the length fields and the value is right after the key.
*/
struct kv_pair {
int keylen;
int vallen;
char key[];
};
static inline void kv_pair_init(struct kv_pair *pair, void *key, int keylen, void *val, int vallen) {
pair->keylen = keylen;
memcpy(pair->key, key, keylen);
pair->vallen = vallen;
memcpy(pair->key + keylen, val, vallen);
}
static inline struct kv_pair *kv_pair_malloc(void *key, int keylen, void *val, int vallen) {
struct kv_pair *pair = toku_malloc(sizeof (struct kv_pair) + keylen + vallen);
if (pair)
kv_pair_init(pair, key, keylen, val, vallen);
return pair;
}
static inline void kv_pair_free(struct kv_pair *pair) {
toku_free(pair);
}
static inline void *kv_pair_key(struct kv_pair *pair) {
return pair->key;
}
static inline int kv_pair_keylen(struct kv_pair *pair) {
return pair->keylen;
}
static inline void *kv_pair_val(struct kv_pair *pair) {
return pair->key + pair->keylen;
}
static inline int kv_pair_vallen(struct kv_pair *pair) {
return pair->vallen;
}
#include "pma.h" #include "pma.h"
struct pair {
bytevec key; /* NULL for empty slots */
int keylen;
bytevec val;
int vallen;
};
struct pma_cursor { struct pma_cursor {
PMA pma; PMA pma;
int position; /* -1 if the position is undefined. */ int position; /* -1 if the position is undefined. */
...@@ -18,7 +11,7 @@ struct pma { ...@@ -18,7 +11,7 @@ struct pma {
enum typ_tag tag; enum typ_tag tag;
int N; /* How long is the array? Always a power of two >= 4. */ int N; /* How long is the array? Always a power of two >= 4. */
int n_pairs_present; /* How many array elements are non-null. */ int n_pairs_present; /* How many array elements are non-null. */
struct pair *pairs; struct kv_pair **pairs;
int uplgN; /* The smallest power of two >= lg(N) */ int uplgN; /* The smallest power of two >= lg(N) */
double densitystep; /* Each doubling decreases the density by densitystep. double densitystep; /* Each doubling decreases the density by densitystep.
* For example if array_len=256 and uplgN=8 then there are 5 doublings. * For example if array_len=256 and uplgN=8 then there are 5 doublings.
...@@ -31,12 +24,14 @@ struct pma { ...@@ -31,12 +24,14 @@ struct pma {
void *skey, *sval; /* used in dbts */ void *skey, *sval; /* used in dbts */
}; };
int pmainternal_count_region (struct pair *pairs, int lo, int hi); int pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi);
void pmainternal_calculate_parameters (PMA pma); void pmainternal_calculate_parameters (PMA pma);
int pmainternal_smooth_region (struct pair *pairs, int n, int idx); int pmainternal_smooth_region (struct kv_pair *pairs[], int n, int idx);
int pmainternal_printpairs (struct pair *pairs, int N); int pmainternal_printpairs (struct kv_pair *pairs[], int N);
int pmainternal_make_space_at (PMA pma, int idx); int pmainternal_make_space_at (PMA pma, int idx);
int pmainternal_find (PMA pma, DBT *, DB*); // The DB is so the comparison fuction can be called. int pmainternal_find (PMA pma, DBT *, DB*); // The DB is so the comparison fuction can be called.
void print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */ void print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */
int pmainternal_init_array(PMA pma, int asksize); int pmainternal_init_array(PMA pma, int asksize);
struct pair *pmainternal_extract_pairs(PMA pma); struct kv_pair **pmainternal_extract_pairs(PMA pma, int lo, int hi);
#include "pma-internal.h"
#include "../include/ydb-constants.h" #include "../include/ydb-constants.h"
#include "memory.h" #include "memory.h"
#include "key.h" #include "key.h"
...@@ -7,46 +6,57 @@ ...@@ -7,46 +6,57 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include "kv-pair.h"
#include "pma-internal.h"
static void test_make_space_at (void) { static void test_make_space_at (void) {
PMA pma; PMA pma;
int r=pma_create(&pma, default_compare_fun); char *key;
int r;
struct kv_pair *key_A, *key_B;
key = "A";
key_A = kv_pair_malloc(key, strlen(key)+1, 0, 0);
key = "B";
key_B = kv_pair_malloc(key, strlen(key)+1, 0, 0);
r=pma_create(&pma, default_compare_fun);
assert(r==0); assert(r==0);
assert(pma_n_entries(pma)==0); assert(pma_n_entries(pma)==0);
r=pmainternal_make_space_at(pma, 2); r=pmainternal_make_space_at(pma, 2);
assert(pma_index_limit(pma)==4); assert(pma_index_limit(pma)==4);
assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL); assert((unsigned long)pma->pairs[pma_index_limit(pma)]==0xdeadbeefL);
print_pma(pma); print_pma(pma);
pma->pairs[2].key="A"; pma->pairs[2] = key_A;
pma->n_pairs_present++; pma->n_pairs_present++;
r=pmainternal_make_space_at(pma,2); r=pmainternal_make_space_at(pma,2);
printf("Requested space at 2, got space at %d\n", r); printf("Requested space at 2, got space at %d\n", r);
print_pma(pma); print_pma(pma);
assert(pma->pairs[r].key==0); assert(pma->pairs[r]==0);
assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL); assert((unsigned long)pma->pairs[pma_index_limit(pma)]==0xdeadbeefL);
assert(pma_index_limit(pma)==4); assert(pma_index_limit(pma)==4);
pma->pairs[0].key="A"; pma->pairs[0] = key_A;
pma->pairs[1].key="B"; pma->pairs[1] = key_B;
pma->pairs[2].key=0; pma->pairs[2] = 0;
pma->pairs[3].key=0; pma->pairs[3] = 0;
pma->n_pairs_present=2; pma->n_pairs_present=2;
print_pma(pma); print_pma(pma);
r=pmainternal_make_space_at(pma,0); r=pmainternal_make_space_at(pma,0);
printf("Requested space at 0, got space at %d\n", r); printf("Requested space at 0, got space at %d\n", r);
print_pma(pma); print_pma(pma);
assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL); // make sure it doesn't go off the end. assert((unsigned long)pma->pairs[pma_index_limit(pma)]==0xdeadbeefL); // make sure it doesn't go off the end.
assert(pma_index_limit(pma)==8); assert(pma_index_limit(pma)==8);
pma->pairs[0].key = "A"; pma->pairs[0] = key_A;
pma->pairs[1].key = 0; pma->pairs[1] = 0;
pma->pairs[2].key = 0; pma->pairs[2] = 0;
pma->pairs[3].key = 0; pma->pairs[3] = 0;
pma->pairs[4].key = "B"; pma->pairs[4] = key_B;
pma->pairs[5].key = 0; pma->pairs[5] = 0;
pma->pairs[6].key = 0; pma->pairs[6] = 0;
pma->pairs[7].key = 0; pma->pairs[7] = 0;
pma->n_pairs_present=2; pma->n_pairs_present=2;
print_pma(pma); print_pma(pma);
r=pmainternal_make_space_at(pma,5); r=pmainternal_make_space_at(pma,5);
...@@ -55,15 +65,16 @@ static void test_make_space_at (void) { ...@@ -55,15 +65,16 @@ static void test_make_space_at (void) {
{ {
int i; int i;
for (i=0; i<pma_index_limit(pma); i++) { for (i=0; i<pma_index_limit(pma); i++) {
if (pma->pairs[i].key) { if (pma->pairs[i]) {
assert(i<r); assert(i<r);
pma->pairs[i] = 0;
} }
pma->pairs[i].key=0; // zero it so that we don't mess things up on free
pma->pairs[i].val=0;
} }
} }
r=pma_free(&pma); assert(r==0); r=pma_free(&pma); assert(r==0);
assert(pma==0); assert(pma==0);
kv_pair_free(key_A);
kv_pair_free(key_B);
} }
static void test_pma_find (void) { static void test_pma_find (void) {
...@@ -76,14 +87,13 @@ static void test_pma_find (void) { ...@@ -76,14 +87,13 @@ static void test_pma_find (void) {
MALLOC_N(N,pma->pairs); MALLOC_N(N,pma->pairs);
// All that is needed to test pma_find is N and pairs. // All that is needed to test pma_find is N and pairs.
pma->N = N; pma->N = N;
for (i=0; i<N; i++) pma->pairs[i].key=0; for (i=0; i<N; i++) pma->pairs[i]=0;
assert(pma_index_limit(pma)==N); assert(pma_index_limit(pma)==N);
pma->compare_fun = default_compare_fun; pma->compare_fun = default_compare_fun;
r=pmainternal_find(pma, fill_dbt(&k, "hello", 5), 0); r=pmainternal_find(pma, fill_dbt(&k, "hello", 5), 0);
assert(r==0); assert(r==0);
pma->pairs[5].key="hello"; pma->pairs[5] = kv_pair_malloc("hello", 5, 0, 0);
pma->pairs[5].keylen=5;
assert(pma_index_limit(pma)==N); assert(pma_index_limit(pma)==N);
r=pmainternal_find(pma, fill_dbt(&k, "hello", 5), 0); r=pmainternal_find(pma, fill_dbt(&k, "hello", 5), 0);
assert(pma_index_limit(pma)==N); assert(pma_index_limit(pma)==N);
...@@ -93,8 +103,7 @@ static void test_pma_find (void) { ...@@ -93,8 +103,7 @@ static void test_pma_find (void) {
r=pmainternal_find(pma, fill_dbt(&k, "aaa", 3), 0); r=pmainternal_find(pma, fill_dbt(&k, "aaa", 3), 0);
assert(r==0); assert(r==0);
pma->pairs[N-1].key="there"; pma->pairs[N-1] = kv_pair_malloc("there", 5, 0, 0);
pma->pairs[N-1].keylen=5;
r=pmainternal_find(pma, fill_dbt(&k, "hello", 5), 0); r=pmainternal_find(pma, fill_dbt(&k, "hello", 5), 0);
assert(r==5); assert(r==5);
r=pmainternal_find(pma, fill_dbt(&k, "there", 5), 0); r=pmainternal_find(pma, fill_dbt(&k, "there", 5), 0);
...@@ -105,13 +114,17 @@ static void test_pma_find (void) { ...@@ -105,13 +114,17 @@ static void test_pma_find (void) {
assert(r==6); assert(r==6);
r=pmainternal_find(pma, fill_dbt(&k, "zzz", 3), 0); r=pmainternal_find(pma, fill_dbt(&k, "zzz", 3), 0);
assert(r==N); assert(r==N);
for (i=0; i<N; i++)
if (pma->pairs[i])
kv_pair_free(pma->pairs[i]);
toku_free(pma->pairs); toku_free(pma->pairs);
toku_free(pma); toku_free(pma);
} }
void test_smooth_region_N (int N) { void test_smooth_region_N (int N) {
struct pair pairs[N]; struct kv_pair *pairs[N];
char *strings[100]; struct kv_pair *strings[N];
char string[N]; char string[N];
int i; int i;
int len; int len;
...@@ -121,7 +134,7 @@ void test_smooth_region_N (int N) { ...@@ -121,7 +134,7 @@ void test_smooth_region_N (int N) {
for (i=0; i<N; i++) { for (i=0; i<N; i++) {
snprintf(string, 10, "%0*d", len, i); snprintf(string, 10, "%0*d", len, i);
strings[i] = strdup(string); strings[i] = kv_pair_malloc(string, len+1, 0, 0);
} }
assert(N<30); assert(N<30);
...@@ -132,16 +145,16 @@ void test_smooth_region_N (int N) { ...@@ -132,16 +145,16 @@ void test_smooth_region_N (int N) {
int r; int r;
for (j=0; j<N; j++) { for (j=0; j<N; j++) {
if ((1<<j)&i) { if ((1<<j)&i) {
pairs[j].key = strings[j]; pairs[j] = strings[j];
} else { } else {
pairs[j].key = 0; pairs[j] = 0;
} }
} }
pmainternal_printpairs(pairs, N); printf(" at %d becomes f", insertat); pmainternal_printpairs(pairs, N); printf(" at %d becomes f", insertat);
r = pmainternal_smooth_region(pairs, N, insertat); r = pmainternal_smooth_region(pairs, N, insertat);
pmainternal_printpairs(pairs, N); printf(" at %d\n", r); pmainternal_printpairs(pairs, N); printf(" at %d\n", r);
assert(0<=r); assert(r<N); assert(0<=r); assert(r<N);
assert(pairs[r].key==0); assert(pairs[r]==0);
/* Now verify that things are in the right place: /* Now verify that things are in the right place:
* everything before r should be smaller than keys[insertat]. * everything before r should be smaller than keys[insertat].
* everything after is bigger. * everything after is bigger.
...@@ -149,8 +162,8 @@ void test_smooth_region_N (int N) { ...@@ -149,8 +162,8 @@ void test_smooth_region_N (int N) {
{ {
int cleari = i; int cleari = i;
for (j=0; j<N; j++) { for (j=0; j<N; j++) {
if (pairs[j].key) { if (pairs[j]) {
int whichkey = atoi(pairs[j].key); int whichkey = atoi(pairs[j]->key);
assert(cleari&(1<<whichkey)); assert(cleari&(1<<whichkey));
cleari &= ~(1<<whichkey); cleari &= ~(1<<whichkey);
if (whichkey<insertat) assert(j<r); if (whichkey<insertat) assert(j<r);
...@@ -162,18 +175,32 @@ void test_smooth_region_N (int N) { ...@@ -162,18 +175,32 @@ void test_smooth_region_N (int N) {
} }
} }
for (i=0; i<N; i++) { for (i=0; i<N; i++) {
free(strings[i]); kv_pair_free(strings[i]);
} }
} }
void test_smooth_region6 (void) { void test_smooth_region6 (void) {
enum {N=7}; enum {N=7};
struct pair pairs[N] = {{.key="A"},{.key="B"},{.key=0},{.key=0},{.key=0},{.key=0},{.key=0}}; struct kv_pair *pairs[N];
char *key;
int i;
for (i=0; i<N; i++)
pairs[i] = 0;
key = "A";
pairs[0] = kv_pair_malloc(key, strlen(key)+1, 0, 0);
key = "B";
pairs[1] = kv_pair_malloc(key, strlen(key)+1, 0, 0);
int r = pmainternal_smooth_region(pairs, N, 2); int r = pmainternal_smooth_region(pairs, N, 2);
printf("{%s %s %s %s %s %s %s} %d\n", printf("{ ");
(char*)pairs[0].key, (char*)pairs[1].key, (char*)pairs[2].key, (char*)pairs[3].key, (char*)pairs[4].key, (char*)pairs[5].key, (char*)pairs[6].key, for (i=0; i<N; i++)
r); printf("%s ", pairs[i] ? pairs[i]->key : "?");
printf("} %d\n", r);
for (i=0; i<7; i++)
if (pairs[i])
kv_pair_free(pairs[i]);
} }
...@@ -191,19 +218,31 @@ static void test_calculate_parameters (void) { ...@@ -191,19 +218,31 @@ static void test_calculate_parameters (void) {
} }
static void test_count_region (void) { static void test_count_region (void) {
struct pair pairs[4]={{.key=0},{.key=0},{.key=0},{.key=0}}; const int N = 4;
struct kv_pair *pairs[N];
int i;
char *key;
for (i=0; i<N; i++)
pairs[i] = 0;
assert(pmainternal_count_region(pairs,0,4)==0); assert(pmainternal_count_region(pairs,0,4)==0);
assert(pmainternal_count_region(pairs,2,4)==0); assert(pmainternal_count_region(pairs,2,4)==0);
assert(pmainternal_count_region(pairs,0,2)==0); assert(pmainternal_count_region(pairs,0,2)==0);
pairs[2].key="A"; key = "A";
pairs[2] = kv_pair_malloc(key, strlen(key)+1, 0, 0);
assert(pmainternal_count_region(pairs,0,4)==1); assert(pmainternal_count_region(pairs,0,4)==1);
assert(pmainternal_count_region(pairs,2,4)==1); assert(pmainternal_count_region(pairs,2,4)==1);
assert(pmainternal_count_region(pairs,0,2)==0); assert(pmainternal_count_region(pairs,0,2)==0);
assert(pmainternal_count_region(pairs,2,2)==0); assert(pmainternal_count_region(pairs,2,2)==0);
assert(pmainternal_count_region(pairs,2,3)==1); assert(pmainternal_count_region(pairs,2,3)==1);
pairs[3].key="B"; key = "B";
pairs[0].key="a"; pairs[3] = kv_pair_malloc(key, strlen(key)+1, 0, 0);
key = "a";
pairs[0] = kv_pair_malloc(key, strlen(key)+1, 0, 0);
assert(pmainternal_count_region(pairs,0,4)==3); assert(pmainternal_count_region(pairs,0,4)==3);
for (i=0; i<N; i++)
if (pairs[i])
kv_pair_free(pairs[i]);
} }
static void test_pma_random_pick (void) { static void test_pma_random_pick (void) {
...@@ -295,12 +334,12 @@ static void test_find_insert (void) { ...@@ -295,12 +334,12 @@ static void test_find_insert (void) {
assert(r==BRT_OK); assert(r==BRT_OK);
assert(keycompare(v.data,v.size,"bbbdata", 8)==0); assert(keycompare(v.data,v.size,"bbbdata", 8)==0);
assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL); assert((unsigned long)pma->pairs[pma_index_limit(pma)]==0xdeadbeefL);
r=pma_insert(pma, fill_dbt(&k, "00000", 6), fill_dbt(&v, "d0", 3), 0); r=pma_insert(pma, fill_dbt(&k, "00000", 6), fill_dbt(&v, "d0", 3), 0);
assert(r==BRT_OK); assert(r==BRT_OK);
assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL); assert((unsigned long)pma->pairs[pma_index_limit(pma)]==0xdeadbeefL);
r=pma_free(&pma); assert(r==0); assert(pma==0); r=pma_free(&pma); assert(r==0); assert(pma==0);
pma_create(&pma, default_compare_fun); assert(pma!=0); pma_create(&pma, default_compare_fun); assert(pma!=0);
...@@ -545,6 +584,7 @@ void test_pma_split(int n) { ...@@ -545,6 +584,7 @@ void test_pma_split(int n) {
PMA pmaa, pmab, pmac; PMA pmaa, pmab, pmac;
int error; int error;
int i; int i;
int na, nb, nc;
printf("test_pma_split:%d\n", n); printf("test_pma_split:%d\n", n);
...@@ -571,8 +611,14 @@ void test_pma_split(int n) { ...@@ -571,8 +611,14 @@ void test_pma_split(int n) {
assert(error == 0); assert(error == 0);
printf("a:"); print_pma(pmaa); printf("a:"); print_pma(pmaa);
na = pma_n_entries(pmaa);
printf("b:"); print_pma(pmab); printf("b:"); print_pma(pmab);
nb = pma_n_entries(pmab);
printf("c:"); print_pma(pmac); printf("c:"); print_pma(pmac);
nc = pma_n_entries(pmac);
assert(na == 0);
assert(nb + nc == n);
error = pma_free(&pmaa); error = pma_free(&pmaa);
assert(error == 0); assert(error == 0);
...@@ -588,6 +634,7 @@ void test_pma_split_varkey() { ...@@ -588,6 +634,7 @@ void test_pma_split_varkey() {
PMA pmaa, pmab, pmac; PMA pmaa, pmab, pmac;
int error; int error;
int i; int i;
int n, na, nb, nc;
printf("test_pma_split_varkey\n"); printf("test_pma_split_varkey\n");
...@@ -606,6 +653,7 @@ void test_pma_split_varkey() { ...@@ -606,6 +653,7 @@ void test_pma_split_varkey() {
error = pma_insert(pmaa, &dbtk, &dbtv, 0); error = pma_insert(pmaa, &dbtk, &dbtv, 0);
assert(error == BRT_OK); assert(error == BRT_OK);
} }
n = i;
printf("a:"); print_pma(pmaa); printf("a:"); print_pma(pmaa);
...@@ -613,8 +661,14 @@ void test_pma_split_varkey() { ...@@ -613,8 +661,14 @@ void test_pma_split_varkey() {
assert(error == 0); assert(error == 0);
printf("a:"); print_pma(pmaa); printf("a:"); print_pma(pmaa);
na = pma_n_entries(pmaa);
printf("b:"); print_pma(pmab); printf("b:"); print_pma(pmab);
nb = pma_n_entries(pmab);
printf("c:"); print_pma(pmac); printf("c:"); print_pma(pmac);
nc = pma_n_entries(pmac);
assert(na == 0);
assert(nb + nc == n);
error = pma_free(&pmaa); error = pma_free(&pmaa);
assert(error == 0); assert(error == 0);
......
...@@ -3,17 +3,18 @@ ...@@ -3,17 +3,18 @@
Only the pointers are kept. Only the pointers are kept.
*/ */
#include "pma-internal.h"
#include "key.h" #include "key.h"
#include "memory.h" #include "memory.h"
#include "myassert.h" #include "myassert.h"
#include "../include/ydb-constants.h" #include "../include/ydb-constants.h"
#include <stdio.h> #include <stdio.h>
#include <errno.h> #include <errno.h>
/* Only needed for testing. */ /* Only needed for testing. */
#include <string.h> #include <string.h>
#include "kv-pair.h"
#include "pma-internal.h"
int pma_n_entries (PMA pma) { int pma_n_entries (PMA pma) {
return pma->n_pairs_present; return pma->n_pairs_present;
...@@ -22,34 +23,52 @@ int pma_n_entries (PMA pma) { ...@@ -22,34 +23,52 @@ int pma_n_entries (PMA pma) {
int pma_index_limit (PMA pma) { int pma_index_limit (PMA pma) {
return pma->N; return pma->N;
} }
int pmanode_valid (PMA pma, int i) { int pmanode_valid (PMA pma, int i) {
assert(0<=i); assert(i<pma_index_limit(pma)); assert(0<=i); assert(i<pma_index_limit(pma));
return pma->pairs[i].key!=0; return pma->pairs[i] != 0;
} }
bytevec pmanode_key (PMA pma, int i) { bytevec pmanode_key (PMA pma, int i) {
struct kv_pair *pair;
assert(0<=i); assert(i<pma_index_limit(pma)); assert(0<=i); assert(i<pma_index_limit(pma));
return pma->pairs[i].key; pair = pma->pairs[i];
assert(pair);
return pair->key;
} }
ITEMLEN pmanode_keylen (PMA pma, int i) { ITEMLEN pmanode_keylen (PMA pma, int i) {
struct kv_pair *pair;
assert(0<=i); assert(i<pma_index_limit(pma)); assert(0<=i); assert(i<pma_index_limit(pma));
return pma->pairs[i].keylen; pair = pma->pairs[i];
assert(pair);
return pair->keylen;
} }
bytevec pmanode_val (PMA pma, int i) { bytevec pmanode_val (PMA pma, int i) {
struct kv_pair *pair;
assert(0<=i); assert(i<pma_index_limit(pma)); assert(0<=i); assert(i<pma_index_limit(pma));
return pma->pairs[i].val; pair = pma->pairs[i];
assert(pair);
return pair->key + pair->keylen;
} }
ITEMLEN pmanode_vallen (PMA pma, int i) { ITEMLEN pmanode_vallen (PMA pma, int i) {
struct kv_pair *pair;
assert(0<=i); assert(i<pma_index_limit(pma)); assert(0<=i); assert(i<pma_index_limit(pma));
return pma->pairs[i].vallen; pair = pma->pairs[i];
assert(pair);
return pair->vallen;
} }
/* Could pick the same one every time if we wanted. */ /* Could pick the same one every time if we wanted. */
int pma_random_pick(PMA pma, bytevec *key, ITEMLEN *keylen, bytevec *val, ITEMLEN *vallen) { int pma_random_pick(PMA pma, bytevec *key, ITEMLEN *keylen, bytevec *val, ITEMLEN *vallen) {
#if 1 #if 1
int i; int i;
/* For now a simple implementation where we simply start at the beginning and look. */ /* For now a simple implementation where we simply start at the beginning and look. */
for (i=0; i<pma_index_limit(pma); i++) { for (i=0; i<pma_index_limit(pma); i++) {
if (pma->pairs[i].key) { if (pma->pairs[i]) {
*key = pmanode_key(pma,i); *key = pmanode_key(pma,i);
*keylen = pmanode_keylen(pma,i); *keylen = pmanode_keylen(pma,i);
*val = pmanode_val(pma,i); *val = pmanode_val(pma,i);
...@@ -100,10 +119,10 @@ int pmainternal_find (PMA pma, DBT *k, DB *db) { ...@@ -100,10 +119,10 @@ int pmainternal_find (PMA pma, DBT *k, DB *db) {
int mid; int mid;
// Scan forward looking for a non-null value. // Scan forward looking for a non-null value.
for (mid=(lo+hi)/2; mid<hi; mid++) { for (mid=(lo+hi)/2; mid<hi; mid++) {
if (pma->pairs[mid].key!=0) { if (pma->pairs[mid]!=0) {
// Found one. // Found one.
DBT k2; DBT k2;
int cmp = pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[mid].key, pma->pairs[mid].keylen)); int cmp = pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[mid]->key, pma->pairs[mid]->keylen));
if (cmp==0) return mid; if (cmp==0) return mid;
else if (cmp<0) { else if (cmp<0) {
/* key is smaller than the midpoint, so look in the low half. */ /* key is smaller than the midpoint, so look in the low half. */
...@@ -130,10 +149,10 @@ int pmainternal_find (PMA pma, DBT *k, DB *db) { ...@@ -130,10 +149,10 @@ int pmainternal_find (PMA pma, DBT *k, DB *db) {
assert(lo==hi); assert(lo==hi);
assert(hi <= pma_index_limit(pma)); assert(hi <= pma_index_limit(pma));
/* If lo points at something, the something should not be smaller than key. */ /* If lo points at something, the something should not be smaller than key. */
if (lo>0 && lo < pma_index_limit(pma) && pma->pairs[lo].key) { if (lo>0 && lo < pma_index_limit(pma) && pma->pairs[lo]) {
//printf("lo=%d\n", lo); //printf("lo=%d\n", lo);
DBT k2; DBT k2;
assert(0 >= pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[lo].key, pma->pairs[lo].keylen))); assert(0 >= pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[lo]->key, pma->pairs[lo]->keylen)));
} }
return lo; return lo;
} }
...@@ -142,17 +161,17 @@ int pmainternal_find (PMA pma, DBT *k, DB *db) { ...@@ -142,17 +161,17 @@ int pmainternal_find (PMA pma, DBT *k, DB *db) {
//int max (int i, int j) { if (i<j) return j; else return i; } //int max (int i, int j) { if (i<j) return j; else return i; }
//double lg (int n) { return log((double)n)/log(2.0); } //double lg (int n) { return log((double)n)/log(2.0); }
int pmainternal_printpairs (struct pair *pairs, int N) { int pmainternal_printpairs (struct kv_pair *pairs[], int N) {
int count=0; int count=0;
int i; int i;
printf("{"); printf("{");
for (i=0; i<N; i++) { for (i=0; i<N; i++) {
if (i!=0) printf(" "); if (i!=0) printf(" ");
if (pairs[i].key) { if (pairs[i]) {
printf("%s", (char*)pairs[i].key); printf("%s", (char*)pairs[i]->key);
count++; count++;
} }
else printf("_"); else printf("_");
} }
printf("}"); printf("}");
return count; return count;
...@@ -167,15 +186,15 @@ void print_pma (PMA pma) { ...@@ -167,15 +186,15 @@ void print_pma (PMA pma) {
} }
/* Smooth the data, and return the location of the null. */ /* Smooth the data, and return the location of the null. */
int distribute_data (struct pair *destpairs, int dcount, int distribute_data (struct kv_pair *destpairs[], int dcount,
struct pair *sourcepairs, int scount) { struct kv_pair *sourcepairs[], int scount) {
assert(scount<=dcount); assert(scount<=dcount);
if (scount==0) { if (scount==0) {
return -1; return -1;
} }
if (scount==1) { if (scount==1) {
*destpairs=*sourcepairs; destpairs[0]=sourcepairs[0];
if (destpairs->key==0) return 0; if (destpairs[0]==0) return 0;
else return -1; else return -1;
} else { } else {
int r1 = distribute_data(destpairs, dcount/2, int r1 = distribute_data(destpairs, dcount/2,
...@@ -191,31 +210,28 @@ int distribute_data (struct pair *destpairs, int dcount, ...@@ -191,31 +210,28 @@ int distribute_data (struct pair *destpairs, int dcount,
/* spread the non-empty pairs around. There are n of them. Create an empty slot just before the IDXth /* spread the non-empty pairs around. There are n of them. Create an empty slot just before the IDXth
element, and return that slot's index in the smoothed array. */ element, and return that slot's index in the smoothed array. */
int pmainternal_smooth_region (struct pair *pairs, int n, int idx) { int pmainternal_smooth_region (struct kv_pair *pairs[], int n, int idx) {
int i; int i;
int n_present=0; int n_present=0;
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
if (pairs[i].key) n_present++; if (pairs[i]) n_present++;
} }
n_present++; // Save one for the blank guy. n_present++; // Save one for the blank guy.
{ {
struct pair *MALLOC_N(n_present,tmppairs); struct kv_pair **MALLOC_N(n_present,tmppairs);
int n_saved=0; int n_saved=0;
int r; int r;
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
if (i==idx) { if (i==idx) {
tmppairs[n_saved++].key = 0; tmppairs[n_saved++] = 0;
} }
if (pairs[i].key) { if (pairs[i]) {
tmppairs[n_saved++] = pairs[i]; tmppairs[n_saved++] = pairs[i];
} }
pairs[i].key = 0; pairs[i] = 0;
pairs[i].keylen = 0;
pairs[i].val = 0;
pairs[i].vallen = 0;
} }
if (idx==n) { if (idx==n) {
tmppairs[n_saved++].key = 0; tmppairs[n_saved++] = 0;
} }
//printf(" temp="); printpairs(tmppairs, n_saved); //printf(" temp="); printpairs(tmppairs, n_saved);
assert(n_saved==n_present); assert(n_saved==n_present);
...@@ -254,10 +270,10 @@ void pmainternal_calculate_parameters (PMA pma) ...@@ -254,10 +270,10 @@ void pmainternal_calculate_parameters (PMA pma)
pma->densitystep = 0.5/n_divisions; pma->densitystep = 0.5/n_divisions;
} }
int pmainternal_count_region (struct pair *pairs, int lo, int hi) { int pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi) {
int n=0; int n=0;
while (lo<hi) { while (lo<hi) {
if (pairs[lo].key) n++; if (pairs[lo]) n++;
lo++; lo++;
} }
return n; return n;
...@@ -282,7 +298,7 @@ int pma_create (PMA *pma, int (*compare_fun)(DB*,const DBT*,const DBT*)) { ...@@ -282,7 +298,7 @@ int pma_create (PMA *pma, int (*compare_fun)(DB*,const DBT*,const DBT*)) {
} }
*pma = result; *pma = result;
assert((unsigned long)result->pairs[result->N].key==0xdeadbeefL); assert((unsigned long)result->pairs[result->N]==0xdeadbeefL);
return 0; return 0;
} }
...@@ -301,16 +317,13 @@ int pmainternal_init_array(PMA pma, int asksize) { ...@@ -301,16 +317,13 @@ int pmainternal_init_array(PMA pma, int asksize) {
n *= 2; n *= 2;
pma->N = n; pma->N = n;
MALLOC_N(1+pma->N, pma->pairs); pma->pairs = toku_malloc((1 + pma->N) * sizeof (struct kv_pair *));
if (pma->pairs == 0) if (pma->pairs == 0)
return -1; return -1;
pma->pairs[pma->N].key = (void *) 0xdeadbeef; pma->pairs[pma->N] = (void *) 0xdeadbeef;
for (i=0; i<pma->N; i++) { for (i=0; i<pma->N; i++) {
pma->pairs[i].key = 0; pma->pairs[i] = 0;
pma->pairs[i].keylen = 0;
pma->pairs[i].val = 0;
pma->pairs[i].vallen = 0;
} }
pmainternal_calculate_parameters(pma); pmainternal_calculate_parameters(pma);
return 0; return 0;
...@@ -340,7 +353,7 @@ int pma_cursor_set_position_last (PMA_CURSOR c) ...@@ -340,7 +353,7 @@ int pma_cursor_set_position_last (PMA_CURSOR c)
{ {
PMA pma = c->pma; PMA pma = c->pma;
c->position=pma->N-1; c->position=pma->N-1;
while (c->pma->pairs[c->position].key==0) { while (c->pma->pairs[c->position]==0) {
if (c->position>0) c->position--; if (c->position>0) c->position--;
else return DB_NOTFOUND; else return DB_NOTFOUND;
} }
...@@ -351,7 +364,7 @@ int pma_cursor_set_position_first (PMA_CURSOR c) ...@@ -351,7 +364,7 @@ int pma_cursor_set_position_first (PMA_CURSOR c)
{ {
PMA pma = c->pma; PMA pma = c->pma;
c->position=0; c->position=0;
while (c->pma->pairs[c->position].key==0) { while (c->pma->pairs[c->position]==0) {
if (c->position+1<pma->N) c->position++; if (c->position+1<pma->N) c->position++;
else return DB_NOTFOUND; else return DB_NOTFOUND;
} }
...@@ -364,7 +377,7 @@ int pma_cursor_set_position_next (PMA_CURSOR c) ...@@ -364,7 +377,7 @@ int pma_cursor_set_position_next (PMA_CURSOR c)
int old_position=c->position; int old_position=c->position;
c->position++; c->position++;
while (c->position<pma->N) { while (c->position<pma->N) {
if (c->pma->pairs[c->position].key!=0) return 0; if (c->pma->pairs[c->position]!=0) return 0;
c->position++; c->position++;
} }
c->position=old_position; c->position=old_position;
...@@ -373,9 +386,10 @@ int pma_cursor_set_position_next (PMA_CURSOR c) ...@@ -373,9 +386,10 @@ int pma_cursor_set_position_next (PMA_CURSOR c)
int pma_cget_current (PMA_CURSOR c, DBT *key, DBT *val) { int pma_cget_current (PMA_CURSOR c, DBT *key, DBT *val) {
PMA pma = c->pma; PMA pma = c->pma;
if (pma->pairs[c->position].key==0) return BRT_KEYEMPTY; struct kv_pair *pair = pma->pairs[c->position];
ybt_set_value(key, pma->pairs[c->position].key, pma->pairs[c->position].keylen, &c->skey); if (pair==0) return BRT_KEYEMPTY;
ybt_set_value(val, pma->pairs[c->position].val, pma->pairs[c->position].vallen, &c->sval); ybt_set_value(key, pair->key, pair->keylen, &c->skey);
ybt_set_value(val, pair->key + pair->keylen, pair->vallen, &c->sval);
return 0; return 0;
} }
...@@ -452,9 +466,9 @@ int pmainternal_make_space_at (PMA pma, int idx) { ...@@ -452,9 +466,9 @@ int pmainternal_make_space_at (PMA pma, int idx) {
assert(size==pma_index_limit(pma)); assert(size==pma_index_limit(pma));
size*=2; size*=2;
//printf("realloc %p to %d\n", pma->pairs, size*sizeof(*pma->pairs)); //printf("realloc %p to %d\n", pma->pairs, size*sizeof(*pma->pairs));
pma->pairs = toku_realloc(pma->pairs, (1+size)*sizeof(*pma->pairs)); pma->pairs = toku_realloc(pma->pairs, (1+size)*sizeof(struct kv_pair *));
for (i=hi; i<size; i++) pma->pairs[i].key=0; for (i=hi; i<size; i++) pma->pairs[i]=0;
pma->pairs[size].key = (void*)0xdeadbeefL; pma->pairs[size] = (void*)0xdeadbeefL;
pma->N=size; pma->N=size;
pmainternal_calculate_parameters(pma); pmainternal_calculate_parameters(pma);
hi=size; hi=size;
...@@ -472,23 +486,20 @@ int pmainternal_make_space_at (PMA pma, int idx) { ...@@ -472,23 +486,20 @@ int pmainternal_make_space_at (PMA pma, int idx) {
} }
} }
enum pma_errors pma_lookup (PMA pma, DBT *k, DBT *v, DB *db) { enum pma_errors pma_lookup (PMA pma, DBT *k, DBT *v, DB *db) {
DBT k2; DBT k2;
struct kv_pair *pair;
int l = pmainternal_find(pma, k, db); int l = pmainternal_find(pma, k, db);
assert(0<=l ); assert(l<=pma_index_limit(pma)); assert(0<=l ); assert(l<=pma_index_limit(pma));
if (l==pma_index_limit(pma)) return DB_NOTFOUND; if (l==pma_index_limit(pma)) return DB_NOTFOUND;
if (pma->pairs[l].key!=0 && pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[l].key,pma->pairs[l].keylen))==0) { pair = pma->pairs[l];
return ybt_set_value(v, pma->pairs[l].val, pma->pairs[l].vallen, &pma->sval); if (pair!=0 && pma->compare_fun(db, k, fill_dbt(&k2, pair->key, pair->keylen))==0) {
return ybt_set_value(v, pair->key + pair->keylen, pair->vallen, &pma->sval);
} else { } else {
return DB_NOTFOUND; return DB_NOTFOUND;
} }
} }
void maybe_free (const void *p) {
if (p) toku_free((void*)p);
}
/* returns 0 if OK. /* returns 0 if OK.
* You must have freed all the cursors, otherwise returns nonzero and does nothing. */ * You must have freed all the cursors, otherwise returns nonzero and does nothing. */
int pma_free (PMA *pmap) { int pma_free (PMA *pmap) {
...@@ -496,12 +507,10 @@ int pma_free (PMA *pmap) { ...@@ -496,12 +507,10 @@ int pma_free (PMA *pmap) {
PMA pma=*pmap; PMA pma=*pmap;
if (pma->cursors_head) return -1; if (pma->cursors_head) return -1;
for (i=0; i<pma_index_limit(pma); i++) { for (i=0; i<pma_index_limit(pma); i++) {
if (pma->pairs[i].key) { if (pma->pairs[i]) {
maybe_free(pma->pairs[i].key); kv_pair_free(pma->pairs[i]);
maybe_free(pma->pairs[i].val); pma->pairs[i] = 0;
pma->pairs[i].key=0; }
pma->pairs[i].val=0;
}
} }
toku_free(pma->pairs); toku_free(pma->pairs);
if (pma->skey) toku_free(pma->skey); if (pma->skey) toku_free(pma->skey);
...@@ -514,37 +523,31 @@ int pma_free (PMA *pmap) { ...@@ -514,37 +523,31 @@ int pma_free (PMA *pmap) {
/* Copies keylen and datalen */ /* Copies keylen and datalen */
int pma_insert (PMA pma, DBT *k, DBT *v, DB* db) { int pma_insert (PMA pma, DBT *k, DBT *v, DB* db) {
int idx = pmainternal_find(pma, k, db); int idx = pmainternal_find(pma, k, db);
if (idx < pma_index_limit(pma) && pma->pairs[idx].key) { if (idx < pma_index_limit(pma) && pma->pairs[idx]) {
DBT k2; DBT k2;
if (0==pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[idx].key, pma->pairs[idx].keylen))) { if (0==pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[idx]->key, pma->pairs[idx]->keylen))) {
return BRT_ALREADY_THERE; /* It is already here. Return an error. */ return BRT_ALREADY_THERE; /* It is already here. Return an error. */
} }
} }
if (pma->pairs[idx].key) { if (pma->pairs[idx]) {
idx = pmainternal_make_space_at (pma, idx); /* returns the new idx. */ idx = pmainternal_make_space_at (pma, idx); /* returns the new idx. */
} }
assert(!pma->pairs[idx].key); assert(!pma->pairs[idx]);
pma->pairs[idx].key = memdup(k->data, k->size); pma->pairs[idx] = kv_pair_malloc(k->data, k->size, v->data, v->size);
pma->pairs[idx].keylen = k->size; assert(pma->pairs[idx]);
pma->pairs[idx].val = memdup(v->data, v->size);
pma->pairs[idx].vallen = v->size;
pma->n_pairs_present++; pma->n_pairs_present++;
return BRT_OK; return BRT_OK;
} }
int pma_delete (PMA pma, DBT *k, DB *db) { int pma_delete (PMA pma, DBT *k, DB *db) {
int l = pmainternal_find(pma, k, db); int l = pmainternal_find(pma, k, db);
if (pma->pairs[l].key==0) { struct kv_pair *pair = pma->pairs[l];
if (pair==0) {
printf("%s:%d l=%d r=%d\n", __FILE__, __LINE__, l, DB_NOTFOUND); printf("%s:%d l=%d r=%d\n", __FILE__, __LINE__, l, DB_NOTFOUND);
return DB_NOTFOUND; return DB_NOTFOUND;
} }
assert(pma->pairs[l].val!=0); kv_pair_free(pair);
toku_free((void*)pma->pairs[l].key); pma->pairs[l] = 0;
toku_free((void*)pma->pairs[l].val);
pma->pairs[l].key = 0;
pma->pairs[l].val = 0;
pma->pairs[l].keylen = 0;
pma->pairs[l].vallen = 0;
pma->n_pairs_present--; pma->n_pairs_present--;
// Need to rebalance // Need to rebalance
// smooth_after_delete(pma,l); // smooth_after_delete(pma,l);
...@@ -554,28 +557,29 @@ int pma_delete (PMA pma, DBT *k, DB *db) { ...@@ -554,28 +557,29 @@ int pma_delete (PMA pma, DBT *k, DB *db) {
void pma_iterate (PMA pma, void(*f)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), void*v) { void pma_iterate (PMA pma, void(*f)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), void*v) {
int i; int i;
for (i=0; i<pma_index_limit(pma); i++) { for (i=0; i<pma_index_limit(pma); i++) {
if (pma->pairs[i].key) { struct kv_pair *pair = pma->pairs[i];
f(pma->pairs[i].key, pma->pairs[i].keylen, if (pair) {
pma->pairs[i].val, pma->pairs[i].vallen, f(pair->key, pair->keylen,
v); pair->key + pair->keylen, pair->vallen, v);
} }
} }
} }
struct pair *pmainternal_extract_pairs(PMA pma) { struct kv_pair **pmainternal_extract_pairs(PMA pma, int lo, int hi) {
int npairs; int npairs;
struct pair *pairs; struct kv_pair **pairs;
int i; int i;
int lastpair; int lastpair;
npairs = pma_n_entries(pma); npairs = pma_n_entries(pma);
pairs = toku_malloc(npairs * sizeof (struct pair)); pairs = toku_malloc(npairs * sizeof (struct kv_pair *));
if (pairs == 0)
return 0;
lastpair = 0; lastpair = 0;
for (i=0; i<pma_index_limit(pma); i++) { for (i=lo; i<hi; i++) {
if (pma->pairs[i].key != 0) { if (pma->pairs[i] != 0) {
pairs[lastpair] = pma->pairs[i]; pairs[lastpair] = pma->pairs[i];
pma->pairs[i].key = 0; pma->pairs[i] = 0;
pma->pairs[i].val = 0;
lastpair += 1; lastpair += 1;
} }
} }
...@@ -583,11 +587,12 @@ struct pair *pmainternal_extract_pairs(PMA pma) { ...@@ -583,11 +587,12 @@ struct pair *pmainternal_extract_pairs(PMA pma) {
return pairs; return pairs;
} }
int pma_split(PMA old, PMA *newa, PMA *newb, int pma_split(PMA old, PMA *newap, PMA *newbp,
PMA_CURSOR *cursors, int ncursors) { PMA_CURSOR *cursors, int ncursors) {
PMA newa, newb;
int error; int error;
int npairs; int npairs;
struct pair *pairs; struct kv_pair **pairs;
int sumlen; int sumlen;
int runlen; int runlen;
int len; int len;
...@@ -597,28 +602,32 @@ int pma_split(PMA old, PMA *newa, PMA *newb, ...@@ -597,28 +602,32 @@ int pma_split(PMA old, PMA *newa, PMA *newb,
assert(cursors == 0 && ncursors == 0); assert(cursors == 0 && ncursors == 0);
/* create the new pma's */ /* create the new pma's */
error = pma_create(newa, old->compare_fun); error = pma_create(newap, old->compare_fun);
if (error != 0) if (error != 0)
return error; return error;
error = pma_create(newb, old->compare_fun); error = pma_create(newbp, old->compare_fun);
if (error != 0) { if (error != 0) {
pma_free(newb); pma_free(newap);
return error; return error;
} }
newa = *newap;
newb = *newbp;
/* extract the pairs */ /* extract the pairs */
npairs = pma_n_entries(old); npairs = pma_n_entries(old);
pairs = pmainternal_extract_pairs(old); pairs = pmainternal_extract_pairs(old, 0, old->N);
assert(pairs);
old->n_pairs_present = 0; old->n_pairs_present = 0;
/* split the pairs in half by length (TODO: combine sum with extract) */ /* split the pairs in half by length (TODO: combine sum with extract) */
sumlen = 0; sumlen = 0;
for (i=0; i<npairs; i++) for (i=0; i<npairs; i++)
sumlen += 4 + pairs[i].keylen + 4 + pairs[i].vallen; sumlen += 4 + kv_pair_keylen(pairs[i]) + 4 + kv_pair_vallen(pairs[i]);
runlen = 0; runlen = 0;
for (i=0; i < npairs; i++) { for (i=0; i < npairs; i++) {
len = 4 + pairs[i].keylen + 4 + pairs[i].vallen; len = 4 + kv_pair_keylen(pairs[i]) + 4 + kv_pair_vallen(pairs[i]);
if (runlen + len > sumlen/2) if (runlen + len > sumlen/2)
break; break;
runlen += len; runlen += len;
...@@ -626,18 +635,19 @@ int pma_split(PMA old, PMA *newa, PMA *newb, ...@@ -626,18 +635,19 @@ int pma_split(PMA old, PMA *newa, PMA *newb,
spliti = i; spliti = i;
/* put the first half of pairs into newa */ /* put the first half of pairs into newa */
error = pmainternal_init_array(*newa, 2 * spliti); error = pmainternal_init_array(newa, 2 * spliti);
assert(error == 0); assert(error == 0);
distribute_data((*newa)->pairs, pma_index_limit(*newa), &pairs[0], spliti); distribute_data(newa->pairs, pma_index_limit(newa), &pairs[0], spliti);
(*newa)->n_pairs_present = spliti; newa->n_pairs_present = spliti;
/* put the second half of pairs into newb */ /* put the second half of pairs into newb */
error = pmainternal_init_array(*newb, 2 * (npairs-spliti)); error = pmainternal_init_array(newb, 2 * (npairs-spliti));
assert(error == 0); assert(error == 0);
distribute_data((*newb)->pairs, pma_index_limit(*newb), &pairs[spliti], npairs-spliti); distribute_data(newb->pairs, pma_index_limit(newb), &pairs[spliti], npairs-spliti);
(*newb)->n_pairs_present = npairs-spliti; newb->n_pairs_present = npairs-spliti;
toku_free(pairs); toku_free(pairs);
return 0; return 0;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment