Commit a284c6a9 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1924 refs[t:1924] Fixed upgrade logic: use separate mempool/omt for version 10/11

git-svn-id: file:///svn/toku/tokudb@13943 c7de825b-a66e-492c-adef-691d508d4ae1
parent fee19e87
...@@ -600,7 +600,6 @@ deserialize_brtnode_leaf_from_rbuf_10 (BRTNODE result, bytevec magic, struct rbu ...@@ -600,7 +600,6 @@ deserialize_brtnode_leaf_from_rbuf_10 (BRTNODE result, bytevec magic, struct rbu
static int static int
deserialize_brtnode_nonleaf_from_rbuf_10 (BRTNODE result, bytevec magic, struct rbuf *rb) { deserialize_brtnode_nonleaf_from_rbuf_10 (BRTNODE result, bytevec magic, struct rbuf *rb) {
assert(FALSE); //This is so far just the copy of version 11
int r; int r;
int i; int i;
...@@ -819,12 +818,11 @@ le10_crc(LEAFENTRY v) { ...@@ -819,12 +818,11 @@ le10_crc(LEAFENTRY v) {
} }
//old_le10 is opaque data only readable by accessors (Not a 'new' LEAFENTRY) //old_le10 is opaque data only readable by accessors (Not a 'new' LEAFENTRY)
static int static void
upgrade_single_leafentry_10_11 (BRTNODE node, u_int32_t idx, LEAFENTRY old_le10) { upgrade_single_leafentry_10_11 (BRTNODE node, u_int32_t idx11, LEAFENTRY old_le10, OMT omt11, struct mempool *mp11) {
//See brt_leaf_apply_cmd_once for template //See brt_leaf_apply_cmd_once for template
size_t newlen=0, newdisksize=0; size_t newlen=0, newdisksize=0;
LEAFENTRY new_le = NULL; LEAFENTRY new_le = NULL;
void *maybe_free = NULL;
ULE_S ule; ULE_S ule;
int r; int r;
assert(old_le10); assert(old_le10);
...@@ -832,88 +830,80 @@ upgrade_single_leafentry_10_11 (BRTNODE node, u_int32_t idx, LEAFENTRY old_le10) ...@@ -832,88 +830,80 @@ upgrade_single_leafentry_10_11 (BRTNODE node, u_int32_t idx, LEAFENTRY old_le10)
r = le_pack(&ule, // create packed leafentry r = le_pack(&ule, // create packed leafentry
&newlen, &newdisksize, &newlen, &newdisksize,
&new_le, &new_le,
node->u.l.buffer, &node->u.l.buffer_mempool, &maybe_free); omt11, mp11, NULL);
if (r!=0) goto cleanup; assert(r==0);
//Update size of memory information and crc
//Subtract old version 10 leafentry information
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + le10_disksize(old_le10);
node->local_fingerprint -= node->rand4fingerprint * le10_crc(old_le10);
if (new_le) { if (new_le) {
//Version 10 leafentry is being upgraded //Version 10 leafentry is being upgraded
//Update size of memory information and crc
//Subtract old version 10 leafentry information
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + le10_disksize(old_le10);
node->local_fingerprint -= node->rand4fingerprint * le10_crc(old_le10);
u_int32_t size = leafentry_memsize_10(old_le10);
// This mfree must occur after the mempool_malloc so that when the mempool is compressed everything is accounted for.
// But we must compute the size before doing the mempool mfree because otherwise the le pointer is no good.
toku_mempool_mfree(&node->u.l.buffer_mempool, 0, size); // Must pass 0, since old_le10 may be no good any more.
assert(newdisksize == leafentry_disksize(new_le)); assert(newdisksize == leafentry_disksize(new_le));
//Add new version 10 leafentry information //Add new version 10 leafentry information
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize; node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
node->local_fingerprint += node->rand4fingerprint*toku_le_crc(new_le); node->local_fingerprint += node->rand4fingerprint*toku_le_crc(new_le);
if ((r = toku_omt_set_at(node->u.l.buffer, new_le, idx))) goto cleanup; r = toku_omt_insert_at(omt11, new_le, idx11);
} assert(r==0);
else {
//Version 10 leafentry is being deleted
//It was there, note that it's gone
//(Was already removed from mempool)
// Figure out if one of the other keys is the same key
toku_upgrade_maybe_bump_nkeys(node, idx, old_le10, -1);
if ((r = toku_omt_delete_at(node->u.l.buffer, idx))) goto cleanup;
//Subtract old version 10 leafentry information
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + le10_disksize(old_le10);
node->local_fingerprint -= node->rand4fingerprint * le10_crc(old_le10);
{
u_int32_t oldlen = le10_any_vallen(old_le10) + le10_any_keylen(old_le10);
assert(node->u.l.leaf_stats.dsize >= oldlen);
node->u.l.leaf_stats.dsize -= oldlen;
}
assert(node->u.l.leaf_stats.dsize < (1U<<31)); // make sure we didn't underflow
node->u.l.leaf_stats.ndata --;
toku_mempool_mfree(&node->u.l.buffer_mempool, 0, leafentry_memsize(old_le10)); // Must pass 0, since old_le10 may be no good any more.
} }
r = 0;
cleanup:
if (maybe_free) toku_free(maybe_free);
return r;
} }
//Upgrade each leafentry from version 10 to 11(nested transactions) //Upgrade each leafentry from version 10 to 11(nested transactions)
//Need to update checksums, and memory pools //Need to update checksums, and memory pools
static int static void
upgrade_brtnode_leaf_10_11 (BRTNODE node) { upgrade_brtnode_leaf_10_11 (BRTNODE node) {
int r; int r;
u_int32_t idx = 0; u_int32_t idx10 = 0;
u_int32_t omtsize = toku_omt_size(node->u.l.buffer); u_int32_t idx11 = 0;
while (idx < omtsize) { OMT omt10 = node->u.l.buffer;
OMT omt11 = NULL;
struct mempool mp10 = node->u.l.buffer_mempool;
struct mempool mp11;
{
//Create a new mempool
size_t starting_size = toku_mempool_get_size(&mp10);
void *base = toku_xmalloc(starting_size);
toku_mempool_init(&mp11, base, starting_size);
}
r = toku_omt_create(&omt11);
assert(r==0);
u_int32_t omt11size = toku_omt_size(node->u.l.buffer);
while (idx10 < omt11size) {
OMTVALUE old_le10; OMTVALUE old_le10;
r = toku_omt_fetch(node->u.l.buffer, idx, &old_le10, NULL); r = toku_omt_fetch(node->u.l.buffer, idx10, &old_le10, NULL);
assert(r==0); assert(r==0);
assert(old_le10); assert(old_le10);
r = upgrade_single_leafentry_10_11(node, idx, old_le10); upgrade_single_leafentry_10_11(node, idx11, old_le10, omt11, &mp11);
if (r!=0) goto cleanup;
u_int32_t new_omtsize = toku_omt_size(node->u.l.buffer); u_int32_t omtsize11 = toku_omt_size(omt11);
if (new_omtsize != omtsize) { if (omtsize11 != idx11) {
assert(omtsize-1 == new_omtsize); assert(omtsize11 == idx11+1);
omtsize = new_omtsize; //Leafentry survived (insert)
//something was deleted, next leafentry is at same index. idx11++;
} }
else idx10++; //Always advance the old omt
idx++; //next leafentry is at next index
} }
r = 0; //Free the old mempool
cleanup: {
return r; void *mpbase = toku_mempool_get_base(&mp10);
toku_mempool_fini(&mp10);
toku_free(mpbase);
}
//Free the old omt
toku_omt_destroy(&omt10);
//Assign new mempool
node->u.l.buffer_mempool = mp11;
//Assign new omt
node->u.l.buffer = omt11;
//Calculate statistics
toku_calculate_leaf_stats(node);
} }
static int static void
upgrade_brtnode_nonleaf_10_11 (BRTNODE node) { upgrade_brtnode_nonleaf_10_11 (BRTNODE node) {
int i; int i;
int r; int r;
...@@ -958,7 +948,6 @@ upgrade_brtnode_nonleaf_10_11 (BRTNODE node) { ...@@ -958,7 +948,6 @@ upgrade_brtnode_nonleaf_10_11 (BRTNODE node) {
BNC_BUFFER(node,i) = fifo11; BNC_BUFFER(node,i) = fifo11;
fifo11 = NULL; fifo11 = NULL;
} }
return 0;
} }
// Structure of brtnode is same for versions 10, 11. The only difference is in the // Structure of brtnode is same for versions 10, 11. The only difference is in the
...@@ -966,13 +955,13 @@ upgrade_brtnode_nonleaf_10_11 (BRTNODE node) { ...@@ -966,13 +955,13 @@ upgrade_brtnode_nonleaf_10_11 (BRTNODE node) {
// of the brtnode is left in place (*brtnode_10 is reused.) // of the brtnode is left in place (*brtnode_10 is reused.)
static int static int
upgrade_brtnode_10_11 (BRTNODE *brtnode_10, BRTNODE *brtnode_11) { upgrade_brtnode_10_11 (BRTNODE *brtnode_10, BRTNODE *brtnode_11) {
int r;
if ((*brtnode_10)->height>0) if ((*brtnode_10)->height>0)
r = upgrade_brtnode_nonleaf_10_11(*brtnode_10); upgrade_brtnode_nonleaf_10_11(*brtnode_10);
else else
r = upgrade_brtnode_leaf_10_11(*brtnode_10); upgrade_brtnode_leaf_10_11(*brtnode_10);
*brtnode_11 = *brtnode_10; *brtnode_11 = *brtnode_10;
*brtnode_10 = NULL; *brtnode_10 = NULL;
(*brtnode_11)->layout_version = BRT_LAYOUT_VERSION_11;
(*brtnode_11)->dirty = 1; (*brtnode_11)->dirty = 1;
return 0; return 0;
} }
......
...@@ -28,6 +28,6 @@ void toku_upgrade_ule_remove_innermost_uxr(ULE ule); ...@@ -28,6 +28,6 @@ void toku_upgrade_ule_remove_innermost_uxr(ULE ule);
void toku_upgrade_ule_push_insert_uxr(ULE ule, TXNID xid, u_int32_t vallen, void * valp); void toku_upgrade_ule_push_insert_uxr(ULE ule, TXNID xid, u_int32_t vallen, void * valp);
void toku_upgrade_ule_push_delete_uxr(ULE ule, TXNID xid); void toku_upgrade_ule_push_delete_uxr(ULE ule, TXNID xid);
//Exposed brt functions for the purpose of upgrading //Exposed brt functions for the purpose of upgrading
void toku_upgrade_maybe_bump_nkeys (BRTNODE node, u_int32_t idx, LEAFENTRY le, int direction); void toku_calculate_leaf_stats(BRTNODE node);
#endif #endif
...@@ -4755,7 +4755,8 @@ toku_brt_note_table_lock (BRT brt, TOKUTXN txn) ...@@ -4755,7 +4755,8 @@ toku_brt_note_table_lock (BRT brt, TOKUTXN txn)
//Wrapper functions for upgrading from version 10. //Wrapper functions for upgrading from version 10.
#include "backwards_10.h" #include "backwards_10.h"
void void
toku_upgrade_maybe_bump_nkeys (BRTNODE node, u_int32_t idx, LEAFENTRY le, int direction) { toku_calculate_leaf_stats (BRTNODE node) {
maybe_bump_nkeys(node, idx, le, direction); assert(node->height == 0);
node->u.l.leaf_stats = calc_leaf_stats(node);
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment