Commit 3b603521 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1737 refs[t:1737] Mega abort/mega commit messages coded

These messages are broadcast to an entire dictionary, aborting/committing a particular transaction only.

git-svn-id: file:///svn/toku/tokudb@14571 c7de825b-a66e-492c-adef-691d508d4ae1
parent ebeb25a8
...@@ -1241,6 +1241,8 @@ should_compare_both_keys (BRTNODE node, BRT_MSG cmd) ...@@ -1241,6 +1241,8 @@ should_compare_both_keys (BRTNODE node, BRT_MSG cmd)
case BRT_ABORT_ANY: case BRT_ABORT_ANY:
case BRT_COMMIT_ANY: case BRT_COMMIT_ANY:
case BRT_COMMIT_BROADCAST_ALL: case BRT_COMMIT_BROADCAST_ALL:
case BRT_COMMIT_BROADCAST_TXN:
case BRT_ABORT_BROADCAST_TXN:
return 0; return 0;
case BRT_NONE: case BRT_NONE:
break; break;
...@@ -1503,6 +1505,7 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd, ...@@ -1503,6 +1505,7 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd,
OMTVALUE storeddatav=NULL; OMTVALUE storeddatav=NULL;
u_int32_t idx; u_int32_t idx;
u_int32_t omt_size;
int r; int r;
int compare_both = should_compare_both_keys(node, cmd); int compare_both = should_compare_both_keys(node, cmd);
struct cmd_leafval_heaviside_extra be = {t, cmd, compare_both}; struct cmd_leafval_heaviside_extra be = {t, cmd, compare_both};
...@@ -1625,7 +1628,7 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd, ...@@ -1625,7 +1628,7 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd,
case BRT_COMMIT_BROADCAST_ALL: case BRT_COMMIT_BROADCAST_ALL:
// Apply to all leafentries // Apply to all leafentries
idx = 0; idx = 0;
u_int32_t omt_size = toku_omt_size(node->u.l.buffer); omt_size = toku_omt_size(node->u.l.buffer);
for (idx = 0; idx < omt_size; ) { for (idx = 0; idx < omt_size; ) {
r = toku_omt_fetch(node->u.l.buffer, idx, &storeddatav, NULL); r = toku_omt_fetch(node->u.l.buffer, idx, &storeddatav, NULL);
assert(r==0); assert(r==0);
...@@ -1647,6 +1650,35 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd, ...@@ -1647,6 +1650,35 @@ brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd,
} }
assert(toku_omt_size(node->u.l.buffer) == omt_size); assert(toku_omt_size(node->u.l.buffer) == omt_size);
break;
case BRT_COMMIT_BROADCAST_TXN:
case BRT_ABORT_BROADCAST_TXN:
// Apply to all leafentries if txn is represented
idx = 0;
omt_size = toku_omt_size(node->u.l.buffer);
for (idx = 0; idx < omt_size; ) {
r = toku_omt_fetch(node->u.l.buffer, idx, &storeddatav, NULL);
assert(r==0);
storeddata=storeddatav;
int deleted = 0;
if (le_has_xids(storeddata, cmd->xids)) {
r = brt_leaf_apply_cmd_once(node, cmd, idx, storeddata);
if (r!=0) return r;
u_int32_t new_omt_size = toku_omt_size(node->u.l.buffer);
if (new_omt_size != omt_size) {
assert(new_omt_size+1 == omt_size);
//Item was deleted.
deleted = 1;
}
node->dirty = 1;
}
if (deleted)
omt_size--;
else
idx++;
}
assert(toku_omt_size(node->u.l.buffer) == omt_size);
break; break;
case BRT_NONE: return EINVAL; case BRT_NONE: return EINVAL;
...@@ -1887,6 +1919,8 @@ brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd, ...@@ -1887,6 +1919,8 @@ brt_nonleaf_put_cmd (BRT t, BRTNODE node, BRT_MSG cmd,
if (0 == (node->flags & TOKU_DB_DUPSORT)) goto do_once; // for nondupsort brt, delete_any message goes to one child. if (0 == (node->flags & TOKU_DB_DUPSORT)) goto do_once; // for nondupsort brt, delete_any message goes to one child.
return brt_nonleaf_cmd_many(t, node, cmd, re_array, did_io); // send message to at least one, possibly all children return brt_nonleaf_cmd_many(t, node, cmd, re_array, did_io); // send message to at least one, possibly all children
case BRT_COMMIT_BROADCAST_ALL: case BRT_COMMIT_BROADCAST_ALL:
case BRT_COMMIT_BROADCAST_TXN:
case BRT_ABORT_BROADCAST_TXN:
return brt_nonleaf_cmd_all (t, node, cmd, re_array, did_io); // send message to all children return brt_nonleaf_cmd_all (t, node, cmd, re_array, did_io); // send message to all children
case BRT_NONE: case BRT_NONE:
break; break;
......
...@@ -115,6 +115,8 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) { ...@@ -115,6 +115,8 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
case BRT_COMMIT_ANY: printf("COMMIT_ANY"); goto ok; case BRT_COMMIT_ANY: printf("COMMIT_ANY"); goto ok;
case BRT_COMMIT_BOTH: printf("COMMIT_BOTH"); goto ok; case BRT_COMMIT_BOTH: printf("COMMIT_BOTH"); goto ok;
case BRT_COMMIT_BROADCAST_ALL: printf("COMMIT_BROADCAST_ALL"); goto ok; case BRT_COMMIT_BROADCAST_ALL: printf("COMMIT_BROADCAST_ALL"); goto ok;
case BRT_COMMIT_BROADCAST_TXN: printf("COMMIT_BROADCAST_TXN"); goto ok;
case BRT_ABORT_BROADCAST_TXN: printf("ABORT_BROADCAST_TXN"); goto ok;
} }
printf("HUH?"); printf("HUH?");
ok: ok:
......
...@@ -91,7 +91,9 @@ enum brt_msg_type { ...@@ -91,7 +91,9 @@ enum brt_msg_type {
BRT_ABORT_BOTH = 5, // Abort commands that match both the key and the value BRT_ABORT_BOTH = 5, // Abort commands that match both the key and the value
BRT_COMMIT_ANY = 6, BRT_COMMIT_ANY = 6,
BRT_COMMIT_BOTH = 7, BRT_COMMIT_BOTH = 7,
BRT_COMMIT_BROADCAST_ALL = 8 // Broadcast to all leafentries, (commit all transactions). BRT_COMMIT_BROADCAST_ALL = 8, // Broadcast to all leafentries, (commit all transactions).
BRT_COMMIT_BROADCAST_TXN = 9, // Broadcast to all leafentries, (commit specific transaction).
BRT_ABORT_BROADCAST_TXN = 10, // Broadcast to all leafentries, (commit specific transaction).
}; };
typedef struct xids_t *XIDS; typedef struct xids_t *XIDS;
......
...@@ -91,6 +91,7 @@ void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le); ...@@ -91,6 +91,7 @@ void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le);
int print_leafentry (FILE *outf, LEAFENTRY v); // Print a leafentry out in human-readable form. int print_leafentry (FILE *outf, LEAFENTRY v); // Print a leafentry out in human-readable form.
int le_is_provdel(LEAFENTRY le); // Return true if it is a provisional delete. int le_is_provdel(LEAFENTRY le); // Return true if it is a provisional delete.
int le_has_xids(LEAFENTRY le, XIDS xids); // Return true transaction represented by xids is still provisional in this leafentry (le's xid stack is a superset or equal to xids)
void* le_latest_key (LEAFENTRY le); // Return the latest key (return NULL for provisional deletes) void* le_latest_key (LEAFENTRY le); // Return the latest key (return NULL for provisional deletes)
u_int32_t le_latest_keylen (LEAFENTRY le); // Return the latest keylen. u_int32_t le_latest_keylen (LEAFENTRY le); // Return the latest keylen.
void* le_latest_key_and_len (LEAFENTRY le, u_int32_t *len); void* le_latest_key_and_len (LEAFENTRY le, u_int32_t *len);
......
...@@ -176,10 +176,12 @@ msg_modify_ule(ULE ule, BRT_MSG msg) { ...@@ -176,10 +176,12 @@ msg_modify_ule(ULE ule, BRT_MSG msg) {
break; break;
case BRT_ABORT_ANY: case BRT_ABORT_ANY:
case BRT_ABORT_BOTH: case BRT_ABORT_BOTH:
case BRT_ABORT_BROADCAST_TXN:
ule_apply_abort(ule, xids); ule_apply_abort(ule, xids);
break; break;
case BRT_COMMIT_ANY: case BRT_COMMIT_ANY:
case BRT_COMMIT_BOTH: case BRT_COMMIT_BOTH:
case BRT_COMMIT_BROADCAST_TXN:
ule_apply_commit(ule, xids); ule_apply_commit(ule, xids);
break; break;
default: default:
...@@ -756,6 +758,60 @@ int le_is_provdel(LEAFENTRY le) { ...@@ -756,6 +758,60 @@ int le_is_provdel(LEAFENTRY le) {
return rval; return rval;
} }
int
le_has_xids(LEAFENTRY le, XIDS xids) {
int rval;
//Read num_uxrs
u_int8_t num_uxrs = le->num_xrs;
assert(num_uxrs > 0);
u_int8_t num_xids = xids_get_num_xids(xids);
assert(num_xids > 1); //Disallow checking for having 'root txn'
if (num_xids > num_uxrs) {
//Not enough transaction records in le to have all of xids
rval = 0;
goto have_answer;
}
if (le_outermost_uncommitted_xid(le) != xids_get_xid(xids, 1)) {
rval = 0;
goto have_answer;
}
if (num_xids == 2) {
//Outermost uncommitted xid is the only xid (other than 0). We're done.
rval = 1;
goto have_answer;
}
//Hard case: shares outermost uncommitted xid, but has more in the stack.
//TODO: Optimize hard case by using leafentry_memsize as a template to do part of the 'unpacking'
ULE_S ule;
le_unpack(&ule, le);
u_int8_t idx = num_xids -1;
rval = xids_get_xid(xids, idx) == ule_get_xid(&ule, idx);
goto have_answer;
have_answer:
#if ULE_DEBUG
{
u_int32_t num_xids_slow = xids_get_num_xids(xids);
int slow_rval = 0;
ULE_S ule_slow;
le_unpack(&ule_slow, le);
if (num_xids_slow > 1 && ule_slow.num_uxrs >= num_xids_slow) {
u_int32_t idx_slow;
for (idx_slow = 0; idx_slow < num_xids_slow; idx_slow++) {
if (xids_get_xid(xids, idx_slow) != ule_get_xid(&ule_slow, idx_slow))
break;
}
if (idx_slow == num_xids_slow)
slow_rval = 1;
}
assert(slow_rval == rval);
}
#endif
return rval;
}
//If le_is_provdel, return (NULL,0) //If le_is_provdel, return (NULL,0)
//Else, return (key,keylen) //Else, return (key,keylen)
void* void*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment