Commit af4f260a authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1924 refs[t:1924] Fixed upgrade logic: use separate mempool/omt for version 10/11

git-svn-id: file:///svn/toku/tokudb@13943 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7b09e824
......@@ -600,7 +600,6 @@ deserialize_brtnode_leaf_from_rbuf_10 (BRTNODE result, bytevec magic, struct rbu
static int
deserialize_brtnode_nonleaf_from_rbuf_10 (BRTNODE result, bytevec magic, struct rbuf *rb) {
assert(FALSE); //This is so far just the copy of version 11
int r;
int i;
......@@ -819,12 +818,11 @@ le10_crc(LEAFENTRY v) {
}
//old_le10 is opaque data only readable by accessors (Not a 'new' LEAFENTRY)
static int
upgrade_single_leafentry_10_11 (BRTNODE node, u_int32_t idx, LEAFENTRY old_le10) {
static void
upgrade_single_leafentry_10_11 (BRTNODE node, u_int32_t idx11, LEAFENTRY old_le10, OMT omt11, struct mempool *mp11) {
//See brt_leaf_apply_cmd_once for template
size_t newlen=0, newdisksize=0;
LEAFENTRY new_le = NULL;
void *maybe_free = NULL;
ULE_S ule;
int r;
assert(old_le10);
......@@ -832,88 +830,80 @@ upgrade_single_leafentry_10_11 (BRTNODE node, u_int32_t idx, LEAFENTRY old_le10)
r = le_pack(&ule, // create packed leafentry
&newlen, &newdisksize,
&new_le,
node->u.l.buffer, &node->u.l.buffer_mempool, &maybe_free);
if (r!=0) goto cleanup;
omt11, mp11, NULL);
assert(r==0);
//Update size of memory information and crc
//Subtract old version 10 leafentry information
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + le10_disksize(old_le10);
node->local_fingerprint -= node->rand4fingerprint * le10_crc(old_le10);
if (new_le) {
//Version 10 leafentry is being upgraded
//Update size of memory information and crc
//Subtract old version 10 leafentry information
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + le10_disksize(old_le10);
node->local_fingerprint -= node->rand4fingerprint * le10_crc(old_le10);
u_int32_t size = leafentry_memsize_10(old_le10);
// This mfree must occur after the mempool_malloc so that when the mempool is compressed everything is accounted for.
// But we must compute the size before doing the mempool mfree because otherwise the le pointer is no good.
toku_mempool_mfree(&node->u.l.buffer_mempool, 0, size); // Must pass 0, since old_le10 may be no good any more.
assert(newdisksize == leafentry_disksize(new_le));
//Add new version 10 leafentry information
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
node->local_fingerprint += node->rand4fingerprint*toku_le_crc(new_le);
if ((r = toku_omt_set_at(node->u.l.buffer, new_le, idx))) goto cleanup;
}
else {
//Version 10 leafentry is being deleted
//It was there, note that it's gone
//(Was already removed from mempool)
// Figure out if one of the other keys is the same key
toku_upgrade_maybe_bump_nkeys(node, idx, old_le10, -1);
if ((r = toku_omt_delete_at(node->u.l.buffer, idx))) goto cleanup;
//Subtract old version 10 leafentry information
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + le10_disksize(old_le10);
node->local_fingerprint -= node->rand4fingerprint * le10_crc(old_le10);
{
u_int32_t oldlen = le10_any_vallen(old_le10) + le10_any_keylen(old_le10);
assert(node->u.l.leaf_stats.dsize >= oldlen);
node->u.l.leaf_stats.dsize -= oldlen;
}
assert(node->u.l.leaf_stats.dsize < (1U<<31)); // make sure we didn't underflow
node->u.l.leaf_stats.ndata --;
toku_mempool_mfree(&node->u.l.buffer_mempool, 0, leafentry_memsize(old_le10)); // Must pass 0, since old_le10 may be no good any more.
r = toku_omt_insert_at(omt11, new_le, idx11);
assert(r==0);
}
r = 0;
cleanup:
if (maybe_free) toku_free(maybe_free);
return r;
}
//Upgrade each leafentry from version 10 to 11(nested transactions)
//Need to update checksums, and memory pools
static int
static void
upgrade_brtnode_leaf_10_11 (BRTNODE node) {
int r;
u_int32_t idx = 0;
u_int32_t omtsize = toku_omt_size(node->u.l.buffer);
while (idx < omtsize) {
u_int32_t idx10 = 0;
u_int32_t idx11 = 0;
OMT omt10 = node->u.l.buffer;
OMT omt11 = NULL;
struct mempool mp10 = node->u.l.buffer_mempool;
struct mempool mp11;
{
//Create a new mempool
size_t starting_size = toku_mempool_get_size(&mp10);
void *base = toku_xmalloc(starting_size);
toku_mempool_init(&mp11, base, starting_size);
}
r = toku_omt_create(&omt11);
assert(r==0);
u_int32_t omt11size = toku_omt_size(node->u.l.buffer);
while (idx10 < omt11size) {
OMTVALUE old_le10;
r = toku_omt_fetch(node->u.l.buffer, idx, &old_le10, NULL);
r = toku_omt_fetch(node->u.l.buffer, idx10, &old_le10, NULL);
assert(r==0);
assert(old_le10);
r = upgrade_single_leafentry_10_11(node, idx, old_le10);
if (r!=0) goto cleanup;
u_int32_t new_omtsize = toku_omt_size(node->u.l.buffer);
if (new_omtsize != omtsize) {
assert(omtsize-1 == new_omtsize);
omtsize = new_omtsize;
//something was deleted, next leafentry is at same index.
upgrade_single_leafentry_10_11(node, idx11, old_le10, omt11, &mp11);
u_int32_t omtsize11 = toku_omt_size(omt11);
if (omtsize11 != idx11) {
assert(omtsize11 == idx11+1);
//Leafentry survived (insert)
idx11++;
}
else
idx++; //next leafentry is at next index
idx10++; //Always advance the old omt
}
r = 0;
cleanup:
return r;
//Free the old mempool
{
void *mpbase = toku_mempool_get_base(&mp10);
toku_mempool_fini(&mp10);
toku_free(mpbase);
}
//Free the old omt
toku_omt_destroy(&omt10);
//Assign new mempool
node->u.l.buffer_mempool = mp11;
//Assign new omt
node->u.l.buffer = omt11;
//Calculate statistics
toku_calculate_leaf_stats(node);
}
static int
static void
upgrade_brtnode_nonleaf_10_11 (BRTNODE node) {
int i;
int r;
......@@ -958,7 +948,6 @@ upgrade_brtnode_nonleaf_10_11 (BRTNODE node) {
BNC_BUFFER(node,i) = fifo11;
fifo11 = NULL;
}
return 0;
}
// Structure of brtnode is same for versions 10, 11. The only difference is in the
......@@ -966,13 +955,13 @@ upgrade_brtnode_nonleaf_10_11 (BRTNODE node) {
// of the brtnode is left in place (*brtnode_10 is reused.)
static int
upgrade_brtnode_10_11 (BRTNODE *brtnode_10, BRTNODE *brtnode_11) {
int r;
if ((*brtnode_10)->height>0)
r = upgrade_brtnode_nonleaf_10_11(*brtnode_10);
upgrade_brtnode_nonleaf_10_11(*brtnode_10);
else
r = upgrade_brtnode_leaf_10_11(*brtnode_10);
upgrade_brtnode_leaf_10_11(*brtnode_10);
*brtnode_11 = *brtnode_10;
*brtnode_10 = NULL;
(*brtnode_11)->layout_version = BRT_LAYOUT_VERSION_11;
(*brtnode_11)->dirty = 1;
return 0;
}
......
......@@ -28,6 +28,6 @@ void toku_upgrade_ule_remove_innermost_uxr(ULE ule);
void toku_upgrade_ule_push_insert_uxr(ULE ule, TXNID xid, u_int32_t vallen, void * valp);
void toku_upgrade_ule_push_delete_uxr(ULE ule, TXNID xid);
//Exposed brt functions for the purpose of upgrading
void toku_upgrade_maybe_bump_nkeys (BRTNODE node, u_int32_t idx, LEAFENTRY le, int direction);
void toku_calculate_leaf_stats(BRTNODE node);
#endif
......@@ -4755,7 +4755,8 @@ toku_brt_note_table_lock (BRT brt, TOKUTXN txn)
//Wrapper functions for upgrading from version 10.
#include "backwards_10.h"
void
toku_upgrade_maybe_bump_nkeys (BRTNODE node, u_int32_t idx, LEAFENTRY le, int direction) {
maybe_bump_nkeys(node, idx, le, direction);
toku_calculate_leaf_stats (BRTNODE node) {
assert(node->height == 0);
node->u.l.leaf_stats = calc_leaf_stats(node);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment