Commit 3e61a540 authored by John Esmet's avatar John Esmet

refs #258 Pass the old key length down to le_pack, so that mempool free has the right value

parent 8b2b6ae2
......@@ -441,6 +441,7 @@ void bn_data::get_space_for_overwrite(
uint32_t idx,
const void* keyp UU(),
uint32_t keylen UU(),
uint32_t old_keylen,
uint32_t old_le_size,
uint32_t new_size,
LEAFENTRY* new_le_space,
......@@ -455,8 +456,8 @@ void bn_data::get_space_for_overwrite(
int r = m_buffer.fetch(idx, &klpair_len, &klp);
invariant_zero(r);
paranoid_invariant(klp!=nullptr);
// Key never changes.
paranoid_invariant(keylen_from_klpair_len(klpair_len) == keylen);
// Old key length should be consistent with what is stored in the DMT
invariant(keylen_from_klpair_len(klpair_len) == old_keylen);
size_t new_le_offset = toku_mempool_get_offset_from_pointer_and_base(&this->m_buffer_mempool, new_le);
paranoid_invariant(new_le_offset <= UINT32_MAX - new_size); // Not using > 4GB
......
......@@ -304,7 +304,8 @@ class bn_data {
// Allocates space in the mempool to store a new leafentry.
// This may require reorganizing the mempool and updating the dmt.
__attribute__((__nonnull__))
void get_space_for_overwrite(uint32_t idx, const void* keyp, uint32_t keylen, uint32_t old_size, uint32_t new_size, LEAFENTRY* new_le_space, void **const maybe_free);
void get_space_for_overwrite(uint32_t idx, const void* keyp, uint32_t keylen, uint32_t old_keylen, uint32_t old_size,
uint32_t new_size, LEAFENTRY* new_le_space, void **const maybe_free);
// Allocates space in the mempool to store a new leafentry
// and inserts a new key into the dmt
......
......@@ -1224,6 +1224,7 @@ toku_ft_bn_apply_msg_once(
BASEMENTNODE bn,
const FT_MSG msg,
uint32_t idx,
uint32_t le_keylen,
LEAFENTRY le,
txn_gc_info *gc_info,
uint64_t *workdonep,
......
......@@ -1740,6 +1740,7 @@ toku_ft_bn_apply_msg_once (
BASEMENTNODE bn,
const FT_MSG msg,
uint32_t idx,
uint32_t le_keylen,
LEAFENTRY le,
txn_gc_info *gc_info,
uint64_t *workdone,
......@@ -1767,6 +1768,7 @@ toku_ft_bn_apply_msg_once (
le,
&bn->data_buffer,
idx,
le_keylen,
gc_info,
&new_le,
&numbytes_delta
......@@ -1816,6 +1818,7 @@ struct setval_extra_s {
XIDS xids;
const DBT *key;
uint32_t idx;
uint32_t le_keylen;
LEAFENTRY le;
txn_gc_info *gc_info;
uint64_t * workdone; // set by toku_ft_bn_apply_msg_once()
......@@ -1849,7 +1852,7 @@ static void setval_fun (const DBT *new_val, void *svextra_v) {
msg.u.id.val = &val;
}
toku_ft_bn_apply_msg_once(svextra->bn, &msg,
svextra->idx, svextra->le,
svextra->idx, svextra->le_keylen, svextra->le,
svextra->gc_info,
svextra->workdone, svextra->stats_to_update);
svextra->setval_r = 0;
......@@ -1909,7 +1912,7 @@ static int do_update(ft_update_func update_fun, DESCRIPTOR desc, BASEMENTNODE bn
le_for_update = le;
struct setval_extra_s setval_extra = {setval_tag, false, 0, bn, msg->msn, msg->xids,
keyp, idx, le_for_update, gc_info,
keyp, idx, keylen, le_for_update, gc_info,
workdone, stats_to_update};
// call handlerton's ft->update_fun(), which passes setval_extra to setval_fun()
FAKE_DB(db, desc);
......@@ -1980,7 +1983,7 @@ toku_ft_bn_apply_msg (
} else {
assert_zero(r);
}
toku_ft_bn_apply_msg_once(bn, msg, idx, storeddata, gc_info, workdone, stats_to_update);
toku_ft_bn_apply_msg_once(bn, msg, idx, keylen, storeddata, gc_info, workdone, stats_to_update);
// if the insertion point is within a window of the right edge of
// the leaf then it is sequential
......@@ -2012,7 +2015,7 @@ toku_ft_bn_apply_msg (
);
if (r == DB_NOTFOUND) break;
assert_zero(r);
toku_ft_bn_apply_msg_once(bn, msg, idx, storeddata, gc_info, workdone, stats_to_update);
toku_ft_bn_apply_msg_once(bn, msg, idx, keylen, storeddata, gc_info, workdone, stats_to_update);
break;
}
......@@ -2034,7 +2037,7 @@ toku_ft_bn_apply_msg (
msg->u.id.key = &curr_keydbt;
int deleted = 0;
if (!le_is_clean(storeddata)) { //If already clean, nothing to do.
toku_ft_bn_apply_msg_once(bn, msg, idx, storeddata, gc_info, workdone, stats_to_update);
toku_ft_bn_apply_msg_once(bn, msg, idx, curr_keylen, storeddata, gc_info, workdone, stats_to_update);
// at this point, we cannot trust msg->u.id.key to be valid.
uint32_t new_dmt_size = bn->data_buffer.num_klpairs();
if (new_dmt_size != num_klpairs) {
......@@ -2067,7 +2070,7 @@ toku_ft_bn_apply_msg (
msg->u.id.key = &curr_keydbt;
int deleted = 0;
if (le_has_xids(storeddata, msg->xids)) {
toku_ft_bn_apply_msg_once(bn, msg, idx, storeddata, gc_info, workdone, stats_to_update);
toku_ft_bn_apply_msg_once(bn, msg, idx, curr_keylen, storeddata, gc_info, workdone, stats_to_update);
uint32_t new_dmt_size = bn->data_buffer.num_klpairs();
if (new_dmt_size != num_klpairs) {
paranoid_invariant(new_dmt_size + 1 == num_klpairs);
......
......@@ -2948,7 +2948,7 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int
uint64_t workdone=0;
// there's no mvcc garbage in a bulk-loaded FT, so there's no need to pass useful gc info
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, true);
toku_ft_bn_apply_msg_once(BLB(leafnode,0), &msg, idx, NULL, &gc_info, &workdone, stats_to_update);
toku_ft_bn_apply_msg_once(BLB(leafnode,0), &msg, idx, keylen, NULL, &gc_info, &workdone, stats_to_update);
}
static int write_literal(struct dbout *out, void*data, size_t len) {
......
......@@ -246,6 +246,7 @@ toku_le_apply_msg(FT_MSG msg,
LEAFENTRY old_leafentry, // NULL if there was no stored data.
bn_data* data_buffer, // bn_data storing leafentry, if NULL, means there is no bn_data
uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced
uint32_t old_keylen,
txn_gc_info *gc_info,
LEAFENTRY *new_leafentry_p,
int64_t * numbytes_delta_p);
......
......@@ -126,7 +126,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u = {.id = { &thekey, &theval }} };
toku_ft_bn_apply_msg_once(BLB(leafnode,0), &msg, idx, NULL, &gc_info, NULL, NULL);
toku_ft_bn_apply_msg_once(BLB(leafnode,0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL);
leafnode->max_msn_applied_to_node_on_disk = msn;
......
......@@ -123,6 +123,7 @@ le_overwrite(bn_data* bn, uint32_t idx, const char *key, int keysize, const cha
idx,
key,
keysize,
keysize, // old_keylen
size_needed, // old_le_size
size_needed,
&r,
......
......@@ -218,7 +218,7 @@ insert_random_message_to_bn(
*keylenp = keydbt->size;
*keyp = toku_xmemdup(keydbt->data, keydbt->size);
int64_t numbytes;
toku_le_apply_msg(&msg, NULL, NULL, 0, &non_mvcc_gc_info, save, &numbytes);
toku_le_apply_msg(&msg, NULL, NULL, 0, keydbt->size, &non_mvcc_gc_info, save, &numbytes);
toku_ft_bn_apply_msg(t->ft->compare_fun, t->ft->update_fun, NULL, blb, &msg, &non_mvcc_gc_info, NULL, NULL);
if (msn.msn > blb->max_msn_applied.msn) {
blb->max_msn_applied = msn;
......@@ -268,7 +268,7 @@ insert_same_message_to_bns(
*keylenp = keydbt->size;
*keyp = toku_xmemdup(keydbt->data, keydbt->size);
int64_t numbytes;
toku_le_apply_msg(&msg, NULL, NULL, 0, &non_mvcc_gc_info, save, &numbytes);
toku_le_apply_msg(&msg, NULL, NULL, 0, keydbt->size, &non_mvcc_gc_info, save, &numbytes);
toku_ft_bn_apply_msg(t->ft->compare_fun, t->ft->update_fun, NULL, blb1, &msg, &non_mvcc_gc_info, NULL, NULL);
if (msn.msn > blb1->max_msn_applied.msn) {
blb1->max_msn_applied = msn;
......
......@@ -213,7 +213,7 @@ test_le_offsets (void) {
static void
test_ule_packs_to_nothing (ULE ule) {
LEAFENTRY le;
int r = le_pack(ule, NULL, 0, NULL, 0, 0, &le, nullptr);
int r = le_pack(ule, NULL, 0, NULL, 0, 0, 0, &le, nullptr);
assert(r==0);
assert(le==NULL);
}
......@@ -319,7 +319,7 @@ test_le_pack_committed (void) {
size_t memsize;
LEAFENTRY le;
int r = le_pack(&ule, nullptr, 0, nullptr, 0, 0, &le, nullptr);
int r = le_pack(&ule, nullptr, 0, nullptr, 0, 0, 0, &le, nullptr);
assert(r==0);
assert(le!=NULL);
memsize = le_memsize_from_ule(&ule);
......@@ -329,7 +329,7 @@ test_le_pack_committed (void) {
verify_ule_equal(&ule, &tmp_ule);
LEAFENTRY tmp_le;
size_t tmp_memsize;
r = le_pack(&tmp_ule, nullptr, 0, nullptr, 0, 0, &tmp_le, nullptr);
r = le_pack(&tmp_ule, nullptr, 0, nullptr, 0, 0, 0, &tmp_le, nullptr);
tmp_memsize = le_memsize_from_ule(&tmp_ule);
assert(r==0);
assert(tmp_memsize == memsize);
......@@ -377,7 +377,7 @@ test_le_pack_uncommitted (uint8_t committed_type, uint8_t prov_type, int num_pla
size_t memsize;
LEAFENTRY le;
int r = le_pack(&ule, nullptr, 0, nullptr, 0, 0, &le, nullptr);
int r = le_pack(&ule, nullptr, 0, nullptr, 0, 0, 0, &le, nullptr);
assert(r==0);
assert(le!=NULL);
memsize = le_memsize_from_ule(&ule);
......@@ -387,7 +387,7 @@ test_le_pack_uncommitted (uint8_t committed_type, uint8_t prov_type, int num_pla
verify_ule_equal(&ule, &tmp_ule);
LEAFENTRY tmp_le;
size_t tmp_memsize;
r = le_pack(&tmp_ule, nullptr, 0, nullptr, 0, 0, &tmp_le, nullptr);
r = le_pack(&tmp_ule, nullptr, 0, nullptr, 0, 0, 0, &tmp_le, nullptr);
tmp_memsize = le_memsize_from_ule(&tmp_ule);
assert(r==0);
assert(tmp_memsize == memsize);
......@@ -448,7 +448,7 @@ test_le_apply(ULE ule_initial, FT_MSG msg, ULE ule_expected) {
LEAFENTRY le_expected;
LEAFENTRY le_result;
r = le_pack(ule_initial, nullptr, 0, nullptr, 0, 0, &le_initial, nullptr);
r = le_pack(ule_initial, nullptr, 0, nullptr, 0, 0, 0, &le_initial, nullptr);
CKERR(r);
size_t result_memsize = 0;
......@@ -458,6 +458,7 @@ test_le_apply(ULE ule_initial, FT_MSG msg, ULE ule_expected) {
le_initial,
nullptr,
0,
0,
&gc_info,
&le_result,
&ignoreme);
......@@ -467,7 +468,7 @@ test_le_apply(ULE ule_initial, FT_MSG msg, ULE ule_expected) {
}
size_t expected_memsize = 0;
r = le_pack(ule_expected, nullptr, 0, nullptr, 0, 0, &le_expected, nullptr);
r = le_pack(ule_expected, nullptr, 0, nullptr, 0, 0, 0, &le_expected, nullptr);
CKERR(r);
if (le_expected) {
expected_memsize = leafentry_memsize(le_expected);
......@@ -749,7 +750,7 @@ test_le_apply_messages(void) {
static bool ule_worth_running_garbage_collection(ULE ule, TXNID oldest_referenced_xid_known) {
LEAFENTRY le;
int r = le_pack(ule, nullptr, 0, nullptr, 0, 0, &le, nullptr); CKERR(r);
int r = le_pack(ule, nullptr, 0, nullptr, 0, 0, 0, &le, nullptr); CKERR(r);
invariant_notnull(le);
txn_gc_info gc_info(nullptr, oldest_referenced_xid_known, oldest_referenced_xid_known, true);
bool worth_running = toku_le_worth_running_garbage_collection(le, &gc_info);
......
......@@ -129,7 +129,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
// apply an insert to the leaf node
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, NULL, &gc_info, NULL, NULL);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL);
// Create bad tree (don't do following):
// leafnode->max_msn_applied_to_node = msn;
......
......@@ -117,7 +117,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
MSN msn = next_dummymsn();
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, NULL, &gc_info, NULL, NULL);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -118,7 +118,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
MSN msn = next_dummymsn();
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, NULL, &gc_info, NULL, NULL);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -117,7 +117,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
MSN msn = next_dummymsn();
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, NULL, &gc_info, NULL, NULL);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -118,7 +118,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
MSN msn = next_dummymsn();
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_msg_once(BLB(leafnode,0), &msg, idx, NULL, &gc_info, NULL, NULL);
toku_ft_bn_apply_msg_once(BLB(leafnode,0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -120,7 +120,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
MSN msn = next_dummymsn();
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, NULL, &gc_info, NULL, NULL);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -117,7 +117,7 @@ append_leaf(FTNODE leafnode, void *key, size_t keylen, void *val, size_t vallen)
MSN msn = next_dummymsn();
FT_MSG_S msg = { FT_INSERT, msn, xids_get_root_xids(), .u={.id = { &thekey, &theval }} };
txn_gc_info gc_info(nullptr, TXNID_NONE, TXNID_NONE, false);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, NULL, &gc_info, NULL, NULL);
toku_ft_bn_apply_msg_once(BLB(leafnode, 0), &msg, idx, keylen, NULL, &gc_info, NULL, NULL);
// dont forget to dirty the node
leafnode->dirty = 1;
......
......@@ -148,6 +148,7 @@ le_pack(ULE ule, // data to be packed into new leafentry
uint32_t idx,
void* keyp,
uint32_t keylen,
uint32_t old_keylen,
uint32_t old_le_size,
LEAFENTRY * const new_leafentry_p, // this is what this function creates
void **const maybe_free
......
......@@ -256,6 +256,7 @@ static void get_space_for_le(
uint32_t idx,
void* keyp,
uint32_t keylen,
uint32_t old_keylen,
uint32_t old_le_size,
size_t size,
LEAFENTRY* new_le_space,
......@@ -268,7 +269,7 @@ static void get_space_for_le(
else {
// this means we are overwriting something
if (old_le_size > 0) {
data_buffer->get_space_for_overwrite(idx, keyp, keylen, old_le_size, size, new_le_space, maybe_free);
data_buffer->get_space_for_overwrite(idx, keyp, keylen, old_keylen, old_le_size, size, new_le_space, maybe_free);
}
// this means we are inserting something new
else {
......@@ -496,6 +497,7 @@ toku_le_apply_msg(FT_MSG msg,
LEAFENTRY old_leafentry, // NULL if there was no stored data.
bn_data* data_buffer, // bn_data storing leafentry, if NULL, means there is no bn_data
uint32_t idx, // index in data_buffer where leafentry is stored (and should be replaced
uint32_t old_keylen, // length of the any key in data_buffer
txn_gc_info *gc_info,
LEAFENTRY *new_leafentry_p,
int64_t * numbytes_delta_p) { // change in total size of key and val, not including any overhead
......@@ -552,6 +554,7 @@ toku_le_apply_msg(FT_MSG msg,
idx,
ft_msg_get_key(msg), // contract of this function is caller has this set, always
keylen, // contract of this function is caller has this set, always
old_keylen,
oldmemsize,
new_leafentry_p,
&maybe_free
......@@ -655,6 +658,7 @@ toku_le_garbage_collect(LEAFENTRY old_leaf_entry,
idx,
keyp,
keylen,
keylen, // old_keylen, same because the key isn't going to change for gc
old_mem_size,
new_leaf_entry,
&maybe_free
......@@ -974,6 +978,7 @@ le_pack(ULE ule, // data to be packed into new leafentry
uint32_t idx,
void* keyp,
uint32_t keylen,
uint32_t old_keylen,
uint32_t old_le_size,
LEAFENTRY * const new_leafentry_p, // this is what this function creates
void **const maybe_free
......@@ -996,7 +1001,8 @@ le_pack(ULE ule, // data to be packed into new leafentry
}
}
if (data_buffer && old_le_size > 0) {
data_buffer->delete_leafentry(idx, keylen, old_le_size);
// must pass old_keylen and old_le_size, since that's what is actually stored in data_buffer
data_buffer->delete_leafentry(idx, old_keylen, old_le_size);
}
*new_leafentry_p = NULL;
rval = 0;
......@@ -1005,7 +1011,7 @@ le_pack(ULE ule, // data to be packed into new leafentry
found_insert:
memsize = le_memsize_from_ule(ule);
LEAFENTRY new_leafentry;
get_space_for_le(data_buffer, idx, keyp, keylen, old_le_size, memsize, &new_leafentry, maybe_free);
get_space_for_le(data_buffer, idx, keyp, keylen, old_keylen, old_le_size, memsize, &new_leafentry, maybe_free);
//p always points to first unused byte after leafentry we are packing
uint8_t *p;
......@@ -2467,6 +2473,7 @@ toku_le_upgrade_13_14(LEAFENTRY_13 old_leafentry,
nullptr, //only matters if we are passing in a bn_data
0, //only matters if we are passing in a bn_data
0, //only matters if we are passing in a bn_data
0, //only matters if we are passing in a bn_data
new_leafentry_p,
nullptr //only matters if we are passing in a bn_data
);
......
......@@ -232,7 +232,8 @@ void *toku_mempool_malloc(struct mempool *mp, size_t size, int alignment) {
void toku_mempool_mfree(struct mempool *mp, void *vp, size_t size) {
if (vp) { paranoid_invariant(toku_mempool_inrange(mp, vp, size)); }
mp->frag_size += size;
paranoid_invariant(mp->frag_size <= mp->size);
invariant(mp->frag_size <= mp->free_offset);
invariant(mp->frag_size <= mp->size);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment