Commit 60274904 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Add the list of txns. Addresses #735. Fixes #749.

git-svn-id: file:///svn/tokudb@3659 c7de825b-a66e-492c-adef-691d508d4ae1
parent 72707043
......@@ -1810,6 +1810,8 @@ int toku_brt_create(BRT *brt_ptr) {
brt->nodesize = BRT_DEFAULT_NODE_SIZE;
brt->compare_fun = toku_default_compare_fun;
brt->dup_compare = toku_default_compare_fun;
int r = toku_omt_create(&brt->txns);
if (r!=0) { toku_free(brt); return r; }
*brt_ptr = brt;
return 0;
}
......@@ -2114,6 +2116,10 @@ int toku_close_brt (BRT brt, TOKULOGGER logger) {
if (brt->fname) toku_free(brt->fname);
if (brt->skey) { toku_free(brt->skey); }
if (brt->sval) { toku_free(brt->sval); }
assert(toku_omt_size(brt->txns)==0);
r=toku_txn_note_close_brt(brt);
assert(r==0);
toku_omt_destroy(&brt->txns);
toku_free(brt);
return 0;
}
......@@ -2284,6 +2290,8 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
toku_cachefile_refup(brt->cf);
r = toku_logger_save_rollback_cmdinsert(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs, databs);
if (r!=0) return r;
r = toku_txn_note_brt(txn, brt);
if (r!=0) return r;
}
BRT_CMD_S brtcmd = { BRT_INSERT, toku_txn_get_txnid(txn), .u.id={key,val}};
r = toku_brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
......@@ -2313,6 +2321,8 @@ int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
toku_cachefile_refup(brt->cf);
r = toku_logger_save_rollback_cmddelete(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs);
if (r!=0) return r;
r = toku_txn_note_brt(txn, brt);
if (r!=0) return r;
}
DBT val;
BRT_CMD_S brtcmd = { BRT_DELETE_ANY, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}};
......@@ -2328,6 +2338,8 @@ int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
toku_cachefile_refup(brt->cf);
r = toku_logger_save_rollback_cmddeleteboth(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs, databs);
if (r!=0) return r;
r = toku_txn_note_brt(txn, brt);
if (r!=0) return r;
}
BRT_CMD_S brtcmd = { BRT_DELETE_BOTH, toku_txn_get_txnid(txn), .u.id={key,val}};
r = toku_brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
......
This diff is collapsed.
......@@ -92,7 +92,7 @@ struct tokutxn {
char *rollentry_filename;
int rollentry_fd; // If we spill the roll_entries, we write them into this fd.
off_t rollentry_filesize; // How many bytes are in the rollentry.
OMT used_open_brtcachefile_pairs; // a collection of the brts that we touched and which are still open.
OMT open_brts; // a collection of the brts that we touched. Indexed by filenum.
};
int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbuf *wbuf, int do_fsync);
......
......@@ -343,6 +343,8 @@ int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbu
return toku_logger_log_bytes(logger, logbytes, do_fsync);
}
static void note_txn_closing (TOKUTXN txn);
static void cleanup_txn (TOKUTXN txn) {
struct roll_entry *item;
while ((item=txn->newest_logentry)) {
......@@ -362,10 +364,8 @@ static void cleanup_txn (TOKUTXN txn) {
}
list_remove(&txn->live_txns_link);
assert(toku_omt_size(txn->used_open_brtcachefile_pairs)==0);
toku_omt_destroy(&txn->used_open_brtcachefile_pairs);
note_txn_closing(txn);
toku_free(txn);
return;
}
......@@ -393,6 +393,10 @@ int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item) {
return 0;
}
static int note_brt_used_in_parent_txn(OMTVALUE brtv, u_int32_t UU(index), void*parentv) {
return toku_txn_note_brt(parentv, brtv);
}
int toku_logger_commit (TOKUTXN txn, int nosync) {
// printf("%s:%d committing\n", __FILE__, __LINE__);
// panic handled in log_commit
......@@ -433,6 +437,10 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
}
txn->newest_logentry = txn->oldest_logentry = 0;
// Note the open brts, the omts must be merged
r = toku_omt_iterate(txn->open_brts, note_brt_used_in_parent_txn, txn->parent);
assert(r==0);
} else {
// do the commit calls and free everything
// we do the commit calls in reverse order too.
......@@ -470,7 +478,7 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER
if (result==0) return errno;
int r =toku_log_xbegin(logger, &result->first_lsn, 0, parent_tokutxn ? parent_tokutxn->txnid64 : 0);
if (r!=0) { toku_logger_panic(logger, r); toku_free(result); return r; }
if ((r=toku_omt_create(&result->used_open_brtcachefile_pairs))!=0) {
if ((r=toku_omt_create(&result->open_brts))!=0) {
toku_logger_panic(logger, r);
toku_free(result);
return r;
......@@ -969,3 +977,76 @@ int toku_read_rollback_backwards(int fd, off_t at, struct roll_entry **item, off
toku_free(buf);
return 0;
}
static int find_ptr (OMTVALUE v, void *vfind) {
if (v<vfind) return -1;
if (v>vfind) return +1;
return 0;
}
static int find_filenum (OMTVALUE v, void *brtv) {
BRT brt = v;
BRT brtfind = brtv;
FILENUM fnum = toku_cachefile_filenum(brt ->cf);
FILENUM fnumfind = toku_cachefile_filenum(brtfind->cf);
if (fnum.fileid<fnumfind.fileid) return -1;
if (fnum.fileid>fnumfind.fileid) return +1;
return 0;
}
int toku_txn_note_brt (TOKUTXN txn, BRT brt) {
OMTVALUE txnv;
u_int32_t index;
int r = toku_omt_find_zero(brt->txns, find_ptr, txn, &txnv, &index);
if (r==0) {
// It's already there.
assert((TOKUTXN)txnv==txn);
return 0;
}
// Otherwise it's not there.
r = toku_omt_insert_at(brt->txns, txn, index);
assert(r==0);
r = toku_omt_insert(txn->open_brts, brt, find_filenum, brt, 0);
assert(r==0);
return 0;
}
static int remove_brt (OMTVALUE txnv, u_int32_t UU(idx), void *brtv) {
TOKUTXN txn = txnv;
BRT brt = brtv;
OMTVALUE brtv_again;
u_int32_t index;
int r = toku_omt_find_zero(txn->open_brts, find_filenum, brt, &brtv_again, &index);
assert(r==0);
assert((void*)brtv_again==brtv);
r = toku_omt_delete_at(txn->open_brts, index);
assert(r==0);
return 0;
}
int toku_txn_note_close_brt (BRT brt) {
int r = toku_omt_iterate(brt->txns, remove_brt, brt);
assert(r==0);
return 0;
}
static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) {
BRT brt = brtv;
TOKUTXN txn = txnv;
OMTVALUE txnv_again;
u_int32_t index;
int r = toku_omt_find_zero(brt->txns, find_ptr, txn, &txnv_again, &index);
assert(r==0);
assert((void*)txnv_again==txnv);
r = toku_omt_delete_at(brt->txns, index);
assert(r==0);
return 0;
}
// for every BRT in txn, remove it.
static void note_txn_closing (TOKUTXN txn) {
toku_omt_iterate(txn->open_brts, remove_txn, txn);
toku_omt_destroy(&txn->open_brts);
}
......@@ -165,4 +165,7 @@ int toku_commit_fileentries (int fd, off_t filesize, TOKUTXN txn);
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item);
int toku_txn_note_brt (TOKUTXN txn, BRT brt);
int toku_txn_note_close_brt (BRT brt);
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment