Commit 28e78543 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:2494], merge read committed to main

git-svn-id: file:///svn/toku/tokudb@19073 c7de825b-a66e-492c-adef-691d508d4ae1
parent 60d7a8bf
......@@ -189,6 +189,7 @@ typedef enum {
#define DB_TXN_NOWAIT 8192
#define DB_TXN_SYNC 16384
#define DB_READ_UNCOMMITTED 67108864
#define DB_READ_COMMITTED 33554432
#define DB_INHERIT_ISOLATION 1
#endif
/* TOKUDB specific error codes */
......
......@@ -189,6 +189,7 @@ typedef enum {
#define DB_TXN_NOWAIT 16384
#define DB_TXN_SYNC 32768
#define DB_READ_UNCOMMITTED 134217728
#define DB_READ_COMMITTED 67108864
#define DB_INHERIT_ISOLATION 1
#endif
/* TOKUDB specific error codes */
......
......@@ -191,6 +191,7 @@ typedef enum {
#define DB_TXN_NOWAIT 1024
#define DB_TXN_SYNC 16384
#define DB_READ_UNCOMMITTED 134217728
#define DB_READ_COMMITTED 67108864
#define DB_INHERIT_ISOLATION 1
#endif
/* TOKUDB specific error codes */
......
......@@ -177,6 +177,9 @@ static void print_defines (void) {
dodefine_track(txn_flags, DB_TXN_SYNC);
#ifdef DB_READ_UNCOMMITTED
dodefine_track(txn_flags, DB_READ_UNCOMMITTED);
#endif
#ifdef DB_READ_COMMITTED
dodefine_track(txn_flags, DB_READ_COMMITTED);
#endif
dodefine_from_track(txn_flags, DB_INHERIT_ISOLATION);
}
......
......@@ -191,6 +191,7 @@ typedef enum {
#define DB_TXN_NOWAIT 1024
#define DB_TXN_SYNC 16384
#define DB_READ_UNCOMMITTED 134217728
#define DB_READ_COMMITTED 67108864
#define DB_INHERIT_ISOLATION 1
#endif
/* TOKUDB specific error codes */
......
......@@ -191,6 +191,7 @@ typedef enum {
#define DB_TXN_NOWAIT 1024
#define DB_TXN_SYNC 16384
#define DB_READ_UNCOMMITTED 134217728
#define DB_READ_COMMITTED 67108864
#define DB_INHERIT_ISOLATION 1
#endif
/* TOKUDB specific error codes */
......
......@@ -289,6 +289,9 @@ struct brt_cursor {
OMTCURSOR omtcursor;
u_int64_t root_put_counter; // what was the count on the BRT when we validated the cursor?
TXNID oldest_living_xid;// what was the oldest live txnid when we created the cursor?
TOKULOGGER logger; // to give access to list of live transactions, needed for read_committed queries
TXNID ancestor_id; // txnid of ancestor, needed for read_committed queries
BOOL is_read_committed; // true if query is read_committed, false otherwise
struct brt_cursor_leaf_info leaf_info;
};
......
......@@ -4034,15 +4034,61 @@ brt_cursor_cleanup_dbts(BRT_CURSOR c) {
}
}
static inline void brt_cursor_extract_key_and_val(
LEAFENTRY le,
BRT_CURSOR cursor,
u_int32_t* keylen,
bytevec* key,
u_int32_t* vallen,
bytevec* val
)
{
if (cursor->is_read_committed) {
TXNID le_anc_id = le_outermost_uncommitted_xid(le);
if (le_anc_id < cursor->logger->oldest_living_xid || //current transaction has inserted this element
le_anc_id == 0 || // le is a committed value with no provisional data
le_anc_id == cursor->ancestor_id || //quick check to avoid more expensive is_txnid_live check
!is_txnid_live(cursor->logger,le_anc_id))
{
*key = le_latest_key_and_len(le, keylen);
*val = le_latest_val_and_len(le, vallen);
}
else {
*key = le_outermost_key_and_len(le, keylen);
*val = le_outermost_val_and_len(le, vallen);
}
}
else {
*key = le_latest_key_and_len(le, keylen);
*val = le_latest_val_and_len(le, vallen);
}
}
static inline void load_dbts_from_omt(BRT_CURSOR c, DBT *key, DBT *val) {
OMTVALUE le = 0;
int r = toku_omt_cursor_current(c->omtcursor, &le);
assert(r==0);
u_int32_t keylen;
bytevec key_vec = NULL;
u_int32_t vallen;
bytevec val_vec = NULL;
brt_cursor_extract_key_and_val(
le,
c,
&keylen,
&key_vec,
&vallen,
&val_vec
);
if (key) {
key->data = le_latest_key_and_len(le, &key->size);
key->data = (void *)key_vec;
key->size = keylen;
}
if (val) {
val->data = le_latest_val_and_len(le, &val->size);
val->data = (void *)val_vec;
val->size = vallen;
}
}
......@@ -4083,8 +4129,13 @@ brt_cursor_invalidate(BRT_CURSOR brtcursor) {
}
}
int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, TOKULOGGER logger) {
int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, TOKULOGGER logger, TXNID txnid, BOOL is_read_committed) {
BRT_CURSOR cursor = toku_malloc(sizeof *cursor);
// if this cursor is to do read_committed fetches, then the txn objects must be valid.
if (is_read_committed) {
assert(logger != NULL);
assert(txnid != TXNID_NONE);
}
if (cursor == 0)
return ENOMEM;
memset(cursor, 0, sizeof(*cursor));
......@@ -4092,6 +4143,9 @@ int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, TOKULOGGER logger) {
cursor->current_in_omt = FALSE;
cursor->prefetching = FALSE;
cursor->oldest_living_xid = toku_logger_get_oldest_living_xid(logger);
cursor->logger = logger;
cursor->ancestor_id = txnid;
cursor->is_read_committed = is_read_committed;
toku_list_push(&brt->cursors, &cursor->cursors_link);
int r = toku_omt_cursor_create(&cursor->omtcursor);
assert(r==0);
......@@ -4199,6 +4253,37 @@ brt_cursor_update(BRT_CURSOR brtcursor) {
toku_omt_cursor_set_index(omtcursor, brtcursor->leaf_info.to_be.index);
}
//
// Returns true if the value that is to be read is empty.
// If is_read_committed is false, then it checks the innermost value
// (and is the equivalent of le_is_provdel)
// If is_read_committed is true, then for live transactions, it checks the committed
// value in le. For committed transactions, it checks the innermost value
//
static inline int
is_le_val_empty(LEAFENTRY le, BRT_CURSOR brtcursor) {
if (brtcursor->is_read_committed) {
TXNID le_anc_id = le_outermost_uncommitted_xid(le);
if (le_anc_id < brtcursor->oldest_living_xid || //current transaction has inserted this element
le_anc_id == 0 || // le is a committed value with no provisional data
le_anc_id == brtcursor->ancestor_id|| //quick check to avoid more expensive is_txnid_live check
!is_txnid_live(brtcursor->logger,le_anc_id))
{
return le_is_provdel(le);
}
// le_anc_id is an active transaction,
else {
//
// need to check the committed val, which requires unpack of le
//
return le_outermost_is_del(le);
}
}
else {
return le_is_provdel(le);
}
}
// This is a bottom layer of the search functions.
static int
brt_search_leaf_node(BRTNODE node, brt_search_t *search, BRT_GET_STRADDLE_CALLBACK_FUNCTION getf, void *getf_v, enum reactivity *re, BOOL *doprefetch, BRT_CURSOR brtcursor)
......@@ -4224,7 +4309,7 @@ brt_search_leaf_node(BRTNODE node, brt_search_t *search, BRT_GET_STRADDLE_CALLBA
if (r!=0) return r;
LEAFENTRY le = datav;
if (le_is_provdel(le)) {
if (is_le_val_empty(le,brtcursor)) {
// Provisionally deleted stuff is gone.
// So we need to scan in the direction to see if we can find something
while (1) {
......@@ -4249,7 +4334,7 @@ brt_search_leaf_node(BRTNODE node, brt_search_t *search, BRT_GET_STRADDLE_CALLBA
r = toku_omt_fetch(node->u.l.buffer, idx, &datav, NULL);
assert(r==0); // we just validated the index
le = datav;
if (!le_is_provdel(le)) goto got_a_good_value;
if (!is_le_val_empty(le,brtcursor)) goto got_a_good_value;
}
}
got_a_good_value:
......@@ -4258,9 +4343,18 @@ brt_search_leaf_node(BRTNODE node, brt_search_t *search, BRT_GET_STRADDLE_CALLBA
maybe_do_implicit_promotion_on_query(brtcursor, le);
{
u_int32_t keylen;
bytevec key = le_latest_key_and_len(le, &keylen);
bytevec key = NULL;
u_int32_t vallen;
bytevec val = le_latest_val_and_len(le, &vallen);
bytevec val = NULL;
brt_cursor_extract_key_and_val(
le,
brtcursor,
&keylen,
&key,
&vallen,
&val
);
assert(brtcursor->current_in_omt == FALSE);
r = getf(keylen, key,
......@@ -4636,7 +4730,7 @@ int
toku_brt_flatten(BRT brt, TOKULOGGER logger)
{
BRT_CURSOR tmp_cursor;
int r = toku_brt_cursor(brt, &tmp_cursor, logger);
int r = toku_brt_cursor(brt, &tmp_cursor, logger, TXNID_NONE, FALSE);
if (r!=0) return r;
brt_search_t search; brt_search_init(&search, brt_cursor_compare_one, BRT_SEARCH_LEFT, 0, 0, tmp_cursor->brt);
r = brt_cursor_search(tmp_cursor, &search, brt_flatten_getf, NULL);
......@@ -4693,12 +4787,21 @@ brt_cursor_shortcut (BRT_CURSOR cursor, int direction, u_int32_t limit, BRT_GET_
r = toku_omt_fetch(omt, index, &le, NULL);
assert(r==0);
if (!le_is_provdel(le)) {
if (!is_le_val_empty(le,cursor)) {
maybe_do_implicit_promotion_on_query(cursor, le);
u_int32_t keylen;
bytevec key = le_latest_key_and_len(le, &keylen);
bytevec key = NULL;
u_int32_t vallen;
bytevec val = le_latest_val_and_len(le, &vallen);
bytevec val = NULL;
brt_cursor_extract_key_and_val(
le,
cursor,
&keylen,
&key,
&vallen,
&val
);
r = getf(keylen, key, vallen, val, getf_v);
if (r==0) {
......@@ -5190,7 +5293,7 @@ toku_brt_lookup (BRT brt, DBT *k, DBT *v, BRT_GET_CALLBACK_FUNCTION getf, void *
int r, rr;
BRT_CURSOR cursor;
rr = toku_brt_cursor(brt, &cursor, NULL);
rr = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE);
if (rr != 0) return rr;
int op = brt->flags & TOKU_DB_DUPSORT ? DB_GET_BOTH : DB_SET;
......@@ -5573,7 +5676,7 @@ brt_is_empty (BRT brt) {
BRT_CURSOR cursor;
int r, r2;
BOOL is_empty;
r = toku_brt_cursor(brt, &cursor, NULL);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE);
if (r == 0) {
r = toku_brt_cursor_first(cursor, getf_nothing, NULL);
r2 = toku_brt_cursor_close(cursor);
......
......@@ -125,7 +125,7 @@ int toku_verify_brt (BRT brt);
//int show_brt_blocknumbers(BRT);
typedef struct brt_cursor *BRT_CURSOR;
int toku_brt_cursor (BRT, BRT_CURSOR*, TOKULOGGER);
int toku_brt_cursor (BRT, BRT_CURSOR*, TOKULOGGER, TXNID, BOOL);
// get is deprecated in favor of the individual functions below
int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *key, DBT *val, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v, int get_flags);
......
......@@ -91,13 +91,16 @@ void wbuf_LEAFENTRY(struct wbuf *w, LEAFENTRY le);
void wbuf_nocrc_LEAFENTRY(struct wbuf *w, LEAFENTRY le);
int print_leafentry (FILE *outf, LEAFENTRY v); // Print a leafentry out in human-readable form.
int le_outermost_is_del(LEAFENTRY le);
int le_is_provdel(LEAFENTRY le); // Return true if it is a provisional delete.
int le_has_xids(LEAFENTRY le, XIDS xids); // Return true transaction represented by xids is still provisional in this leafentry (le's xid stack is a superset or equal to xids)
void* le_latest_key (LEAFENTRY le); // Return the latest key (return NULL for provisional deletes)
u_int32_t le_latest_keylen (LEAFENTRY le); // Return the latest keylen.
void* le_outermost_key_and_len (LEAFENTRY le, u_int32_t *len);
void* le_latest_key_and_len (LEAFENTRY le, u_int32_t *len);
void* le_latest_val (LEAFENTRY le); // Return the latest val (return NULL for provisional deletes)
u_int32_t le_latest_vallen (LEAFENTRY le); // Return the latest vallen. Returns 0 for provisional deletes.
void* le_outermost_val_and_len (LEAFENTRY le, u_int32_t *len);
void* le_latest_val_and_len (LEAFENTRY le, u_int32_t *len);
// Return any key or value (even if it's only provisional).
......
......@@ -1005,6 +1005,14 @@ find_by_xid (OMTVALUE v, void *txnidv) {
return 0;
}
BOOL is_txnid_live(TOKULOGGER logger, TXNID txnid) {
assert(logger);
TOKUTXN result = NULL;
int rval = toku_txnid2txn(logger, txnid, &result);
assert(rval == 0);
return (result != NULL);
}
int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) {
if (logger==NULL) return -1;
......
......@@ -71,6 +71,7 @@ LSN toku_txn_get_last_lsn (TOKUTXN txn);
LSN toku_logger_last_lsn(TOKULOGGER logger);
TOKULOGGER toku_txn_logger (TOKUTXN txn);
BOOL is_txnid_live(TOKULOGGER logger, TXNID txnid);
int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result);
//int toku_logger_log_checkpoint (TOKULOGGER);
//int toku_set_func_fsync (int (*fsync_function)(int));
......
......@@ -50,7 +50,7 @@ static void test_sub_block(int n) {
assert(error == 0);
BRT_CURSOR cursor;
error = toku_brt_cursor(brt, &cursor, NULL);
error = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE);
assert(error == 0);
for (i=0; ; i++) {
......
......@@ -51,7 +51,7 @@ static void test_multiple_brt_cursor_dbts(int n, DB *db) {
}
for (i=0; i<n; i++) {
r = toku_brt_cursor(brt, &cursors[i], NULL);
r = toku_brt_cursor(brt, &cursors[i], NULL, TXNID_NONE, FALSE);
assert(r == 0);
}
......
......@@ -19,7 +19,7 @@ static void assert_cursor_notfound(BRT brt, int position) {
BRT_CURSOR cursor=0;
int r;
r = toku_brt_cursor(brt, &cursor, NULL);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE);
assert(r==0);
struct check_pair pair = {0,0,0,0,0};
......@@ -35,7 +35,7 @@ static void assert_cursor_value(BRT brt, int position, long long value) {
BRT_CURSOR cursor=0;
int r;
r = toku_brt_cursor(brt, &cursor, NULL);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -52,7 +52,7 @@ static void assert_cursor_first_last(BRT brt, long long firstv, long long lastv)
BRT_CURSOR cursor=0;
int r;
r = toku_brt_cursor(brt, &cursor, NULL);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE);
assert(r==0);
if (test_cursor_debug && verbose) printf("first key: ");
......@@ -250,7 +250,7 @@ static void assert_cursor_walk(BRT brt, int n) {
int i;
int r;
r = toku_brt_cursor(brt, &cursor, NULL);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -316,7 +316,7 @@ static void assert_cursor_rwalk(BRT brt, int n) {
int i;
int r;
r = toku_brt_cursor(brt, &cursor, NULL);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -402,7 +402,7 @@ static void assert_cursor_walk_inorder(BRT brt, int n) {
int r;
char *prevkey = 0;
r = toku_brt_cursor(brt, &cursor, NULL);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -504,7 +504,7 @@ static void test_brt_cursor_split(int n, DB *db) {
assert(r==0);
}
r = toku_brt_cursor(brt, &cursor, NULL);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -569,7 +569,7 @@ static void test_multiple_brt_cursors(int n, DB *db) {
int i;
for (i=0; i<n; i++) {
r = toku_brt_cursor(brt, &cursors[i], NULL);
r = toku_brt_cursor(brt, &cursors[i], NULL, TXNID_NONE, FALSE);
assert(r == 0);
}
......@@ -619,7 +619,7 @@ static void test_multiple_brt_cursor_walk(int n, DB *db) {
int c;
/* create the cursors */
for (c=0; c<ncursors; c++) {
r = toku_brt_cursor(brt, &cursors[c], NULL);
r = toku_brt_cursor(brt, &cursors[c], NULL, TXNID_NONE, FALSE);
assert(r == 0);
}
......@@ -706,7 +706,7 @@ static void test_brt_cursor_set(int n, int cursor_op, DB *db) {
assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor, NULL);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE);
assert(r==0);
/* set cursor to random keys in set { 0, 10, 20, .. 10*(n-1) } */
......@@ -779,7 +779,7 @@ static void test_brt_cursor_set_range(int n, DB *db) {
assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor, NULL);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE);
assert(r==0);
/* pick random keys v in 0 <= v < 10*n, the cursor should point
......@@ -829,7 +829,7 @@ static void test_brt_cursor_delete(int n, DB *db) {
error = toku_open_brt(fname, 1, &brt, 1<<12, ct, null_txn, test_brt_cursor_keycompare, db);
assert(error == 0);
error = toku_brt_cursor(brt, &cursor, NULL);
error = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE);
assert(error == 0);
DBT key, val;
......@@ -890,7 +890,7 @@ static void test_brt_cursor_get_both(int n, DB *db) {
error = toku_open_brt(fname, 1, &brt, 1<<12, ct, null_txn, test_brt_cursor_keycompare, db);
assert(error == 0);
error = toku_brt_cursor(brt, &cursor, NULL);
error = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE);
assert(error == 0);
{
......
......@@ -255,7 +255,7 @@ static void test_cursor_last_empty(void) {
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, toku_get_n_items_malloced()); toku_print_malloced_items();
r = toku_open_brt(fname, 1, &brt, 1<<12, ct, null_txn, toku_builtin_compare_fun, null_db); assert(r==0);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, toku_get_n_items_malloced()); toku_print_malloced_items();
r = toku_brt_cursor(brt, &cursor, NULL); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE); assert(r==0);
{
struct check_pair pair = {0,0,0,0,0};
r = toku_brt_cursor_get(cursor, NULL, NULL, lookup_checkf, &pair, DB_LAST);
......@@ -291,7 +291,7 @@ static void test_cursor_next (void) {
r = toku_brt_insert(brt, toku_fill_dbt(&kbt, "hello", 6), toku_fill_dbt(&vbt, "there", 6), null_txn);
r = toku_brt_insert(brt, toku_fill_dbt(&kbt, "byebye", 7), toku_fill_dbt(&vbt, "byenow", 7), null_txn);
if (verbose) printf("%s:%d calling toku_brt_cursor(...)\n", __FILE__, __LINE__);
r = toku_brt_cursor(brt, &cursor, NULL); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE); assert(r==0);
toku_init_dbt(&kbt);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, toku_get_n_items_malloced()); toku_print_malloced_items();
toku_init_dbt(&vbt);
......@@ -383,7 +383,7 @@ static void test_wrongendian_compare (int wrong_p, unsigned int N) {
}
{
BRT_CURSOR cursor=0;
r = toku_brt_cursor(brt, &cursor, NULL); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE); assert(r==0);
for (i=0; i<2; i++) {
unsigned char a[4],b[4];
......@@ -423,7 +423,7 @@ static void test_wrongendian_compare (int wrong_p, unsigned int N) {
toku_cachetable_verify(ct);
}
BRT_CURSOR cursor=0;
r = toku_brt_cursor(brt, &cursor, NULL); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE); assert(r==0);
for (i=0; i<N; i++) {
unsigned char a[4],b[4];
......@@ -567,7 +567,7 @@ static void test_brt_delete_present(int n) {
/* cursor should not find anything */
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor, NULL);
r = toku_brt_cursor(t, &cursor, NULL, TXNID_NONE, FALSE);
assert(r == 0);
{
......@@ -698,7 +698,7 @@ static void test_brt_delete_cursor_first(int n) {
/* cursor should find the last key: n-1 */
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor, NULL);
r = toku_brt_cursor(t, &cursor, NULL, TXNID_NONE, FALSE);
assert(r == 0);
{
......@@ -820,7 +820,7 @@ static void test_brt_delete_both(int n) {
/* cursor should find only odd pairs */
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor, NULL); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL, TXNID_NONE, FALSE); assert(r == 0);
for (i=1; ; i += 2) {
int kv = toku_htonl(0);
......@@ -866,7 +866,7 @@ static void test_new_brt_cursor_create_close (void) {
int i;
for (i=0; i<n; i++) {
r = toku_brt_cursor(brt, &cursors[i], NULL); assert(r == 0);
r = toku_brt_cursor(brt, &cursors[i], NULL, TXNID_NONE, FALSE); assert(r == 0);
}
for (i=0; i<n; i++) {
......@@ -901,7 +901,7 @@ static void test_new_brt_cursor_first(int n, int dup_mode) {
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor, NULL); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL, TXNID_NONE, FALSE); assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_REALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
......@@ -954,7 +954,7 @@ static void test_new_brt_cursor_last(int n, int dup_mode) {
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor, NULL); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL, TXNID_NONE, FALSE); assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_REALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
......@@ -1007,7 +1007,7 @@ static void test_new_brt_cursor_next(int n, int dup_mode) {
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor, NULL); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL, TXNID_NONE, FALSE); assert(r == 0);
for (i=0; ; i++) {
int kk = toku_htonl(i);
......@@ -1051,7 +1051,7 @@ static void test_new_brt_cursor_prev(int n, int dup_mode) {
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor, NULL); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL, TXNID_NONE, FALSE); assert(r == 0);
for (i=n-1; ; i--) {
int kk = toku_htonl(i);
......@@ -1095,7 +1095,7 @@ static void test_new_brt_cursor_current(int n, int dup_mode) {
BRT_CURSOR cursor=0;
r = toku_brt_cursor(t, &cursor, NULL); assert(r == 0);
r = toku_brt_cursor(t, &cursor, NULL, TXNID_NONE, FALSE); assert(r == 0);
for (i=0; ; i++) {
{
......@@ -1180,7 +1180,7 @@ static void test_new_brt_cursor_set_range(int n, int dup_mode) {
r = toku_brt_insert(brt, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0); assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor, NULL); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE); assert(r==0);
/* pick random keys v in 0 <= v < 10*n, the cursor should point
to the smallest key in the tree that is >= v */
......@@ -1238,7 +1238,7 @@ static void test_new_brt_cursor_set(int n, int cursor_op, DB *db) {
r = toku_brt_insert(brt, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0); assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor, NULL); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE); assert(r==0);
/* set cursor to random keys in set { 0, 10, 20, .. 10*(n-1) } */
for (i=0; i<n; i++) {
......
......@@ -79,7 +79,7 @@ static void test_delete_all (void) {
// Now use a cursor to see if it is all empty
{
BRT_CURSOR cursor = 0;
r = toku_brt_cursor(t, &cursor, 0); assert(r==0);
r = toku_brt_cursor(t, &cursor, 0, TXNID_NONE, FALSE); assert(r==0);
struct check_pair pair = {len_ignore, NULL, len_ignore, NULL, 0};
r = toku_brt_cursor_get(cursor, NULL, NULL, lookup_checkf, &pair, DB_FIRST);
assert(r == DB_NOTFOUND);
......
......@@ -22,7 +22,7 @@ test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute
r = toku_brt_create_cachetable(&ct, 0, ZERO_LSN, NULL_LOGGER); assert(r==0);
r = toku_open_brt(fname, 1, &brt, 1<<12, ct, null_txn, test_brt_cursor_keycompare, db); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL); assert(r==0);
r = toku_brt_cursor(brt, &cursor, NULL, TXNID_NONE, FALSE); assert(r==0);
int i;
for (i=0; i<1000; i++) {
......
......@@ -64,6 +64,7 @@ static void ule_apply_commit(ULE ule, XIDS xids);
static void ule_push_insert_uxr(ULE ule, TXNID xid, u_int32_t vallen, void * valp);
static void ule_push_delete_uxr(ULE ule, TXNID xid);
static void ule_push_placeholder_uxr(ULE ule, TXNID xid);
static UXR ule_get_outermost_uxr(ULE ule);
static UXR ule_get_innermost_uxr(ULE ule);
static UXR ule_get_first_empty_uxr(ULE ule);
static void ule_remove_innermost_uxr(ULE ule);
......@@ -735,6 +736,13 @@ le_full_promotion(LEAFENTRY le,
#endif
}
int le_outermost_is_del(LEAFENTRY le) {
ULE_S ule;
le_unpack(&ule, le);
UXR outermost_uxr = ule_get_outermost_uxr(&ule);
int rval = uxr_is_delete(outermost_uxr);
return rval;
}
int le_is_provdel(LEAFENTRY le) {
int rval;
......@@ -857,6 +865,25 @@ le_has_xids(LEAFENTRY le, XIDS xids) {
return rval;
}
void*
le_outermost_key_and_len (LEAFENTRY le, u_int32_t *len) {
ULE_S ule;
le_unpack(&ule, le);
UXR uxr = ule_get_outermost_uxr(&ule);
void *slow_keyp;
u_int32_t slow_len;
if (uxr_is_insert(uxr)) {
slow_keyp = ule.keyp;
slow_len = ule.keylen;
}
else {
slow_keyp = NULL;
slow_len = 0;
}
*len = slow_len;
return slow_keyp;
}
//If le_is_provdel, return (NULL,0)
//Else, return (key,keylen)
void*
......@@ -943,6 +970,25 @@ le_latest_keylen (LEAFENTRY le) {
return rval;
}
void*
le_outermost_val_and_len (LEAFENTRY le, u_int32_t *len) {
ULE_S ule;
le_unpack(&ule, le);
UXR uxr = ule_get_outermost_uxr(&ule);
void *slow_valp;
u_int32_t slow_len;
if (uxr_is_insert(uxr)) {
slow_valp = uxr->valp;
slow_len = uxr->vallen;
}
else {
slow_valp = NULL;
slow_len = 0;
}
*len = slow_len;
return slow_valp;
}
void*
le_latest_val_and_len (LEAFENTRY le, u_int32_t *len) {
u_int8_t num_xrs = le->num_xrs;
......@@ -1418,6 +1464,14 @@ ule_get_innermost_uxr(ULE ule) {
return rval;
}
// Return innermost transaction record.
static UXR
ule_get_outermost_uxr(ULE ule) {
assert(ule->num_uxrs > 0);
UXR rval = &(ule->uxrs[0]);
return rval;
}
// Return first empty transaction record
static UXR
ule_get_first_empty_uxr(ULE ule) {
......
// Test that isolation works right for subtransactions.
// In particular, check to see what happens if a subtransaction has different isolation level from its parent.
#include "test.h"
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE;
int test_main (int argc, char * const argv[]) {
parse_args(argc, argv);
int r;
system("rm -rf " ENVDIR);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
env->set_errfile(env, stderr);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
DB *db;
{
DB_TXN *txna;
r = env->txn_begin(env, NULL, &txna, 0); CKERR(r);
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, txna, "foo.db", NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
DBT key,val;
r = db->put(db, txna, dbt_init(&key, "a", 4), dbt_init(&val, "a", 4), 0); CKERR(r);
r = txna->commit(txna, 0); CKERR(r);
}
DB_TXN *txn_put, *txn_committed, *txn_uncommitted;
r = env->txn_begin(env, NULL, &txn_put, DB_READ_COMMITTED); CKERR(r);
r = env->txn_begin(env, NULL, &txn_committed, DB_READ_COMMITTED); CKERR(r);
r = env->txn_begin(env, NULL, &txn_uncommitted, DB_READ_UNCOMMITTED); CKERR(r);
//
// test a simple get
//
{
DBT key,val;
r = db->put(db, txn_put, dbt_init(&key, "x", 4), dbt_init(&val, "x", 4), 0); CKERR(r);
dbt_init_malloc(&val);
r = db->get(db, txn_put, dbt_init(&key, "x", 4), &val, 0); CKERR(r);
r = db->get(db, txn_committed, dbt_init(&key, "x", 4), &val, 0); CKERR2(r, DB_NOTFOUND);
r = db->get(db, txn_uncommitted, dbt_init(&key, "x", 4), &val, 0); CKERR(r);
toku_free(val.data);
r = db->del(db, txn_put, dbt_init(&key, "a", 4), 0); CKERR(r);
dbt_init_malloc(&val);
r = db->get(db, txn_put, dbt_init(&key, "a", 4), &val, 0); CKERR2(r, DB_NOTFOUND);
r = db->get(db, txn_committed, dbt_init(&key, "a", 4), &val, 0); CKERR(r);
r = db->get(db, txn_uncommitted, dbt_init(&key, "a", 4), &val, 0); CKERR2(r, DB_NOTFOUND);
val.data = NULL;
toku_free(val.data);
}
r = txn_put->commit(txn_put, 0); CKERR(r);
r = txn_committed->commit(txn_committed, 0); CKERR(r);
r = txn_uncommitted->commit(txn_uncommitted, 0); CKERR(r);
r = env->txn_begin(env, NULL, &txn_put, DB_READ_COMMITTED); CKERR(r);
r = env->txn_begin(env, NULL, &txn_committed, DB_READ_COMMITTED); CKERR(r);
r = env->txn_begin(env, NULL, &txn_uncommitted, DB_READ_UNCOMMITTED); CKERR(r);
//
// test a simple get
//
{
DBT key,val;
DBT curr_key, curr_val;
DBC* cursor_committed = NULL;
DBC* cursor_uncommitted = NULL;
memset(&curr_key, 0, sizeof(curr_key));
memset(&curr_val, 0, sizeof(curr_val));
r = db->cursor(db, txn_committed, &cursor_committed, 0); assert(r == 0);
r = db->cursor(db, txn_uncommitted, &cursor_uncommitted, 0); assert(r == 0);
r = db->put(db, txn_put, dbt_init(&key, "y", 4), dbt_init(&val, "y", 4), 0); CKERR(r);
r = cursor_uncommitted->c_get(cursor_uncommitted, &curr_key, &curr_val, DB_NEXT); CKERR(r);
assert(((char *)(curr_key.data))[0] == 'x');
assert(((char *)(curr_val.data))[0] == 'x');
r = cursor_committed->c_get(cursor_committed, &curr_key, &curr_val, DB_NEXT); CKERR(r);
assert(((char *)(curr_key.data))[0] == 'x');
assert(((char *)(curr_val.data))[0] == 'x');
r = cursor_committed->c_get(cursor_committed, &curr_key, &curr_val, DB_NEXT); CKERR2(r, DB_NOTFOUND);
r = cursor_uncommitted->c_get(cursor_uncommitted, &curr_key, &curr_val, DB_NEXT); CKERR(r);
assert(((char *)(curr_key.data))[0] == 'y');
assert(((char *)(curr_val.data))[0] == 'y');
}
r = txn_put->commit(txn_put, 0); CKERR(r);
r = txn_committed->commit(txn_committed, 0); CKERR(r);
r = txn_uncommitted->commit(txn_uncommitted, 0); CKERR(r);
r = db->close(db, 0); CKERR(r);
r = env->close(env, 0); CKERR(r);
return 0;
}
......@@ -1996,12 +1996,19 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f
if (!(env->i->open_flags & DB_INIT_TXN)) return toku_ydb_do_error(env, EINVAL, "Environment does not have transactions enabled\n");
u_int32_t txn_flags = 0;
txn_flags |= DB_TXN_NOWAIT; //We do not support blocking locks.
uint32_t child_isolation_flags = 0; //TODO: #2126 DB_READ_COMMITTED should be added here once supported.
uint32_t child_isolation_flags = 0;
uint32_t parent_isolation_flags = 0;
int inherit = 0;
int set_isolation = 0;
if ((flags & DB_READ_UNCOMMITTED) && (flags & DB_READ_COMMITTED)) {
return toku_ydb_do_error(
env,
EINVAL,
"Transaction cannot have both DB_READ_COMMITTED and DB_READ_UNCOMMITTED set\n"
);
}
if (stxn) {
parent_isolation_flags = db_txn_struct_i(stxn)->flags & (DB_READ_UNCOMMITTED); //TODO: #2126 DB_READ_COMMITTED should be added here once supported.
parent_isolation_flags = db_txn_struct_i(stxn)->flags & (DB_READ_UNCOMMITTED | DB_READ_COMMITTED);
if (internal || flags&DB_INHERIT_ISOLATION) {
flags &= ~DB_INHERIT_ISOLATION;
inherit = 1;
......@@ -2009,12 +2016,12 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f
child_isolation_flags = parent_isolation_flags;
}
}
if (flags&DB_READ_UNCOMMITTED) {
if (flags & (DB_READ_UNCOMMITTED|DB_READ_COMMITTED)) {
if (set_isolation)
return toku_ydb_do_error(env, EINVAL, "Cannot set isolation two different ways in DB_ENV->txn_begin\n");
set_isolation = 1;
child_isolation_flags |= DB_READ_UNCOMMITTED;
flags &= ~DB_READ_UNCOMMITTED;
child_isolation_flags |= (flags & (DB_READ_UNCOMMITTED|DB_READ_COMMITTED));
flags &= ~(DB_READ_UNCOMMITTED | DB_READ_COMMITTED);
}
txn_flags |= child_isolation_flags;
if (flags&DB_TXN_NOWAIT) {
......@@ -2406,8 +2413,13 @@ static inline u_int32_t get_prelocked_flags(u_int32_t flags, DB_TXN* txn, DB* db
// for internal (non-user) dictionary, do not set DB_PRELOCK
if (db->i->dname) {
//DB_READ_UNCOMMITTED transactions 'own' all read locks for user-data dictionaries.
if (txn && db_txn_struct_i(txn)->flags&DB_READ_UNCOMMITTED) lock_flags |= DB_PRELOCKED;
//DB_READ_UNCOMMITTED and DB_READ_COMMITTED transactions 'own' all read locks for user-data dictionaries.
if (txn &&
(db_txn_struct_i(txn)->flags& (DB_READ_UNCOMMITTED | DB_READ_COMMITTED))
)
{
lock_flags |= DB_PRELOCKED;
}
}
return lock_flags;
}
......@@ -4007,7 +4019,21 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int
dbc_struct_i(result)->skey = &dbc_struct_i(result)->skey_s;
dbc_struct_i(result)->sval = &dbc_struct_i(result)->sval_s;
}
int r = toku_brt_cursor(db->i->brt, &dbc_struct_i(result)->c, db->dbenv->i->logger);
DB_TXN* txn_anc = NULL;
TXNID txn_anc_id = TXNID_NONE;
BOOL is_read_committed = FALSE;
if (txn) {
txn_anc = toku_txn_ancestor(txn);
txn_anc_id = toku_txn_get_txnid(db_txn_struct_i(txn_anc)->tokutxn);
is_read_committed = ((db_txn_struct_i(txn_anc)->flags & DB_READ_COMMITTED) != 0);
}
int r = toku_brt_cursor(
db->i->brt,
&dbc_struct_i(result)->c,
db->dbenv->i->logger,
txn_anc_id,
is_read_committed
);
assert(r == 0);
*c = result;
return 0;
......@@ -4228,8 +4254,9 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP
int is_db_excl = flags & DB_EXCL; unused_flags&=~DB_EXCL;
int is_db_create = flags & DB_CREATE; unused_flags&=~DB_CREATE;
//We support READ_UNCOMMITTED whether or not the flag is provided.
//We support READ_UNCOMMITTED and READ_COMMITTED whether or not the flag is provided.
unused_flags&=~DB_READ_UNCOMMITTED;
unused_flags&=~DB_READ_COMMITTED;
if (unused_flags & ~DB_THREAD) return EINVAL; // unknown flags
if (is_db_excl && !is_db_create) return EINVAL;
......@@ -4329,8 +4356,9 @@ db_open_iname(DB * db, DB_TXN * txn, const char *iname_in_env, u_int32_t flags,
int is_db_excl = flags & DB_EXCL; flags&=~DB_EXCL;
int is_db_create = flags & DB_CREATE; flags&=~DB_CREATE;
//We support READ_UNCOMMITTED whether or not the flag is provided.
//We support READ_UNCOMMITTED and READ_COMMITTED whether or not the flag is provided.
flags&=~DB_READ_UNCOMMITTED;
flags&=~DB_READ_COMMITTED;
if (flags & ~DB_THREAD) return EINVAL; // unknown flags
if (is_db_excl && !is_db_create) return EINVAL;
......@@ -4941,8 +4969,9 @@ static int toku_db_key_range64(DB* db, DB_TXN* txn __attribute__((__unused__)),
static int toku_db_pre_acquire_read_lock(DB *db, DB_TXN *txn, const DBT *key_left, const DBT *val_left, const DBT *key_right, const DBT *val_right) {
HANDLE_PANICKED_DB(db);
if (!db->i->lt || !txn) return EINVAL;
//READ_UNCOMMITTED transactions do not need read locks.
//READ_UNCOMMITTED and READ_COMMITTED transactions do not need read locks.
if (db_txn_struct_i(txn)->flags&DB_READ_UNCOMMITTED) return 0;
if (db_txn_struct_i(txn)->flags&DB_READ_COMMITTED) return 0;
DB_TXN* txn_anc = toku_txn_ancestor(txn);
int r;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment