Commit e804fe3d authored by Rich Prohaska's avatar Rich Prohaska

implement the rebalance and shrink after pma deletion

git-svn-id: file:///svn/tokudb@220 c7de825b-a66e-492c-adef-691d508d4ae1
parent 5fea2602
#include "pma.h"
#include "pma.h"
struct pma_cursor {
PMA pma;
......@@ -13,12 +13,14 @@ struct pma {
int n_pairs_present; /* How many array elements are non-null. */
struct kv_pair **pairs;
int uplgN; /* The smallest power of two >= lg(N) */
double densitystep; /* Each doubling decreases the density by densitystep.
double udt_step; /* upper density threshold step */
/* Each doubling decreases the density by density step.
* For example if array_len=256 and uplgN=8 then there are 5 doublings.
* Regions of size 8 are full. Regions of size 16 are 90% full.
* Regions of size 32 are 80% full. Regions of size 64 are 70% full.
* Regions of size 128 are 60% full. Regions of size 256 are 50% full.
* The densitystep is 0.10. */
* The density step is 0.10. */
double ldt_step; /* lower density threshold step */
struct list cursors;
int (*compare_fun)(DB*,const DBT*,const DBT*);
void *skey, *sval; /* used in dbts */
......@@ -31,8 +33,38 @@ int pmainternal_printpairs (struct kv_pair *pairs[], int N);
int pmainternal_make_space_at (PMA pma, int idx);
int pmainternal_find (PMA pma, DBT *, DB*); // The DB is so the comparison fuction can be called.
void print_pma (PMA pma); /* useful for debugging, so keep the name short. I.e., not pmainternal_print_pma() */
int pma_resize_array(PMA pma, int asksize);
struct kv_pair_tag *pmainternal_extract_pairs(PMA pma, int lo, int hi);
/*
* resize the pma array to asksize. zero all array entries starting from startx.
*/
int __pma_resize_array(PMA pma, int asksize, int startx);
void pma_update_region(PMA pma, struct list *cursorset, struct kv_pair_tag *, int n);
/*
* extract pairs from the pma in the window delimited by lo and hi.
*/
struct kv_pair_tag *__pma_extract_pairs(PMA pma, int count, int lo, int hi);
/*
* update the cursors in a cursor set given a set of tagged pairs.
*/
void __pma_update_cursors(PMA pma, struct list *cursorset, struct kv_pair_tag *tpairs, int n);
/*
* update this pma's cursors given a set of tagged pairs.
*/
void __pma_update_my_cursors(PMA pma, struct kv_pair_tag *tpairs, int n);
/*
* a deletion occured at index "here" in the pma. rebalance the windows around "here". if
* necessary, shrink the pma.
*/
void __pma_delete_at(PMA pma, int here);
/* density thresholds */
#define PMA_LDT_HIGH 0.25
#define PMA_LDT_LOW 0.40
#define PMA_UDT_HIGH 1.00
#define PMA_UDT_LOW 0.50
/* minimum array size */
#define PMA_MIN_ARRAY_SIZE 4
......@@ -214,8 +214,8 @@ static void test_smooth_region (void) {
static void test_calculate_parameters (void) {
struct pma pma;
pma.N=4; pmainternal_calculate_parameters(&pma); assert(pma.uplgN==2); assert(pma.densitystep==0.5);
pma.N=8; pmainternal_calculate_parameters(&pma); assert(pma.uplgN==4); assert(pma.densitystep==0.5);
pma.N=4; pmainternal_calculate_parameters(&pma); assert(pma.uplgN==2); assert(pma.udt_step==0.5);
pma.N=8; pmainternal_calculate_parameters(&pma); assert(pma.uplgN==4); assert(pma.udt_step==0.5);
}
......@@ -855,9 +855,9 @@ void test_pma_split_cursor(void) {
/* insert some kv pairs */
for (i=1; i<=16; i += 1) {
DBT dbtk, dbtv;
char k[5]; int v;
char k[11]; int v;
sprintf(k, "%4.4d", i);
snprintf(k, sizeof k, "%.10d", i);
fill_dbt(&dbtk, &k, strlen(k)+1);
v = i;
fill_dbt(&dbtv, &v, sizeof v);
......@@ -946,6 +946,11 @@ void test_pma_split(void) {
test_pma_split_cursor(); memory_check_all_free();
}
/*
* test the pma_bulk_insert function by creating n kv pairs and bulk
* inserting them into an empty pma. verify that the pma contains all
* of the kv pairs.
*/
void test_pma_bulk_insert_n(int n) {
PMA pma;
int error;
......@@ -965,11 +970,11 @@ void test_pma_bulk_insert_n(int n) {
/* init n kv pairs */
for (i=0; i<n; i++) {
char kstring[5];
char kstring[11];
char *k; int klen;
int *v; int vlen;
sprintf(kstring, "%4.4d", i);
snprintf(kstring, sizeof kstring, "%.10d", i);
klen = strlen(kstring) + 1;
k = toku_malloc(klen);
assert(k);
......@@ -988,8 +993,17 @@ void test_pma_bulk_insert_n(int n) {
assert(error == 0);
/* verify */
print_pma(pma);
if (0) print_pma(pma);
assert(n == pma_n_entries(pma));
for (i=0; i<n; i++) {
DBT val;
init_dbt(&val); val.flags = DB_DBT_MALLOC;
error = pma_lookup(pma, &keys[i], &val, 0);
assert(error == 0);
assert(vals[i].size == val.size);
assert(memcmp(vals[i].data, val.data, val.size) == 0);
toku_free(val.data);
}
/* cleanup */
for (i=0; i<n; i++) {
......@@ -1050,6 +1064,191 @@ void test_pma_insert_or_replace(void) {
assert(r==0);
}
/*
* test that the pma shrinks back to its minimum size.
*/
void test_pma_delete_shrink(int n) {
PMA pma;
int r;
int i;
printf("test_pma_delete_shrink:%d\n", n);
r = pma_create(&pma, default_compare_fun);
assert(r == 0);
/* insert */
for (i=0; i<n; i++) {
char k[11];
int v;
DBT key, val;
snprintf(k, sizeof k, "%.10d", i);
fill_dbt(&key, k, strlen(k)+1);
v = i;
fill_dbt(&val, &v, sizeof v);
r = pma_insert(pma, &key, &val, 0);
assert(r == 0);
}
/* delete */
for (i=0; i<n; i++) {
char k[11];
DBT key;
snprintf(k, sizeof k, "%.10d", i);
fill_dbt(&key, k, strlen(k)+1);
r = pma_delete(pma, &key, 0);
assert(r == 0);
}
assert(pma->N == PMA_MIN_ARRAY_SIZE);
r = pma_free(&pma);
assert(r == 0);
}
/*
* test that the pma shrinks to its minimum size after inserting
* random keys and then deleting them.
*/
void test_pma_delete_random(int n) {
PMA pma;
int r;
int i;
int keys[n];
printf("test_pma_delete_random:%d\n", n);
r = pma_create(&pma, default_compare_fun);
assert(r == 0);
for (i=0; i<n; i++) {
keys[i] = random();
}
/* insert */
for (i=0; i<n; i++) {
char k[11];
int v;
DBT key, val;
snprintf(k, sizeof k, "%.10d", keys[i]);
fill_dbt(&key, k, strlen(k)+1);
v = keys[i];
fill_dbt(&val, &v, sizeof v);
r = pma_insert(pma, &key, &val, 0);
assert(r == 0);
}
/* delete */
for (i=0; i<n; i++) {
char k[11];
DBT key;
snprintf(k, sizeof k, "%.10d", keys[i]);
fill_dbt(&key, k, strlen(k)+1);
r = pma_delete(pma, &key, 0);
assert(r == 0);
}
assert(pma->N == PMA_MIN_ARRAY_SIZE);
r = pma_free(&pma);
assert(r == 0);
}
void assert_cursor_equal(PMA_CURSOR pmacursor, int v) {
DBT key, val;
init_dbt(&key); key.flags = DB_DBT_MALLOC;
init_dbt(&val); val.flags = DB_DBT_MALLOC;
int r;
r = pma_cget_current(pmacursor, &key, &val);
assert(r == 0);
if (0) printf("key %s\n", (char*) key.data);
int thev;
assert(val.size == sizeof thev);
memcpy(&thev, val.data, val.size);
assert(thev == v);
toku_free(key.data);
toku_free(val.data);
}
void assert_cursor_nokey(PMA_CURSOR pmacursor) {
DBT key, val;
init_dbt(&key); key.flags = DB_DBT_MALLOC;
init_dbt(&val); val.flags = DB_DBT_MALLOC;
int r;
r = pma_cget_current(pmacursor, &key, &val);
assert(r != 0);
}
/*
* test that pma delete ops update pma cursors
* - insert n keys
* - point the cursor at the last key in the pma
* - delete keys sequentially. the cursor should be stuck at the
* last key until the last key is deleted.
*/
void test_pma_delete_cursor(int n) {
printf("test_delete_cursor:%d\n", n);
PMA pma;
int r;
r = pma_create(&pma, default_compare_fun);
assert(r == 0);
int i;
for (i=0; i<n; i++) {
char k[11];
int v;
DBT key, val;
snprintf(k, sizeof k, "%.10d", i);
fill_dbt(&key, k, strlen(k)+1);
v = i;
fill_dbt(&val, &v, sizeof v);
r = pma_insert(pma, &key, &val, 0);
assert(r == 0);
}
PMA_CURSOR pmacursor;
r = pma_cursor(pma, &pmacursor);
assert(r == 0);
r = pma_cursor_set_position_last(pmacursor);
assert(r == 0);
assert_cursor_equal(pmacursor, n-1);
for (i=0; i<n; i++) {
char k[11];
DBT key;
snprintf(k, sizeof k, "%.10d", i);
fill_dbt(&key, k, strlen(k)+1);
r = pma_delete(pma, &key, 0);
assert(r == 0);
if (i == n-1)
assert_cursor_nokey(pmacursor);
else
assert_cursor_equal(pmacursor, n-1);
}
assert(pma->N == PMA_MIN_ARRAY_SIZE);
r = pma_cursor_free(&pmacursor);
assert(r == 0);
r = pma_free(&pma);
assert(r == 0);
}
void test_pma_delete() {
test_pma_delete_shrink(256); memory_check_all_free();
test_pma_delete_random(256); memory_check_all_free();
test_pma_delete_cursor(32); memory_check_all_free();
}
void pma_tests (void) {
memory_check=1;
test_keycompare(); memory_check_all_free();
......@@ -1068,6 +1267,7 @@ void pma_tests (void) {
test_pma_split(); memory_check_all_free();
test_pma_bulk_insert(); memory_check_all_free();
test_pma_insert_or_replace(); memory_check_all_free();
test_pma_delete();
}
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
......
......@@ -37,7 +37,7 @@ bytevec pmanode_key (PMA pma, int i) {
assert(0<=i); assert(i<pma_index_limit(pma));
pair = pma->pairs[i];
assert(pair);
return pair->key;
return kv_pair_key(pair);
}
ITEMLEN pmanode_keylen (PMA pma, int i) {
......@@ -45,7 +45,7 @@ ITEMLEN pmanode_keylen (PMA pma, int i) {
assert(0<=i); assert(i<pma_index_limit(pma));
pair = pma->pairs[i];
assert(pair);
return pair->keylen;
return kv_pair_keylen(pair);
}
bytevec pmanode_val (PMA pma, int i) {
......@@ -53,7 +53,7 @@ bytevec pmanode_val (PMA pma, int i) {
assert(0<=i); assert(i<pma_index_limit(pma));
pair = pma->pairs[i];
assert(pair);
return pair->key + pair->keylen;
return kv_pair_val(pair);
}
ITEMLEN pmanode_vallen (PMA pma, int i) {
......@@ -61,21 +61,22 @@ ITEMLEN pmanode_vallen (PMA pma, int i) {
assert(0<=i); assert(i<pma_index_limit(pma));
pair = pma->pairs[i];
assert(pair);
return pair->vallen;
return kv_pair_vallen(pair);
}
/* Could pick the same one every time if we wanted. */
int pma_random_pick(PMA pma, bytevec *key, ITEMLEN *keylen, bytevec *val, ITEMLEN *vallen) {
#if 1
int i;
/* For now a simple implementation where we simply start at the beginning and look. */
for (i=0; i<pma_index_limit(pma); i++) {
if (pma->pairs[i]) {
*key = pmanode_key(pma,i);
*keylen = pmanode_keylen(pma,i);
*val = pmanode_val(pma,i);
*vallen = pmanode_vallen(pma,i);
struct kv_pair *pair = pma->pairs[i];
if (pair) {
*key = kv_pair_key(pair);
*keylen = kv_pair_keylen(pair);
*val = kv_pair_val(pair);
*vallen = kv_pair_vallen(pair);
return 0;
}
}
......@@ -88,11 +89,12 @@ int pma_random_pick(PMA pma, bytevec *key, ITEMLEN *keylen, bytevec *val, ITEMLE
/* For now a simple implementation where we simply start at the beginning and look. */
for (i=0; i<l; i++) {
int ir=(i+r)%l;
if (pma->pairs[ir].key) {
*key = pmanode_key(pma,ir);
*keylen = pmanode_keylen(pma,ir);
*val = pmanode_val(pma,ir);
*vallen = pmanode_vallen(pma,ir);
struct kv_pair *pair = pma->pairs[ir];
if (pair) {
*key = kv_pair_key(pair);
*keylen = kv_pair_keylen(pair);
*val = kv_pair_val(pair);
*vallen = kv_pair_vallen(pair);
return 0;
}
}
......@@ -104,6 +106,7 @@ int pma_random_pick(PMA pma, bytevec *key, ITEMLEN *keylen, bytevec *val, ITEMLE
static int pma_count_finds=0;
static int pma_count_divides=0;
static int pma_count_scans=0;
void pma_show_stats (void) {
printf("%d finds, %d divides, %d scans\n", pma_count_finds, pma_count_divides, pma_count_scans);
}
......@@ -242,7 +245,7 @@ int pmainternal_smooth_region (struct kv_pair *pairs[], int n, int idx, int base
tmppairs[n_saved].oldtag = base + i;
tmppairs[n_saved++].pair = pairs[i];
}
pairs[i] = 0;
pairs[i] = 0;
}
if (idx==n) {
tmppairs[n_saved++].pair = 0;
......@@ -253,17 +256,8 @@ int pmainternal_smooth_region (struct kv_pair *pairs[], int n, int idx, int base
r=distribute_data (pairs, n,
tmppairs, n_saved, pma);
if (pma && !list_empty(&pma->cursors)) {
struct list cursors;
list_move(&cursors, &pma->cursors);
pma_update_region(pma, &cursors, tmppairs, n_present);
while (!list_empty(&cursors)) {
struct list *list = list_head(&cursors);
list_remove(list);
list_push(&pma->cursors, list);
}
}
if (pma && !list_empty(&pma->cursors))
__pma_update_my_cursors(pma, tmppairs, n_present);
#ifdef USE_MALLOC_IN_SMOOTH
toku_free(tmppairs);
#endif
......@@ -281,9 +275,8 @@ int lg (int n) {
return result;
}
void pmainternal_calculate_parameters (PMA pma)
/* Calculate densitystep and uplgN, given N. */
{
/* Calculate densitysteps and uplgN, given N. */
void pmainternal_calculate_parameters (PMA pma) {
int N = pma_index_limit(pma);
int lgN = lg(N);
int n_divisions=0;
......@@ -295,7 +288,8 @@ void pmainternal_calculate_parameters (PMA pma)
pma->uplgN=N;
//printf("uplgN = %d n_divisions=%d\n", pma->uplgN, n_divisions);
assert(n_divisions>0);
pma->densitystep = 0.5/n_divisions;
pma->udt_step = (PMA_UDT_HIGH - PMA_UDT_LOW)/n_divisions;
pma->ldt_step = (PMA_LDT_HIGH - PMA_LDT_LOW)/n_divisions;
}
int pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi) {
......@@ -317,14 +311,10 @@ int pma_create (PMA *pma, int (*compare_fun)(DB*,const DBT*,const DBT*)) {
result->compare_fun = compare_fun;
result->skey = 0;
result->sval = 0;
result->N = 4;
#if 0 /* memory.c is broken */
result->N = PMA_MIN_ARRAY_SIZE;
result->pairs = 0;
#else
result->pairs = toku_malloc((1 + 4) * sizeof (struct kv_pair *));
#endif
error = pma_resize_array(result, 4);
error = __pma_resize_array(result, result->N, 0);
if (error) {
toku_free(result);
return -1;
......@@ -335,22 +325,30 @@ int pma_create (PMA *pma, int (*compare_fun)(DB*,const DBT*,const DBT*)) {
return 0;
}
int pma_resize_array(PMA pma, int asksize) {
int i;
int n;
/* find the smallest power of 2 >= n */
n = 4;
/* find the smallest power of 2 >= n */
int __pma_array_size(PMA pma __attribute__((unused)), int asksize) {
int n = PMA_MIN_ARRAY_SIZE;
while (n < asksize)
n *= 2;
return n;
}
int __pma_resize_array(PMA pma, int asksize, int startz) {
int i;
int n;
n = __pma_array_size(pma, asksize);
// printf("pma_resize %d -> %d\n", pma->N, n);
pma->N = n;
pma->pairs = toku_realloc(pma->pairs, (1 + pma->N) * sizeof (struct kv_pair *));
if (pma->pairs == 0)
pma->pairs = toku_malloc((1 + pma->N) * sizeof (struct kv_pair *));
else
pma->pairs = toku_realloc(pma->pairs, (1 + pma->N) * sizeof (struct kv_pair *));
if (pma->pairs == 0)
return -1;
pma->pairs[pma->N] = (void *) 0xdeadbeef;
for (i=0; i<pma->N; i++) {
for (i=startz; i<pma->N; i++) {
pma->pairs[i] = 0;
}
pmainternal_calculate_parameters(pma);
......@@ -375,8 +373,7 @@ int pma_cursor_get_pma(PMA_CURSOR c, PMA *pmap) {
return 0;
}
int pma_cursor_set_position_last (PMA_CURSOR c)
{
int pma_cursor_set_position_last (PMA_CURSOR c) {
PMA pma = c->pma;
c->position=pma->N-1;
while (c->pma->pairs[c->position]==0) {
......@@ -399,8 +396,7 @@ int pma_cursor_set_position_prev (PMA_CURSOR c) {
return DB_NOTFOUND;
}
int pma_cursor_set_position_first (PMA_CURSOR c)
{
int pma_cursor_set_position_first (PMA_CURSOR c) {
PMA pma = c->pma;
c->position=0;
while (c->pma->pairs[c->position]==0) {
......@@ -410,8 +406,7 @@ int pma_cursor_set_position_first (PMA_CURSOR c)
return 0;
}
int pma_cursor_set_position_next (PMA_CURSOR c)
{
int pma_cursor_set_position_next (PMA_CURSOR c) {
PMA pma = c->pma;
int old_position=c->position;
c->position++;
......@@ -469,7 +464,7 @@ int pmainternal_make_space_at (PMA pma, int idx) {
int size=pma->uplgN;
int lo=idx;
int hi=idx;
double density=1.0;
double udt=PMA_UDT_HIGH;
while (1) {
/* set hi-lo equal size, make sure it is a supserset of (hi,lo). */
lo=idx-size/2;
......@@ -481,29 +476,37 @@ int pmainternal_make_space_at (PMA pma, int idx) {
//printf("lo=%d hi=%d\n", lo, hi);
assert(0<=lo); assert(lo<hi); assert(hi<=pma_index_limit(pma)); assert(hi-lo==size); // separate into separate assertions so that gcov doesn't see branches not taken.
assert(density>0.499); assert(density<=1);
if (density<0.5001) { assert(lo==0); assert(hi==pma_index_limit(pma)); }
assert(udt>0.499); assert(udt<=1);
if (udt<0.5001) { assert(lo==0); assert(hi==pma_index_limit(pma)); }
{
int count = (1+ /* Don't forget space for the new guy. */
pmainternal_count_region(pma->pairs, lo, hi));
if (count/(double)(hi-lo) <= density) break;
double density = (double) count / (double) (hi - lo);
if (density <= udt)
break;
if (lo==0 && hi==pma_index_limit(pma)) {
/* The array needs to be doubled in size. */
#if 0
int i;
#endif
assert(size==pma_index_limit(pma));
size*=2;
//printf("realloc %p to %d\n", pma->pairs, size*sizeof(*pma->pairs));
#if 0
pma->pairs = toku_realloc(pma->pairs, (1+size)*sizeof(struct kv_pair *));
for (i=hi; i<size; i++) pma->pairs[i]=0;
pma->pairs[size] = (void*)0xdeadbeefL;
pma->N=size;
pmainternal_calculate_parameters(pma);
#else
// printf("pma_make_space_realloc %d to %d hi %d\n", pma->N, size, hi);
__pma_resize_array(pma, size, hi);
#endif
hi=size;
//printf("doubled N\n");
break;
}
}
density-=pma->densitystep;
udt-=pma->udt_step;
size*=2;
}
//printf("%s:%d Smoothing from %d to %d to density %f\n", __FILE__, __LINE__, lo, hi, density);
......@@ -574,7 +577,9 @@ int pma_insert (PMA pma, DBT *k, DBT *v, DB* db) {
}
int pma_delete (PMA pma, DBT *k, DB *db) {
int l = pmainternal_find(pma, k, db);
int l;
l = pmainternal_find(pma, k, db);
struct kv_pair *pair = pma->pairs[l];
if (pair==0) {
printf("%s:%d l=%d r=%d\n", __FILE__, __LINE__, l, DB_NOTFOUND);
......@@ -583,11 +588,79 @@ int pma_delete (PMA pma, DBT *k, DB *db) {
kv_pair_free(pair);
pma->pairs[l] = 0;
pma->n_pairs_present--;
// Need to rebalance
// smooth_after_delete(pma,l);
__pma_delete_at(pma, l);
return BRT_OK;
}
void __pma_delete_at(PMA pma, int here) {
int size;
int count;
struct kv_pair_tag *newpairs;
int lgN;
double ldt;
lgN = pma->uplgN;
size = lgN;
ldt = PMA_LDT_HIGH;
/* check the density of regions from lg(N) size to the entire array */
for (;;) {
int lo, hi;
double density;
/* select a region centered on here */
lo = here - size/2;
hi = here + size/2;
if (lo < 0) {
hi -= lo;
lo = 0;
if (hi > pma->N)
hi = pma->N;
} else if (hi > pma->N) {
lo -= hi - pma->N;
hi = pma->N;
if (lo < 0)
lo = 0;
}
assert(lo <= hi);
/* compute the density of the region */
count = pmainternal_count_region(pma->pairs, lo, hi);
density = (double) count / ((double) (hi - lo));
/* rebalance if the density exceeds the lower threadshold */
if (0) printf("check size %d h %d density %d/%d %f %d-%d ldt %f\n", size,
lgN, count, hi-lo, density, lo, hi, ldt);
if (density >= ldt) {
if (size == lgN)
return;
if (0) printf("delete_at_rebalance %d over %d %d\n", count, lo, hi);
newpairs = __pma_extract_pairs(pma, count, lo, hi);
distribute_data(pma->pairs + lo, hi - lo, newpairs, count, pma);
__pma_update_my_cursors(pma, newpairs, count);
toku_free(newpairs);
return;
}
ldt -= pma->ldt_step;
size *= 2;
if (0 == lo && pma->N == hi)
break;
}
/* shrink */
size = __pma_array_size(pma, count + count/4);
if (size == pma->N)
return;
if (0) printf("shrink %d from %d to %d\n", count, pma->N, size);
newpairs = __pma_extract_pairs(pma, count, 0, pma->N);
assert(newpairs);
__pma_resize_array(pma, size, 0);
distribute_data(pma->pairs, pma->N, newpairs, count, pma);
/* update the cursors */
__pma_update_my_cursors(pma, newpairs, count);
toku_free(newpairs);
}
int pma_insert_or_replace (PMA pma, DBT *k, DBT *v, DB *db,
int *replaced_v_size /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
) {
......@@ -625,7 +698,7 @@ void pma_iterate (PMA pma, void(*f)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), voi
}
}
void pma_update_cursors(PMA pma, struct list *cursor_set, int oldposition, int newposition) {
void __pma_update_cursors_position(PMA pma, struct list *cursor_set, int oldposition, int newposition) {
struct list *list, *nextlist;
struct pma_cursor *cursor;
......@@ -634,6 +707,7 @@ void pma_update_cursors(PMA pma, struct list *cursor_set, int oldposition, int n
nextlist = list->next; /* may be removed later */
cursor = list_struct(list, struct pma_cursor, next);
if (cursor->position == oldposition) {
if (0) printf("cursor %p %d -> %d\n", cursor, oldposition, newposition);
cursor->position = newposition;
cursor->pma = pma;
list_remove(list);
......@@ -643,33 +717,47 @@ void pma_update_cursors(PMA pma, struct list *cursor_set, int oldposition, int n
}
}
void pma_update_region(PMA pma, struct list *cursor_set, struct kv_pair_tag *pairs, int n) {
int i;
void __pma_update_cursors(PMA pma, struct list *cursor_set, struct kv_pair_tag *tpairs, int n) {
/* short cut */
if (list_empty(cursor_set))
return;
/* update all cursors to their new positions */
int i;
for (i=0; i<n; i++) {
if (pairs[i].pair && pairs[i].oldtag >= 0)
pma_update_cursors(pma, cursor_set, pairs[i].oldtag, pairs[i].newtag);
if (tpairs[i].pair && tpairs[i].oldtag >= 0)
__pma_update_cursors_position(pma, cursor_set, tpairs[i].oldtag, tpairs[i].newtag);
}
}
struct kv_pair_tag *pma_extract_pairs(PMA pma, int lo, int hi) {
int npairs;
void __pma_update_my_cursors(PMA pma, struct kv_pair_tag *tpairs, int n) {
if (list_empty(&pma->cursors))
return;
struct list cursors;
list_move(&cursors, &pma->cursors);
__pma_update_cursors(pma, &cursors, tpairs, n);
while (!list_empty(&cursors)) {
struct list *list = list_head(&cursors);
list_remove(list);
list_push(&pma->cursors, list);
}
}
struct kv_pair_tag *__pma_extract_pairs(PMA pma, int npairs, int lo, int hi) {
struct kv_pair_tag *pairs;
int i;
int lastpair;
npairs = pma_n_entries(pma);
pairs = toku_malloc(npairs * sizeof (struct kv_pair_tag));
if (pairs == 0)
return 0;
lastpair = 0;
for (i=lo; i<hi; i++) {
assert(0 <= i && i < pma->N);
if (pma->pairs[i] != 0) {
assert(pma->pairs[i] != (void*)0xdeadbeef);
pairs[lastpair].pair = pma->pairs[i];
pairs[lastpair].oldtag = i;
pma->pairs[i] = 0;
......@@ -681,8 +769,8 @@ struct kv_pair_tag *pma_extract_pairs(PMA pma, int lo, int hi) {
}
int pma_split(PMA origpma, unsigned int *origpma_size,
PMA leftpma, unsigned int *leftpma_size,
PMA rightpma, unsigned int *rightpma_size) {
PMA leftpma, unsigned int *leftpma_size,
PMA rightpma, unsigned int *rightpma_size) {
int error;
int npairs;
struct kv_pair_tag *pairs;
......@@ -701,7 +789,7 @@ int pma_split(PMA origpma, unsigned int *origpma_size,
assert(pma_n_entries(rightpma) == 0);
/* TODO move pairs to the stack */
pairs = pma_extract_pairs(origpma, 0, origpma->N);
pairs = __pma_extract_pairs(origpma, npairs, 0, origpma->N);
assert(pairs);
origpma->n_pairs_present = 0;
......@@ -733,18 +821,18 @@ int pma_split(PMA origpma, unsigned int *origpma_size,
/* put the first half of pairs into the left pma */
n = spliti;
error = pma_resize_array(leftpma, n + n/4);
error = __pma_resize_array(leftpma, n + n/4, 0);
assert(error == 0);
distribute_data(leftpma->pairs, pma_index_limit(leftpma), &pairs[0], n, leftpma);
pma_update_region(leftpma, &cursors, &pairs[0], spliti);
__pma_update_cursors(leftpma, &cursors, &pairs[0], spliti);
leftpma->n_pairs_present = spliti;
/* put the second half of pairs into the right pma */
n = npairs - spliti;
error = pma_resize_array(rightpma, n + n/4);
error = __pma_resize_array(rightpma, n + n/4, 0);
assert(error == 0);
distribute_data(rightpma->pairs, pma_index_limit(rightpma), &pairs[spliti], n, rightpma);
pma_update_region(rightpma, &cursors, &pairs[spliti], n);
__pma_update_cursors(rightpma, &cursors, &pairs[spliti], n);
rightpma->n_pairs_present = n;
toku_free(pairs);
......@@ -823,7 +911,7 @@ int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs) {
}
}
error = pma_resize_array(pma, n_newpairs + n_newpairs/4);
error = __pma_resize_array(pma, n_newpairs + n_newpairs/4, 0);
if (error) {
__pma_bulk_cleanup(newpairs, n_newpairs);
toku_free(newpairs);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment