Commit 956652f1 authored by Rich Prohaska's avatar Rich Prohaska

update pma cursors whenever the kv pairs are moved in the pma index.

add pma_bulk_insert functionality.



git-svn-id: file:///svn/tokudb@89 c7de825b-a66e-492c-adef-691d508d4ae1
parent e0bf2ec7
......@@ -42,3 +42,8 @@ static inline void *kv_pair_val(struct kv_pair *pair) {
static inline int kv_pair_vallen(struct kv_pair *pair) {
return pair->vallen;
}
struct kv_pair_tag {
struct kv_pair *pair;
int oldtag, newtag;
};
struct list {
struct list *next, *prev;
};
static inline void list_init(struct list *head) {
head->next = head->prev = head;
}
static inline int list_empty(struct list *head) {
return head->next == head;
}
static inline struct list *list_head(struct list *head) {
return head->next;
}
static inline struct list *list_tail(struct list *head) {
return head->prev;
}
static inline void list_insert_between(struct list *a, struct list *list, struct list *b) {
list->next = a->next;
list->prev = b->prev;
a->next = b->prev = list;
}
static inline void list_push(struct list *head, struct list *list) {
list_insert_between(head->prev, list, head);
}
static inline void list_push_head(struct list *head, struct list *list) {
list_insert_between(head, list, head->next);
}
static inline void list_remove(struct list *list) {
list->next->prev = list->prev;
list->prev->next = list->next;
}
static inline struct list *list_pop(struct list *head) {
struct list *list = head->prev;
list_remove(list);
return list;
}
static inline struct list *list_pop_head(struct list *head) {
struct list *list = head->next;
list_remove(list);
return list;
}
static inline void list_move(struct list *newhead, struct list *oldhead) {
struct list *first = oldhead->next;
struct list *last = oldhead->prev;
assert(!list_empty(oldhead));
newhead->next = first;
newhead->prev = last;
last->next = first->prev = newhead;
list_init(oldhead);
}
#define list_struct(p, t, f) (t*)((char*)(p) - __builtin_offsetof(t, f))
......@@ -3,7 +3,7 @@
struct pma_cursor {
PMA pma;
int position; /* -1 if the position is undefined. */
PMA_CURSOR next,prev;
struct list next;
void *skey, *sval; /* used in dbts. */
};
......@@ -19,19 +19,20 @@ struct pma {
* Regions of size 32 are 80% full. Regions of size 64 are 70% full.
* Regions of size 128 are 60% full. Regions of size 256 are 50% full.
* The densitystep is 0.10. */
PMA_CURSOR cursors_head, cursors_tail;
struct list cursors;
int (*compare_fun)(DB*,const DBT*,const DBT*);
void *skey, *sval; /* used in dbts */
};
int pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi);
void pmainternal_calculate_parameters (PMA pma);
int pmainternal_smooth_region (struct kv_pair *pairs[], int n, int idx);
int pmainternal_smooth_region (struct kv_pair *pairs[], int n, int idx, int base, PMA pma);
int pmainternal_printpairs (struct kv_pair *pairs[], int N);
int pmainternal_make_space_at (PMA pma, int idx);
int pmainternal_find (PMA pma, DBT *, DB*); // The DB is so the comparison fuction can be called.
void print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */
int pmainternal_init_array(PMA pma, int asksize);
struct kv_pair **pmainternal_extract_pairs(PMA pma, int lo, int hi);
int pma_resize_array(PMA pma, int asksize);
struct kv_pair_tag *pmainternal_extract_pairs(PMA pma, int lo, int hi);
void pma_update_region(PMA pma, struct list *cursorset, struct kv_pair_tag *, int n);
......@@ -6,6 +6,7 @@
#include <stdlib.h>
#include <stdio.h>
#include <arpa/inet.h>
#include "list.h"
#include "kv-pair.h"
#include "pma-internal.h"
......@@ -151,7 +152,7 @@ void test_smooth_region_N (int N) {
}
}
pmainternal_printpairs(pairs, N); printf(" at %d becomes f", insertat);
r = pmainternal_smooth_region(pairs, N, insertat);
r = pmainternal_smooth_region(pairs, N, insertat, 0, 0);
pmainternal_printpairs(pairs, N); printf(" at %d\n", r);
assert(0<=r); assert(r<N);
assert(pairs[r]==0);
......@@ -192,7 +193,7 @@ void test_smooth_region6 (void) {
key = "B";
pairs[1] = kv_pair_malloc(key, strlen(key)+1, 0, 0);
int r = pmainternal_smooth_region(pairs, N, 2);
int r = pmainternal_smooth_region(pairs, N, 2, 0, 0);
printf("{ ");
for (i=0; i<N; i++)
printf("%s ", pairs[i] ? pairs[i]->key : "?");
......@@ -517,11 +518,101 @@ void test_pma_cursor_3 (void) {
}
void assert_cursor_val(PMA_CURSOR cursor, int v) {
DBT key, val;
int error;
init_dbt(&key); key.flags = DB_DBT_MALLOC;
init_dbt(&val); val.flags = DB_DBT_MALLOC;
error = pma_cget_current(cursor, &key, &val);
assert(error == 0);
assert( v == *(int *)val.data);
toku_free(key.data);
toku_free(val.data);
}
/* make sure cursors are adjusted when the pma grows */
void test_pma_cursor_4() {
int error;
PMA pma;
PMA_CURSOR cursora, cursorb, cursorc;
int i;
error = pma_create(&pma, default_compare_fun);
assert(error == 0);
for (i=1; i<=4; i += 1) {
DBT dbtk, dbtv;
char k[5]; int v;
sprintf(k, "%4.4d", i);
fill_dbt(&dbtk, &k, strlen(k)+1);
v = i;
fill_dbt(&dbtv, &v, sizeof v);
error = pma_insert(pma, &dbtk, &dbtv, 0);
assert(error == BRT_OK);
}
assert(pma_n_entries(pma) == 4);
printf("a:"); print_pma(pma);
error = pma_cursor(pma, &cursora);
assert(error == 0);
error = pma_cursor_set_position_first(cursora);
assert(error == 0);
assert_cursor_val(cursora, 1);
error = pma_cursor(pma, &cursorb);
assert(error == 0);
error = pma_cursor_set_position_first(cursorb);
assert(error == 0);
assert_cursor_val(cursorb, 1);
error = pma_cursor_set_position_next(cursorb);
assert(error == 0);
assert_cursor_val(cursorb, 2);
error = pma_cursor(pma, &cursorc);
assert(error == 0);
error = pma_cursor_set_position_last(cursorc);
assert(error == 0);
assert_cursor_val(cursorc, 4);
for (i=5; i<=6; i += 1) {
DBT dbtk, dbtv;
char k[5]; int v;
sprintf(k, "%4.4d", i);
fill_dbt(&dbtk, &k, strlen(k)+1);
v = i;
fill_dbt(&dbtv, &v, sizeof v);
error = pma_insert(pma, &dbtk, &dbtv, 0);
assert(error == BRT_OK);
}
assert(pma_n_entries(pma) == 6);
printf("a:"); print_pma(pma);
assert_cursor_val(cursora, 1);
assert_cursor_val(cursorb, 2);
assert_cursor_val(cursorc, 4);
error = pma_cursor_free(&cursora);
assert(error == 0);
error = pma_cursor_free(&cursorb);
assert(error == 0);
error = pma_cursor_free(&cursorc);
assert(error == 0);
error = pma_free(&pma);
assert(error == 0);
}
void test_pma_cursor (void) {
test_pma_cursor_0();
test_pma_cursor_1();
test_pma_cursor_2();
test_pma_cursor_3();
test_pma_cursor_4();
}
int wrong_endian_compare_fun (DB *ignore __attribute__((__unused__)),
......@@ -580,13 +671,13 @@ void test_pma_compare_fun (int wrong_endian_p) {
r=pma_free(&pma); assert(r==0);
}
void test_pma_split(int n) {
void test_pma_split_n(int n) {
PMA pmaa, pmab, pmac;
int error;
int i;
int na, nb, nc;
printf("test_pma_split:%d\n", n);
printf("test_pma_split_n:%d\n", n);
error = pma_create(&pmaa, default_compare_fun);
assert(error == 0);
......@@ -607,7 +698,7 @@ void test_pma_split(int n) {
printf("a:"); print_pma(pmaa);
error = pma_split(pmaa, &pmab, &pmac, 0, 0);
error = pma_split(pmaa, &pmab, &pmac);
assert(error == 0);
printf("a:"); print_pma(pmaa);
......@@ -657,7 +748,7 @@ void test_pma_split_varkey() {
printf("a:"); print_pma(pmaa);
error = pma_split(pmaa, &pmab, &pmac, 0, 0);
error = pma_split(pmaa, &pmab, &pmac);
assert(error == 0);
printf("a:"); print_pma(pmaa);
......@@ -678,8 +769,240 @@ void test_pma_split_varkey() {
assert(error == 0);
}
void print_cursor(const char *str, PMA_CURSOR cursor) {
DBT key, val;
int error;
printf("cursor %s: ", str);
init_dbt(&key); key.flags = DB_DBT_MALLOC;
init_dbt(&val); val.flags = DB_DBT_MALLOC;
error = pma_cget_current(cursor, &key, &val);
assert(error == 0);
printf("%s ", (char*)key.data);
toku_free(key.data);
toku_free(val.data);
printf("\n");
}
void walk_cursor(const char *str, PMA_CURSOR cursor) {
DBT key, val;
int error;
printf("walk %s: ", str);
for (;;) {
init_dbt(&key); key.flags = DB_DBT_MALLOC;
init_dbt(&val); val.flags = DB_DBT_MALLOC;
error = pma_cget_current(cursor, &key, &val);
assert(error == 0);
printf("%s ", (char*)key.data);
toku_free(key.data);
toku_free(val.data);
error = pma_cursor_set_position_next(cursor);
if (error != 0)
break;
}
printf("\n");
}
void walk_cursor_reverse(const char *str, PMA_CURSOR cursor) {
DBT key, val;
int error;
printf("walk %s: ", str);
for (;;) {
init_dbt(&key); key.flags = DB_DBT_MALLOC;
init_dbt(&val); val.flags = DB_DBT_MALLOC;
error = pma_cget_current(cursor, &key, &val);
assert(error == 0);
printf("%s ", (char*)key.data);
toku_free(key.data);
toku_free(val.data);
error = pma_cursor_set_position_prev(cursor);
if (error != 0)
break;
}
printf("\n");
}
void test_pma_split_cursor() {
PMA pmaa, pmab, pmac;
PMA_CURSOR cursora, cursorb, cursorc;
int error;
int i;
int na, nb, nc;
printf("test_pma_split_cursor\n");
error = pma_create(&pmaa, default_compare_fun);
assert(error == 0);
/* insert some kv pairs */
for (i=1; i<=16; i += 1) {
DBT dbtk, dbtv;
char k[5]; int v;
sprintf(k, "%4.4d", i);
fill_dbt(&dbtk, &k, strlen(k)+1);
v = i;
fill_dbt(&dbtv, &v, sizeof v);
error = pma_insert(pmaa, &dbtk, &dbtv, 0);
assert(error == BRT_OK);
}
assert(pma_n_entries(pmaa) == 16);
printf("a:"); print_pma(pmaa);
error = pma_cursor(pmaa, &cursora);
assert(error == 0);
error = pma_cursor_set_position_first(cursora);
assert(error == 0);
// print_cursor("cursora", cursora);
assert_cursor_val(cursora, 1);
error = pma_cursor(pmaa, &cursorb);
assert(error == 0);
error = pma_cursor_set_position_first(cursorb);
assert(error == 0);
error = pma_cursor_set_position_next(cursorb);
assert(error == 0);
// print_cursor("cursorb", cursorb);
assert_cursor_val(cursorb, 2);
error = pma_cursor(pmaa, &cursorc);
assert(error == 0);
error = pma_cursor_set_position_last(cursorc);
assert(error == 0);
// print_cursor("cursorc", cursorc);
assert_cursor_val(cursorc, 16);
error = pma_split(pmaa, &pmab, &pmac);
assert(error == 0);
printf("a:"); print_pma(pmaa);
na = pma_n_entries(pmaa);
assert(na == 0);
printf("b:"); print_pma(pmab);
nb = pma_n_entries(pmab);
printf("c:"); print_pma(pmac);
nc = pma_n_entries(pmac);
assert(nb + nc == 16);
/* cursors open, should fail */
error = pma_free(&pmab);
assert(error != 0);
/* walk cursora */
assert_cursor_val(cursora, 1);
walk_cursor("cursora", cursora);
/* walk cursorb */
assert_cursor_val(cursorb, 2);
walk_cursor("cursorb", cursorb);
/* walk cursorc */
assert_cursor_val(cursorc, 16);
walk_cursor("cursorc", cursorc);
walk_cursor_reverse("cursorc reverse", cursorc);
error = pma_cursor_free(&cursora);
assert(error == 0);
error = pma_cursor_free(&cursorb);
assert(error == 0);
error = pma_cursor_free(&cursorc);
assert(error == 0);
error = pma_free(&pmaa);
assert(error == 0);
error = pma_free(&pmab);
assert(error == 0);
error = pma_free(&pmac);
assert(error == 0);
}
void test_pma_split() {
test_pma_split_n(0); memory_check_all_free();
test_pma_split_n(1); memory_check_all_free();
test_pma_split_n(2); memory_check_all_free();
test_pma_split_n(4); memory_check_all_free();
test_pma_split_n(8); memory_check_all_free();
test_pma_split_n(9); memory_check_all_free();
test_pma_split_varkey(); memory_check_all_free();
test_pma_split_cursor(); memory_check_all_free();
}
void test_pma_bulk_insert_n(int n) {
PMA pma;
int error;
int i;
DBT *keys, *vals;
printf("test_pma_bulk_insert_n: %d\n", n);
error = pma_create(&pma, default_compare_fun);
assert(error == 0);
/* init n kv pairs */
keys = toku_malloc(n * sizeof (DBT));
assert(keys);
vals = toku_malloc(n * sizeof (DBT));
assert(vals);
/* init n kv pairs */
for (i=0; i<n; i++) {
char kstring[5];
char *k; int klen;
int *v; int vlen;
sprintf(kstring, "%4.4d", i);
klen = strlen(kstring) + 1;
k = toku_malloc(klen);
assert(k);
strcpy(k, kstring);
fill_dbt(&keys[i], k, klen);
vlen = sizeof (int);
v = toku_malloc(vlen);
assert(v);
*v = i;
fill_dbt(&vals[i], v, vlen);
}
/* bulk insert n kv pairs */
error = pma_bulk_insert(pma, keys, vals, n);
assert(error == 0);
/* verify */
print_pma(pma);
/* cleanup */
for (i=0; i<n; i++) {
toku_free(keys[i].data);
toku_free(vals[i].data);
}
error = pma_free(&pma);
assert(error == 0);
toku_free(keys);
toku_free(vals);
}
void test_pma_bulk_insert() {
test_pma_bulk_insert_n(0); memory_check_all_free();
test_pma_bulk_insert_n(1); memory_check_all_free();
test_pma_bulk_insert_n(2); memory_check_all_free();
test_pma_bulk_insert_n(3); memory_check_all_free();
test_pma_bulk_insert_n(4); memory_check_all_free();
test_pma_bulk_insert_n(5); memory_check_all_free();
test_pma_bulk_insert_n(8); memory_check_all_free();
test_pma_bulk_insert_n(32); memory_check_all_free();
}
void pma_tests (void) {
memory_check=1;
goto skip;
test_pma_compare_fun(0); memory_check_all_free();
test_pma_compare_fun(1); memory_check_all_free();
test_pma_iterate();
......@@ -693,13 +1016,9 @@ void pma_tests (void) {
test_keycompare(); memory_check_all_free();
test_pma_random_pick(); memory_check_all_free();
test_pma_cursor(); memory_check_all_free();
test_pma_split(0); memory_check_all_free();
test_pma_split(1); memory_check_all_free();
test_pma_split(2); memory_check_all_free();
test_pma_split(4); memory_check_all_free();
test_pma_split(8); memory_check_all_free();
test_pma_split(9); memory_check_all_free();
test_pma_split_varkey(); memory_check_all_free();
test_pma_split(); memory_check_all_free();
skip:
test_pma_bulk_insert(); memory_check_all_free();
}
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
......
......@@ -12,6 +12,7 @@
/* Only needed for testing. */
#include <string.h>
#include "list.h"
#include "kv-pair.h"
#include "pma-internal.h"
......@@ -187,20 +188,22 @@ void print_pma (PMA pma) {
/* Smooth the data, and return the location of the null. */
int distribute_data (struct kv_pair *destpairs[], int dcount,
struct kv_pair *sourcepairs[], int scount) {
struct kv_pair_tag sourcepairs[], int scount, PMA pma) {
assert(scount<=dcount);
if (scount==0) {
return -1;
}
if (scount==1) {
destpairs[0]=sourcepairs[0];
destpairs[0]=sourcepairs[0].pair;
if (pma)
sourcepairs[0].newtag = destpairs - pma->pairs;
if (destpairs[0]==0) return 0;
else return -1;
} else {
int r1 = distribute_data(destpairs, dcount/2,
sourcepairs, scount/2);
sourcepairs, scount/2, pma);
int r2 = distribute_data(destpairs +dcount/2, dcount-dcount/2,
sourcepairs+scount/2, scount-scount/2);
sourcepairs+scount/2, scount-scount/2, pma);
assert(r1==-1 || r2==-1);
if (r1!=-1) return r1;
else if (r2!=-1) return r2+dcount/2;
......@@ -210,7 +213,7 @@ int distribute_data (struct kv_pair *destpairs[], int dcount,
/* spread the non-empty pairs around. There are n of them. Create an empty slot just before the IDXth
element, and return that slot's index in the smoothed array. */
int pmainternal_smooth_region (struct kv_pair *pairs[], int n, int idx) {
int pmainternal_smooth_region (struct kv_pair *pairs[], int n, int idx, int base, PMA pma) {
int i;
int n_present=0;
for (i=0; i<n; i++) {
......@@ -218,26 +221,41 @@ int pmainternal_smooth_region (struct kv_pair *pairs[], int n, int idx) {
}
n_present++; // Save one for the blank guy.
{
struct kv_pair **MALLOC_N(n_present,tmppairs);
struct kv_pair_tag *tmppairs;
int n_saved=0;
int r;
tmppairs = toku_malloc(n_present * sizeof (struct kv_pair_tag));
for (i=0; i<n; i++) {
if (i==idx) {
tmppairs[n_saved++] = 0;
tmppairs[n_saved++].pair = 0;
}
if (pairs[i]) {
tmppairs[n_saved++] = pairs[i];
tmppairs[n_saved].oldtag = base + i;
tmppairs[n_saved++].pair = pairs[i];
}
pairs[i] = 0;
}
if (idx==n) {
tmppairs[n_saved++] = 0;
tmppairs[n_saved++].pair = 0;
}
//printf(" temp="); printpairs(tmppairs, n_saved);
assert(n_saved==n_present);
/* Now the tricky part. Distribute the data. */
r=distribute_data (pairs, n,
tmppairs, n_saved);
tmppairs, n_saved, pma);
if (pma && !list_empty(&pma->cursors)) {
struct list cursors;
list_move(&cursors, &pma->cursors);
pma_update_region(pma, &cursors, tmppairs, n_present);
while (!list_empty(&cursors)) {
struct list *list = list_head(&cursors);
list_remove(list);
list_push(&pma->cursors, list);
}
}
toku_free(tmppairs);
return r;
}
......@@ -280,18 +298,25 @@ int pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi) {
}
int pma_create (PMA *pma, int (*compare_fun)(DB*,const DBT*,const DBT*)) {
TAGMALLOC(PMA, result);
struct pma *result;
int error;
result = toku_malloc(sizeof (struct pma));
if (result==0) return -1;
result->n_pairs_present = 0;
result->pairs = 0;
result->cursors_head = result->cursors_tail = 0;
list_init(&result->cursors);
result->compare_fun = compare_fun;
result->skey=0;
result->skey = 0;
result->sval = 0;
result->N = 4;
#if 0 /* memory.c is broken */
result->pairs = 0;
#else
result->pairs = toku_malloc((1 + 4) * sizeof (struct kv_pair *));
#endif
error = pmainternal_init_array(result, 4);
error = pma_resize_array(result, 4);
if (error) {
toku_free(result);
return -1;
......@@ -302,22 +327,17 @@ int pma_create (PMA *pma, int (*compare_fun)(DB*,const DBT*,const DBT*)) {
return 0;
}
int pmainternal_init_array(PMA pma, int asksize) {
int pma_resize_array(PMA pma, int asksize) {
int i;
int n;
if (pma->pairs) {
toku_free(pma->pairs);
pma->pairs = 0;
}
/* find the smallest power of 2 >= n */
n = 4;
while (n < asksize)
n *= 2;
pma->N = n;
pma->pairs = toku_malloc((1 + pma->N) * sizeof (struct kv_pair *));
pma->pairs = toku_realloc(pma->pairs, (1 + pma->N) * sizeof (struct kv_pair *));
if (pma->pairs == 0)
return -1;
pma->pairs[pma->N] = (void *) 0xdeadbeef;
......@@ -331,20 +351,13 @@ int pmainternal_init_array(PMA pma, int asksize) {
int pma_cursor (PMA pma, PMA_CURSOR *cursp) {
PMA_CURSOR MALLOC(curs);
if (errno!=0) return errno;
assert(curs!=0);
if (errno!=0) return errno;
curs->position=-1; /* undefined */
if (pma->cursors_head) {
pma->cursors_head->prev = curs;
} else {
pma->cursors_tail = curs;
}
curs->next = pma->cursors_head;
curs->prev = 0;
curs->pma = pma;
curs->skey = 0;
curs->sval=0;
pma->cursors_head = curs;
list_push(&pma->cursors, &curs->next);
*cursp=curs;
return 0;
}
......@@ -360,6 +373,19 @@ int pma_cursor_set_position_last (PMA_CURSOR c)
return 0;
}
int pma_cursor_set_position_prev (PMA_CURSOR c) {
PMA pma = c->pma;
int old_position = c->position;
c->position--;
while (c->position >= 0) {
if (pma->pairs[c->position] != 0)
return 0;
c->position--;
}
c->position = old_position;
return DB_NOTFOUND;
}
int pma_cursor_set_position_first (PMA_CURSOR c)
{
PMA pma = c->pma;
......@@ -411,19 +437,7 @@ int pma_cget_first (PMA_CURSOR c, YBT *key, YBT *val) {
int pma_cursor_free (PMA_CURSOR *cursp) {
PMA_CURSOR curs=*cursp;
PMA pma = curs->pma;
if (curs->prev==0) {
assert(pma->cursors_head==curs);
pma->cursors_head = curs->next;
} else {
curs->prev->next = curs->next;
}
if (curs->next==0) {
assert(pma->cursors_tail==curs);
pma->cursors_tail = curs->prev;
} else {
curs->next->prev = curs->prev;
}
list_remove(&curs->next);
if (curs->skey) toku_free(curs->skey);
if (curs->sval) toku_free(curs->sval);
toku_free(curs);
......@@ -481,7 +495,8 @@ int pmainternal_make_space_at (PMA pma, int idx) {
}
//printf("%s:%d Smoothing from %d to %d to density %f\n", __FILE__, __LINE__, lo, hi, density);
{
int new_index = pmainternal_smooth_region(pma->pairs+lo, hi-lo, idx-lo);
int new_index = pmainternal_smooth_region(pma->pairs+lo, hi-lo, idx-lo, lo, pma);
return new_index+lo;
}
}
......@@ -505,7 +520,7 @@ enum pma_errors pma_lookup (PMA pma, DBT *k, DBT *v, DB *db) {
int pma_free (PMA *pmap) {
int i;
PMA pma=*pmap;
if (pma->cursors_head) return -1;
if (!list_empty(&pma->cursors)) return -1;
for (i=0; i<pma_index_limit(pma); i++) {
if (pma->pairs[i]) {
kv_pair_free(pma->pairs[i]);
......@@ -565,20 +580,53 @@ void pma_iterate (PMA pma, void(*f)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), voi
}
}
struct kv_pair **pmainternal_extract_pairs(PMA pma, int lo, int hi) {
void pma_update_cursors(PMA pma, struct list *cursor_set, int oldposition, int newposition) {
struct list *list, *nextlist;
struct pma_cursor *cursor;
list = list_head(cursor_set);
while (list != cursor_set) {
nextlist = list->next; /* may be removed later */
cursor = list_struct(list, struct pma_cursor, next);
if (cursor->position == oldposition) {
cursor->position = newposition;
cursor->pma = pma;
list_remove(list);
list_push(&pma->cursors, list);
}
list = nextlist;
}
}
void pma_update_region(PMA pma, struct list *cursor_set, struct kv_pair_tag *pairs, int n) {
int i;
/* short cut */
if (list_empty(cursor_set))
return;
/* update all cursors to their new positions */
for (i=0; i<n; i++) {
if (pairs[i].pair && pairs[i].oldtag >= 0)
pma_update_cursors(pma, cursor_set, pairs[i].oldtag, pairs[i].newtag);
}
}
struct kv_pair_tag *pmainternal_extract_pairs(PMA pma, int lo, int hi) {
int npairs;
struct kv_pair **pairs;
struct kv_pair_tag *pairs;
int i;
int lastpair;
npairs = pma_n_entries(pma);
pairs = toku_malloc(npairs * sizeof (struct kv_pair *));
pairs = toku_malloc(npairs * sizeof (struct kv_pair_tag));
if (pairs == 0)
return 0;
lastpair = 0;
for (i=lo; i<hi; i++) {
if (pma->pairs[i] != 0) {
pairs[lastpair] = pma->pairs[i];
pairs[lastpair].pair = pma->pairs[i];
pairs[lastpair].oldtag = i;
pma->pairs[i] = 0;
lastpair += 1;
}
......@@ -587,19 +635,17 @@ struct kv_pair **pmainternal_extract_pairs(PMA pma, int lo, int hi) {
return pairs;
}
int pma_split(PMA old, PMA *newap, PMA *newbp,
PMA_CURSOR *cursors, int ncursors) {
int pma_split(PMA old, PMA *newap, PMA *newbp) {
PMA newa, newb;
int error;
int npairs;
struct kv_pair **pairs;
struct kv_pair_tag *pairs;
int sumlen;
int runlen;
int len;
int i;
int spliti;
assert(cursors == 0 && ncursors == 0);
struct list cursors;
/* create the new pma's */
error = pma_create(newap, old->compare_fun);
......@@ -623,31 +669,98 @@ int pma_split(PMA old, PMA *newap, PMA *newbp,
/* split the pairs in half by length (TODO: combine sum with extract) */
sumlen = 0;
for (i=0; i<npairs; i++)
sumlen += 4 + kv_pair_keylen(pairs[i]) + 4 + kv_pair_vallen(pairs[i]);
sumlen += 4 + kv_pair_keylen(pairs[i].pair) + 4 + kv_pair_vallen(pairs[i].pair);
runlen = 0;
for (i=0; i < npairs; i++) {
len = 4 + kv_pair_keylen(pairs[i]) + 4 + kv_pair_vallen(pairs[i]);
len = 4 + kv_pair_keylen(pairs[i].pair) + 4 + kv_pair_vallen(pairs[i].pair);
if (runlen + len > sumlen/2)
break;
runlen += len;
}
spliti = i;
/* set the cursor set to be all of the cursors from the old pma */
list_init(&cursors);
if (!list_empty(&old->cursors))
list_move(&cursors, &old->cursors);
/* put the first half of pairs into newa */
error = pmainternal_init_array(newa, 2 * spliti);
error = pma_resize_array(newa, 2 * spliti);
assert(error == 0);
distribute_data(newa->pairs, pma_index_limit(newa), &pairs[0], spliti);
distribute_data(newa->pairs, pma_index_limit(newa), &pairs[0], spliti, newa);
pma_update_region(newa, &cursors, &pairs[0], spliti);
newa->n_pairs_present = spliti;
/* put the second half of pairs into newb */
error = pmainternal_init_array(newb, 2 * (npairs-spliti));
error = pma_resize_array(newb, 2 * (npairs-spliti));
assert(error == 0);
distribute_data(newb->pairs, pma_index_limit(newb), &pairs[spliti], npairs-spliti);
distribute_data(newb->pairs, pma_index_limit(newb), &pairs[spliti], npairs-spliti, newb);
pma_update_region(newb, &cursors, &pairs[spliti], npairs-spliti);
newb->n_pairs_present = npairs-spliti;
toku_free(pairs);
/* bind the remaining cursors to pma b */
while (!list_empty(&cursors)) {
struct list *list = list_head(&cursors);
list_remove(list);
list_push(&newa->cursors, list);
}
return 0;
}
int pma_bulk_insert_pairs(PMA pma, struct kv_pair_tag *newpairs, int n_newpairs) {
int error;
if (!list_empty(&pma->cursors))
return -1;
if (pma_n_entries(pma) > 0)
return -2;
error = pma_resize_array(pma, 2 * n_newpairs);
if (error)
return error;
distribute_data(pma->pairs, pma_index_limit(pma), newpairs, n_newpairs, pma);
pma->n_pairs_present = n_newpairs;
return 0;
}
void __pma_bulk_cleanup(struct kv_pair_tag *pairs, int n) {
int i;
for (i=0; i<n; i++)
if (pairs[i].pair)
kv_pair_free(pairs[i].pair);
}
int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs) {
struct kv_pair_tag *newpairs;
int i;
int error;
newpairs = toku_malloc(n_newpairs * sizeof (struct kv_pair_tag));
if (newpairs == 0) {
error = -1; return error;
}
for (i=0; i<n_newpairs; i++) {
newpairs[i].pair = kv_pair_malloc(keys[i].data, keys[i].size,
vals[i].data, vals[i].size);
if (newpairs[i].pair == 0) {
__pma_bulk_cleanup(newpairs, i);
toku_free(newpairs);
error = -2; return error;
}
}
error = pma_bulk_insert_pairs(pma, newpairs, n_newpairs);
if (error)
__pma_bulk_cleanup(newpairs, n_newpairs);
toku_free(newpairs);
return error;
}
......@@ -36,7 +36,27 @@ int pma_delete (PMA, DBT *, DB*);
* Don't modify the returned data. Don't free it. */
enum pma_errors pma_lookup (PMA, DBT*, DBT*, DB*);
int pma_split(PMA old, PMA *newa, PMA *newb, PMA_CURSOR *cursors, int ncursors);
/*
* split a pma into 2 pma's. the new pma's are designated the
* left and right pma's. the left and right pma's have roughly the same
* key and value space.
*
* old - the old pma
* newa - the new pma on the left
* newb - the new pma on the right
*/
int pma_split(PMA old, PMA *newa, PMA *newb);
/*
* insert several key value pairs into an empty pma
*
* pma - the pma that the key value pairs will be inserted into.
* must be empty with no cursors.
* keys - an array of pointers and lengths of the keys
* vals - an array of pointers and lengths of the values
* n_newpairs - the number of key value pairs
*/
int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs);
/* Move the cursor to the beginning or the end or to a key */
int pma_cursor (PMA, PMA_CURSOR *);
......@@ -45,6 +65,7 @@ int pma_cursor_free (PMA_CURSOR*);
int pma_cursor_set_position_last (PMA_CURSOR c);
int pma_cursor_set_position_first (PMA_CURSOR c);
int pma_cursor_set_position_next (PMA_CURSOR c); /* Requires the cursor is init'd. Returns DB_NOTFOUND if we fall off the end. */
int pma_cursor_set_position_prev (PMA_CURSOR c);
int pma_cget_current (PMA_CURSOR c, DBT *key, DBT *val);
/* Return PMA_NOTFOUND if the pma is empty. */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment