Commit da3b6935 authored by Rich Prohaska's avatar Rich Prohaska

merge branch 838 to main. addresses #838

git-svn-id: file:///svn/tokudb@4493 c7de825b-a66e-492c-adef-691d508d4ae1
parent df7304b2
...@@ -2703,7 +2703,7 @@ static int bessel_from_search_t (OMTVALUE lev, void *extra) { ...@@ -2703,7 +2703,7 @@ static int bessel_from_search_t (OMTVALUE lev, void *extra) {
LESWITCHCALL(leafval, pair_leafval_bessel, search); LESWITCHCALL(leafval, pair_leafval_bessel, search);
} }
static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, OMTCURSOR omtcursor) { static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger, OMTCURSOR omtcursor) {
// Now we have to convert from brt_search_t to the bessel function with a direction. What a pain... // Now we have to convert from brt_search_t to the bessel function with a direction. What a pain...
int direction; int direction;
switch (search->direction) { switch (search->direction) {
...@@ -2723,12 +2723,35 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT ...@@ -2723,12 +2723,35 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT
LEAFENTRY le = datav; LEAFENTRY le = datav;
if (le_is_provdel(le)) { if (le_is_provdel(le)) {
TXNID xid = le_any_xid(le);
TOKUTXN txn = 0;
toku_txn_find_by_xid(brt, xid, &txn);
// Provisionally deleted stuff is gone. // Provisionally deleted stuff is gone.
// So we need to scan in the direction to see if we can find something // So we need to scan in the direction to see if we can find something
while (1) { while (1) {
// see if the transaction is alive
TXNID newxid = le_any_xid(le);
if (newxid != xid) {
xid = newxid;
txn = 0;
toku_txn_find_by_xid(brt, xid, &txn);
}
switch (search->direction) { switch (search->direction) {
case BRT_SEARCH_LEFT: case BRT_SEARCH_LEFT:
if (txn) {
// printf("xid %llu -> %p\n", (unsigned long long) xid, txn);
idx++; idx++;
} else {
// apply a commit message for this leafentry to the node
// printf("apply commit_both %llu\n", (unsigned long long) xid);
DBT key, val;
BRT_CMD_S brtcmd = { BRT_COMMIT_BOTH, xid, .u.id= {toku_fill_dbt(&key, le_latest_key(le), le_latest_keylen(le)),
toku_fill_dbt(&val, le_latest_val(le), le_latest_vallen(le))} };
r = brt_leaf_apply_cmd_once(brt, node, &brtcmd, logger, idx, le);
assert(r == 0);
}
if (idx>=toku_omt_size(node->u.l.buffer)) return DB_NOTFOUND; if (idx>=toku_omt_size(node->u.l.buffer)) return DB_NOTFOUND;
break; break;
case BRT_SEARCH_RIGHT: case BRT_SEARCH_RIGHT:
...@@ -2760,7 +2783,7 @@ static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *new ...@@ -2760,7 +2783,7 @@ static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *new
if (node->height > 0) if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, split, logger, omtcursor); return brt_search_nonleaf_node(brt, node, search, newkey, newval, split, logger, omtcursor);
else else
return brt_search_leaf_node(brt, node, search, newkey, newval, omtcursor); return brt_search_leaf_node(brt, node, search, newkey, newval, logger, omtcursor);
} }
int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger, OMTCURSOR omtcursor, uint64_t *root_put_counter) int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger, OMTCURSOR omtcursor, uint64_t *root_put_counter)
......
...@@ -407,3 +407,24 @@ u_int32_t any_vallen_le_provpair (TXNID UU(xid), u_int32_t UU(klen), void *UU(kv ...@@ -407,3 +407,24 @@ u_int32_t any_vallen_le_provpair (TXNID UU(xid), u_int32_t UU(klen), void *UU(kv
u_int32_t le_any_vallen (LEAFENTRY le) { u_int32_t le_any_vallen (LEAFENTRY le) {
LESWITCHCALL(le, any_vallen); LESWITCHCALL(le, any_vallen);
} }
u_int64_t any_xid_le_committed (u_int32_t UU(keylen), void *UU(key), u_int32_t UU(vallen), void *UU(val)) {
return 0;
}
u_int64_t any_xid_le_both (TXNID xid, u_int32_t UU(klen), void *UU(kval), u_int32_t UU(clen), void *UU(cval), u_int32_t UU(plen), void *UU(pval)) {
return xid;
}
u_int64_t any_xid_le_provdel (TXNID xid, u_int32_t UU(klen), void *UU(kval), u_int32_t UU(clen), void *UU(cval)) {
return xid;
}
u_int64_t any_xid_le_provpair (TXNID xid, u_int32_t UU(klen), void *UU(kval), u_int32_t UU(plen), void *UU(pval)) {
return xid;
}
u_int64_t le_any_xid (LEAFENTRY le) {
LESWITCHCALL(le, any_xid);
}
...@@ -132,6 +132,7 @@ void* le_any_key (LEAFENTRY le); ...@@ -132,6 +132,7 @@ void* le_any_key (LEAFENTRY le);
u_int32_t le_any_keylen (LEAFENTRY le); u_int32_t le_any_keylen (LEAFENTRY le);
void* le_any_val (LEAFENTRY le); void* le_any_val (LEAFENTRY le);
u_int32_t le_any_vallen (LEAFENTRY le); u_int32_t le_any_vallen (LEAFENTRY le);
u_int64_t le_any_xid (LEAFENTRY le);
#endif #endif
...@@ -1001,11 +1001,10 @@ int toku_read_rollback_backwards(int fd, off_t at, struct roll_entry **item, off ...@@ -1001,11 +1001,10 @@ int toku_read_rollback_backwards(int fd, off_t at, struct roll_entry **item, off
return 0; return 0;
} }
static int find_xid (OMTVALUE v, void *txnv) {
static int find_ptr (OMTVALUE v, void *vfind) { TOKUTXN txn = v;
if (v<vfind) return -1; TOKUTXN txnfind = txnv;
if (v>vfind) return +1; return txn->txnid64 - txnfind->txnid64;
return 0;
} }
static int find_filenum (OMTVALUE v, void *brtv) { static int find_filenum (OMTVALUE v, void *brtv) {
...@@ -1022,7 +1021,7 @@ static int find_filenum (OMTVALUE v, void *brtv) { ...@@ -1022,7 +1021,7 @@ static int find_filenum (OMTVALUE v, void *brtv) {
int toku_txn_note_brt (TOKUTXN txn, BRT brt) { int toku_txn_note_brt (TOKUTXN txn, BRT brt) {
OMTVALUE txnv; OMTVALUE txnv;
u_int32_t index; u_int32_t index;
int r = toku_omt_find_zero(brt->txns, find_ptr, txn, &txnv, &index, NULL); int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv, &index, NULL);
if (r==0) { if (r==0) {
// It's already there. // It's already there.
assert((TOKUTXN)txnv==txn); assert((TOKUTXN)txnv==txn);
...@@ -1060,7 +1059,7 @@ static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) { ...@@ -1060,7 +1059,7 @@ static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) {
TOKUTXN txn = txnv; TOKUTXN txn = txnv;
OMTVALUE txnv_again=NULL; OMTVALUE txnv_again=NULL;
u_int32_t index; u_int32_t index;
int r = toku_omt_find_zero(brt->txns, find_ptr, txn, &txnv_again, &index, NULL); int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv_again, &index, NULL);
assert(r==0); assert(r==0);
assert((void*)txnv_again==txnv); assert((void*)txnv_again==txnv);
r = toku_omt_delete_at(brt->txns, index); r = toku_omt_delete_at(brt->txns, index);
...@@ -1073,3 +1072,12 @@ static void note_txn_closing (TOKUTXN txn) { ...@@ -1073,3 +1072,12 @@ static void note_txn_closing (TOKUTXN txn) {
toku_omt_iterate(txn->open_brts, remove_txn, txn); toku_omt_iterate(txn->open_brts, remove_txn, txn);
toku_omt_destroy(&txn->open_brts); toku_omt_destroy(&txn->open_brts);
} }
int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr) {
struct tokutxn fake_txn; fake_txn.txnid64 = xid;
uint32_t index;
OMTVALUE txnv;
int r = toku_omt_find_zero(brt->txns, find_xid, &fake_txn, &txnv, &index, NULL);
if (r == 0) *txnptr = txnv;
return r;
}
...@@ -169,4 +169,8 @@ int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item); ...@@ -169,4 +169,8 @@ int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item);
int toku_txn_note_brt (TOKUTXN txn, BRT brt); int toku_txn_note_brt (TOKUTXN txn, BRT brt);
int toku_txn_note_close_brt (BRT brt); int toku_txn_note_close_brt (BRT brt);
// find the TOKUTXN object by xid
// if found then return 0 and set txnptr to the address of the TOKUTXN object
int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr);
#endif #endif
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment