Commit e0bf2ec7 authored by Rich Prohaska's avatar Rich Prohaska

change the pma index from an array of pairs to an array of pointers

to the key and value.  this change was made to address the slow 
insert performance on an amd64 machine.



git-svn-id: file:///svn/tokudb@87 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9c781e32
/*
* the key value pair contains a key and a value in a contiguous space. the
* key is right after the length fields and the value is right after the key.
*/
struct kv_pair {
int keylen;
int vallen;
char key[];
};
static inline void kv_pair_init(struct kv_pair *pair, void *key, int keylen, void *val, int vallen) {
pair->keylen = keylen;
memcpy(pair->key, key, keylen);
pair->vallen = vallen;
memcpy(pair->key + keylen, val, vallen);
}
static inline struct kv_pair *kv_pair_malloc(void *key, int keylen, void *val, int vallen) {
struct kv_pair *pair = toku_malloc(sizeof (struct kv_pair) + keylen + vallen);
if (pair)
kv_pair_init(pair, key, keylen, val, vallen);
return pair;
}
static inline void kv_pair_free(struct kv_pair *pair) {
toku_free(pair);
}
static inline void *kv_pair_key(struct kv_pair *pair) {
return pair->key;
}
static inline int kv_pair_keylen(struct kv_pair *pair) {
return pair->keylen;
}
static inline void *kv_pair_val(struct kv_pair *pair) {
return pair->key + pair->keylen;
}
static inline int kv_pair_vallen(struct kv_pair *pair) {
return pair->vallen;
}
#include "pma.h"
struct pair {
bytevec key; /* NULL for empty slots */
int keylen;
bytevec val;
int vallen;
};
struct pma_cursor {
PMA pma;
int position; /* -1 if the position is undefined. */
......@@ -18,7 +11,7 @@ struct pma {
enum typ_tag tag;
int N; /* How long is the array? Always a power of two >= 4. */
int n_pairs_present; /* How many array elements are non-null. */
struct pair *pairs;
struct kv_pair **pairs;
int uplgN; /* The smallest power of two >= lg(N) */
double densitystep; /* Each doubling decreases the density by densitystep.
* For example if array_len=256 and uplgN=8 then there are 5 doublings.
......@@ -31,12 +24,14 @@ struct pma {
void *skey, *sval; /* used in dbts */
};
int pmainternal_count_region (struct pair *pairs, int lo, int hi);
int pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi);
void pmainternal_calculate_parameters (PMA pma);
int pmainternal_smooth_region (struct pair *pairs, int n, int idx);
int pmainternal_printpairs (struct pair *pairs, int N);
int pmainternal_smooth_region (struct kv_pair *pairs[], int n, int idx);
int pmainternal_printpairs (struct kv_pair *pairs[], int N);
int pmainternal_make_space_at (PMA pma, int idx);
int pmainternal_find (PMA pma, DBT *, DB*); // The DB is so the comparison fuction can be called.
void print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */
int pmainternal_init_array(PMA pma, int asksize);
struct pair *pmainternal_extract_pairs(PMA pma);
struct kv_pair **pmainternal_extract_pairs(PMA pma, int lo, int hi);
#include "pma-internal.h"
#include "../include/ydb-constants.h"
#include "memory.h"
#include "key.h"
......@@ -7,46 +6,57 @@
#include <stdlib.h>
#include <stdio.h>
#include <arpa/inet.h>
#include "kv-pair.h"
#include "pma-internal.h"
static void test_make_space_at (void) {
PMA pma;
int r=pma_create(&pma, default_compare_fun);
char *key;
int r;
struct kv_pair *key_A, *key_B;
key = "A";
key_A = kv_pair_malloc(key, strlen(key)+1, 0, 0);
key = "B";
key_B = kv_pair_malloc(key, strlen(key)+1, 0, 0);
r=pma_create(&pma, default_compare_fun);
assert(r==0);
assert(pma_n_entries(pma)==0);
r=pmainternal_make_space_at(pma, 2);
assert(pma_index_limit(pma)==4);
assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL);
assert((unsigned long)pma->pairs[pma_index_limit(pma)]==0xdeadbeefL);
print_pma(pma);
pma->pairs[2].key="A";
pma->pairs[2] = key_A;
pma->n_pairs_present++;
r=pmainternal_make_space_at(pma,2);
printf("Requested space at 2, got space at %d\n", r);
print_pma(pma);
assert(pma->pairs[r].key==0);
assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL);
assert(pma->pairs[r]==0);
assert((unsigned long)pma->pairs[pma_index_limit(pma)]==0xdeadbeefL);
assert(pma_index_limit(pma)==4);
pma->pairs[0].key="A";
pma->pairs[1].key="B";
pma->pairs[2].key=0;
pma->pairs[3].key=0;
pma->pairs[0] = key_A;
pma->pairs[1] = key_B;
pma->pairs[2] = 0;
pma->pairs[3] = 0;
pma->n_pairs_present=2;
print_pma(pma);
r=pmainternal_make_space_at(pma,0);
printf("Requested space at 0, got space at %d\n", r);
print_pma(pma);
assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL); // make sure it doesn't go off the end.
assert((unsigned long)pma->pairs[pma_index_limit(pma)]==0xdeadbeefL); // make sure it doesn't go off the end.
assert(pma_index_limit(pma)==8);
pma->pairs[0].key = "A";
pma->pairs[1].key = 0;
pma->pairs[2].key = 0;
pma->pairs[3].key = 0;
pma->pairs[4].key = "B";
pma->pairs[5].key = 0;
pma->pairs[6].key = 0;
pma->pairs[7].key = 0;
pma->pairs[0] = key_A;
pma->pairs[1] = 0;
pma->pairs[2] = 0;
pma->pairs[3] = 0;
pma->pairs[4] = key_B;
pma->pairs[5] = 0;
pma->pairs[6] = 0;
pma->pairs[7] = 0;
pma->n_pairs_present=2;
print_pma(pma);
r=pmainternal_make_space_at(pma,5);
......@@ -55,15 +65,16 @@ static void test_make_space_at (void) {
{
int i;
for (i=0; i<pma_index_limit(pma); i++) {
if (pma->pairs[i].key) {
if (pma->pairs[i]) {
assert(i<r);
pma->pairs[i] = 0;
}
pma->pairs[i].key=0; // zero it so that we don't mess things up on free
pma->pairs[i].val=0;
}
}
r=pma_free(&pma); assert(r==0);
assert(pma==0);
kv_pair_free(key_A);
kv_pair_free(key_B);
}
static void test_pma_find (void) {
......@@ -76,14 +87,13 @@ static void test_pma_find (void) {
MALLOC_N(N,pma->pairs);
// All that is needed to test pma_find is N and pairs.
pma->N = N;
for (i=0; i<N; i++) pma->pairs[i].key=0;
for (i=0; i<N; i++) pma->pairs[i]=0;
assert(pma_index_limit(pma)==N);
pma->compare_fun = default_compare_fun;
r=pmainternal_find(pma, fill_dbt(&k, "hello", 5), 0);
assert(r==0);
pma->pairs[5].key="hello";
pma->pairs[5].keylen=5;
pma->pairs[5] = kv_pair_malloc("hello", 5, 0, 0);
assert(pma_index_limit(pma)==N);
r=pmainternal_find(pma, fill_dbt(&k, "hello", 5), 0);
assert(pma_index_limit(pma)==N);
......@@ -93,8 +103,7 @@ static void test_pma_find (void) {
r=pmainternal_find(pma, fill_dbt(&k, "aaa", 3), 0);
assert(r==0);
pma->pairs[N-1].key="there";
pma->pairs[N-1].keylen=5;
pma->pairs[N-1] = kv_pair_malloc("there", 5, 0, 0);
r=pmainternal_find(pma, fill_dbt(&k, "hello", 5), 0);
assert(r==5);
r=pmainternal_find(pma, fill_dbt(&k, "there", 5), 0);
......@@ -105,13 +114,17 @@ static void test_pma_find (void) {
assert(r==6);
r=pmainternal_find(pma, fill_dbt(&k, "zzz", 3), 0);
assert(r==N);
for (i=0; i<N; i++)
if (pma->pairs[i])
kv_pair_free(pma->pairs[i]);
toku_free(pma->pairs);
toku_free(pma);
}
void test_smooth_region_N (int N) {
struct pair pairs[N];
char *strings[100];
struct kv_pair *pairs[N];
struct kv_pair *strings[N];
char string[N];
int i;
int len;
......@@ -121,7 +134,7 @@ void test_smooth_region_N (int N) {
for (i=0; i<N; i++) {
snprintf(string, 10, "%0*d", len, i);
strings[i] = strdup(string);
strings[i] = kv_pair_malloc(string, len+1, 0, 0);
}
assert(N<30);
......@@ -132,16 +145,16 @@ void test_smooth_region_N (int N) {
int r;
for (j=0; j<N; j++) {
if ((1<<j)&i) {
pairs[j].key = strings[j];
pairs[j] = strings[j];
} else {
pairs[j].key = 0;
pairs[j] = 0;
}
}
pmainternal_printpairs(pairs, N); printf(" at %d becomes f", insertat);
r = pmainternal_smooth_region(pairs, N, insertat);
pmainternal_printpairs(pairs, N); printf(" at %d\n", r);
assert(0<=r); assert(r<N);
assert(pairs[r].key==0);
assert(pairs[r]==0);
/* Now verify that things are in the right place:
* everything before r should be smaller than keys[insertat].
* everything after is bigger.
......@@ -149,8 +162,8 @@ void test_smooth_region_N (int N) {
{
int cleari = i;
for (j=0; j<N; j++) {
if (pairs[j].key) {
int whichkey = atoi(pairs[j].key);
if (pairs[j]) {
int whichkey = atoi(pairs[j]->key);
assert(cleari&(1<<whichkey));
cleari &= ~(1<<whichkey);
if (whichkey<insertat) assert(j<r);
......@@ -162,18 +175,32 @@ void test_smooth_region_N (int N) {
}
}
for (i=0; i<N; i++) {
free(strings[i]);
kv_pair_free(strings[i]);
}
}
void test_smooth_region6 (void) {
enum {N=7};
struct pair pairs[N] = {{.key="A"},{.key="B"},{.key=0},{.key=0},{.key=0},{.key=0},{.key=0}};
struct kv_pair *pairs[N];
char *key;
int i;
for (i=0; i<N; i++)
pairs[i] = 0;
key = "A";
pairs[0] = kv_pair_malloc(key, strlen(key)+1, 0, 0);
key = "B";
pairs[1] = kv_pair_malloc(key, strlen(key)+1, 0, 0);
int r = pmainternal_smooth_region(pairs, N, 2);
printf("{%s %s %s %s %s %s %s} %d\n",
(char*)pairs[0].key, (char*)pairs[1].key, (char*)pairs[2].key, (char*)pairs[3].key, (char*)pairs[4].key, (char*)pairs[5].key, (char*)pairs[6].key,
r);
printf("{ ");
for (i=0; i<N; i++)
printf("%s ", pairs[i] ? pairs[i]->key : "?");
printf("} %d\n", r);
for (i=0; i<7; i++)
if (pairs[i])
kv_pair_free(pairs[i]);
}
......@@ -191,19 +218,31 @@ static void test_calculate_parameters (void) {
}
static void test_count_region (void) {
struct pair pairs[4]={{.key=0},{.key=0},{.key=0},{.key=0}};
const int N = 4;
struct kv_pair *pairs[N];
int i;
char *key;
for (i=0; i<N; i++)
pairs[i] = 0;
assert(pmainternal_count_region(pairs,0,4)==0);
assert(pmainternal_count_region(pairs,2,4)==0);
assert(pmainternal_count_region(pairs,0,2)==0);
pairs[2].key="A";
key = "A";
pairs[2] = kv_pair_malloc(key, strlen(key)+1, 0, 0);
assert(pmainternal_count_region(pairs,0,4)==1);
assert(pmainternal_count_region(pairs,2,4)==1);
assert(pmainternal_count_region(pairs,0,2)==0);
assert(pmainternal_count_region(pairs,2,2)==0);
assert(pmainternal_count_region(pairs,2,3)==1);
pairs[3].key="B";
pairs[0].key="a";
key = "B";
pairs[3] = kv_pair_malloc(key, strlen(key)+1, 0, 0);
key = "a";
pairs[0] = kv_pair_malloc(key, strlen(key)+1, 0, 0);
assert(pmainternal_count_region(pairs,0,4)==3);
for (i=0; i<N; i++)
if (pairs[i])
kv_pair_free(pairs[i]);
}
static void test_pma_random_pick (void) {
......@@ -295,12 +334,12 @@ static void test_find_insert (void) {
assert(r==BRT_OK);
assert(keycompare(v.data,v.size,"bbbdata", 8)==0);
assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL);
assert((unsigned long)pma->pairs[pma_index_limit(pma)]==0xdeadbeefL);
r=pma_insert(pma, fill_dbt(&k, "00000", 6), fill_dbt(&v, "d0", 3), 0);
assert(r==BRT_OK);
assert((unsigned long)pma->pairs[pma_index_limit(pma)].key==0xdeadbeefL);
assert((unsigned long)pma->pairs[pma_index_limit(pma)]==0xdeadbeefL);
r=pma_free(&pma); assert(r==0); assert(pma==0);
pma_create(&pma, default_compare_fun); assert(pma!=0);
......@@ -545,6 +584,7 @@ void test_pma_split(int n) {
PMA pmaa, pmab, pmac;
int error;
int i;
int na, nb, nc;
printf("test_pma_split:%d\n", n);
......@@ -571,8 +611,14 @@ void test_pma_split(int n) {
assert(error == 0);
printf("a:"); print_pma(pmaa);
na = pma_n_entries(pmaa);
printf("b:"); print_pma(pmab);
nb = pma_n_entries(pmab);
printf("c:"); print_pma(pmac);
nc = pma_n_entries(pmac);
assert(na == 0);
assert(nb + nc == n);
error = pma_free(&pmaa);
assert(error == 0);
......@@ -588,6 +634,7 @@ void test_pma_split_varkey() {
PMA pmaa, pmab, pmac;
int error;
int i;
int n, na, nb, nc;
printf("test_pma_split_varkey\n");
......@@ -606,6 +653,7 @@ void test_pma_split_varkey() {
error = pma_insert(pmaa, &dbtk, &dbtv, 0);
assert(error == BRT_OK);
}
n = i;
printf("a:"); print_pma(pmaa);
......@@ -613,8 +661,14 @@ void test_pma_split_varkey() {
assert(error == 0);
printf("a:"); print_pma(pmaa);
na = pma_n_entries(pmaa);
printf("b:"); print_pma(pmab);
nb = pma_n_entries(pmab);
printf("c:"); print_pma(pmac);
nc = pma_n_entries(pmac);
assert(na == 0);
assert(nb + nc == n);
error = pma_free(&pmaa);
assert(error == 0);
......
......@@ -3,17 +3,18 @@
Only the pointers are kept.
*/
#include "pma-internal.h"
#include "key.h"
#include "memory.h"
#include "myassert.h"
#include "../include/ydb-constants.h"
#include <stdio.h>
#include <errno.h>
/* Only needed for testing. */
#include <string.h>
#include "kv-pair.h"
#include "pma-internal.h"
int pma_n_entries (PMA pma) {
return pma->n_pairs_present;
......@@ -22,34 +23,52 @@ int pma_n_entries (PMA pma) {
int pma_index_limit (PMA pma) {
return pma->N;
}
int pmanode_valid (PMA pma, int i) {
assert(0<=i); assert(i<pma_index_limit(pma));
return pma->pairs[i].key!=0;
return pma->pairs[i] != 0;
}
bytevec pmanode_key (PMA pma, int i) {
struct kv_pair *pair;
assert(0<=i); assert(i<pma_index_limit(pma));
return pma->pairs[i].key;
pair = pma->pairs[i];
assert(pair);
return pair->key;
}
ITEMLEN pmanode_keylen (PMA pma, int i) {
struct kv_pair *pair;
assert(0<=i); assert(i<pma_index_limit(pma));
return pma->pairs[i].keylen;
pair = pma->pairs[i];
assert(pair);
return pair->keylen;
}
bytevec pmanode_val (PMA pma, int i) {
struct kv_pair *pair;
assert(0<=i); assert(i<pma_index_limit(pma));
return pma->pairs[i].val;
pair = pma->pairs[i];
assert(pair);
return pair->key + pair->keylen;
}
ITEMLEN pmanode_vallen (PMA pma, int i) {
struct kv_pair *pair;
assert(0<=i); assert(i<pma_index_limit(pma));
return pma->pairs[i].vallen;
pair = pma->pairs[i];
assert(pair);
return pair->vallen;
}
/* Could pick the same one every time if we wanted. */
int pma_random_pick(PMA pma, bytevec *key, ITEMLEN *keylen, bytevec *val, ITEMLEN *vallen) {
#if 1
int i;
/* For now a simple implementation where we simply start at the beginning and look. */
for (i=0; i<pma_index_limit(pma); i++) {
if (pma->pairs[i].key) {
if (pma->pairs[i]) {
*key = pmanode_key(pma,i);
*keylen = pmanode_keylen(pma,i);
*val = pmanode_val(pma,i);
......@@ -100,10 +119,10 @@ int pmainternal_find (PMA pma, DBT *k, DB *db) {
int mid;
// Scan forward looking for a non-null value.
for (mid=(lo+hi)/2; mid<hi; mid++) {
if (pma->pairs[mid].key!=0) {
if (pma->pairs[mid]!=0) {
// Found one.
DBT k2;
int cmp = pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[mid].key, pma->pairs[mid].keylen));
int cmp = pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[mid]->key, pma->pairs[mid]->keylen));
if (cmp==0) return mid;
else if (cmp<0) {
/* key is smaller than the midpoint, so look in the low half. */
......@@ -130,10 +149,10 @@ int pmainternal_find (PMA pma, DBT *k, DB *db) {
assert(lo==hi);
assert(hi <= pma_index_limit(pma));
/* If lo points at something, the something should not be smaller than key. */
if (lo>0 && lo < pma_index_limit(pma) && pma->pairs[lo].key) {
if (lo>0 && lo < pma_index_limit(pma) && pma->pairs[lo]) {
//printf("lo=%d\n", lo);
DBT k2;
assert(0 >= pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[lo].key, pma->pairs[lo].keylen)));
assert(0 >= pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[lo]->key, pma->pairs[lo]->keylen)));
}
return lo;
}
......@@ -142,17 +161,17 @@ int pmainternal_find (PMA pma, DBT *k, DB *db) {
//int max (int i, int j) { if (i<j) return j; else return i; }
//double lg (int n) { return log((double)n)/log(2.0); }
int pmainternal_printpairs (struct pair *pairs, int N) {
int pmainternal_printpairs (struct kv_pair *pairs[], int N) {
int count=0;
int i;
printf("{");
for (i=0; i<N; i++) {
if (i!=0) printf(" ");
if (pairs[i].key) {
printf("%s", (char*)pairs[i].key);
count++;
}
else printf("_");
if (i!=0) printf(" ");
if (pairs[i]) {
printf("%s", (char*)pairs[i]->key);
count++;
}
else printf("_");
}
printf("}");
return count;
......@@ -167,15 +186,15 @@ void print_pma (PMA pma) {
}
/* Smooth the data, and return the location of the null. */
int distribute_data (struct pair *destpairs, int dcount,
struct pair *sourcepairs, int scount) {
int distribute_data (struct kv_pair *destpairs[], int dcount,
struct kv_pair *sourcepairs[], int scount) {
assert(scount<=dcount);
if (scount==0) {
return -1;
}
if (scount==1) {
*destpairs=*sourcepairs;
if (destpairs->key==0) return 0;
destpairs[0]=sourcepairs[0];
if (destpairs[0]==0) return 0;
else return -1;
} else {
int r1 = distribute_data(destpairs, dcount/2,
......@@ -191,31 +210,28 @@ int distribute_data (struct pair *destpairs, int dcount,
/* spread the non-empty pairs around. There are n of them. Create an empty slot just before the IDXth
element, and return that slot's index in the smoothed array. */
int pmainternal_smooth_region (struct pair *pairs, int n, int idx) {
int pmainternal_smooth_region (struct kv_pair *pairs[], int n, int idx) {
int i;
int n_present=0;
for (i=0; i<n; i++) {
if (pairs[i].key) n_present++;
if (pairs[i]) n_present++;
}
n_present++; // Save one for the blank guy.
{
struct pair *MALLOC_N(n_present,tmppairs);
struct kv_pair **MALLOC_N(n_present,tmppairs);
int n_saved=0;
int r;
for (i=0; i<n; i++) {
if (i==idx) {
tmppairs[n_saved++].key = 0;
tmppairs[n_saved++] = 0;
}
if (pairs[i].key) {
if (pairs[i]) {
tmppairs[n_saved++] = pairs[i];
}
pairs[i].key = 0;
pairs[i].keylen = 0;
pairs[i].val = 0;
pairs[i].vallen = 0;
pairs[i] = 0;
}
if (idx==n) {
tmppairs[n_saved++].key = 0;
tmppairs[n_saved++] = 0;
}
//printf(" temp="); printpairs(tmppairs, n_saved);
assert(n_saved==n_present);
......@@ -254,10 +270,10 @@ void pmainternal_calculate_parameters (PMA pma)
pma->densitystep = 0.5/n_divisions;
}
int pmainternal_count_region (struct pair *pairs, int lo, int hi) {
int pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi) {
int n=0;
while (lo<hi) {
if (pairs[lo].key) n++;
if (pairs[lo]) n++;
lo++;
}
return n;
......@@ -282,7 +298,7 @@ int pma_create (PMA *pma, int (*compare_fun)(DB*,const DBT*,const DBT*)) {
}
*pma = result;
assert((unsigned long)result->pairs[result->N].key==0xdeadbeefL);
assert((unsigned long)result->pairs[result->N]==0xdeadbeefL);
return 0;
}
......@@ -301,16 +317,13 @@ int pmainternal_init_array(PMA pma, int asksize) {
n *= 2;
pma->N = n;
MALLOC_N(1+pma->N, pma->pairs);
pma->pairs = toku_malloc((1 + pma->N) * sizeof (struct kv_pair *));
if (pma->pairs == 0)
return -1;
pma->pairs[pma->N].key = (void *) 0xdeadbeef;
pma->pairs[pma->N] = (void *) 0xdeadbeef;
for (i=0; i<pma->N; i++) {
pma->pairs[i].key = 0;
pma->pairs[i].keylen = 0;
pma->pairs[i].val = 0;
pma->pairs[i].vallen = 0;
pma->pairs[i] = 0;
}
pmainternal_calculate_parameters(pma);
return 0;
......@@ -340,7 +353,7 @@ int pma_cursor_set_position_last (PMA_CURSOR c)
{
PMA pma = c->pma;
c->position=pma->N-1;
while (c->pma->pairs[c->position].key==0) {
while (c->pma->pairs[c->position]==0) {
if (c->position>0) c->position--;
else return DB_NOTFOUND;
}
......@@ -351,7 +364,7 @@ int pma_cursor_set_position_first (PMA_CURSOR c)
{
PMA pma = c->pma;
c->position=0;
while (c->pma->pairs[c->position].key==0) {
while (c->pma->pairs[c->position]==0) {
if (c->position+1<pma->N) c->position++;
else return DB_NOTFOUND;
}
......@@ -364,7 +377,7 @@ int pma_cursor_set_position_next (PMA_CURSOR c)
int old_position=c->position;
c->position++;
while (c->position<pma->N) {
if (c->pma->pairs[c->position].key!=0) return 0;
if (c->pma->pairs[c->position]!=0) return 0;
c->position++;
}
c->position=old_position;
......@@ -373,9 +386,10 @@ int pma_cursor_set_position_next (PMA_CURSOR c)
int pma_cget_current (PMA_CURSOR c, DBT *key, DBT *val) {
PMA pma = c->pma;
if (pma->pairs[c->position].key==0) return BRT_KEYEMPTY;
ybt_set_value(key, pma->pairs[c->position].key, pma->pairs[c->position].keylen, &c->skey);
ybt_set_value(val, pma->pairs[c->position].val, pma->pairs[c->position].vallen, &c->sval);
struct kv_pair *pair = pma->pairs[c->position];
if (pair==0) return BRT_KEYEMPTY;
ybt_set_value(key, pair->key, pair->keylen, &c->skey);
ybt_set_value(val, pair->key + pair->keylen, pair->vallen, &c->sval);
return 0;
}
......@@ -452,9 +466,9 @@ int pmainternal_make_space_at (PMA pma, int idx) {
assert(size==pma_index_limit(pma));
size*=2;
//printf("realloc %p to %d\n", pma->pairs, size*sizeof(*pma->pairs));
pma->pairs = toku_realloc(pma->pairs, (1+size)*sizeof(*pma->pairs));
for (i=hi; i<size; i++) pma->pairs[i].key=0;
pma->pairs[size].key = (void*)0xdeadbeefL;
pma->pairs = toku_realloc(pma->pairs, (1+size)*sizeof(struct kv_pair *));
for (i=hi; i<size; i++) pma->pairs[i]=0;
pma->pairs[size] = (void*)0xdeadbeefL;
pma->N=size;
pmainternal_calculate_parameters(pma);
hi=size;
......@@ -472,23 +486,20 @@ int pmainternal_make_space_at (PMA pma, int idx) {
}
}
enum pma_errors pma_lookup (PMA pma, DBT *k, DBT *v, DB *db) {
DBT k2;
struct kv_pair *pair;
int l = pmainternal_find(pma, k, db);
assert(0<=l ); assert(l<=pma_index_limit(pma));
if (l==pma_index_limit(pma)) return DB_NOTFOUND;
if (pma->pairs[l].key!=0 && pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[l].key,pma->pairs[l].keylen))==0) {
return ybt_set_value(v, pma->pairs[l].val, pma->pairs[l].vallen, &pma->sval);
pair = pma->pairs[l];
if (pair!=0 && pma->compare_fun(db, k, fill_dbt(&k2, pair->key, pair->keylen))==0) {
return ybt_set_value(v, pair->key + pair->keylen, pair->vallen, &pma->sval);
} else {
return DB_NOTFOUND;
return DB_NOTFOUND;
}
}
void maybe_free (const void *p) {
if (p) toku_free((void*)p);
}
/* returns 0 if OK.
* You must have freed all the cursors, otherwise returns nonzero and does nothing. */
int pma_free (PMA *pmap) {
......@@ -496,12 +507,10 @@ int pma_free (PMA *pmap) {
PMA pma=*pmap;
if (pma->cursors_head) return -1;
for (i=0; i<pma_index_limit(pma); i++) {
if (pma->pairs[i].key) {
maybe_free(pma->pairs[i].key);
maybe_free(pma->pairs[i].val);
pma->pairs[i].key=0;
pma->pairs[i].val=0;
}
if (pma->pairs[i]) {
kv_pair_free(pma->pairs[i]);
pma->pairs[i] = 0;
}
}
toku_free(pma->pairs);
if (pma->skey) toku_free(pma->skey);
......@@ -514,37 +523,31 @@ int pma_free (PMA *pmap) {
/* Copies keylen and datalen */
int pma_insert (PMA pma, DBT *k, DBT *v, DB* db) {
int idx = pmainternal_find(pma, k, db);
if (idx < pma_index_limit(pma) && pma->pairs[idx].key) {
DBT k2;
if (0==pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[idx].key, pma->pairs[idx].keylen))) {
return BRT_ALREADY_THERE; /* It is already here. Return an error. */
}
if (idx < pma_index_limit(pma) && pma->pairs[idx]) {
DBT k2;
if (0==pma->compare_fun(db, k, fill_dbt(&k2, pma->pairs[idx]->key, pma->pairs[idx]->keylen))) {
return BRT_ALREADY_THERE; /* It is already here. Return an error. */
}
}
if (pma->pairs[idx].key) {
idx = pmainternal_make_space_at (pma, idx); /* returns the new idx. */
if (pma->pairs[idx]) {
idx = pmainternal_make_space_at (pma, idx); /* returns the new idx. */
}
assert(!pma->pairs[idx].key);
pma->pairs[idx].key = memdup(k->data, k->size);
pma->pairs[idx].keylen = k->size;
pma->pairs[idx].val = memdup(v->data, v->size);
pma->pairs[idx].vallen = v->size;
assert(!pma->pairs[idx]);
pma->pairs[idx] = kv_pair_malloc(k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
pma->n_pairs_present++;
return BRT_OK;
}
int pma_delete (PMA pma, DBT *k, DB *db) {
int l = pmainternal_find(pma, k, db);
if (pma->pairs[l].key==0) {
struct kv_pair *pair = pma->pairs[l];
if (pair==0) {
printf("%s:%d l=%d r=%d\n", __FILE__, __LINE__, l, DB_NOTFOUND);
return DB_NOTFOUND;
}
assert(pma->pairs[l].val!=0);
toku_free((void*)pma->pairs[l].key);
toku_free((void*)pma->pairs[l].val);
pma->pairs[l].key = 0;
pma->pairs[l].val = 0;
pma->pairs[l].keylen = 0;
pma->pairs[l].vallen = 0;
kv_pair_free(pair);
pma->pairs[l] = 0;
pma->n_pairs_present--;
// Need to rebalance
// smooth_after_delete(pma,l);
......@@ -554,28 +557,29 @@ int pma_delete (PMA pma, DBT *k, DB *db) {
void pma_iterate (PMA pma, void(*f)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), void*v) {
int i;
for (i=0; i<pma_index_limit(pma); i++) {
if (pma->pairs[i].key) {
f(pma->pairs[i].key, pma->pairs[i].keylen,
pma->pairs[i].val, pma->pairs[i].vallen,
v);
}
struct kv_pair *pair = pma->pairs[i];
if (pair) {
f(pair->key, pair->keylen,
pair->key + pair->keylen, pair->vallen, v);
}
}
}
struct pair *pmainternal_extract_pairs(PMA pma) {
struct kv_pair **pmainternal_extract_pairs(PMA pma, int lo, int hi) {
int npairs;
struct pair *pairs;
struct kv_pair **pairs;
int i;
int lastpair;
npairs = pma_n_entries(pma);
pairs = toku_malloc(npairs * sizeof (struct pair));
pairs = toku_malloc(npairs * sizeof (struct kv_pair *));
if (pairs == 0)
return 0;
lastpair = 0;
for (i=0; i<pma_index_limit(pma); i++) {
if (pma->pairs[i].key != 0) {
for (i=lo; i<hi; i++) {
if (pma->pairs[i] != 0) {
pairs[lastpair] = pma->pairs[i];
pma->pairs[i].key = 0;
pma->pairs[i].val = 0;
pma->pairs[i] = 0;
lastpair += 1;
}
}
......@@ -583,11 +587,12 @@ struct pair *pmainternal_extract_pairs(PMA pma) {
return pairs;
}
int pma_split(PMA old, PMA *newa, PMA *newb,
int pma_split(PMA old, PMA *newap, PMA *newbp,
PMA_CURSOR *cursors, int ncursors) {
PMA newa, newb;
int error;
int npairs;
struct pair *pairs;
struct kv_pair **pairs;
int sumlen;
int runlen;
int len;
......@@ -597,28 +602,32 @@ int pma_split(PMA old, PMA *newa, PMA *newb,
assert(cursors == 0 && ncursors == 0);
/* create the new pma's */
error = pma_create(newa, old->compare_fun);
error = pma_create(newap, old->compare_fun);
if (error != 0)
return error;
error = pma_create(newb, old->compare_fun);
error = pma_create(newbp, old->compare_fun);
if (error != 0) {
pma_free(newb);
pma_free(newap);
return error;
}
newa = *newap;
newb = *newbp;
/* extract the pairs */
npairs = pma_n_entries(old);
pairs = pmainternal_extract_pairs(old);
pairs = pmainternal_extract_pairs(old, 0, old->N);
assert(pairs);
old->n_pairs_present = 0;
/* split the pairs in half by length (TODO: combine sum with extract) */
sumlen = 0;
for (i=0; i<npairs; i++)
sumlen += 4 + pairs[i].keylen + 4 + pairs[i].vallen;
sumlen += 4 + kv_pair_keylen(pairs[i]) + 4 + kv_pair_vallen(pairs[i]);
runlen = 0;
for (i=0; i < npairs; i++) {
len = 4 + pairs[i].keylen + 4 + pairs[i].vallen;
len = 4 + kv_pair_keylen(pairs[i]) + 4 + kv_pair_vallen(pairs[i]);
if (runlen + len > sumlen/2)
break;
runlen += len;
......@@ -626,18 +635,19 @@ int pma_split(PMA old, PMA *newa, PMA *newb,
spliti = i;
/* put the first half of pairs into newa */
error = pmainternal_init_array(*newa, 2 * spliti);
error = pmainternal_init_array(newa, 2 * spliti);
assert(error == 0);
distribute_data((*newa)->pairs, pma_index_limit(*newa), &pairs[0], spliti);
(*newa)->n_pairs_present = spliti;
distribute_data(newa->pairs, pma_index_limit(newa), &pairs[0], spliti);
newa->n_pairs_present = spliti;
/* put the second half of pairs into newb */
error = pmainternal_init_array(*newb, 2 * (npairs-spliti));
error = pmainternal_init_array(newb, 2 * (npairs-spliti));
assert(error == 0);
distribute_data((*newb)->pairs, pma_index_limit(*newb), &pairs[spliti], npairs-spliti);
(*newb)->n_pairs_present = npairs-spliti;
distribute_data(newb->pairs, pma_index_limit(newb), &pairs[spliti], npairs-spliti);
newb->n_pairs_present = npairs-spliti;
toku_free(pairs);
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment