Commit 0a074cbe authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

pma with compare fun works, and also uses DBTs for most interface work

git-svn-id: file:///svn/tokudb@31 c7de825b-a66e-492c-adef-691d508d4ae1
parent d22ef7c0
...@@ -28,6 +28,7 @@ struct pma { ...@@ -28,6 +28,7 @@ struct pma {
* The densitystep is 0.10. */ * The densitystep is 0.10. */
PMA_CURSOR cursors_head, cursors_tail; PMA_CURSOR cursors_head, cursors_tail;
int (*compare_fun)(DB*,DBT*,DBT*); int (*compare_fun)(DB*,DBT*,DBT*);
void *skey, *sval; /* used in dbts */
}; };
int pmainternal_count_region (struct pair *pairs, int lo, int hi); int pmainternal_count_region (struct pair *pairs, int lo, int hi);
...@@ -35,5 +36,5 @@ void pmainternal_calculate_parameters (PMA pma); ...@@ -35,5 +36,5 @@ void pmainternal_calculate_parameters (PMA pma);
int pmainternal_smooth_region (struct pair *pairs, int n, int idx); int pmainternal_smooth_region (struct pair *pairs, int n, int idx);
int pmainternal_printpairs (struct pair *pairs, int N); int pmainternal_printpairs (struct pair *pairs, int N);
int pmainternal_make_space_at (PMA pma, int idx); int pmainternal_make_space_at (PMA pma, int idx);
int pmainternal_find (PMA pma, bytevec key, int keylen); int pmainternal_find (PMA pma, DBT *, DB*); // The DB is so the comparison fuction can be called.
void print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */ void print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */
...@@ -70,37 +70,38 @@ static void test_pma_find (void) { ...@@ -70,37 +70,38 @@ static void test_pma_find (void) {
int i; int i;
int r; int r;
const int N = 16; const int N = 16;
DBT k;
MALLOC(pma); MALLOC(pma);
MALLOC_N(N,pma->pairs); MALLOC_N(N,pma->pairs);
// All that is needed to test pma_find is N and pairs. // All that is needed to test pma_find is N and pairs.
pma->N = N; pma->N = N;
for (i=0; i<N; i++) pma->pairs[i].key=0; for (i=0; i<N; i++) pma->pairs[i].key=0;
assert(pma_index_limit(pma)==N); assert(pma_index_limit(pma)==N);
r=pmainternal_find(pma, "hello", 5); r=pmainternal_find(pma, fill_dbt(&k, "hello", 5), 0);
assert(r==0); assert(r==0);
pma->pairs[5].key="hello"; pma->pairs[5].key="hello";
pma->pairs[5].keylen=5; pma->pairs[5].keylen=5;
assert(pma_index_limit(pma)==N); assert(pma_index_limit(pma)==N);
r=pmainternal_find(pma, "hello", 5); r=pmainternal_find(pma, fill_dbt(&k, "hello", 5), 0);
assert(pma_index_limit(pma)==N); assert(pma_index_limit(pma)==N);
assert(r==5); assert(r==5);
r=pmainternal_find(pma, "there", 5); r=pmainternal_find(pma, fill_dbt(&k, "there", 5), 0);
assert(r==6); assert(r==6);
r=pmainternal_find(pma, "aaa", 3); r=pmainternal_find(pma, fill_dbt(&k, "aaa", 3), 0);
assert(r==0); assert(r==0);
pma->pairs[N-1].key="there"; pma->pairs[N-1].key="there";
pma->pairs[N-1].keylen=5; pma->pairs[N-1].keylen=5;
r=pmainternal_find(pma, "hello", 5); r=pmainternal_find(pma, fill_dbt(&k, "hello", 5), 0);
assert(r==5); assert(r==5);
r=pmainternal_find(pma, "there", 5); r=pmainternal_find(pma, fill_dbt(&k, "there", 5), 0);
assert(r==N-1); assert(r==N-1);
r=pmainternal_find(pma, "aaa", 3); r=pmainternal_find(pma, fill_dbt(&k, "aaa", 3), 0);
assert(r==0); assert(r==0);
r=pmainternal_find(pma, "hellob", 6); r=pmainternal_find(pma, fill_dbt(&k, "hellob", 6), 0);
assert(r==6); assert(r==6);
r=pmainternal_find(pma, "zzz", 3); r=pmainternal_find(pma, fill_dbt(&k, "zzz", 3), 0);
assert(r==N); assert(r==N);
toku_free(pma->pairs); toku_free(pma->pairs);
toku_free(pma); toku_free(pma);
...@@ -203,25 +204,26 @@ static void test_pma_random_pick (void) { ...@@ -203,25 +204,26 @@ static void test_pma_random_pick (void) {
int r = pma_create(&pma, default_compare_fun); int r = pma_create(&pma, default_compare_fun);
bytevec key,val; bytevec key,val;
ITEMLEN keylen,vallen; ITEMLEN keylen,vallen;
DBT k,v;
assert(r==0); assert(r==0);
r = pma_random_pick(pma, &key, &keylen, &val, &vallen); r = pma_random_pick(pma, &key, &keylen, &val, &vallen);
assert(r==DB_NOTFOUND); assert(r==DB_NOTFOUND);
r = pma_insert(pma, "hello", 6, "there", 6); r = pma_insert(pma, fill_dbt(&k, "hello", 6), fill_dbt(&v, "there", 6), 0);
assert(r==BRT_OK); assert(r==BRT_OK);
r = pma_random_pick(pma, &key, &keylen, &val, &vallen); r = pma_random_pick(pma, &key, &keylen, &val, &vallen);
assert(r==0); assert(r==0);
assert(keylen==6); assert(vallen==6); assert(keylen==6); assert(vallen==6);
assert(strcmp(key,"hello")==0); assert(strcmp(key,"hello")==0);
assert(strcmp(val,"there")==0); assert(strcmp(val,"there")==0);
r = pma_delete(pma, "nothello", 9); r = pma_delete(pma, fill_dbt(&k, "nothello", 9), 0);
assert(r==DB_NOTFOUND); assert(r==DB_NOTFOUND);
r = pma_delete(pma, "hello", 6); r = pma_delete(pma, fill_dbt(&k, "hello", 6), 0);
assert(r==BRT_OK); assert(r==BRT_OK);
r = pma_random_pick(pma, &key, &keylen, &val, &vallen); r = pma_random_pick(pma, &key, &keylen, &val, &vallen);
assert(r==DB_NOTFOUND); assert(r==DB_NOTFOUND);
r = pma_insert(pma, "hello", 6, "there", 6); r = pma_insert(pma, fill_dbt(&k, "hello", 6), fill_dbt(&v, "there", 6), 0);
assert(r==BRT_OK); assert(r==BRT_OK);
...@@ -231,20 +233,20 @@ static void test_pma_random_pick (void) { ...@@ -231,20 +233,20 @@ static void test_pma_random_pick (void) {
assert(strcmp(key,"hello")==0); assert(strcmp(key,"hello")==0);
assert(strcmp(val,"there")==0); assert(strcmp(val,"there")==0);
r = pma_insert(pma, "aaa", 4, "athere", 7); assert(r==BRT_OK); r = pma_insert(pma, fill_dbt(&k, "aaa", 4), fill_dbt(&v, "athere", 7), 0); assert(r==BRT_OK);
r = pma_insert(pma, "aab", 4, "bthere", 7); assert(r==BRT_OK); r = pma_insert(pma, fill_dbt(&k, "aab", 4), fill_dbt(&v, "bthere", 7), 0); assert(r==BRT_OK);
r = pma_insert(pma, "aac", 4, "cthere", 7); assert(r==BRT_OK); r = pma_insert(pma, fill_dbt(&k, "aac", 4), fill_dbt(&v, "cthere", 7), 0); assert(r==BRT_OK);
r = pma_insert(pma, "aad", 4, "dthere", 7); assert(r==BRT_OK); r = pma_insert(pma, fill_dbt(&k, "aad", 4), fill_dbt(&v, "dthere", 7), 0); assert(r==BRT_OK);
r = pma_insert(pma, "aae", 4, "ethere", 7); assert(r==BRT_OK); r = pma_insert(pma, fill_dbt(&k, "aae", 4), fill_dbt(&v, "ethere", 7), 0); assert(r==BRT_OK);
r = pma_insert(pma, "aaf", 4, "fthere", 7); assert(r==BRT_OK); r = pma_insert(pma, fill_dbt(&k, "aaf", 4), fill_dbt(&v, "fthere", 7), 0); assert(r==BRT_OK);
r = pma_insert(pma, "aag", 4, "gthere", 7); assert(r==BRT_OK); r = pma_insert(pma, fill_dbt(&k, "aag", 4), fill_dbt(&v, "gthere", 7), 0); assert(r==BRT_OK);
r = pma_delete(pma, "aaa", 4); assert(r==BRT_OK); r = pma_delete(pma, fill_dbt(&k, "aaa", 4), 0); assert(r==BRT_OK);
r = pma_delete(pma, "aab", 4); assert(r==BRT_OK); r = pma_delete(pma, fill_dbt(&k, "aab", 4), 0); assert(r==BRT_OK);
r = pma_delete(pma, "aac", 4); assert(r==BRT_OK); r = pma_delete(pma, fill_dbt(&k, "aac", 4), 0); assert(r==BRT_OK);
r = pma_delete(pma, "aad", 4); assert(r==BRT_OK); r = pma_delete(pma, fill_dbt(&k, "aad", 4), 0); assert(r==BRT_OK);
r = pma_delete(pma, "aae", 4); assert(r==BRT_OK); r = pma_delete(pma, fill_dbt(&k, "aae", 4), 0); assert(r==BRT_OK);
r = pma_delete(pma, "aag", 4); assert(r==BRT_OK); r = pma_delete(pma, fill_dbt(&k, "aag", 4), 0); assert(r==BRT_OK);
r = pma_delete(pma, "hello", 6); assert(r==BRT_OK); r = pma_delete(pma, fill_dbt(&k, "hello", 6), 0); assert(r==BRT_OK);
r = pma_random_pick(pma, &key, &keylen, &val, &vallen); r = pma_random_pick(pma, &key, &keylen, &val, &vallen);
assert(r==0); assert(r==0);
...@@ -258,34 +260,37 @@ static void test_pma_random_pick (void) { ...@@ -258,34 +260,37 @@ static void test_pma_random_pick (void) {
static void test_find_insert (void) { static void test_find_insert (void) {
PMA pma; PMA pma;
int r; int r;
bytevec dv; DBT k,v;
ITEMLEN dl;
pma_create(&pma, default_compare_fun); pma_create(&pma, default_compare_fun);
r=pma_lookup(pma, "aaa", 3, &dv, &dl); r=pma_lookup(pma, fill_dbt(&k, "aaa", 3), &v, 0);
assert(r==DB_NOTFOUND); assert(r==DB_NOTFOUND);
r=pma_insert(pma, "aaa", 3, "aaadata", 7); r=pma_insert(pma, fill_dbt(&k, "aaa", 3), fill_dbt(&v, "aaadata", 7), 0);
assert(r==BRT_OK); assert(r==BRT_OK);
dv=0; dl=0; ybt_init(&v);
r=pma_lookup(pma, "aaa", 3, &dv, &dl); r=pma_lookup(pma, fill_dbt(&k, "aaa", 3), &v, 0);
assert(r==BRT_OK); assert(r==BRT_OK);
assert(keycompare(dv,dl,"aaadata", 7)==0); assert(v.size==7);
assert(keycompare(v.data,v.size,"aaadata", 7)==0);
//toku_free(v.data); v.data=0;
r=pma_insert(pma, "bbb", 4, "bbbdata", 8); r=pma_insert(pma, fill_dbt(&k, "bbb", 4), fill_dbt(&v, "bbbdata", 8), 0);
assert(r==BRT_OK); assert(r==BRT_OK);
r=pma_lookup(pma, "aaa", 3, &dv, &dl); ybt_init(&v);
r=pma_lookup(pma, fill_dbt(&k, "aaa", 3), &v, 0);
assert(r==BRT_OK); assert(r==BRT_OK);
assert(keycompare(dv,dl,"aaadata", 7)==0); assert(keycompare(v.data,v.size,"aaadata", 7)==0);
r=pma_lookup(pma, "bbb", 4, &dv, &dl); ybt_init(&v);
r=pma_lookup(pma, fill_dbt(&k, "bbb", 4), &v, 0);
assert(r==BRT_OK); assert(r==BRT_OK);
assert(keycompare(dv,dl,"bbbdata", 8)==0); assert(keycompare(v.data,v.size,"bbbdata", 8)==0);
assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL); assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL);
r=pma_insert(pma, "00000", 6, "d0", 3); r=pma_insert(pma, fill_dbt(&k, "00000", 6), fill_dbt(&v, "d0", 3), 0);
assert(r==BRT_OK); assert(r==BRT_OK);
assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL); assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL);
...@@ -301,7 +306,7 @@ static void test_find_insert (void) { ...@@ -301,7 +306,7 @@ static void test_find_insert (void) {
snprintf(string,10,"%05d",i); snprintf(string,10,"%05d",i);
snprintf(dstring,10,"d%d", i); snprintf(dstring,10,"d%d", i);
printf("Inserting %d: string=%s dstring=%s\n", i, string, dstring); printf("Inserting %d: string=%s dstring=%s\n", i, string, dstring);
r=pma_insert(pma, string, strlen(string)+1, dstring, strlen(dstring)+1); r=pma_insert(pma, fill_dbt(&k, string, strlen(string)+1), fill_dbt(&v, dstring, strlen(dstring)+1), 0);
assert(r==BRT_OK); assert(r==BRT_OK);
} }
} }
...@@ -327,12 +332,13 @@ static void test_pma_iterate_internal (PMA pma, int expected_k, int expected_v) ...@@ -327,12 +332,13 @@ static void test_pma_iterate_internal (PMA pma, int expected_k, int expected_v)
static void test_pma_iterate (void) { static void test_pma_iterate (void) {
PMA pma; PMA pma;
int r; int r;
DBT k,v;
pma_create(&pma, default_compare_fun); pma_create(&pma, default_compare_fun);
r=pma_insert(pma, "42", 3, "-19", 4); r=pma_insert(pma, fill_dbt(&k, "42", 3), fill_dbt(&v, "-19", 4), 0);
assert(r==BRT_OK); assert(r==BRT_OK);
test_pma_iterate_internal(pma, 42, -19); test_pma_iterate_internal(pma, 42, -19);
r=pma_insert(pma, "12", 3, "-100", 5); r=pma_insert(pma, fill_dbt(&k, "12", 3), fill_dbt(&v, "-100", 5), 0);
assert(r==BRT_OK); assert(r==BRT_OK);
test_pma_iterate_internal(pma, 42+12, -19-100); test_pma_iterate_internal(pma, 42+12, -19-100);
r=pma_free(&pma); assert(r==0); assert(pma==0); r=pma_free(&pma); assert(r==0); assert(pma==0);
...@@ -343,11 +349,12 @@ static void test_pma_iterate2 (void) { ...@@ -343,11 +349,12 @@ static void test_pma_iterate2 (void) {
int r; int r;
int sum=0; int sum=0;
int n_items=0; int n_items=0;
DBT k,v;
r=pma_create(&pma0, default_compare_fun); assert(r==0); r=pma_create(&pma0, default_compare_fun); assert(r==0);
r=pma_create(&pma1, default_compare_fun); assert(r==0); r=pma_create(&pma1, default_compare_fun); assert(r==0);
pma_insert(pma0, "a", 2, "aval", 5); pma_insert(pma0, fill_dbt(&k, "a", 2), fill_dbt(&v, "aval", 5), 0);
pma_insert(pma0, "b", 2, "bval", 5); pma_insert(pma0, fill_dbt(&k, "b", 2), fill_dbt(&v, "bval", 5), 0);
pma_insert(pma1, "x", 2, "xval", 5); pma_insert(pma1, fill_dbt(&k, "x", 2), fill_dbt(&v, "xval", 5), 0);
PMA_ITERATE(pma0,kv __attribute__((__unused__)),kl,dv __attribute__((__unused__)),dl, (n_items++,sum+=kl+dl)); PMA_ITERATE(pma0,kv __attribute__((__unused__)),kl,dv __attribute__((__unused__)),dl, (n_items++,sum+=kl+dl));
PMA_ITERATE(pma1,kv __attribute__((__unused__)),kl,dv __attribute__((__unused__)), dl, (n_items++,sum+=kl+dl)); PMA_ITERATE(pma1,kv __attribute__((__unused__)),kl,dv __attribute__((__unused__)), dl, (n_items++,sum+=kl+dl));
assert(sum==21); assert(sum==21);
...@@ -422,10 +429,11 @@ void test_pma_cursor_3 (void) { ...@@ -422,10 +429,11 @@ void test_pma_cursor_3 (void) {
PMA_CURSOR c=0; PMA_CURSOR c=0;
int r; int r;
DBT key,val; DBT key,val;
DBT k,v;
r=pma_create(&pma, default_compare_fun); assert(r==0); r=pma_create(&pma, default_compare_fun); assert(r==0);
r=pma_insert(pma, "x", 2, "xx", 3); assert(r==BRT_OK); r=pma_insert(pma, fill_dbt(&k, "x", 2), fill_dbt(&v, "xx", 3), 0); assert(r==BRT_OK);
r=pma_insert(pma, "m", 2, "mm", 3); assert(r==BRT_OK); r=pma_insert(pma, fill_dbt(&k, "m", 2), fill_dbt(&v, "mm", 3), 0); assert(r==BRT_OK);
r=pma_insert(pma, "aa", 3, "a", 2); assert(r==BRT_OK); r=pma_insert(pma, fill_dbt(&k, "aa", 3), fill_dbt(&v,"a", 2), 0); assert(r==BRT_OK);
ybt_init(&key); key.flags=DB_DBT_REALLOC; ybt_init(&key); key.flags=DB_DBT_REALLOC;
ybt_init(&val); val.flags=DB_DBT_REALLOC; ybt_init(&val); val.flags=DB_DBT_REALLOC;
r=pma_cursor(pma, &c); assert(r==0); assert(c!=0); r=pma_cursor(pma, &c); assert(r==0); assert(c!=0);
...@@ -478,7 +486,6 @@ int wrong_endian_compare_fun (DB *ignore __attribute__((__unused__)), ...@@ -478,7 +486,6 @@ int wrong_endian_compare_fun (DB *ignore __attribute__((__unused__)),
int siz = a->size; int siz = a->size;
assert(a->size==b->size); // This function requires that the keys be the same size. assert(a->size==b->size); // This function requires that the keys be the same size.
abort();
for (i=0; i<a->size; i++) { for (i=0; i<a->size; i++) {
if (ad[siz-1-i]<bd[siz-1-i]) return -1; if (ad[siz-1-i]<bd[siz-1-i]) return -1;
if (ad[siz-1-i]>bd[siz-1-i]) return +1; if (ad[siz-1-i]>bd[siz-1-i]) return +1;
...@@ -495,11 +502,12 @@ void test_pma_compare_fun (int wrong_endian_p) { ...@@ -495,11 +502,12 @@ void test_pma_compare_fun (int wrong_endian_p) {
char *right_endian_expected_keys[] = {"00", "01", "10", "11"}; char *right_endian_expected_keys[] = {"00", "01", "10", "11"};
char **expected_keys = wrong_endian_p ? wrong_endian_expected_keys : right_endian_expected_keys; char **expected_keys = wrong_endian_p ? wrong_endian_expected_keys : right_endian_expected_keys;
int i; int i;
DBT k,v;
r = pma_create(&pma, wrong_endian_p ? wrong_endian_compare_fun : default_compare_fun); assert(r==0); r = pma_create(&pma, wrong_endian_p ? wrong_endian_compare_fun : default_compare_fun); assert(r==0);
r = pma_insert(pma, "10", 3, "10v", 4); assert(r==BRT_OK); r = pma_insert(pma, fill_dbt(&k, "10", 3), fill_dbt(&v, "10v", 4), 0); assert(r==BRT_OK);
r = pma_insert(pma, "00", 3, "00v", 4); assert(r==BRT_OK); r = pma_insert(pma, fill_dbt(&k, "00", 3), fill_dbt(&v, "00v", 4), 0); assert(r==BRT_OK);
r = pma_insert(pma, "01", 3, "01v", 4); assert(r==BRT_OK); r = pma_insert(pma, fill_dbt(&k, "01", 3), fill_dbt(&v, "01v", 4), 0); assert(r==BRT_OK);
r = pma_insert(pma, "11", 3, "11v", 4); assert(r==BRT_OK); r = pma_insert(pma, fill_dbt(&k, "11", 3), fill_dbt(&v, "11v", 4), 0); assert(r==BRT_OK);
ybt_init(&key); key.flags=DB_DBT_REALLOC; ybt_init(&key); key.flags=DB_DBT_REALLOC;
ybt_init(&val); val.flags=DB_DBT_REALLOC; ybt_init(&val); val.flags=DB_DBT_REALLOC;
r=pma_cursor(pma, &c); assert(r==0); assert(c!=0); r=pma_cursor(pma, &c); assert(r==0); assert(c!=0);
......
...@@ -92,7 +92,7 @@ void pma_show_stats (void) { ...@@ -92,7 +92,7 @@ void pma_show_stats (void) {
// For example: if the array is empty, that means we return 0. // For example: if the array is empty, that means we return 0.
// For example: if the array is full of small keys, that means we return pma_index_limit(pma), which is off the end of teh array. // For example: if the array is full of small keys, that means we return pma_index_limit(pma), which is off the end of teh array.
// For example: if the array is full of large keys, then we return 0. // For example: if the array is full of large keys, then we return 0.
int pmainternal_find (PMA pma, bytevec key, int keylen) { int pmainternal_find (PMA pma, DBT *k, DB *db) {
int lo=0, hi=pma_index_limit(pma); int lo=0, hi=pma_index_limit(pma);
/* lo and hi are the minimum and maximum values (inclusive) that we could possibly return. */ /* lo and hi are the minimum and maximum values (inclusive) that we could possibly return. */
pma_count_finds++; pma_count_finds++;
...@@ -102,7 +102,8 @@ int pmainternal_find (PMA pma, bytevec key, int keylen) { ...@@ -102,7 +102,8 @@ int pmainternal_find (PMA pma, bytevec key, int keylen) {
for (mid=(lo+hi)/2; mid<hi; mid++) { for (mid=(lo+hi)/2; mid<hi; mid++) {
if (pma->pairs[mid].key!=0) { if (pma->pairs[mid].key!=0) {
// Found one. // Found one.
int cmp = keycompare(key,keylen, pma->pairs[mid].key, pma->pairs[mid].keylen); DBT k2;
int cmp = pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[mid].key, pma->pairs[mid].keylen));
if (cmp==0) return mid; if (cmp==0) return mid;
else if (cmp<0) { else if (cmp<0) {
/* key is smaller than the midpoint, so look in the low half. */ /* key is smaller than the midpoint, so look in the low half. */
...@@ -131,7 +132,8 @@ int pmainternal_find (PMA pma, bytevec key, int keylen) { ...@@ -131,7 +132,8 @@ int pmainternal_find (PMA pma, bytevec key, int keylen) {
/* If lo points at something, the something should not be smaller than key. */ /* If lo points at something, the something should not be smaller than key. */
if (lo>0 && lo < pma_index_limit(pma) && pma->pairs[lo].key) { if (lo>0 && lo < pma_index_limit(pma) && pma->pairs[lo].key) {
//printf("lo=%d\n", lo); //printf("lo=%d\n", lo);
assert(0 >= keycompare(key, keylen, pma->pairs[lo].key, pma->pairs[lo].keylen)); DBT k2;
assert(0 >= pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[lo].key, pma->pairs[lo].keylen)));
} }
return lo; return lo;
} }
...@@ -283,6 +285,8 @@ int pma_create (PMA *pma, int (*compare_fun)(DB*,DBT*,DBT*)) { ...@@ -283,6 +285,8 @@ int pma_create (PMA *pma, int (*compare_fun)(DB*,DBT*,DBT*)) {
pmainternal_calculate_parameters(result); pmainternal_calculate_parameters(result);
result->cursors_head = result->cursors_tail = 0; result->cursors_head = result->cursors_tail = 0;
result->compare_fun = compare_fun; result->compare_fun = compare_fun;
result->skey=0;
result->sval = 0;
*pma = result; *pma = result;
assert((unsigned long)result->pairs[result->N].key==0xdeadbeefL); assert((unsigned long)result->pairs[result->N].key==0xdeadbeefL);
return 0; return 0;
...@@ -446,16 +450,13 @@ int pmainternal_make_space_at (PMA pma, int idx) { ...@@ -446,16 +450,13 @@ int pmainternal_make_space_at (PMA pma, int idx) {
} }
/* Exposes internals of the PMA by returning a pointer to the guts. enum pma_errors pma_lookup (PMA pma, DBT *k, DBT *v, DB *db) {
* Don't modify the returned data. Don't free it. */ DBT k2;
enum pma_errors pma_lookup (PMA pma, bytevec key, ITEMLEN keylen, bytevec*val, ITEMLEN *vallen) { int l = pmainternal_find(pma, k, db);
int l = pmainternal_find(pma, key, keylen);
assert(0<=l ); assert(l<=pma_index_limit(pma)); assert(0<=l ); assert(l<=pma_index_limit(pma));
if (l==pma_index_limit(pma)) return DB_NOTFOUND; if (l==pma_index_limit(pma)) return DB_NOTFOUND;
if (keycompare(key,keylen,pma->pairs[l].key,pma->pairs[l].keylen)==0) { if (pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[l].key,pma->pairs[l].keylen))==0) {
*val = pma->pairs[l].val; return ybt_set_value(v, pma->pairs[l].val, pma->pairs[l].vallen, &pma->sval);
*vallen = pma->pairs[l].vallen;
return BRT_OK;
} else { } else {
return DB_NOTFOUND; return DB_NOTFOUND;
} }
...@@ -481,15 +482,18 @@ int pma_free (PMA *pmap) { ...@@ -481,15 +482,18 @@ int pma_free (PMA *pmap) {
} }
toku_free(pma->pairs); toku_free(pma->pairs);
toku_free(pma); toku_free(pma);
if (pma->skey) toku_free(pma->skey);
if (pma->sval) toku_free(pma->sval);
*pmap=0; *pmap=0;
return 0; return 0;
} }
/* Copies keylen and datalen */ /* Copies keylen and datalen */
int pma_insert (PMA pma, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen) { int pma_insert (PMA pma, DBT *k, DBT *v, DB* db) {
int idx = pmainternal_find(pma, key, keylen); int idx = pmainternal_find(pma, k, db);
if (idx < pma_index_limit(pma) && pma->pairs[idx].key) { if (idx < pma_index_limit(pma) && pma->pairs[idx].key) {
if (0==keycompare(key, keylen, pma->pairs[idx].key, pma->pairs[idx].keylen)) { DBT k2;
if (0==pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[idx].key, pma->pairs[idx].keylen))) {
return BRT_ALREADY_THERE; /* It is already here. Return an error. */ return BRT_ALREADY_THERE; /* It is already here. Return an error. */
} }
} }
...@@ -497,40 +501,16 @@ int pma_insert (PMA pma, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN data ...@@ -497,40 +501,16 @@ int pma_insert (PMA pma, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN data
idx = pmainternal_make_space_at (pma, idx); /* returns the new idx. */ idx = pmainternal_make_space_at (pma, idx); /* returns the new idx. */
} }
assert(!pma->pairs[idx].key); assert(!pma->pairs[idx].key);
pma->pairs[idx].key = memdup(key, keylen); pma->pairs[idx].key = memdup(k->data, k->size);
pma->pairs[idx].keylen = keylen; pma->pairs[idx].keylen = k->size;
pma->pairs[idx].val = memdup(data, datalen); pma->pairs[idx].val = memdup(v->data, v->size);
pma->pairs[idx].vallen = datalen; pma->pairs[idx].vallen = v->size;
pma->n_pairs_present++; pma->n_pairs_present++;
return BRT_OK; return BRT_OK;
} }
#if 0 int pma_delete (PMA pma, DBT *k, DB *db) {
void smooth_after_delete (PMA pma, int idx) { int l = pmainternal_find(pma, k, db);
int size=pma->uplgN;
int lo=idx;
int hi=idx;
double density=0.1;
while (1) {
lo=idx-size/2;
hi=idx+size/2;
if (lo<0) { hi-=lo; lo=0; }
else if (hi>pma_index_limit(pma)) { lo-=(hi-pma_index_limit(pma)); hi=pma_index_limit(pma); }
else { ; /* nothing */ }
assert(density<0.25);
{
int count=pmainternal_count_region(pma->pairs, lo, hi);
if (count/(double)(hi-lo) >= density) break;
if (lo==0 && hi==pma_index_limit(pma)) {
/* The array needs to be shrunk */
}
#endif
int pma_delete (PMA pma, bytevec key, ITEMLEN keylen) {
int l = pmainternal_find(pma, key, keylen);
if (pma->pairs[l].key==0) { if (pma->pairs[l].key==0) {
printf("%s:%d l=%d r=%d\n", __FILE__, __LINE__, l, DB_NOTFOUND); printf("%s:%d l=%d r=%d\n", __FILE__, __LINE__, l, DB_NOTFOUND);
return DB_NOTFOUND; return DB_NOTFOUND;
......
...@@ -24,15 +24,17 @@ int pma_n_entries (PMA); ...@@ -24,15 +24,17 @@ int pma_n_entries (PMA);
/* The values returned should not be modified.by the caller. */ /* The values returned should not be modified.by the caller. */
/* Any cursors should be updated. */ /* Any cursors should be updated. */
/* Duplicates the key and keylen. */ /* Duplicates the key and keylen. */
enum pma_errors pma_insert (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen); //enum pma_errors pma_insert (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
// The DB pointer is there so that the comparison function can be called.
enum pma_errors pma_insert (PMA, DBT*, DBT*, DB*);
/* This returns an error if the key is NOT present. */ /* This returns an error if the key is NOT present. */
int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen); int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
/* This returns an error if the key is NOT present. */ /* This returns an error if the key is NOT present. */
int pma_delete (PMA, bytevec key, ITEMLEN keylen); int pma_delete (PMA, DBT *, DB*);
/* Exposes internals of the PMA by returning a pointer to the guts. /* Exposes internals of the PMA by returning a pointer to the guts.
* Don't modify the returned data. Don't free it. */ * Don't modify the returned data. Don't free it. */
enum pma_errors pma_lookup (PMA, bytevec key, ITEMLEN keylen, bytevec*data, ITEMLEN *datalen); enum pma_errors pma_lookup (PMA, DBT*, DBT*, DB*);
/* Move the cursor to the beginning or the end or to a key */ /* Move the cursor to the beginning or the end or to a key */
int pma_cursor (PMA, PMA_CURSOR *); int pma_cursor (PMA, PMA_CURSOR *);
......
...@@ -9,6 +9,14 @@ int ybt_init (DBT *ybt) { ...@@ -9,6 +9,14 @@ int ybt_init (DBT *ybt) {
return 0; return 0;
} }
DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len) {
ybt_init(dbt);
dbt->size=len;
dbt->data=(char*)k;
return dbt;
}
int ybt_set_value (DBT *ybt, bytevec val, ITEMLEN vallen, void **staticptrp) { int ybt_set_value (DBT *ybt, bytevec val, ITEMLEN vallen, void **staticptrp) {
if (ybt->flags==DB_DBT_MALLOC) { if (ybt->flags==DB_DBT_MALLOC) {
domalloc: domalloc:
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
int ybt_init (DBT *); int ybt_init (DBT *);
DBT *fill_dbt(DBT *dbt, bytevec k, ITEMLEN len);
int ybt_set_value (DBT *, bytevec val, ITEMLEN vallen, void **staticptrp); int ybt_set_value (DBT *, bytevec val, ITEMLEN vallen, void **staticptrp);
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment