Commit c96e94ee authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge read uncommited changes to the dev branch. addresses #1201

git-svn-id: file:///svn/toku/tokudb.1195@7681 c7de825b-a66e-492c-adef-691d508d4ae1
parent c2ef1a3a
......@@ -12,7 +12,7 @@
# 4. make hs
BDB = 4_4
BDBDIR = ../../berkeleydb
BDBDIR = ../../../berkeleydb
MYSQL_H = -I$(BDBDIR)/db-4.1.25/build_unix/
OPTFLAGS = -O2
CFLAGS = -Wall -W -Werror -g $(OPTFLAGS)
......
......@@ -69,6 +69,7 @@ typedef enum {
#define DB_INIT_TXN 262144
#define DB_USE_ENVIRON 2048
#define DB_USE_ENVIRON_ROOT 4096
#define DB_READ_UNCOMMITTED 67108864
#define DB_KEYEMPTY -30997
#define DB_KEYEXIST -30996
#define DB_LOCK_DEADLOCK -30995
......
......@@ -69,6 +69,7 @@ typedef enum {
#define DB_INIT_TXN 524288
#define DB_USE_ENVIRON 4096
#define DB_USE_ENVIRON_ROOT 8192
#define DB_READ_UNCOMMITTED 134217728
#define DB_KEYEMPTY -30997
#define DB_KEYEXIST -30996
#define DB_LOCK_DEADLOCK -30995
......
......@@ -69,6 +69,7 @@ typedef enum {
#define DB_INIT_TXN 2097152
#define DB_USE_ENVIRON 16384
#define DB_USE_ENVIRON_ROOT 32768
#define DB_READ_UNCOMMITTED 134217728
#define DB_KEYEMPTY -30997
#define DB_KEYEXIST -30996
#define DB_LOCK_DEADLOCK -30995
......
......@@ -81,6 +81,9 @@ void print_defines (void) {
dodefine(DB_USE_ENVIRON);
dodefine(DB_USE_ENVIRON_ROOT);
#ifdef DB_READ_UNCOMMITTED
dodefine(DB_READ_UNCOMMITTED);
#endif
dodefine(DB_KEYEMPTY);
dodefine(DB_KEYEXIST);
dodefine(DB_LOCK_DEADLOCK);
......
......@@ -69,6 +69,7 @@ typedef enum {
#define DB_INIT_TXN 262144
#define DB_USE_ENVIRON 2048
#define DB_USE_ENVIRON_ROOT 4096
#define DB_READ_UNCOMMITTED 67108864
#define DB_KEYEMPTY -30997
#define DB_KEYEXIST -30996
#define DB_LOCK_DEADLOCK -30995
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include <string.h>
#include <db.h>
#include <assert.h>
#include <errno.h>
#include <sys/stat.h>
#include "test.h"
// ENVDIR is defined in the Makefile
static DB *db;
static DB_TXN* txns[(int)256];
static DB_ENV* dbenv;
static DBC* cursors[(int)256];
static void
put(BOOL success, char txn, int _key, int _data) {
assert(txns[(int)txn]);
int r;
DBT key;
DBT data;
r = db->put(db, txns[(int)txn],
dbt_init(&key, &_key, sizeof(int)),
dbt_init(&data, &_data, sizeof(int)),
DB_YESOVERWRITE);
if (success) CKERR(r);
else CKERR2s(r, DB_LOCK_DEADLOCK, DB_LOCK_NOTGRANTED);
}
static void
init_txn (char name, u_int32_t flags) {
int r;
assert(!txns[(int)name]);
r = dbenv->txn_begin(dbenv, NULL, &txns[(int)name], DB_TXN_NOWAIT | flags);
CKERR(r);
assert(txns[(int)name]);
}
static void
init_dbc (char name) {
int r;
assert(!cursors[(int)name] && txns[(int)name]);
r = db->cursor(db, txns[(int)name], &cursors[(int)name], 0);
CKERR(r);
assert(cursors[(int)name]);
}
static void
commit_txn (char name) {
int r;
assert(txns[(int)name] && !cursors[(int)name]);
r = txns[(int)name]->commit(txns[(int)name], 0);
CKERR(r);
txns[(int)name] = NULL;
}
static void
close_dbc (char name) {
int r;
assert(cursors[(int)name]);
r = cursors[(int)name]->c_close(cursors[(int)name]);
CKERR(r);
cursors[(int)name] = NULL;
}
static void
early_commit (char name) {
assert(cursors[(int)name] && txns[(int)name]);
close_dbc(name);
commit_txn(name);
}
static void
setup_dbs (u_int32_t dup_flags) {
int r;
system("rm -rf " ENVDIR);
mkdir(ENVDIR, 0777);
dbenv = NULL;
db = NULL;
/* Open/create primary */
r = db_env_create(&dbenv, 0);
CKERR(r);
u_int32_t env_txn_flags = DB_INIT_TXN | DB_INIT_LOCK;
u_int32_t env_open_flags = DB_CREATE | DB_PRIVATE | DB_INIT_MPOOL;
r = dbenv->open(dbenv, ENVDIR, env_open_flags | env_txn_flags, 0600);
CKERR(r);
r = db_create(&db, dbenv, 0);
CKERR(r);
if (dup_flags) {
r = db->set_flags(db, dup_flags);
CKERR(r);
}
r = db->set_bt_compare( db, int_dbt_cmp);
CKERR(r);
r = db->set_dup_compare(db, int_dbt_cmp);
CKERR(r);
char a;
for (a = 'a'; a <= 'z'; a++) init_txn(a, 0);
for (a = '0'; a <= '9'; a++) init_txn(a, DB_READ_UNCOMMITTED);
init_txn('\0', 0);
r = db->open(db, txns[(int)'\0'], "foobar.db", NULL, DB_BTREE, DB_CREATE | DB_READ_UNCOMMITTED, 0600);
CKERR(r);
commit_txn('\0');
for (a = 'a'; a <= 'z'; a++) init_dbc(a);
for (a = '0'; a <= '9'; a++) init_dbc(a);
}
static void
close_dbs(void) {
char a;
for (a = 'a'; a <= 'z'; a++) {
if (cursors[(int)a]) close_dbc(a);
if (txns[(int)a]) commit_txn(a);
}
for (a = '0'; a <= '9'; a++) {
if (cursors[(int)a]) close_dbc(a);
if (txns[(int)a]) commit_txn(a);
}
int r;
r = db->close(db, 0);
CKERR(r);
db = NULL;
r = dbenv->close(dbenv, 0);
CKERR(r);
dbenv = NULL;
}
static void
table_scan(char txn, BOOL success) {
int r;
DBT key;
DBT data;
assert(txns[(int)txn] && cursors[(int)txn]);
r = cursors[(int)txn]->c_get(cursors[(int)txn],
dbt_init(&key, 0, 0),
dbt_init(&data, 0, 0),
DB_FIRST);
while (r==0) {
r = cursors[(int)txn]->c_get(cursors[(int)txn],
dbt_init(&key, 0, 0),
dbt_init(&data, 0, 0),
DB_NEXT);
}
if (success) CKERR2(r, DB_NOTFOUND);
else CKERR2s(r, DB_LOCK_NOTGRANTED, DB_LOCK_DEADLOCK);
}
static void
table_prelock(char txn, BOOL success) {
int r;
#if defined USE_TDB && USE_TDB
r = db->pre_acquire_table_lock(db, txns[(int)txn]);
#else
DBT key;
DBT data;
assert(txns[(int)txn] && cursors[(int)txn]);
r = cursors[(int)txn]->c_get(cursors[(int)txn],
dbt_init(&key, 0, 0),
dbt_init(&data, 0, 0),
DB_FIRST | DB_RMW);
while (r==0) {
r = cursors[(int)txn]->c_get(cursors[(int)txn],
dbt_init(&key, 0, 0),
dbt_init(&data, 0, 0),
DB_NEXT | DB_RMW);
}
#endif
if (success) CKERR(r);
else CKERR2s(r, DB_LOCK_NOTGRANTED, DB_LOCK_DEADLOCK);
}
static void
test (u_int32_t dup_flags) {
char txn;
/* ********************************************************************** */
setup_dbs(dup_flags);
close_dbs();
/* ********************************************************************** */
setup_dbs(dup_flags);
table_scan('0', TRUE);
table_prelock('a', TRUE);
put(TRUE, 'a', 0, 0);
for (txn = 'b'; txn<'z'; txn++) {
table_scan(txn, FALSE);
}
for (txn = '0'; txn<'9'; txn++) {
table_scan(txn, TRUE);
}
early_commit('a');
for (txn = 'b'; txn<'z'; txn++) {
table_scan(txn, TRUE);
}
for (txn = '0'; txn<'9'; txn++) {
table_scan(txn, TRUE);
}
close_dbs();
/* ********************************************************************** */
}
int main(int argc, const char* argv[]) {
parse_args(argc, argv);
test(0);
test(DB_DUP | DB_DUPSORT);
return 0;
}
......@@ -73,6 +73,7 @@ struct __toku_db_txn_internal {
//TXNID txnid64; /* A sixty-four bit txn id. */
TOKUTXN tokutxn;
toku_lth* lth;
u_int32_t flags;
DB_TXN *child, *next, *prev;
};
......
......@@ -979,7 +979,7 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
}
}
//toku_ydb_notef("flags=%d\n", flags);
int nosync = (flags & DB_TXN_NOSYNC)!=0;
int nosync = (flags & DB_TXN_NOSYNC)!=0 || (txn->i->flags&DB_TXN_NOSYNC);
flags &= ~DB_TXN_NOSYNC;
int r2 = toku_txn_release_locks(txn);
......@@ -1056,7 +1056,22 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f
HANDLE_PANICKED_ENV(env);
if (!toku_logger_is_open(env->i->logger)) return toku_ydb_do_error(env, EINVAL, "Environment does not have logging enabled\n");
if (!(env->i->open_flags & DB_INIT_TXN)) return toku_ydb_do_error(env, EINVAL, "Environment does not have transactions enabled\n");
flags=flags;
u_int32_t txn_flags = 0;
txn_flags |= DB_TXN_NOWAIT; //We do not support blocking locks.
if (flags&DB_READ_UNCOMMITTED) {
txn_flags |= DB_READ_UNCOMMITTED;
flags &= ~DB_READ_UNCOMMITTED;
}
if (flags&DB_TXN_NOWAIT) {
txn_flags |= DB_TXN_NOWAIT;
flags &= ~DB_TXN_NOWAIT;
}
if (flags&DB_TXN_NOSYNC) {
txn_flags |= DB_TXN_NOSYNC;
flags &= ~DB_TXN_NOSYNC;
}
if (flags!=0) return toku_ydb_do_error(env, EINVAL, "Invalid flags passed to DB_ENV->txn_begin\n");
DB_TXN *MALLOC(result);
if (result == 0)
return ENOMEM;
......@@ -1073,6 +1088,7 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f
return ENOMEM;
}
memset(result->i, 0, sizeof *result->i);
result->i->flags = txn_flags;
int r;
if (env->i->open_flags & DB_INIT_LOCK && !stxn) {
......@@ -1321,8 +1337,12 @@ typedef struct {
BOOL tmp_dat_malloced;
} C_GET_VARS;
static inline u_int32_t get_prelocked_flags(u_int32_t flags) {
return flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE);
static inline u_int32_t get_prelocked_flags(u_int32_t flags, DB_TXN* txn) {
u_int32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE);
//DB_READ_UNCOMMITTED transactions 'own' all read locks.
if (txn && txn->i->flags&DB_READ_UNCOMMITTED) lock_flags |= DB_PRELOCKED;
return lock_flags;
}
static void toku_c_get_fix_flags(C_GET_VARS* g) {
......@@ -1351,7 +1371,7 @@ static void toku_c_get_fix_flags(C_GET_VARS* g) {
default:
break;
}
g->lock_flags = get_prelocked_flags(g->flag);
g->lock_flags = get_prelocked_flags(g->flag, g->c->i->txn);
g->flag &= ~g->lock_flags;
}
......@@ -1690,7 +1710,7 @@ static int toku_c_del_noassociate(DBC * c, u_int32_t flags) {
HANDLE_PANICKED_DB(db);
if (toku_c_uninitialized(c)) return EINVAL;
u_int32_t lock_flags = get_prelocked_flags(flags);
u_int32_t lock_flags = get_prelocked_flags(flags, c->i->txn);
flags &= ~lock_flags;
int r;
......@@ -1843,7 +1863,7 @@ static int locked_c_getf_next_dup(DBC *c, u_int32_t flag, void(*f)(DBT const *ke
static int toku_c_getf_first(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) {
HANDLE_PANICKED_DB(c->dbp);
u_int32_t lock_flags = get_prelocked_flags(flag);
u_int32_t lock_flags = get_prelocked_flags(flag, c->i->txn);
flag &= ~lock_flags;
assert(flag==0);
TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL;
......@@ -1878,7 +1898,7 @@ static int toku_c_getf_first(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DB
static int toku_c_getf_last(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) {
HANDLE_PANICKED_DB(c->dbp);
u_int32_t lock_flags = get_prelocked_flags(flag);
u_int32_t lock_flags = get_prelocked_flags(flag, c->i->txn);
flag &= ~lock_flags;
assert(flag==0);
TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL;
......@@ -1914,7 +1934,7 @@ static int toku_c_getf_last(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT
static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) {
HANDLE_PANICKED_DB(c->dbp);
if (toku_c_uninitialized(c)) return toku_c_getf_first(c, flag, f, extra);
u_int32_t lock_flags = get_prelocked_flags(flag);
u_int32_t lock_flags = get_prelocked_flags(flag, c->i->txn);
flag &= ~lock_flags;
assert(flag==0);
TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL;
......@@ -1959,7 +1979,7 @@ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT
static int toku_c_getf_prev(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) {
HANDLE_PANICKED_DB(c->dbp);
if (toku_c_uninitialized(c)) return toku_c_getf_last(c, flag, f, extra);
u_int32_t lock_flags = get_prelocked_flags(flag);
u_int32_t lock_flags = get_prelocked_flags(flag, c->i->txn);
flag &= ~lock_flags;
assert(flag==0);
TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL;
......@@ -2004,7 +2024,7 @@ static int toku_c_getf_prev(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT
static int toku_c_getf_next_dup(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) {
HANDLE_PANICKED_DB(c->dbp);
if (toku_c_uninitialized(c)) return EINVAL;
u_int32_t lock_flags = get_prelocked_flags(flag);
u_int32_t lock_flags = get_prelocked_flags(flag, c->i->txn);
flag &= ~lock_flags;
assert(flag==0);
TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL;
......@@ -2057,7 +2077,7 @@ static int toku_c_getf_heavi(DBC *c, u_int32_t flags,
if (direction==0) return EINVAL;
DBC *tmp_c = NULL;
int r;
u_int32_t lock_flags = get_prelocked_flags(flags);
u_int32_t lock_flags = get_prelocked_flags(flags, c->i->txn);
flags &= ~lock_flags;
assert(flags==0);
struct heavi_wrapper wrapper;
......@@ -2221,7 +2241,7 @@ static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) {
static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
int r;
u_int32_t lock_flags = get_prelocked_flags(flags);
u_int32_t lock_flags = get_prelocked_flags(flags, txn);
flags &= ~lock_flags;
if (flags!=0 && flags!=DB_GET_BOTH) return EINVAL;
......@@ -2236,7 +2256,7 @@ static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data,
static int toku_db_del_noassociate(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) {
int r;
u_int32_t lock_flags = get_prelocked_flags(flags);
u_int32_t lock_flags = get_prelocked_flags(flags, txn);
flags &= ~lock_flags;
if (flags!=0 && flags!=DB_DELETE_ANY) return EINVAL;
//DB_DELETE_ANY supresses the BDB DB->del return value indicating that the key was not found prior to the delete
......@@ -2592,7 +2612,7 @@ static int toku_db_delboth_noassociate(DB *db, DB_TXN *txn, DBT *key, DBT *val,
HANDLE_PANICKED_DB(db);
int r;
u_int32_t lock_flags = get_prelocked_flags(flags);
u_int32_t lock_flags = get_prelocked_flags(flags, txn);
flags &= ~lock_flags;
u_int32_t delete_any = flags&DB_DELETE_ANY;
flags &= ~DB_DELETE_ANY;
......@@ -2645,7 +2665,7 @@ static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t
if ((db->i->open_flags & DB_THREAD) && db_thread_need_flags(data))
return EINVAL;
u_int32_t lock_flags = get_prelocked_flags(flags);
u_int32_t lock_flags = get_prelocked_flags(flags, txn);
flags &= ~lock_flags;
if (flags != 0 && flags != DB_GET_BOTH) return EINVAL;
// We aren't ready to handle flags such as DB_READ_COMMITTED or DB_READ_UNCOMMITTED or DB_RMW
......@@ -2811,6 +2831,8 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
int is_db_excl = flags & DB_EXCL; flags&=~DB_EXCL;
int is_db_create = flags & DB_CREATE; flags&=~DB_CREATE;
int is_db_rdonly = flags & DB_RDONLY; flags&=~DB_RDONLY;
//We support READ_UNCOMMITTED whether or not the flag is provided.
flags&=~DB_READ_UNCOMMITTED;
if (dbtype != DB_UNKNOWN && dbtype != DB_BTREE) return EINVAL;
if (flags & ~DB_THREAD) return EINVAL; // unknown flags
......@@ -2935,7 +2957,7 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data,
if (key->size >= limit || data->size >= limit)
return toku_ydb_do_error(db->dbenv, EINVAL, "The largest key or data item allowed is %d bytes", limit);
}
u_int32_t lock_flags = get_prelocked_flags(flags);
u_int32_t lock_flags = get_prelocked_flags(flags, txn);
flags &= ~lock_flags;
if (flags == DB_YESOVERWRITE) {
......@@ -3191,6 +3213,8 @@ static int toku_db_key_range64(DB* db, DB_TXN* txn __attribute__((__unused__)),
int toku_db_pre_acquire_read_lock(DB *db, DB_TXN *txn, const DBT *key_left, const DBT *val_left, const DBT *key_right, const DBT *val_right) {
HANDLE_PANICKED_DB(db);
if (!db->i->lt || !txn) return EINVAL;
//READ_UNCOMMITTED transactions do not need read locks.
if (txn->i->flags&DB_READ_UNCOMMITTED) return 0;
DB_TXN* txn_anc = toku_txn_ancestor(txn);
int r;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment