Commit a12ded1a authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1866 Oldest living xid stored in logger instead of a global.

Cursors take a copy of oldest living xid upon creation, which they use for implicit promotion

git-svn-id: file:///svn/toku/tokudb@13606 c7de825b-a66e-492c-adef-691d508d4ae1
parent 681c5dd3
......@@ -277,6 +277,7 @@ struct brt_cursor {
DBT key, val; // The key-value pair that the cursor currently points to
OMTCURSOR omtcursor;
u_int64_t root_put_counter; // what was the count on the BRT when we validated the cursor?
TXNID oldest_living_xid;// what was the oldest live txnid when we created the cursor?
struct brt_cursor_leaf_info leaf_info;
};
......
......@@ -1346,7 +1346,7 @@ brt_leaf_apply_full_promotion_once (BRTNODE node, LEAFENTRY le)
}
static void
maybe_do_implicit_promotion_on_query (BRTNODE node, LEAFENTRY le) {
maybe_do_implicit_promotion_on_query (BRT_CURSOR brtcursor, LEAFENTRY le) {
//Requires: le is not a provdel (Callers never call it unless not provdel).
//assert(!le_is_provdel(le)); //Must be as fast as possible. Assert is superfluous.
......@@ -1362,8 +1362,8 @@ maybe_do_implicit_promotion_on_query (BRTNODE node, LEAFENTRY le) {
// * We will sometimes say a txn is uncommitted when it is committed.
// * We will NEVER say a txn is committed when it is uncommitted.
TXNID outermost_uncommitted_xid = le_outermost_uncommitted_xid(le);
if (outermost_uncommitted_xid != 0 && outermost_uncommitted_xid < oldest_living_xid) {
brt_leaf_apply_full_promotion_once(node, le);
if (outermost_uncommitted_xid != 0 && outermost_uncommitted_xid < brtcursor->oldest_living_xid) {
brt_leaf_apply_full_promotion_once(brtcursor->leaf_info.node, le);
}
}
......@@ -3305,7 +3305,7 @@ brt_cursor_invalidate(BRT_CURSOR brtcursor) {
}
}
int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr) {
int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, TOKULOGGER logger) {
BRT_CURSOR cursor = toku_malloc(sizeof *cursor);
if (cursor == 0)
return ENOMEM;
......@@ -3313,6 +3313,8 @@ int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr) {
cursor->brt = brt;
cursor->current_in_omt = FALSE;
cursor->prefetching = FALSE;
cursor->oldest_living_xid = toku_logger_get_oldest_living_xid(logger);
assert(cursor->oldest_living_xid != MAX_TXNID);
list_push(&brt->cursors, &cursor->cursors_link);
int r = toku_omt_cursor_create(&cursor->omtcursor);
assert(r==0);
......@@ -3474,7 +3476,9 @@ brt_search_leaf_node(BRTNODE node, brt_search_t *search, BRT_GET_STRADDLE_CALLBA
}
}
got_a_good_value:
maybe_do_implicit_promotion_on_query(node, le);
//Save node ptr in brtcursor (implicit promotion requires it).
brtcursor->leaf_info.node = node;
maybe_do_implicit_promotion_on_query(brtcursor, le);
{
u_int32_t keylen;
bytevec key = le_latest_key_and_len(le, &keylen);
......@@ -3501,7 +3505,6 @@ brt_search_leaf_node(BRTNODE node, brt_search_t *search, BRT_GET_STRADDLE_CALLBA
brtcursor->leaf_info.fullhash = node->fullhash;
brtcursor->leaf_info.blocknumber = node->thisnodename;
#endif
brtcursor->leaf_info.node = node;
brtcursor->leaf_info.leaflock = node->u.l.leaflock;
brtcursor->leaf_info.to_be.omt = node->u.l.buffer;
brtcursor->leaf_info.to_be.index = idx;
......@@ -3890,7 +3893,7 @@ brt_cursor_shortcut (BRT_CURSOR cursor, int direction, u_int32_t limit, BRT_GET_
assert(r==0);
if (!le_is_provdel(le)) {
maybe_do_implicit_promotion_on_query(cursor->leaf_info.node, le);
maybe_do_implicit_promotion_on_query(cursor, le);
u_int32_t keylen;
bytevec key = le_latest_key_and_len(le, &keylen);
u_int32_t vallen;
......@@ -4357,7 +4360,7 @@ toku_brt_lookup (BRT brt, DBT *k, DBT *v, BRT_GET_CALLBACK_FUNCTION getf, void *
int r, rr;
BRT_CURSOR cursor;
rr = toku_brt_cursor(brt, &cursor);
rr = toku_brt_cursor(brt, &cursor, NULL);
if (rr != 0) return rr;
int op = brt->flags & TOKU_DB_DUPSORT ? DB_GET_BOTH : DB_SET;
......@@ -4723,7 +4726,7 @@ brt_is_empty (BRT brt, TOKULOGGER logger) {
BRT_CURSOR cursor;
int r, r2;
BOOL is_empty;
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, NULL);
if (r == 0) {
r = toku_brt_cursor_first(cursor, getf_nothing, NULL, logger);
r2 = toku_brt_cursor_close(cursor);
......
......@@ -84,7 +84,7 @@ int toku_verify_brt (BRT brt);
//int show_brt_blocknumbers(BRT);
typedef struct brt_cursor *BRT_CURSOR;
int toku_brt_cursor (BRT, BRT_CURSOR*);
int toku_brt_cursor (BRT, BRT_CURSOR*, TOKULOGGER);
// get is deprecated in favor of the individual functions below
int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, int get_flags, TOKUTXN txn);
......
......@@ -77,6 +77,7 @@ struct tokulogger {
int n_in_file;
u_int32_t write_block_size; // How big should the blocks be written to various logs?
TXNID oldest_living_xid;
};
int toku_logger_find_next_unused_log_file(const char *directory, long long *result);
......
......@@ -29,6 +29,7 @@ int toku_logger_create (TOKULOGGER *resultp) {
result->n_in_file=0;
result->directory=0;
result->checkpoint_lsn=(LSN){0};
result->oldest_living_xid = MAX_TXNID;
result->write_block_size = BRT_DEFAULT_NODE_SIZE; // default logging size is the same as the default brt block size
*resultp=result;
r = ml_init(&result->input_lock); if (r!=0) goto died1;
......@@ -812,7 +813,8 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
// get them into increasing order
qsort(all_logs, all_n_logs, sizeof(all_logs[0]), logfilenamecompare);
LSN oldest_live_txn_lsn={.lsn = oldest_living_xid};
LSN oldest_live_txn_lsn={.lsn = toku_logger_get_oldest_living_xid(logger)};
assert(oldest_live_txn_lsn.lsn != 0);
//printf("%s:%d Oldest txn is %lld\n", __FILE__, __LINE__, (long long)oldest_live_txn_lsn.lsn);
// Now starting at the last one, look for archivable ones.
......@@ -871,3 +873,11 @@ TOKUTXN toku_logger_txn_parent (TOKUTXN txn) {
void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn) {
logger->checkpoint_lsn = lsn;
}
TXNID toku_logger_get_oldest_living_xid(TOKULOGGER logger) {
TXNID rval = 0;
if (logger)
rval = logger->oldest_living_xid;
return rval;
}
......@@ -68,4 +68,6 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags);
TOKUTXN toku_logger_txn_parent (TOKUTXN txn);
void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn);
TXNID toku_logger_get_oldest_living_xid(TOKULOGGER logger);
#endif
......@@ -42,9 +42,9 @@ void toku_rollback_txn_close (TOKUTXN txn) {
assert(r==0);
}
assert(oldest_living_xid <= txn->txnid64);
assert(oldest_living_xid < MAX_TXNID);
if (txn->txnid64 == oldest_living_xid) {
assert(txn->logger->oldest_living_xid <= txn->txnid64);
assert(txn->logger->oldest_living_xid < MAX_TXNID);
if (txn->txnid64 == txn->logger->oldest_living_xid) {
TOKULOGGER logger = txn->logger;
OMTVALUE oldest_txnv;
......@@ -52,13 +52,13 @@ void toku_rollback_txn_close (TOKUTXN txn) {
if (r==0) {
TOKUTXN oldest_txn = oldest_txnv;
assert(oldest_txn != txn); // We just removed it
assert(oldest_txn->txnid64 > oldest_living_xid); //Must be newer than the previous oldest
oldest_living_xid = oldest_txn->txnid64;
assert(oldest_txn->txnid64 > txn->logger->oldest_living_xid); //Must be newer than the previous oldest
txn->logger->oldest_living_xid = oldest_txn->txnid64;
}
else {
//No living transactions
assert(r==EINVAL);
oldest_living_xid = MAX_TXNID;
txn->logger->oldest_living_xid = MAX_TXNID;
}
}
......
......@@ -50,7 +50,7 @@ static void test_sub_block(int n) {
assert(error == 0);
BRT_CURSOR cursor;
error = toku_brt_cursor(brt, &cursor);
error = toku_brt_cursor(brt, &cursor, NULL);
assert(error == 0);
for (i=0; ; i++) {
......
......@@ -51,7 +51,7 @@ static void test_multiple_brt_cursor_dbts(int n, DB *db) {
}
for (i=0; i<n; i++) {
r = toku_brt_cursor(brt, &cursors[i]);
r = toku_brt_cursor(brt, &cursors[i], NULL);
assert(r == 0);
}
......
......@@ -19,7 +19,7 @@ static void assert_cursor_notfound(BRT brt, int position) {
BRT_CURSOR cursor=0;
int r;
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, NULL);
assert(r==0);
struct check_pair pair = {0,0,0,0,0};
......@@ -35,7 +35,7 @@ static void assert_cursor_value(BRT brt, int position, long long value) {
BRT_CURSOR cursor=0;
int r;
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, NULL);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -52,7 +52,7 @@ static void assert_cursor_first_last(BRT brt, long long firstv, long long lastv)
BRT_CURSOR cursor=0;
int r;
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, NULL);
assert(r==0);
if (test_cursor_debug && verbose) printf("first key: ");
......@@ -250,7 +250,7 @@ static void assert_cursor_walk(BRT brt, int n) {
int i;
int r;
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, NULL);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -316,7 +316,7 @@ static void assert_cursor_rwalk(BRT brt, int n) {
int i;
int r;
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, NULL);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -402,7 +402,7 @@ static void assert_cursor_walk_inorder(BRT brt, int n) {
int r;
char *prevkey = 0;
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, NULL);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -504,7 +504,7 @@ static void test_brt_cursor_split(int n, DB *db) {
assert(r==0);
}
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, NULL);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -569,7 +569,7 @@ static void test_multiple_brt_cursors(int n, DB *db) {
int i;
for (i=0; i<n; i++) {
r = toku_brt_cursor(brt, &cursors[i]);
r = toku_brt_cursor(brt, &cursors[i], NULL);
assert(r == 0);
}
......@@ -619,7 +619,7 @@ static void test_multiple_brt_cursor_walk(int n, DB *db) {
int c;
/* create the cursors */
for (c=0; c<ncursors; c++) {
r = toku_brt_cursor(brt, &cursors[c]);
r = toku_brt_cursor(brt, &cursors[c], NULL);
assert(r == 0);
}
......@@ -706,7 +706,7 @@ static void test_brt_cursor_set(int n, int cursor_op, DB *db) {
assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, NULL);
assert(r==0);
/* set cursor to random keys in set { 0, 10, 20, .. 10*(n-1) } */
......@@ -779,7 +779,7 @@ static void test_brt_cursor_set_range(int n, DB *db) {
assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, NULL);
assert(r==0);
/* pick random keys v in 0 <= v < 10*n, the cursor should point
......@@ -829,7 +829,7 @@ static void test_brt_cursor_delete(int n, DB *db) {
error = toku_open_brt(fname, 1, &brt, 1<<12, ct, null_txn, test_brt_cursor_keycompare, db);
assert(error == 0);
error = toku_brt_cursor(brt, &cursor);
error = toku_brt_cursor(brt, &cursor, NULL);
assert(error == 0);
DBT key, val;
......@@ -890,7 +890,7 @@ static void test_brt_cursor_get_both(int n, DB *db) {
error = toku_open_brt(fname, 1, &brt, 1<<12, ct, null_txn, test_brt_cursor_keycompare, db);
assert(error == 0);
error = toku_brt_cursor(brt, &cursor);
error = toku_brt_cursor(brt, &cursor, NULL);
assert(error == 0);
{
......
......@@ -255,7 +255,7 @@ static void test_cursor_last_empty(void) {
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, toku_get_n_items_malloced()); toku_print_malloced_items();
r = toku_open_brt(fname, 1, &brt, 1<<12, ct, null_txn, toku_default_compare_fun, null_db); assert(r==0);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, toku_get_n_items_malloced()); toku_print_malloced_items();
r = toku_brt_cursor(brt, &cursor); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL); assert(r==0);
{
struct check_pair pair = {0,0,0,0,0};
r = toku_brt_cursor_get(cursor, NULL, NULL, lookup_checkf, &pair, DB_LAST, null_txn);
......@@ -291,7 +291,7 @@ static void test_cursor_next (void) {
r = toku_brt_insert(brt, toku_fill_dbt(&kbt, "hello", 6), toku_fill_dbt(&vbt, "there", 6), null_txn);
r = toku_brt_insert(brt, toku_fill_dbt(&kbt, "byebye", 7), toku_fill_dbt(&vbt, "byenow", 7), null_txn);
if (verbose) printf("%s:%d calling toku_brt_cursor(...)\n", __FILE__, __LINE__);
r = toku_brt_cursor(brt, &cursor); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL); assert(r==0);
toku_init_dbt(&kbt);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, toku_get_n_items_malloced()); toku_print_malloced_items();
toku_init_dbt(&vbt);
......@@ -383,7 +383,7 @@ static void test_wrongendian_compare (int wrong_p, unsigned int N) {
}
{
BRT_CURSOR cursor=0;
r = toku_brt_cursor(brt, &cursor); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL); assert(r==0);
for (i=0; i<2; i++) {
unsigned char a[4],b[4];
......@@ -423,7 +423,7 @@ static void test_wrongendian_compare (int wrong_p, unsigned int N) {
toku_cachetable_verify(ct);
}
BRT_CURSOR cursor=0;
r = toku_brt_cursor(brt, &cursor); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL); assert(r==0);
for (i=0; i<N; i++) {
unsigned char a[4],b[4];
......@@ -567,7 +567,7 @@ static void test_brt_delete_present(int n) {
/* cursor should not find anything */
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor);
r = toku_brt_cursor(t, &cursor, NULL);
assert(r == 0);
{
......@@ -698,7 +698,7 @@ static void test_brt_delete_cursor_first(int n) {
/* cursor should find the last key: n-1 */
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor);
r = toku_brt_cursor(t, &cursor, NULL);
assert(r == 0);
{
......@@ -820,7 +820,7 @@ static void test_brt_delete_both(int n) {
/* cursor should find only odd pairs */
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL); assert(r == 0);
for (i=1; ; i += 2) {
int kv = toku_htonl(0);
......@@ -866,7 +866,7 @@ static void test_new_brt_cursor_create_close (void) {
int i;
for (i=0; i<n; i++) {
r = toku_brt_cursor(brt, &cursors[i]); assert(r == 0);
r = toku_brt_cursor(brt, &cursors[i], NULL); assert(r == 0);
}
for (i=0; i<n; i++) {
......@@ -901,7 +901,7 @@ static void test_new_brt_cursor_first(int n, int dup_mode) {
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL); assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_REALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
......@@ -954,7 +954,7 @@ static void test_new_brt_cursor_last(int n, int dup_mode) {
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL); assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_REALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
......@@ -1007,7 +1007,7 @@ static void test_new_brt_cursor_next(int n, int dup_mode) {
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL); assert(r == 0);
for (i=0; ; i++) {
int kk = toku_htonl(i);
......@@ -1051,7 +1051,7 @@ static void test_new_brt_cursor_prev(int n, int dup_mode) {
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL); assert(r == 0);
for (i=n-1; ; i--) {
int kk = toku_htonl(i);
......@@ -1095,7 +1095,7 @@ static void test_new_brt_cursor_current(int n, int dup_mode) {
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL); assert(r == 0);
for (i=0; ; i++) {
{
......@@ -1180,7 +1180,7 @@ static void test_new_brt_cursor_set_range(int n, int dup_mode) {
r = toku_brt_insert(brt, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0); assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL); assert(r==0);
/* pick random keys v in 0 <= v < 10*n, the cursor should point
to the smallest key in the tree that is >= v */
......@@ -1238,7 +1238,7 @@ static void test_new_brt_cursor_set(int n, int cursor_op, DB *db) {
r = toku_brt_insert(brt, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0); assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL); assert(r==0);
/* set cursor to random keys in set { 0, 10, 20, .. 10*(n-1) } */
for (i=0; i<n; i++) {
......
......@@ -22,7 +22,7 @@ test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
r = toku_open_brt(fname, 1, &brt, 1<<12, ct, null_txn, test_brt_cursor_keycompare, db); assert(r==0);
r = toku_brt_cursor(brt, &cursor); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL); assert(r==0);
int i;
for (i=0; i<1000; i++) {
......
......@@ -6,8 +6,6 @@
#include "includes.h"
#include "txn.h"
TXNID oldest_living_xid = MAX_TXNID;
int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger) {
if (logger->is_panicked) return EINVAL;
TAGMALLOC(TOKUTXN, result);
......@@ -49,11 +47,11 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
result->rollentry_arena = memarena_create();
if (toku_omt_size(logger->live_txns) == 0) {
assert(oldest_living_xid == MAX_TXNID);
oldest_living_xid = result->txnid64;
assert(logger->oldest_living_xid == MAX_TXNID);
logger->oldest_living_xid = result->txnid64;
}
assert(oldest_living_xid < MAX_TXNID);
assert(oldest_living_xid <= result->txnid64);
assert(logger->oldest_living_xid < MAX_TXNID);
assert(logger->oldest_living_xid <= result->txnid64);
{
//Add txn to list (omt) of live transactions
......@@ -61,7 +59,7 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
r = toku_omt_insert(logger->live_txns, result, find_xid, result, &idx);
if (r!=0) goto died2;
if (oldest_living_xid == result->txnid64)
if (logger->oldest_living_xid == result->txnid64)
assert(idx == 0);
else
assert(idx > 0);
......
......@@ -10,6 +10,5 @@ int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void*yieldv);
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void*yieldv);
void toku_txn_close_txn(TOKUTXN txn);
XIDS toku_txn_get_xids (TOKUTXN);
extern TXNID oldest_living_xid;
#endif //TOKUTXN_H
......@@ -2752,7 +2752,7 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int
dbc_struct_i(result)->skey = &dbc_struct_i(result)->skey_s;
dbc_struct_i(result)->sval = &dbc_struct_i(result)->sval_s;
}
int r = toku_brt_cursor(db->i->brt, &dbc_struct_i(result)->c);
int r = toku_brt_cursor(db->i->brt, &dbc_struct_i(result)->c, db->dbenv->i->logger);
assert(r == 0);
*c = result;
return 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment