Commit 0b880e86 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Merge onto the main line again.

{{{
svn merge -r3571:3582 https://svn.tokutek.com/tokudb/tokudb.724
}}}


git-svn-id: file:///svn/tokudb@3583 c7de825b-a66e-492c-adef-691d508d4ae1
parent 33b25277
......@@ -63,11 +63,13 @@ REGRESSION_TESTS = \
cachetable-test2 \
fifo-test \
list-test \
log-test \
log-test2 \
log-test3 \
log-test4 \
log-test5 \
log-test6 \
omt-test \
test-assert \
test-brt-delete-both \
test-brt-overflow \
......@@ -77,8 +79,6 @@ REGRESSION_TESTS = \
test_oexcl \
test_toku_malloc_plain_free \
ybt-test \
log-test \
omt-test \
# This line intentially kept commented so I can have a \ on the end of the previous line
# Add in the binaries that must be run in various ways.
......
......@@ -220,4 +220,8 @@ int toku_cachefile_root_put_cmd (CACHEFILE cf, BRT_CMD cmd, TOKULOGGER logger);
int toku_omt_compress_kvspace (OMT omt, struct mempool *memp);
void *mempool_malloc_from_omt(OMT omt, struct mempool *mp, size_t size);
void toku_verify_all_in_mempool(BRTNODE node);
#define BRT_LAYOUT_VERSION 5
#endif
......@@ -27,7 +27,7 @@ static void test_serialize(void) {
sn.thisnodename = sn.nodesize*20;
sn.disk_lsn.lsn = 789;
sn.log_lsn.lsn = 123456;
sn.layout_version = 5;
sn.layout_version = BRT_LAYOUT_VERSION;
sn.height = 1;
sn.rand4fingerprint = randval;
sn.local_fingerprint = 0;
......@@ -57,7 +57,7 @@ static void test_serialize(void) {
assert(dn->thisnodename==nodesize*20);
assert(dn->disk_lsn.lsn==123456);
assert(dn->layout_version ==5);
assert(dn->layout_version ==BRT_LAYOUT_VERSION);
assert(dn->height == 1);
assert(dn->rand4fingerprint==randval);
assert(dn->u.n.n_children==2);
......
......@@ -236,8 +236,8 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode) {
rc.buf=toku_malloc(datasize);
//printf("%s:%d errno=%d\n", __FILE__, __LINE__, errno);
if (rc.buf==0) {
if (0) { died1: toku_free(rc.buf); }
r=errno;
if (0) { died1: toku_free(rc.buf); }
goto died0;
}
rc.size=datasize;
......@@ -264,7 +264,7 @@ int toku_deserialize_brtnode_from (int fd, DISKOFF off, BRTNODE *brtnode) {
}
}
result->layout_version = rbuf_int(&rc);
if (result->layout_version!=5) {
if (result->layout_version!=BRT_LAYOUT_VERSION) {
r=DB_BADFORMAT;
goto died1;
}
......
......@@ -94,7 +94,7 @@ static int verify_in_mempool(LEAFENTRY le, u_int32_t UU(idx), void *vmp) {
assert(toku_mempool_inrange(mp, le, leafentry_memsize(le)));
return 0;
}
static void verify_all_in_mempool(BRTNODE node) {
void toku_verify_all_in_mempool(BRTNODE node) {
if (node->height==0) {
toku_omt_iterate(node->u.l.buffer, verify_in_mempool, &node->u.l.buffer_mempool);
}
......@@ -292,7 +292,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height)
n->thisnodename = nodename;
n->disk_lsn.lsn = 0; // a new one can always be 0.
n->log_lsn = n->disk_lsn;
n->layout_version = 5;
n->layout_version = BRT_LAYOUT_VERSION;
n->height = height;
n->rand4fingerprint = random();
n->local_fingerprint = 0;
......@@ -375,10 +375,11 @@ static int brtleaf_split (TOKULOGGER logger, FILENUM filenum, BRT t, BRTNODE nod
//printf("%s:%d B is at %lld nodesize=%d\n", __FILE__, __LINE__, B->thisnodename, B->nodesize);
assert(node->height>0 || node->u.l.buffer!=0);
verify_all_in_mempool(node);
toku_verify_all_in_mempool(node);
LEAFENTRY *MALLOC_N(toku_omt_size(node->u.l.buffer), leafentries);
u_int32_t n_leafentries = toku_omt_size(node->u.l.buffer);
LEAFENTRY *MALLOC_N(n_leafentries, leafentries);
assert(leafentries);
toku_omt_iterate(node->u.l.buffer, fill_buf, leafentries);
u_int32_t break_at = 0;
{
......@@ -423,8 +424,8 @@ static int brtleaf_split (TOKULOGGER logger, FILENUM filenum, BRT t, BRTNODE nod
toku_free(leafentries);
verify_all_in_mempool(node);
verify_all_in_mempool(B);
toku_verify_all_in_mempool(node);
toku_verify_all_in_mempool(B);
toku_omt_destroy(&old_omt);
......@@ -945,6 +946,7 @@ static int push_some_brt_cmds_down (BRT t, BRTNODE node, int childnum,
if (r!=0) return r;
//printf("%s:%d pin %p\n", __FILE__, __LINE__, childnode_v);
child=childnode_v;
assert(child->thisnodename!=0);
//verify_local_fingerprint_nonleaf(child);
VERIFY_NODE(child);
//printf("%s:%d height=%d n_bytes_in_buffer = {%d, %d, %d, ...}\n", __FILE__, __LINE__, child->height, child->n_bytes_in_buffer[0], child->n_bytes_in_buffer[1], child->n_bytes_in_buffer[2]);
......
......@@ -219,7 +219,7 @@ static void initialize_brtnode (BRT t, BRTNODE n, DISKOFF nodename, int height)
n->thisnodename = nodename;
n->disk_lsn.lsn = 0; // a new one can always be 0.
n->log_lsn = n->disk_lsn;
n->layout_version = 5;
n->layout_version = BRT_LAYOUT_VERSION;
n->height = height;
n->rand4fingerprint = random();
n->local_fingerprint = 0;
......
......@@ -55,6 +55,11 @@ enum close_when_done {
CLOSE_WHEN_DONE,
KEEP_WHEN_DONE
};
enum create_type {
BATCH_INSERT,
INSERT_AT,
INSERT_AT_ALMOST_RANDOM,
};
/* Globals */
OMT omt;
......@@ -159,24 +164,77 @@ void test_create_size(enum close_when_done close) {
test_close(close);
}
void test_create_from_sorted_array(enum close_when_done close) {
void test_create_insert_at_almost_random(enum close_when_done close) {
u_int32_t i;
int r;
u_int32_t size = 0;
test_create(KEEP_WHEN_DONE);
r = toku_omt_insert_at(omt, values[0], toku_omt_size(omt)+1);
CKERR2(r, ERANGE);
for (i = 0; i < length/2; i++) {
assert(size==toku_omt_size(omt));
r = toku_omt_insert_at(omt, values[i], i);
CKERR(r);
assert(++size==toku_omt_size(omt));
r = toku_omt_insert_at(omt, values[length-1-i], i+1);
CKERR(r);
assert(++size==toku_omt_size(omt));
}
r = toku_omt_insert_at(omt, values[0], toku_omt_size(omt)+1);
CKERR2(r, ERANGE);
assert(size==toku_omt_size(omt));
test_close(close);
}
void test_create_insert_at_sequential(enum close_when_done close) {
u_int32_t i;
int r;
u_int32_t size = 0;
test_create(KEEP_WHEN_DONE);
r = toku_omt_insert_at(omt, values[0], toku_omt_size(omt)+1);
CKERR2(r, ERANGE);
for (i = 0; i < length; i++) {
assert(size==toku_omt_size(omt));
r = toku_omt_insert_at(omt, values[i], i);
CKERR(r);
assert(++size==toku_omt_size(omt));
}
r = toku_omt_insert_at(omt, values[0], toku_omt_size(omt)+1);
CKERR2(r, ERANGE);
assert(size==toku_omt_size(omt));
test_close(close);
}
void test_create_from_sorted_array(enum create_type create_choice, enum close_when_done close) {
int r;
omt = NULL;
if (create_choice == BATCH_INSERT) {
r = toku_omt_create_from_sorted_array(&omt, values, length);
CKERR(r);
}
else if (create_choice == INSERT_AT) {
test_create_insert_at_sequential(KEEP_WHEN_DONE);
}
else if (create_choice == INSERT_AT_ALMOST_RANDOM) {
test_create_insert_at_almost_random(KEEP_WHEN_DONE);
}
else assert(FALSE);
assert(omt!=NULL);
test_close(close);
}
void test_create_from_sorted_array_size(enum close_when_done close) {
test_create_from_sorted_array(KEEP_WHEN_DONE);
void test_create_from_sorted_array_size(enum create_type create_choice, enum close_when_done close) {
test_create_from_sorted_array(create_choice, KEEP_WHEN_DONE);
assert(toku_omt_size(omt)==length);
test_close(close);
}
void test_create_from_sorted_array_fetch_verify(enum close_when_done close) {
test_create_from_sorted_array(KEEP_WHEN_DONE);
void test_create_fetch_verify(enum create_type create_choice, enum close_when_done close) {
test_create_from_sorted_array(create_choice, KEEP_WHEN_DONE);
u_int32_t i;
int r;
OMTVALUE v = (OMTVALUE)&i;
......@@ -225,8 +283,8 @@ int iterate_helper(OMTVALUE v, u_int32_t idx, void* extra) {
return 0;
}
void test_create_from_sorted_array_iterate_verify(enum close_when_done close) {
test_create_from_sorted_array(KEEP_WHEN_DONE);
void test_create_iterate_verify(enum create_type create_choice, enum close_when_done close) {
test_create_from_sorted_array(create_choice, KEEP_WHEN_DONE);
int r;
iterate_helper_error_return = 0;
......@@ -239,7 +297,7 @@ void test_create_from_sorted_array_iterate_verify(enum close_when_done close) {
test_close(close);
}
void test_create_array(enum rand_type rand_choice) {
void test_create_array(enum create_type create_choice, enum rand_type rand_choice) {
if (rand_choice == TEST_RANDOM) {
init_distinct_random_values(random_seed, 100);
}
......@@ -251,12 +309,12 @@ void test_create_array(enum rand_type rand_choice) {
}
else assert(FALSE);
/* ********************************************************************** */
test_create_from_sorted_array(CLOSE_WHEN_DONE);
test_create_from_sorted_array_size(CLOSE_WHEN_DONE);
test_create_from_sorted_array( create_choice, CLOSE_WHEN_DONE);
test_create_from_sorted_array_size(create_choice, CLOSE_WHEN_DONE);
/* ********************************************************************** */
test_create_from_sorted_array_fetch_verify(CLOSE_WHEN_DONE);
test_create_fetch_verify( create_choice, CLOSE_WHEN_DONE);
/* ********************************************************************** */
test_create_from_sorted_array_iterate_verify(CLOSE_WHEN_DONE);
test_create_iterate_verify( create_choice, CLOSE_WHEN_DONE);
/* ********************************************************************** */
}
......@@ -360,10 +418,10 @@ void test_find_dir(int dir, void* extra, int (*h)(OMTVALUE, void*),
assert(omt_val == NULL);
}
void test_find(enum close_when_done close) {
void test_find(enum create_type create_choice, enum close_when_done close) {
h_extra extra;
init_identity_values(random_seed, 100);
test_create_from_sorted_array(KEEP_WHEN_DONE);
test_create_from_sorted_array(create_choice, KEEP_WHEN_DONE);
/*
-...-
......@@ -433,81 +491,40 @@ void test_find(enum close_when_done close) {
test_close(close);
}
void runtests_create_choice(enum create_type create_choice) {
test_create_array(create_choice, TEST_SORTED);
test_create_array(create_choice, TEST_RANDOM);
test_create_array(create_choice, TEST_IDENTITY);
test_find( create_choice, CLOSE_WHEN_DONE);
}
int main(int argc, const char *argv[]) {
parse_args(argc, argv);
init_globals();
test_create( CLOSE_WHEN_DONE);
test_create_size( CLOSE_WHEN_DONE);
test_create_array(TEST_SORTED);
test_create_array(TEST_RANDOM);
test_create_array(TEST_IDENTITY);
test_find(CLOSE_WHEN_DONE);
runtests_create_choice(BATCH_INSERT);
runtests_create_choice(INSERT_AT);
runtests_create_choice(INSERT_AT_ALMOST_RANDOM);
cleanup_globals();
return 0;
}
/*
UNTESTED COMPLETELY:
int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int direction, OMTVALUE *value, u_int32_t *index);
Effect:
If direction >0 then find the smallest i such that h(V_i,extra)>0.
If direction <0 then find the largest i such that h(V_i,extra)<0.
If value!=NULL then store V_i in *value
If index!=NULL then store i in *index.
Requires: The signum of h is monotically increasing.
Returns
0 success
DB_NOTFOUND no such value is found.
On nonzero return, *value and *index are unchanged.
Performance: time=O(\log N)
Rationale:
The direction==0 is a strange case that should go away in the future.
Here's how to use the find function to find various things
Cases for find:
find first value: ( h(v)=+1, direction=+1 )
find last value ( h(v)=-1, direction=-1 )
find first X ( h(v)=(v< x) ? -1 : 1 direction=+1 )
find last X ( h(v)=(v<=x) ? -1 : 1 direction=-1 )
find X or successor to X ( same as find first X. )
Rationale: To help understand heaviside functions and behavor of find:
There are 7 kinds of heaviside functions.
The signus of the h must be monotonically increasing.
Given a function of the following form, A is the element
returned for direction>0, B is the element returned
for direction<0, and C is the element returned for
direction==0 (see find_zero).
If any of A, B, or C are not found, then asking for the
associated direction will return DB_NOTFOUND.
See find_zero for more information.
Let the following represent the signus of the heaviside function.
-...-
A
+...+
B
0...0
C
-...-0...0
AC
0...0+...+
C B
-...-+...+
AB
-...-0...0+...+
AC B
int toku_omt_split_at(OMT omt, OMT *newomt, u_int32_t index);
// Effect: Create a new OMT, storing it in *newomt.
// The values to the right of index (starting at index) are moved to *newomt.
int toku_omt_insert_at(OMT omt, OMTVALUE value, u_int32_t index);
// Effect: Increases indexes of all items at slot >= index by 1.
// Insert value into the position at index.
int toku_omt_merge(OMT leftomt, OMT rightomt, OMT *newomt);
// Effect: Appends leftomt and rightomt to produce a new omt.
// Sets *newomt to the new omt.
// On success, leftomt and rightomt destroyed,.
// Returns 0 on success
// ENOMEM on out of memory.
// On error, nothing is modified.
// Performance: time=O(n) is acceptable, but one can imagine implementations that are O(\log n) worst-case.
int toku_omt_set_at (OMT omt, OMTVALUE value, u_int32_t index);
// Effect: Replaces the item at index with value.
......@@ -525,23 +542,5 @@ int toku_omt_delete_at(OMT omt, u_int32_t index);
// Effect: Delete the item in slot index.
// Decreases indexes of all items at slot >= index by 1.
int toku_omt_find_zero(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, u_int32_t *index);
// Effect: Find the smallest i such that h(V_i, extra)>=0
// If there is such an i and h(V_i,extra)==0 then set *index=i and return 0.
// If there is such an i and h(V_i,extra)>0 then set *index=i and return DB_NOTFOUND.
// If there is no such i then set *index=toku_omt_size(V) and return DB_NOTFOUND.
int toku_omt_split_at(OMT omt, OMT *newomt, u_int32_t index);
// Effect: Create a new OMT, storing it in *newomt.
// The values to the right of index (starting at index) are moved to *newomt.
int toku_omt_merge(OMT leftomt, OMT rightomt, OMT *newomt);
// Effect: Appends leftomt and rightomt to produce a new omt.
// Sets *newomt to the new omt.
// On success, leftomt and rightomt destroyed,.
// Returns 0 on success
// ENOMEM on out of memory.
// On error, nothing is modified.
// Performance: time=O(n) is acceptable, but one can imagine implementations that are O(\log n) worst-case.
*/
......@@ -38,19 +38,19 @@ int toku_omt_create (OMT *omtp) {
return 0;
}
static u_int32_t nweight (OMT_NODE n) {
static inline u_int32_t nweight (OMT_NODE n) {
if (n==NULL) return 0;
else return n->weight;
}
static void fill_array_from_omt_nodes_tree (OMT_NODE *array, OMT_NODE tree) {
static inline void fill_array_from_omt_nodes_tree (OMT_NODE *array, OMT_NODE tree) {
if (tree==NULL) return;
fill_array_from_omt_nodes_tree(array, tree->left);
array[nweight(tree->left)] = tree;
fill_array_from_omt_nodes_tree(array+nweight(tree->left)+1, tree->right);
}
static void rebuild_from_sorted_array_of_omt_nodes(OMT_NODE *np, OMT_NODE *nodes, u_int32_t numvalues) {
static inline void rebuild_from_sorted_array_of_omt_nodes(OMT_NODE *np, OMT_NODE *nodes, u_int32_t numvalues) {
if (numvalues==0) {
*np=NULL;
} else {
......@@ -64,7 +64,7 @@ static void rebuild_from_sorted_array_of_omt_nodes(OMT_NODE *np, OMT_NODE *nodes
}
}
static void maybe_rebalance (OMT omt, OMT_NODE *np) {
static inline void maybe_rebalance (OMT omt, OMT_NODE *np) {
OMT_NODE n = *np;
if (n==0) return;
// one of the 1's is for the root.
......@@ -78,7 +78,7 @@ static void maybe_rebalance (OMT omt, OMT_NODE *np) {
}
}
static int insert_internal (OMT omt, OMT_NODE *np, OMTVALUE value, u_int32_t index) {
static inline int insert_internal (OMT omt, OMT_NODE *np, OMTVALUE value, u_int32_t index) {
if (*np==0) {
assert(index==0);
OMT_NODE MALLOC(newnode);
......@@ -103,7 +103,7 @@ static int insert_internal (OMT omt, OMT_NODE *np, OMTVALUE value, u_int32_t ind
}
}
static int make_sure_array_is_sized_ok (OMT omt, u_int32_t n) {
static inline int make_sure_array_is_sized_ok (OMT omt, u_int32_t n) {
u_int32_t new_size;
if (omt->tmparray_size < n) {
new_size = 2*n;
......@@ -121,11 +121,12 @@ static int make_sure_array_is_sized_ok (OMT omt, u_int32_t n) {
int toku_omt_insert_at (OMT omt, OMTVALUE value, u_int32_t index) {
int r;
if (index>nweight(omt->root)) return ERANGE;
if ((r=make_sure_array_is_sized_ok(omt, 1+nweight(omt->root)))) return r;
return insert_internal(omt, &omt->root, value, index);
}
static void set_at_internal (OMT_NODE n, OMTVALUE v, u_int32_t index) {
static inline void set_at_internal (OMT_NODE n, OMTVALUE v, u_int32_t index) {
assert(n);
if (index<nweight(n->left))
set_at_internal(n->left, v, index);
......@@ -156,7 +157,7 @@ int toku_omt_insert(OMT omt, OMTVALUE value, int(*h)(OMTVALUE, void*v), void *v,
return 0;
}
static void delete_internal (OMT omt, OMT_NODE *np, u_int32_t index, OMTVALUE *vp) {
static inline void delete_internal (OMT omt, OMT_NODE *np, u_int32_t index, OMTVALUE *vp) {
OMT_NODE n=*np;
if (index < nweight(n->left)) {
delete_internal(omt, &n->left, index, vp);
......@@ -193,8 +194,7 @@ int toku_omt_delete_at(OMT omt, u_int32_t index) {
return 0;
}
static int fetch_internal (OMT_NODE n, u_int32_t i, OMTVALUE *v) {
if (n==NULL) return ERANGE;
static inline int fetch_internal (OMT_NODE n, u_int32_t i, OMTVALUE *v) {
if (i < nweight(n->left)) {
return fetch_internal(n->left, i, v);
} else if (i == nweight(n->left)) {
......@@ -206,10 +206,11 @@ static int fetch_internal (OMT_NODE n, u_int32_t i, OMTVALUE *v) {
}
int toku_omt_fetch (OMT V, u_int32_t i, OMTVALUE *v) {
if (i>=nweight(V->root)) return ERANGE;
return fetch_internal(V->root, i, v);
}
static int find_internal_zero (OMT_NODE n, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, u_int32_t *index) {
static inline int find_internal_zero (OMT_NODE n, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, u_int32_t *index) {
if (n==NULL) {
if (index!=NULL) (*index)=0;
return DB_NOTFOUND;
......@@ -237,7 +238,7 @@ int toku_omt_find_zero (OMT t, int (*h)(OMTVALUE, void*extra), void*extra, OMTVA
}
// If direction <0 then find the largest i such that h(V_i,extra)<0.
static int find_internal_minus (OMT_NODE n, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, u_int32_t *index) {
static inline int find_internal_minus (OMT_NODE n, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, u_int32_t *index) {
if (n==NULL) return DB_NOTFOUND;
int hv = h(n->value, extra);
if (hv<0) {
......@@ -255,7 +256,7 @@ static int find_internal_minus (OMT_NODE n, int (*h)(OMTVALUE, void*extra), void
}
// If direction >0 then find the smallest i such that h(V_i,extra)>0.
static int find_internal_plus (OMT_NODE n, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, u_int32_t *index) {
static inline int find_internal_plus (OMT_NODE n, int (*h)(OMTVALUE, void*extra), void*extra, OMTVALUE *value, u_int32_t *index) {
if (n==NULL) return DB_NOTFOUND;
int hv = h(n->value, extra);
if (hv>0) {
......@@ -283,7 +284,7 @@ int toku_omt_find(OMT V, int (*h)(OMTVALUE, void*extra), void*extra, int directi
}
}
static void free_omt_nodes (OMT_NODE n) {
static inline void free_omt_nodes (OMT_NODE n) {
if (n==0) return;
free_omt_nodes(n->left);
free_omt_nodes(n->right);
......@@ -298,7 +299,7 @@ static void free_omt_nodes (OMT_NODE n) {
// right side is values+2 of size 0
// numvalues=1, halfway=0, left side is values of size 0
// right side is values of size 0.
static int create_from_sorted_array_internal(OMT_NODE *np, OMTVALUE *values, u_int32_t numvalues) {
static inline int create_from_sorted_array_internal(OMT_NODE *np, OMTVALUE *values, u_int32_t numvalues) {
if (numvalues==0) {
*np=NULL;
return 0;
......@@ -324,7 +325,7 @@ static int create_from_sorted_array_internal(OMT_NODE *np, OMTVALUE *values, u_i
}
int toku_omt_create_from_sorted_array(OMT *omtp, OMTVALUE *values, u_int32_t numvalues) {
OMT omt;
OMT omt = NULL;
int r;
if ((r = toku_omt_create(&omt))) return r;
if ((r = create_from_sorted_array_internal(&omt->root, values, numvalues))) {
......@@ -351,7 +352,7 @@ u_int32_t toku_omt_size(OMT V) {
return nweight(V->root);
}
static int iterate_internal(OMT_NODE n, u_int32_t idx, int (*f)(OMTVALUE, u_int32_t, void*), void*v) {
static inline int iterate_internal(OMT_NODE n, u_int32_t idx, int (*f)(OMTVALUE, u_int32_t, void*), void*v) {
int r;
if (n==NULL) return 0;
if ((r=iterate_internal(n->left, idx, f, v))) return r;
......@@ -364,10 +365,10 @@ int toku_omt_iterate(OMT omt, int (*f)(OMTVALUE, u_int32_t, void*), void*v) {
}
int toku_omt_split_at(OMT omt, OMT *newomtp, u_int32_t index) {
if (index>=nweight(omt->root)) return ERANGE;
if (index>nweight(omt->root)) return ERANGE;
int r;
u_int32_t newsize = toku_omt_size(omt)-index;
OMT newomt;
OMT newomt = NULL;
if ((r = toku_omt_create(&newomt))) return r;
if ((r = make_sure_array_is_sized_ok(newomt, newsize))) {
fail:
......@@ -394,7 +395,7 @@ int toku_omt_split_at(OMT omt, OMT *newomtp, u_int32_t index) {
int toku_omt_merge(OMT leftomt, OMT rightomt, OMT *newomtp) {
int r;
OMT newomt;
OMT newomt = NULL;
u_int32_t newsize = toku_omt_size(leftomt)+toku_omt_size(rightomt);
if ((r = toku_omt_create(&newomt))) return r;
if ((r = make_sure_array_is_sized_ok(newomt, newsize))) {
......
......@@ -125,7 +125,7 @@ int toku_omt_set_at (OMT omt, OMTVALUE value, u_int32_t index);
// Effect: Replaces the item at index with value.
// Returns:
// 0 success
// ERANGE
// ERANGE if index>=toku_omt_size(omt)
// On error, omt i sunchanged.
// Performance: time=O(\log N)
// Rationale: The BRT needs to be able to replace a value with another copy of the same value (allocated in a different location)
......@@ -164,7 +164,7 @@ int toku_omt_fetch (OMT V, u_int32_t i, OMTVALUE *v);
// Requires: v != NULL
// Returns
// 0 success
// ERANGE if i out of range
// ERANGE if index>=toku_omt_size(omt)
// On nonzero return, *v is unchanged.
// Performance: time=O(\log N)
......
......@@ -175,7 +175,7 @@ void toku_recover_newbrtnode (LSN lsn, FILENUM filenum,DISKOFF diskoff,u_int32_t
n->thisnodename = diskoff;
n->log_lsn = n->disk_lsn = lsn;
//printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn);
n->layout_version = 5;
n->layout_version = BRT_LAYOUT_VERSION;
n->height = height;
n->rand4fingerprint = rand4fingerprint;
n->flags = is_dup_sort ? TOKU_DB_DUPSORT : 0; // Don't have TOKU_DB_DUP ???
......@@ -488,6 +488,13 @@ void toku_recover_cfclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) {
toku_free_BYTESTRING(fname);
}
static int fill_buf (LEAFENTRY le, u_int32_t idx, void *varray) {
LEAFENTRY *array=varray;
array[idx]=le;
return 0;
}
// The memory for the new node should have already been allocated.
void toku_recover_leafsplit (LSN lsn, FILENUM filenum, DISKOFF old_diskoff, DISKOFF new_diskoff, u_int32_t old_n, u_int32_t new_n, u_int32_t new_node_size, u_int32_t new_rand4, u_int8_t is_dup_sort) {
struct cf_pair *pair = NULL;
......@@ -500,22 +507,76 @@ void toku_recover_leafsplit (LSN lsn, FILENUM filenum, DISKOFF old_diskoff, DISK
assert(oldn->height==0);
TAGMALLOC(BRTNODE, newn);
assert(newn);
//printf("%s:%d leafsplit %p (%lld) %p (%lld)\n", __FILE__, __LINE__, oldn, old_diskoff, newn, new_diskoff);
newn->nodesize = new_node_size;
newn->thisnodename = new_diskoff;
newn->log_lsn = newn->disk_lsn = lsn;
//printf("%s:%d %p->disk_lsn=%"PRId64"\n", __FILE__, __LINE__, n, n->disk_lsn.lsn);
newn->layout_version = 4;
newn->layout_version = BRT_LAYOUT_VERSION;
newn->height = 0;
newn->rand4fingerprint = new_rand4;
newn->flags = is_dup_sort ? TOKU_DB_DUPSORT : 0; // Don't have TOKU_DB_DUP ???
newn->local_fingerprint = 0; // nothing there yet
newn->dirty = 1;
{
u_int32_t mpsize = newn->nodesize + newn->nodesize/4;
void *mp = toku_malloc(mpsize);
assert(mp);
toku_mempool_init(&newn->u.l.buffer_mempool, mp, mpsize);
}
assert(toku_omt_size(oldn->u.l.buffer)==old_n);
r = toku_omt_split_at(oldn->u.l.buffer, &newn->u.l.buffer, new_n);
u_int32_t n_leafentries = old_n;
LEAFENTRY *MALLOC_N(n_leafentries, leafentries);
assert(leafentries);
toku_omt_iterate(oldn->u.l.buffer, fill_buf, leafentries);
{
u_int32_t i;
u_int32_t new_fp = 0, new_size = 0;
for (i=new_n; i<n_leafentries; i++) {
LEAFENTRY oldle = leafentries[i];
LEAFENTRY newle = toku_mempool_malloc(&newn->u.l.buffer_mempool, leafentry_memsize(oldle), 1);
assert(newle);
new_fp += toku_le_crc(oldle);
new_size += OMT_ITEM_OVERHEAD + leafentry_disksize(oldle);
memcpy(newle, oldle, leafentry_memsize(oldle));
toku_mempool_mfree(&oldn->u.l.buffer_mempool, oldle, leafentry_memsize(oldle));
leafentries[i] = newle;
}
toku_omt_destroy(&oldn->u.l.buffer);
r = toku_omt_create_from_sorted_array(&newn->u.l.buffer, leafentries+new_n, n_leafentries-new_n);
assert(r==0);
newn->u.l.n_bytes_in_buffer = new_size;
newn->local_fingerprint = newn->rand4fingerprint * new_fp;
}
{
u_int32_t i;
u_int32_t old_fp = 0, old_size = 0;
for (i=0; i<new_n; i++) {
LEAFENTRY oldle = leafentries[i];
old_fp += toku_le_crc(oldle);
old_size += OMT_ITEM_OVERHEAD + leafentry_disksize(oldle);
}
r = toku_omt_create_from_sorted_array(&oldn->u.l.buffer, leafentries, new_n);
oldn->u.l.n_bytes_in_buffer = old_size;
oldn->local_fingerprint = oldn->rand4fingerprint * old_fp;
}
toku_free(leafentries);
//r = toku_omt_split_at(oldn->u.l.buffer, &newn->u.l.buffer, new_n);
toku_verify_all_in_mempool(oldn); toku_verify_counts(oldn);
toku_verify_all_in_mempool(newn); toku_verify_counts(newn);
toku_cachetable_put(pair->cf, new_diskoff, newn, toku_serialize_brtnode_size(newn), toku_brtnode_flush_callback, toku_brtnode_fetch_callback, 0);
newn->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, new_diskoff, 1, toku_serialize_brtnode_size(newn));
assert(r==0);
oldn->log_lsn = lsn;
r = toku_cachetable_unpin(pair->cf, old_diskoff, 1, toku_serialize_brtnode_size(oldn));
assert(r==0);
}
void toku_recover_insertleafentry (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t idx, LEAFENTRY newleafentry) {
......
/* A simple case to see if recovery works. */
// A simple case to see if recovery works.
// Create a file (foo.db) in a transaction and commit.
// Insert some random key-value pairs in a transaciton an dcommit.
// Close the environments, delete foo.db, and then
// run recovery.
// Verify that the data is present.
#include <db.h>
#include <stdlib.h>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment