Commit ab106122 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1125 Fix a couple of off-by-one errors with xids (max level of nested transactions)

Added tests that check all message types/changes you can do to a non-existant leafentry
Added tests that check all message types/changes you can do to a committed (insert) leafentry

git-svn-id: file:///svn/toku/tokudb@13529 c7de825b-a66e-492c-adef-691d508d4ae1
parent a6a6b242
......@@ -7,6 +7,7 @@
#include "ule.h"
enum {MAX_SIZE = 256};
static XIDS nested_xids[MAX_TRANSACTION_RECORDS];
static void
verify_ule_equal(ULE a, ULE b) {
......@@ -26,6 +27,26 @@ verify_ule_equal(ULE a, ULE b) {
}
}
static void
verify_le_equal(LEAFENTRY a, LEAFENTRY b) {
if (a==NULL) assert(b==NULL);
else {
assert(b!=NULL);
size_t size = leafentry_memsize(a);
assert(size==leafentry_memsize(b));
assert(memcmp(a, b, size) == 0);
ULE_S ule_a;
ULE_S ule_b;
le_unpack(&ule_a, a);
le_unpack(&ule_b, b);
verify_ule_equal(&ule_a, &ule_b);
}
}
static void
fillrandom(u_int8_t buf[MAX_SIZE], u_int32_t length) {
assert(length < MAX_SIZE);
......@@ -385,6 +406,306 @@ test_le_pack (void) {
}
}
static void
test_le_apply(ULE ule_initial, BRT_MSG msg, ULE ule_expected) {
int r;
LEAFENTRY le_initial;
LEAFENTRY le_expected;
LEAFENTRY le_result;
size_t initial_memsize;
size_t initial_disksize;
r = le_pack(ule_initial, &initial_memsize, &initial_disksize,
&le_initial, NULL, NULL, NULL);
CKERR(r);
size_t result_memsize;
size_t result_disksize;
r = apply_msg_to_leafentry(msg,
le_initial,
&result_memsize, &result_disksize,
&le_result,
NULL, NULL, NULL);
CKERR(r);
if (le_result)
le_verify_accessors(le_result, ule_expected, result_memsize, result_disksize);
size_t expected_memsize;
size_t expected_disksize;
r = le_pack(ule_expected, &expected_memsize, &expected_disksize,
&le_expected, NULL, NULL, NULL);
CKERR(r);
verify_le_equal(le_result, le_expected);
if (le_result && le_expected) {
assert(result_memsize == expected_memsize);
assert(result_disksize == expected_disksize);
}
if (le_initial) toku_free(le_initial);
if (le_result) toku_free(le_result);
if (le_expected) toku_free(le_expected);
}
static const ULE_S ule_committed_delete = {
.num_uxrs = 1,
.keylen = 0,
.keyp = NULL,
.uxrs[0] = {
.type = XR_DELETE,
.vallen = 0,
.xid = 0,
.valp = NULL
}
};
static BRT_MSG
msg_init(BRT_MSG msg, int type, XIDS xids,
DBT *key, DBT *val) {
msg->type = type;
msg->xids = xids;
msg->u.id.key = key;
msg->u.id.val = val;
return msg;
}
static u_int8_t
next_nesting_level(u_int8_t current) {
u_int8_t rval = current + 1;
if (current > 3 && current < MAX_TRANSACTION_RECORDS - 1) {
rval = current + random() % 100;
if (rval >= MAX_TRANSACTION_RECORDS)
rval = MAX_TRANSACTION_RECORDS - 1;
}
return rval;
}
static void
generate_committed_for(ULE ule, DBT *key, DBT *val) {
ule->num_uxrs = 1;
ule->keylen = key->size;
ule->keyp = key->data;
ule->uxrs[0].type = XR_INSERT;
ule->uxrs[0].vallen = val->size;
ule->uxrs[0].valp = val->data;
ule->uxrs[0].xid = 0;
}
static void
generate_provpair_for(ULE ule, BRT_MSG msg) {
u_int8_t level;
XIDS xids = msg->xids;
ule->num_uxrs = xids_get_num_xids(xids);
ule->keylen = msg->u.id.key->size;
ule->keyp = msg->u.id.key->data;
ule->uxrs[0].type = XR_DELETE;
ule->uxrs[0].vallen = 0;
ule->uxrs[0].valp = NULL;
ule->uxrs[0].xid = xids_get_xid(xids, 0);
for (level = 1; level < ule->num_uxrs - 1; level++) {
ule->uxrs[level].type = XR_PLACEHOLDER;
ule->uxrs[level].vallen = 0;
ule->uxrs[level].valp = NULL;
ule->uxrs[level].xid = xids_get_xid(xids, level);
}
ule->uxrs[ule->num_uxrs - 1].type = XR_INSERT;
ule->uxrs[ule->num_uxrs - 1].vallen = msg->u.id.val->size;
ule->uxrs[ule->num_uxrs - 1].valp = msg->u.id.val->data;
ule->uxrs[ule->num_uxrs - 1].xid = xids_get_innermost_xid(xids);
}
static void
test_le_empty_apply(void) {
ULE_S ule_initial = ule_committed_delete;
BRT_MSG_S msg;
DBT key;
DBT val;
u_int8_t keybuf[MAX_SIZE];
u_int8_t valbuf[MAX_SIZE];
u_int32_t keysize;
u_int32_t valsize;
u_int8_t nesting_level;
for (keysize = 0; keysize < MAX_SIZE; keysize += (random() % MAX_SIZE) + 1) {
for (valsize = 0; valsize < MAX_SIZE; valsize += (random() % MAX_SIZE) + 1) {
for (nesting_level = 0;
nesting_level < MAX_TRANSACTION_RECORDS;
nesting_level = next_nesting_level(nesting_level)) {
XIDS msg_xids = nested_xids[nesting_level];
fillrandom(keybuf, keysize);
fillrandom(valbuf, valsize);
toku_fill_dbt(&key, keybuf, keysize);
toku_fill_dbt(&val, valbuf, valsize);
//COMMIT/ABORT is illegal with TXNID 0
if (nesting_level > 0) {
//Abort/commit of an empty le is an empty le
ULE_S ule_expected = ule_committed_delete;
msg_init(&msg, BRT_COMMIT_ANY, msg_xids, &key, &val);
test_le_apply(&ule_initial, &msg, &ule_expected);
msg_init(&msg, BRT_COMMIT_BOTH, msg_xids, &key, &val);
test_le_apply(&ule_initial, &msg, &ule_expected);
msg_init(&msg, BRT_ABORT_ANY, msg_xids, &key, &val);
test_le_apply(&ule_initial, &msg, &ule_expected);
msg_init(&msg, BRT_ABORT_BOTH, msg_xids, &key, &val);
test_le_apply(&ule_initial, &msg, &ule_expected);
}
{
//delete of an empty le is an empty le
ULE_S ule_expected = ule_committed_delete;
msg_init(&msg, BRT_DELETE_ANY, msg_xids, &key, &val);
test_le_apply(&ule_initial, &msg, &ule_expected);
msg_init(&msg, BRT_DELETE_BOTH, msg_xids, &key, &val);
test_le_apply(&ule_initial, &msg, &ule_expected);
}
{
msg_init(&msg, BRT_INSERT, msg_xids, &key, &val);
ULE_S ule_expected;
generate_provpair_for(&ule_expected, &msg);
test_le_apply(&ule_initial, &msg, &ule_expected);
}
}
}
}
}
static void
generate_provdel_for(ULE ule, BRT_MSG msg) {
u_int8_t level;
XIDS xids = msg->xids;
ule->num_uxrs = xids_get_num_xids(xids);
ule->keylen = msg->u.id.key->size;
ule->keyp = msg->u.id.key->data;
ule->uxrs[0].type = XR_INSERT;
ule->uxrs[0].vallen = msg->u.id.val->size;
ule->uxrs[0].valp = msg->u.id.val->data;
ule->uxrs[0].xid = xids_get_xid(xids, 0);
for (level = 1; level < ule->num_uxrs - 1; level++) {
ule->uxrs[level].type = XR_PLACEHOLDER;
ule->uxrs[level].vallen = 0;
ule->uxrs[level].valp = NULL;
ule->uxrs[level].xid = xids_get_xid(xids, level);
}
ule->uxrs[ule->num_uxrs - 1].type = XR_DELETE;
ule->uxrs[ule->num_uxrs - 1].vallen = 0;
ule->uxrs[ule->num_uxrs - 1].valp = NULL;
ule->uxrs[ule->num_uxrs - 1].xid = xids_get_innermost_xid(xids);
}
static void
generate_both_for(ULE ule, DBT *oldval, BRT_MSG msg) {
u_int8_t level;
XIDS xids = msg->xids;
ule->num_uxrs = xids_get_num_xids(xids);
ule->keylen = msg->u.id.key->size;
ule->keyp = msg->u.id.key->data;
ule->uxrs[0].type = XR_INSERT;
ule->uxrs[0].vallen = oldval->size;
ule->uxrs[0].valp = oldval->data;
ule->uxrs[0].xid = xids_get_xid(xids, 0);
for (level = 1; level < ule->num_uxrs - 1; level++) {
ule->uxrs[level].type = XR_PLACEHOLDER;
ule->uxrs[level].vallen = 0;
ule->uxrs[level].valp = NULL;
ule->uxrs[level].xid = xids_get_xid(xids, level);
}
ule->uxrs[ule->num_uxrs - 1].type = XR_INSERT;
ule->uxrs[ule->num_uxrs - 1].vallen = msg->u.id.val->size;
ule->uxrs[ule->num_uxrs - 1].valp = msg->u.id.val->data;
ule->uxrs[ule->num_uxrs - 1].xid = xids_get_innermost_xid(xids);
}
static void
test_le_committed_apply(void) {
ULE_S ule_initial;
BRT_MSG_S msg;
DBT key;
DBT val;
u_int8_t keybuf[MAX_SIZE];
u_int8_t valbuf[MAX_SIZE];
u_int32_t keysize;
u_int32_t valsize;
u_int8_t nesting_level;
for (keysize = 0; keysize < MAX_SIZE; keysize += (random() % MAX_SIZE) + 1) {
for (valsize = 0; valsize < MAX_SIZE; valsize += (random() % MAX_SIZE) + 1) {
for (nesting_level = 0;
nesting_level < MAX_TRANSACTION_RECORDS;
nesting_level = next_nesting_level(nesting_level)) {
XIDS msg_xids = nested_xids[nesting_level];
fillrandom(keybuf, keysize);
fillrandom(valbuf, valsize);
toku_fill_dbt(&key, keybuf, keysize);
toku_fill_dbt(&val, valbuf, valsize);
//Generate initial ule
generate_committed_for(&ule_initial, &key, &val);
//COMMIT/ABORT is illegal with TXNID 0
if (nesting_level > 0) {
//Commit/abort will not change a committed le
ULE_S ule_expected = ule_initial;
msg_init(&msg, BRT_COMMIT_ANY, msg_xids, &key, &val);
test_le_apply(&ule_initial, &msg, &ule_expected);
msg_init(&msg, BRT_COMMIT_BOTH, msg_xids, &key, &val);
test_le_apply(&ule_initial, &msg, &ule_expected);
msg_init(&msg, BRT_ABORT_ANY, msg_xids, &key, &val);
test_le_apply(&ule_initial, &msg, &ule_expected);
msg_init(&msg, BRT_ABORT_BOTH, msg_xids, &key, &val);
test_le_apply(&ule_initial, &msg, &ule_expected);
}
{
msg_init(&msg, BRT_DELETE_ANY, msg_xids, &key, &val);
ULE_S ule_expected;
generate_provdel_for(&ule_expected, &msg);
test_le_apply(&ule_initial, &msg, &ule_expected);
msg_init(&msg, BRT_DELETE_BOTH, msg_xids, &key, &val);
test_le_apply(&ule_initial, &msg, &ule_expected);
}
{
//INSERT SAME
msg_init(&msg, BRT_INSERT, msg_xids, &key, &val);
ULE_S ule_expected;
generate_both_for(&ule_expected, &val, &msg);
test_le_apply(&ule_initial, &msg, &ule_expected);
}
{
//NOT RIGHT YET
//INSERT DIFFERENT
u_int8_t valbuf2[MAX_SIZE];
u_int32_t valsize2 = random() % MAX_SIZE;
fillrandom(valbuf2, valsize2);
DBT val2;
toku_fill_dbt(&val2, valbuf2, valsize2);
msg_init(&msg, BRT_INSERT, msg_xids, &key, &val2);
ULE_S ule_expected;
generate_both_for(&ule_expected, &val, &msg);
test_le_apply(&ule_initial, &msg, &ule_expected);
}
}
}
}
}
static void
test_le_apply_messages(void) {
test_le_empty_apply();
test_le_committed_apply();
}
//TODO: #1125 tests:
// Will probably have to expose ULE_S definition
// - Check memsize function is correct
......@@ -422,10 +743,32 @@ test_le_pack (void) {
// Compare the two results
// Test full_promote
static void
init_xids(void) {
u_int8_t i;
nested_xids[0] = xids_get_root_xids();
for (i = 1; i < MAX_TRANSACTION_RECORDS; i++) {
int r = xids_create_child(nested_xids[i-1], &nested_xids[i], i * 37 + random() % 36);
assert(r==0);
}
}
static void
destroy_xids(void) {
u_int8_t i;
for (i = 0; i < MAX_TRANSACTION_RECORDS; i++) {
xids_destroy(&nested_xids[i]);
}
}
int
test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute__((__unused__))) {
srandom(7); //Arbitrary seed.
init_xids();
test_le_offsets();
test_le_pack();
test_le_apply_messages();
destroy_xids();
return 0;
}
......@@ -206,7 +206,7 @@ msg_init_empty_ule(ULE ule, BRT_MSG msg) {
static void
msg_modify_ule(ULE ule, BRT_MSG msg) {
XIDS xids = brt_msg_get_xids(msg);
assert(xids_get_num_xids(xids) < MAX_TRANSACTION_RECORDS);
assert(xids_get_num_xids(xids) <= MAX_TRANSACTION_RECORDS);
ule_do_implicit_promotions(ule, xids);
brt_msg_type type = brt_msg_get_type(msg);
switch (type) {
......@@ -684,7 +684,8 @@ le_full_promotion(LEAFENTRY le,
assert(le);
assert(le->num_xrs > 1); //Not committed
assert(!le_is_provdel(le));
assert(le_outermost_uncommitted_xid(le) != 0);
TXNID outermost_uncommitted_xid = le_outermost_uncommitted_xid(le);
assert(outermost_uncommitted_xid != 0);
size_t old_memsize = leafentry_memsize(le);
u_int32_t old_keylen;
......@@ -705,12 +706,15 @@ le_full_promotion(LEAFENTRY le,
BRT_MSG_S slow_full_promotion_msg = {
.type = BRT_COMMIT_ANY,
.xids = xids_get_root_xids(),
.u.id = {
.key = NULL,
.val = NULL,
}
};
int r_xids = xids_create_child(xids_get_root_xids(),
&slow_full_promotion_msg.xids,
outermost_uncommitted_xid);
assert(r_xids==0);
size_t slow_new_memsize;
size_t slow_new_disksize;
LEAFENTRY slow_le;
......@@ -771,6 +775,7 @@ le_full_promotion(LEAFENTRY le,
assert(memcpy(new_key, old_key, old_keylen) == 0);
assert(memcpy(new_val, old_val, old_vallen) == 0);
xids_destroy(&slow_full_promotion_msg.xids);
toku_free(slow_le);
toku_free(old_key);
toku_free(old_val);
......
......@@ -69,11 +69,11 @@ xids_create_child(XIDS parent_xids, // xids list for parent transaction
int rval;
assert(parent_xids);
assert(this_xid > xids_get_innermost_xid(parent_xids));
u_int8_t num_stored_xids = parent_xids->num_stored_xids + 1;
u_int8_t num_xids = num_stored_xids + 1;
u_int32_t num_stored_xids = parent_xids->num_stored_xids + 1;
u_int32_t num_xids = num_stored_xids + 1;
assert(num_xids > 0);
assert(num_xids <= MAX_TRANSACTION_RECORDS);
if (num_xids == MAX_TRANSACTION_RECORDS) rval = EINVAL;
assert(num_xids <= MAX_TRANSACTION_RECORDS + 1);
if (num_xids > MAX_TRANSACTION_RECORDS) rval = EINVAL;
else {
XIDS xids = toku_malloc(sizeof(*xids) + num_stored_xids*sizeof(xids->ids[0]));
if (!xids) rval = ENOMEM;
......@@ -94,10 +94,10 @@ xids_create_child(XIDS parent_xids, // xids list for parent transaction
void
xids_create_from_buffer(struct rbuf *rb, // xids list for parent transaction
XIDS * xids_p) { // xids list created
u_int8_t num_stored_xids = rbuf_char(rb);
u_int8_t num_xids = num_stored_xids + 1;
u_int32_t num_stored_xids = rbuf_char(rb);
u_int32_t num_xids = num_stored_xids + 1;
assert(num_xids > 0);
assert(num_xids < MAX_TRANSACTION_RECORDS);
assert(num_xids <= MAX_TRANSACTION_RECORDS);
XIDS xids = toku_xmalloc(sizeof(*xids) + num_stored_xids*sizeof(xids->ids[0]));
xids->num_stored_xids = num_stored_xids;
u_int8_t index;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment