Commit c69bbcf7 authored by Rich Prohaska's avatar Rich Prohaska

pma uses a memory pool

git-svn-id: file:///svn/tokudb@436 c7de825b-a66e-492c-adef-691d508d4ae1
parent a2dcdeee
......@@ -64,13 +64,13 @@ check-fanout:
# pma: PROF_FLAGS=-fprofile-arcs -ftest-coverage
key.o: brttypes.h key.h
pma-test.o: pma-internal.h pma.h yerror.h memory.h ../include/db.h list.h kv-pair.h brttypes.h ybt.h yerror.h
pma-test: pma.o memory.o key.o ybt.o log.o
pma-test: pma.o memory.o key.o ybt.o log.o mempool.o
pma.o: pma.h yerror.h pma-internal.h memory.h key.h ybt.h brttypes.h ../include/db.h
ybt.o: ybt.h brttypes.h ../include/db.h
ybt-test: ybt-test.o ybt.o memory.o
ybt-test.o: ybt.h ../include/db.h
cachetable.o: cachetable.h hashfun.h
brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o header-io.o ybt.o key.o primes.o log.o
brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o header-io.o ybt.o key.o primes.o log.o mempool.o
brt-test.o brt.o: brt.h ../include/db.h hashtable.h pma.h brttypes.h cachetable.h
brt-serialize-test.o: pma.h yerror.h brt.h ../include/db.h memory.h hashtable.h brttypes.h brt-internal.h
brt.o: brt.h ../include/db.h mdict.h pma.h brttypes.h memory.h brt-internal.h cachetable.h hashtable.h
......@@ -86,7 +86,7 @@ brt-bigtest: memory.o ybt.o brt.o pma.o cachetable.o key.o hashtable.o brt-seria
brt-bigtest.o: brt.h ../include/db.h
log-test: log.o memory.o
brt-serialize-test: brt-serialize-test.o brt-serialize.o memory.o hashtable.o pma.o key.o ybt.o brt.o cachetable.o primes.o log.o
brt-serialize-test: brt-serialize-test.o brt-serialize.o memory.o hashtable.o pma.o key.o ybt.o brt.o cachetable.o primes.o log.o mempool.o
cachetable-test.o: cachetable.h memory.h
cachetable-test: cachetable.o memory.o cachetable-test.o primes.o
......@@ -94,7 +94,7 @@ cachetable-test: cachetable.o memory.o cachetable-test.o primes.o
cachetable-test2.o: cachetable.h memory.h
cachetable-test2: cachetable.o memory.o cachetable-test2.o primes.o
benchmark-test: benchmark-test.o ybt.o memory.o brt.o pma.o cachetable.o key.o hashtable.o brt-serialize.o primes.o log.o
benchmark-test: benchmark-test.o ybt.o memory.o brt.o pma.o cachetable.o key.o hashtable.o brt-serialize.o primes.o log.o mempool.o
benchmark-test.o: brt.h ../include/db.h
clean:
......
......@@ -241,7 +241,7 @@ int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesiz
} else {
int n_in_buf = rbuf_int(&rc);
result->u.l.n_bytes_in_buffer = 0;
int r=pma_create(&result->u.l.buffer, default_compare_fun);
int r=pma_create(&result->u.l.buffer, default_compare_fun, nodesize);
if (r!=0) {
if (0) { died_21: pma_free(&result->u.l.buffer); }
goto died1;
......
......@@ -64,6 +64,7 @@ void brtnode_free (BRTNODE *nodep) {
long brtnode_size(BRTNODE node) {
long size;
assert(node->tag == TYP_BRTNODE);
if (node->height > 0)
size = node->u.n.n_bytes_in_hashtables;
else
......@@ -258,7 +259,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, diskoff nodename, int height)
}
n->u.n.n_bytes_in_hashtables = 0;
} else {
int r = pma_create(&n->u.l.buffer, t->compare_fun);
int r = pma_create(&n->u.l.buffer, t->compare_fun, n->nodesize);
static int rcount=0;
assert(r==0);
//printf("%s:%d n PMA= %p (rcount=%d)\n", __FILE__, __LINE__, n->u.l.buffer, rcount);
......@@ -1345,6 +1346,7 @@ int brt_lookup_node (BRT brt, diskoff off, DBT *k, DBT *v, DB *db, BRTNODE paren
return r;
BRTNODE node = node_v;
assert(node->tag == TYP_BRTNODE);
int childnum;
//printf("%s:%d pin %p height=%d children=%d\n", __FILE__, __LINE__, node_v, node->height, node->u.n.n_children);
......
#include <stdio.h>
#include <assert.h>
#include "mempool.h"
void mempool_init(struct mempool *mp, void *base, int size) {
// printf("mempool_init %p %p %d\n", mp, base, size);
assert(base != 0 && size > 0);
mp->base = base;
mp->size = size;
mp->free_offset = 0;
mp->frag_size = 0;
}
void mempool_fini(struct mempool *mp __attribute__((unused))) {
// printf("mempool_fini %p %p %d %d\n", mp, mp->base, mp->size, mp->frag_size);
}
void mempool_get_base_size(struct mempool *mp, void **base_ptr, int *size_ptr) {
*base_ptr = mp->base;
*size_ptr = mp->size;
}
int mempool_get_frag_size(struct mempool *mp) {
return mp->frag_size;
}
void *mempool_malloc(struct mempool *mp, int size, int alignment) {
assert(mp->free_offset < mp->size);
void *vp;
int offset = (mp->free_offset + (alignment-1)) & ~(alignment-1);
if (offset + size > mp->size) {
vp = 0;
} else {
vp = mp->base + offset;
mp->free_offset = offset + size;
}
assert(mp->free_offset < mp->size);
assert(((long)vp & (alignment-1)) == 0);
assert(vp == 0 || (mp->base <= vp && vp + size < mp->base + mp->size));
return vp;
}
void mempool_mfree(struct mempool *mp, void *vp, int size) {
assert(size > 0 && mp->base <= vp && vp + size < mp->base + mp->size);
mp->frag_size += size;
assert(mp->frag_size <= mp->size);
}
/* a memory pool is a contiguous region of memory that supports single
allocations from the pool. these allocated regions are never recycled.
when the memory pool no longer has free space, the allocated chunks
must be relocated by the application to a new memory pool. */
struct mempool {
void *base; /* the base address of the memory */
int free_offset; /* the offset of the memory pool free space */
int size; /* the size of the memory */
int frag_size; /* the size of the fragmented memory */
};
/* initialize the memory pool with the base address and size of a
contiguous chunk of memory */
void mempool_init(struct mempool *mp, void *base, int size);
/* finalize the memory pool */
void mempool_fini(struct mempool *mp);
/* get the base address and size of the memory pool */
void mempool_get_base_size(struct mempool *mp, void **base_ptr, int *size_ptr);
/* get the amount of fragmented space in the memory pool */
int mempool_get_frag_size(struct mempool *mp);
/* allocate a chunk of memory from the memory pool suitably aligned */
void *mempool_malloc(struct mempool *mp, int size, int alignment);
/* free a previously allocated chunk of memory. the free only updates
a count of the amount of free space in the memory pool. the memory
pool does not keep track of the locations of the free chunks */
void mempool_mfree(struct mempool *mp, void *vp, int size);
#include "pma.h"
#include "pma.h"
#include "mempool.h"
struct pma_cursor {
PMA pma;
......@@ -24,6 +25,7 @@ struct pma {
struct list cursors;
int (*compare_fun)(DB*,const DBT*,const DBT*);
void *skey, *sval; /* used in dbts */
struct mempool kvspace;
};
int pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi);
......
......@@ -27,7 +27,7 @@ static void test_make_space_at (void) {
key = "B";
key_B = kv_pair_malloc(key, strlen(key)+1, 0, 0);
r=pma_create(&pma, default_compare_fun);
r=pma_create(&pma, default_compare_fun, 0);
assert(r==0);
assert(pma_n_entries(pma)==0);
r=pmainternal_make_space_at(pma, 2);
......@@ -255,7 +255,7 @@ static void test_count_region (void) {
static void test_pma_random_pick (void) {
PMA pma;
int r = pma_create(&pma, default_compare_fun);
int r = pma_create(&pma, default_compare_fun, 0);
bytevec key,val;
ITEMLEN keylen,vallen;
DBT k,v;
......@@ -315,7 +315,7 @@ static void test_find_insert (void) {
PMA pma;
int r;
DBT k,v;
pma_create(&pma, default_compare_fun);
pma_create(&pma, default_compare_fun, 0);
r=pma_lookup(pma, fill_dbt(&k, "aaa", 3), &v, 0);
assert(r==DB_NOTFOUND);
......@@ -350,7 +350,7 @@ static void test_find_insert (void) {
assert((unsigned long)pma->pairs[pma_index_limit(pma)]==0xdeadbeefL);
r=pma_free(&pma); assert(r==0); assert(pma==0);
pma_create(&pma, default_compare_fun); assert(pma!=0);
pma_create(&pma, default_compare_fun, 0); assert(pma!=0);
{
int i;
......@@ -387,7 +387,7 @@ static void test_pma_iterate (void) {
PMA pma;
int r;
DBT k,v;
pma_create(&pma, default_compare_fun);
pma_create(&pma, default_compare_fun, 0);
r=pma_insert(pma, fill_dbt(&k, "42", 3), fill_dbt(&v, "-19", 4), NULL_ARGS);
assert(r==BRT_OK);
test_pma_iterate_internal(pma, 42, -19);
......@@ -404,8 +404,8 @@ static void test_pma_iterate2 (void) {
int sum=0;
int n_items=0;
DBT k,v;
r=pma_create(&pma0, default_compare_fun); assert(r==0);
r=pma_create(&pma1, default_compare_fun); assert(r==0);
r=pma_create(&pma0, default_compare_fun, 0); assert(r==0);
r=pma_create(&pma1, default_compare_fun, 0); assert(r==0);
pma_insert(pma0, fill_dbt(&k, "a", 2), fill_dbt(&v, "aval", 5), NULL_ARGS);
pma_insert(pma0, fill_dbt(&k, "b", 2), fill_dbt(&v, "bval", 5), NULL_ARGS);
pma_insert(pma1, fill_dbt(&k, "x", 2), fill_dbt(&v, "xval", 5), NULL_ARGS);
......@@ -422,7 +422,7 @@ void test_pma_cursor_0 (void) {
PMA pma;
PMA_CURSOR c=0;
int r;
r=pma_create(&pma, default_compare_fun); assert(r==0);
r=pma_create(&pma, default_compare_fun, 0); assert(r==0);
r=pma_cursor(pma, &c); assert(r==0); assert(c!=0);
printf("%s:%d\n", __FILE__, __LINE__);
r=pma_free(&pma); assert(r!=0); /* didn't deallocate the cursor. */
......@@ -440,7 +440,7 @@ void test_pma_cursor_1 (void) {
int r;
int order;
for (order=0; order<6; order++) {
r=pma_create(&pma, default_compare_fun); assert(r==0);
r=pma_create(&pma, default_compare_fun, 0); assert(r==0);
r=pma_cursor(pma, &c0); assert(r==0); assert(c0!=0);
r=pma_cursor(pma, &c1); assert(r==0); assert(c1!=0);
r=pma_cursor(pma, &c2); assert(r==0); assert(c2!=0);
......@@ -471,7 +471,7 @@ void test_pma_cursor_2 (void) {
DBT key,val;
init_dbt(&key); key.flags=DB_DBT_REALLOC;
init_dbt(&val); val.flags=DB_DBT_REALLOC;
r=pma_create(&pma, default_compare_fun); assert(r==0);
r=pma_create(&pma, default_compare_fun, 0); assert(r==0);
r=pma_cursor(pma, &c); assert(r==0); assert(c!=0);
r=pma_cursor_set_position_last(c); assert(r==DB_NOTFOUND);
r=pma_cursor_free(&c); assert(r==0);
......@@ -484,7 +484,7 @@ void test_pma_cursor_3 (void) {
int r;
DBT key,val;
DBT k,v;
r=pma_create(&pma, default_compare_fun); assert(r==0);
r=pma_create(&pma, default_compare_fun, 0); assert(r==0);
r=pma_insert(pma, fill_dbt(&k, "x", 2), fill_dbt(&v, "xx", 3), NULL_ARGS); assert(r==BRT_OK);
r=pma_insert(pma, fill_dbt(&k, "m", 2), fill_dbt(&v, "mm", 3), NULL_ARGS); assert(r==BRT_OK);
r=pma_insert(pma, fill_dbt(&k, "aa", 3), fill_dbt(&v,"a", 2), NULL_ARGS); assert(r==BRT_OK);
......@@ -546,7 +546,7 @@ void test_pma_cursor_4 (void) {
int i;
printf("test_pma_cursor_4\n");
error = pma_create(&pma, default_compare_fun);
error = pma_create(&pma, default_compare_fun, 0);
assert(error == 0);
for (i=1; i<=4; i += 1) {
......@@ -621,7 +621,7 @@ void test_pma_cursor_delete(int n) {
PMA pma;
int error;
error = pma_create(&pma, default_compare_fun);
error = pma_create(&pma, default_compare_fun, 0);
assert(error == 0);
/* insert 1 -> 42 */
......@@ -730,7 +730,7 @@ void test_pma_compare_fun (int wrong_endian_p) {
char **expected_keys = wrong_endian_p ? wrong_endian_expected_keys : right_endian_expected_keys;
int i;
DBT k,v;
r = pma_create(&pma, wrong_endian_p ? wrong_endian_compare_fun : default_compare_fun); assert(r==0);
r = pma_create(&pma, wrong_endian_p ? wrong_endian_compare_fun : default_compare_fun, 0); assert(r==0);
r = pma_insert(pma, fill_dbt(&k, "10", 3), fill_dbt(&v, "10v", 4), NULL_ARGS); assert(r==BRT_OK);
r = pma_insert(pma, fill_dbt(&k, "00", 3), fill_dbt(&v, "00v", 4), NULL_ARGS); assert(r==BRT_OK);
r = pma_insert(pma, fill_dbt(&k, "01", 3), fill_dbt(&v, "01v", 4), NULL_ARGS); assert(r==BRT_OK);
......@@ -769,11 +769,11 @@ void test_pma_split_n(int n) {
printf("test_pma_split_n:%d\n", n);
error = pma_create(&pmaa, default_compare_fun);
error = pma_create(&pmaa, default_compare_fun, 0);
assert(error == 0);
error = pma_create(&pmab, default_compare_fun);
error = pma_create(&pmab, default_compare_fun, 0);
assert(error == 0);
error = pma_create(&pmac, default_compare_fun);
error = pma_create(&pmac, default_compare_fun, 0);
assert(error == 0);
/* insert some kv pairs */
......@@ -823,11 +823,11 @@ void test_pma_split_varkey(void) {
printf("test_pma_split_varkey\n");
error = pma_create(&pmaa, default_compare_fun);
error = pma_create(&pmaa, default_compare_fun, 0);
assert(error == 0);
error = pma_create(&pmab, default_compare_fun);
error = pma_create(&pmab, default_compare_fun, 0);
assert(error == 0);
error = pma_create(&pmac, default_compare_fun);
error = pma_create(&pmac, default_compare_fun, 0);
assert(error == 0);
/* insert some kv pairs */
......@@ -933,11 +933,11 @@ void test_pma_split_cursor(void) {
printf("test_pma_split_cursor\n");
error = pma_create(&pmaa, default_compare_fun);
error = pma_create(&pmaa, default_compare_fun, 0);
assert(error == 0);
error = pma_create(&pmab, default_compare_fun);
error = pma_create(&pmab, default_compare_fun, 0);
assert(error == 0);
error = pma_create(&pmac, default_compare_fun);
error = pma_create(&pmac, default_compare_fun, 0);
assert(error == 0);
/* insert some kv pairs */
......@@ -1047,7 +1047,7 @@ void test_pma_bulk_insert_n(int n) {
printf("test_pma_bulk_insert_n: %d\n", n);
error = pma_create(&pma, default_compare_fun);
error = pma_create(&pma, default_compare_fun, 0);
assert(error == 0);
/* init n kv pairs */
......@@ -1122,7 +1122,7 @@ void test_pma_insert_or_replace(void) {
int r;
DBT dbtk, dbtv;
int n_diff=-2;
r = pma_create(&pma, default_compare_fun);
r = pma_create(&pma, default_compare_fun, 0);
assert(r==0);
r = pma_insert_or_replace(pma, fill_dbt(&dbtk, "aaa", 4), fill_dbt(&dbtv, "zzz", 4), &n_diff, NULL_ARGS);
assert(r==0); assert(n_diff==-1);
......@@ -1162,7 +1162,7 @@ void test_pma_delete_shrink(int n) {
printf("test_pma_delete_shrink:%d\n", n);
r = pma_create(&pma, default_compare_fun);
r = pma_create(&pma, default_compare_fun, n*(8 + 11 + sizeof (int)));
assert(r == 0);
/* insert */
......@@ -1207,7 +1207,7 @@ void test_pma_delete_random(int n) {
printf("test_pma_delete_random:%d\n", n);
r = pma_create(&pma, default_compare_fun);
r = pma_create(&pma, default_compare_fun, n * (8 + 11 + sizeof (int)));
assert(r == 0);
for (i=0; i<n; i++) {
......@@ -1282,7 +1282,7 @@ void test_pma_delete_cursor(int n) {
PMA pma;
int r;
r = pma_create(&pma, default_compare_fun);
r = pma_create(&pma, default_compare_fun, 0);
assert(r == 0);
int i;
......@@ -1347,7 +1347,7 @@ void test_pma_delete_insert() {
PMA pma;
int error;
error = pma_create(&pma, default_compare_fun);
error = pma_create(&pma, default_compare_fun, 0);
assert(error == 0);
PMA_CURSOR pmacursor;
......@@ -1400,7 +1400,7 @@ void test_pma_double_delete() {
PMA pma;
int error;
error = pma_create(&pma, default_compare_fun);
error = pma_create(&pma, default_compare_fun, 0);
assert(error == 0);
PMA_CURSOR pmacursor;
......@@ -1445,7 +1445,7 @@ void test_pma_cursor_first_delete_last() {
int error;
PMA pma;
error = pma_create(&pma, default_compare_fun);
error = pma_create(&pma, default_compare_fun, 0);
assert(error == 0);
DBT key, val;
......@@ -1493,7 +1493,7 @@ void test_pma_cursor_last_delete_first() {
int error;
PMA pma;
error = pma_create(&pma, default_compare_fun);
error = pma_create(&pma, default_compare_fun, 0);
assert(error == 0);
DBT key, val;
......@@ -1551,7 +1551,7 @@ void test_pma_already_there() {
int error;
PMA pma;
error = pma_create(&pma, default_compare_fun);
error = pma_create(&pma, default_compare_fun, 0);
assert(error == 0);
DBT key, val;
......@@ -1575,7 +1575,7 @@ void test_pma_cursor_set_key() {
int error;
PMA pma;
error = pma_create(&pma, default_compare_fun);
error = pma_create(&pma, default_compare_fun, 0);
assert(error == 0);
DBT key, val;
......@@ -1630,7 +1630,7 @@ void test_pma_cursor_set_range() {
int error;
PMA pma;
error = pma_create(&pma, default_compare_fun);
error = pma_create(&pma, default_compare_fun, 0);
assert(error == 0);
DBT key, val;
......@@ -1687,7 +1687,9 @@ void test_pma_cursor_delete_under() {
int error;
PMA pma;
error = pma_create(&pma, default_compare_fun);
const int n = 1000;
error = pma_create(&pma, default_compare_fun, n * (8 + sizeof (int) + sizeof (int)));
assert(error == 0);
PMA_CURSOR cursor;
......@@ -1700,8 +1702,6 @@ void test_pma_cursor_delete_under() {
error = pma_cursor_delete_under(cursor, &kvsize);
assert(error == DB_NOTFOUND);
const int n = 1000;
DBT key, val;
int k, v;
......@@ -1758,15 +1758,15 @@ void test_pma_cursor_set_both() {
int error;
PMA pma;
error = pma_create(&pma, default_compare_fun);
const int n = 1000;
error = pma_create(&pma, default_compare_fun, n * (8 + sizeof (int) + sizeof (int)));
assert(error == 0);
PMA_CURSOR cursor;
error = pma_cursor(pma, &cursor);
assert(error == 0);
const int n = 1000;
DBT key, val;
int k, v;
......
......@@ -19,6 +19,58 @@
/* get KEY_VALUE_OVERHEAD */
#include "brt-internal.h"
/* allocate a kv pair from the pma kv memory pool */
static struct kv_pair *kv_pair_malloc_mempool(void *key, int keylen, void *val, int vallen, struct mempool *mp) {
struct kv_pair *kv = mempool_malloc(mp, sizeof (struct kv_pair) + keylen + vallen, 4);
if (kv)
kv_pair_init(kv, key, keylen, val, vallen);
return kv;
}
/* compress all of the kv pairs to the left edge of the memory pool and
update the pma index with the new kv pair locations */
static int pma_compress_kvspace(PMA pma) {
if (mempool_get_frag_size(&pma->kvspace) == 0)
return -1;
void *mp = toku_malloc(pma->kvspace.size);
if (mp == 0)
return -2;
struct mempool new_kvspace;
mempool_init(&new_kvspace, mp, pma->kvspace.size);
int i;
for (i=0; i<pma->N; i++) {
struct kv_pair *kv = pma->pairs[i];
if (kv_pair_inuse(kv)) {
kv = kv_pair_ptr(kv);
struct kv_pair *newkv = mempool_malloc(&new_kvspace, kv_pair_size(kv), 4);
assert(newkv);
memcpy(newkv, kv, kv_pair_size(kv));
if (kv_pair_deleted(pma->pairs[i]))
kv_pair_set_deleted(newkv);
pma->pairs[i] = newkv;
}
}
toku_free(pma->kvspace.base);
pma->kvspace = new_kvspace;
return 0;
}
/* malloc space for a kv pair from the pma memory pool and initialize it.
if the allocation fails, try to compress the memory pool and try again. */
static struct kv_pair *pma_malloc_kv_pair(PMA pma, void *k, int ksize, void *v, int vsize) {
struct kv_pair *kv = kv_pair_malloc_mempool(k, ksize, v, vsize, &pma->kvspace);
if (kv == 0) {
if (0 == pma_compress_kvspace(pma))
kv = kv_pair_malloc_mempool(k, ksize, v, vsize, &pma->kvspace);
}
return kv;
}
static void pma_mfree_kv_pair(PMA pma, struct kv_pair *kv) {
kv = kv_pair_ptr(kv);
mempool_mfree(&pma->kvspace, kv, kv_pair_size(kv));
}
int pma_n_entries (PMA pma) {
return pma->n_pairs_present;
}
......@@ -303,7 +355,7 @@ int pmainternal_count_region (struct kv_pair *pairs[], int lo, int hi) {
return n;
}
int pma_create (PMA *pma, int (*compare_fun)(DB*,const DBT*,const DBT*)) {
int pma_create(PMA *pma, int (*compare_fun)(DB*,const DBT*,const DBT*), int maxsize) {
int error;
TAGMALLOC(PMA, result);
if (result==0) return -1;
......@@ -315,12 +367,16 @@ int pma_create (PMA *pma, int (*compare_fun)(DB*,const DBT*,const DBT*)) {
result->sval = 0;
result->N = PMA_MIN_ARRAY_SIZE;
result->pairs = 0;
error = __pma_resize_array(result, result->N, 0);
if (error) {
toku_free(result);
return -1;
}
if (maxsize == 0)
maxsize = 4*1024;
maxsize = maxsize + maxsize/4;
void *mpbase = toku_malloc(maxsize); assert(mpbase);
mempool_init(&result->kvspace, mpbase, maxsize);
*pma = result;
assert((unsigned long)result->pairs[result->N]==0xdeadbeefL);
......@@ -633,13 +689,15 @@ int pma_free (PMA *pmap) {
for (i=0; i < pma->N; i++) {
struct kv_pair *kv = pma->pairs[i];
if (kv_pair_inuse(kv)) {
kv_pair_free(kv_pair_ptr(kv));
pma_mfree_kv_pair(pma, kv);
pma->pairs[i] = 0;
pma->n_pairs_present--;
}
}
}
assert(pma->n_pairs_present == 0);
mempool_fini(&pma->kvspace);
void *mpbase; int mpsize; mempool_get_base_size(&pma->kvspace, &mpbase, &mpsize); toku_free(mpbase);
toku_free(pma->pairs);
if (pma->skey) toku_free(pma->skey);
if (pma->sval) toku_free(pma->sval);
......@@ -656,8 +714,9 @@ int pma_insert (PMA pma, DBT *k, DBT *v, DB* db, TOKUTXN txn, diskoff diskoff) {
struct kv_pair *kv = kv_pair_ptr(pma->pairs[idx]);
if (0==pma->compare_fun(db, k, fill_dbt(&k2, kv->key, kv->keylen))) {
if (kv_pair_deleted(pma->pairs[idx])) {
pma->pairs[idx] = kv_pair_realloc_same_key(kv, v->data, v->size);
int r = tokulogger_log_phys_add_or_delete_in_leaf(txn, diskoff, 0, pma->pairs[idx]);
pma->pairs[idx] = pma_malloc_kv_pair(pma, k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
int r = tokulogger_log_phys_add_or_delete_in_leaf(txn, diskoff, 0, pma->pairs[idx]);
return r;
} else
return BRT_ALREADY_THERE; /* It is already here. Return an error. */
......@@ -667,7 +726,7 @@ int pma_insert (PMA pma, DBT *k, DBT *v, DB* db, TOKUTXN txn, diskoff diskoff) {
idx = pmainternal_make_space_at (pma, idx); /* returns the new idx. */
}
assert(!kv_pair_inuse(pma->pairs[idx]));
pma->pairs[idx] = kv_pair_malloc(k->data, k->size, v->data, v->size);
pma->pairs[idx] = pma_malloc_kv_pair(pma, k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
pma->n_pairs_present++;
return tokulogger_log_phys_add_or_delete_in_leaf(txn, diskoff, 1, pma->pairs[idx]);
......@@ -697,7 +756,7 @@ void __pma_delete_finish(PMA pma, int here) {
struct kv_pair *kv = pma->pairs[here];
if (!kv_pair_inuse(kv))
return;
kv_pair_free(kv_pair_ptr(kv));
pma_mfree_kv_pair(pma, kv);
pma->pairs[here] = 0;
pma->n_pairs_present--;
__pma_delete_at(pma, here);
......@@ -789,8 +848,14 @@ int pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
r=tokulogger_log_phys_add_or_delete_in_leaf(txn, diskoff, 0, kv);
if (r!=0) return r;
}
pma->pairs[idx] = kv_pair_realloc_same_key(kv, v->data, v->size);
r = tokulogger_log_phys_add_or_delete_in_leaf(txn, diskoff, 0, pma->pairs[idx]);
if (v->size == (unsigned int) kv_pair_vallen(kv)) {
memcpy(kv_pair_val(kv), v->data, v->size);
} else {
mempool_mfree(&pma->kvspace, kv, kv_pair_size(kv));
pma->pairs[idx] = pma_malloc_kv_pair(pma, k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
}
r = tokulogger_log_phys_add_or_delete_in_leaf(txn, diskoff, 0, pma->pairs[idx]);
return r;
}
}
......@@ -799,7 +864,7 @@ int pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
}
assert(!kv_pair_inuse(pma->pairs[idx]));
//printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size);
pma->pairs[idx] = kv_pair_malloc(k->data, k->size, v->data, v->size);
pma->pairs[idx] = pma_malloc_kv_pair(pma, k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
pma->n_pairs_present++;
*replaced_v_size = -1;
......@@ -904,6 +969,18 @@ struct kv_pair_tag *__pma_extract_pairs(PMA pma, int npairs, int lo, int hi) {
return pairs;
}
static void __pma_relocate_kvpairs(PMA pma) {
int i;
for (i=0; i<pma->N; i++) {
struct kv_pair *kv = pma->pairs[i];
if (kv) {
pma->pairs[i] = kv_pair_malloc_mempool(kv_pair_key(kv), kv_pair_keylen(kv), kv_pair_val(kv),
kv_pair_vallen(kv), &pma->kvspace);
assert(pma->pairs[i]);
}
}
}
int pma_split(PMA origpma, unsigned int *origpma_size,
PMA leftpma, unsigned int *leftpma_size,
PMA rightpma, unsigned int *rightpma_size) {
......@@ -960,6 +1037,7 @@ int pma_split(PMA origpma, unsigned int *origpma_size,
error = __pma_resize_array(leftpma, n + n/4, 0);
assert(error == 0);
distribute_data(leftpma->pairs, pma_index_limit(leftpma), &pairs[0], n, leftpma);
__pma_relocate_kvpairs(leftpma);
__pma_update_cursors(leftpma, &cursors, &pairs[0], spliti);
leftpma->n_pairs_present = spliti;
......@@ -968,6 +1046,7 @@ int pma_split(PMA origpma, unsigned int *origpma_size,
error = __pma_resize_array(rightpma, n + n/4, 0);
assert(error == 0);
distribute_data(rightpma->pairs, pma_index_limit(rightpma), &pairs[spliti], n, rightpma);
__pma_relocate_kvpairs(rightpma);
__pma_update_cursors(rightpma, &cursors, &pairs[spliti], n);
rightpma->n_pairs_present = n;
......@@ -1011,12 +1090,12 @@ int pma_get_last(PMA pma, DBT *key, DBT *val) {
return 0;
}
void __pma_bulk_cleanup(struct kv_pair_tag *pairs, int n) {
void __pma_bulk_cleanup(struct pma *pma, struct kv_pair_tag *pairs, int n) {
int i;
for (i=0; i<n; i++)
if (pairs[i].pair)
kv_pair_free(pairs[i].pair);
pma_mfree_kv_pair(pma, pairs[i].pair);
}
int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs) {
......@@ -1038,10 +1117,10 @@ int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs) {
}
for (i=0; i<n_newpairs; i++) {
newpairs[i].pair = kv_pair_malloc(keys[i].data, keys[i].size,
vals[i].data, vals[i].size);
newpairs[i].pair = kv_pair_malloc_mempool(keys[i].data, keys[i].size,
vals[i].data, vals[i].size, &pma->kvspace);
if (newpairs[i].pair == 0) {
__pma_bulk_cleanup(newpairs, i);
__pma_bulk_cleanup(pma, newpairs, i);
toku_free(newpairs);
error = -4; return error;
}
......@@ -1049,7 +1128,7 @@ int pma_bulk_insert(PMA pma, DBT *keys, DBT *vals, int n_newpairs) {
error = __pma_resize_array(pma, n_newpairs + n_newpairs/4, 0);
if (error) {
__pma_bulk_cleanup(newpairs, n_newpairs);
__pma_bulk_cleanup(pma, newpairs, n_newpairs);
toku_free(newpairs);
error = -5; return error;
}
......
......@@ -14,7 +14,7 @@ typedef struct pma *PMA;
typedef struct pma_cursor *PMA_CURSOR;
/* All functions return 0 on success. */
int pma_create (PMA *, int (*compare_fun)(DB*,const DBT*,const DBT*));
int pma_create(PMA *, int (*compare_fun)(DB*,const DBT*,const DBT*), int maxsize);
/* returns 0 if OK.
* You must have freed all the cursors, otherwise returns nonzero and does nothing. */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment