Commit 4e5d0d1d authored by Yoni Fogel's avatar Yoni Fogel

Closes #904

Skip lock tree overhead (marshalling and calling)
when appropriate due to DB_PRELOCKED/DB_PRELOCKED_WRITE

git-svn-id: file:///svn/tokudb@4469 c7de825b-a66e-492c-adef-691d508d4ae1
parent 8f6c7794
......@@ -9,7 +9,8 @@
#include <unistd.h>
const char *pname;
int verify_lwc=0, lwc=0, hwc=1, prelock=0;
int verify_lwc=0, lwc=0, hwc=1, prelock=0, prelockflag=0;
u_int32_t lock_flag = DB_PRELOCKED;
void parse_args (int argc, const char *argv[]) {
......@@ -21,8 +22,10 @@ void parse_args (int argc, const char *argv[]) {
else if (strcmp(*argv, "--lwc")==0) lwc=1;
else if (strcmp(*argv, "--nohwc")==0) hwc=0;
else if (strcmp(*argv, "--prelock")==0) prelock=1;
else if (strcmp(*argv, "--prelockflag")==0) prelockflag=1;
else if (strcmp(*argv, "--prelockwriteflag")==0) { prelockflag=1; lock_flag = DB_PRELOCKED_WRITE; }
else {
printf("Usage:\n%s [--verify-lwc] [--lwc] [--nohwc]\n", pname);
printf("Usage:\n%s [--verify-lwc] [--lwc] [--nohwc] [--prelock] [--prelockflag] [--prelockwriteflag]\n", pname);
exit(1);
}
argc--;
......@@ -97,7 +100,11 @@ void scanscan (void) {
r = db->cursor(db, tid, &dbc, 0); assert(r==0);
memset(&k, 0, sizeof(k));
memset(&v, 0, sizeof(v));
while (0 == (r = dbc->c_get(dbc, &k, &v, DB_NEXT))) {
u_int32_t c_get_flags = DB_NEXT;
if (prelockflag && (counter || prelock)) {
c_get_flags |= lock_flag;
}
while (0 == (r = dbc->c_get(dbc, &k, &v, c_get_flags))) {
totalbytes += k.size + v.size;
rowcounter++;
}
......@@ -126,7 +133,11 @@ void scanscan_lwc (void) {
double prevtime = gettime();
DBC *dbc;
r = db->cursor(db, tid, &dbc, 0); assert(r==0);
while (0 == (r = dbc->c_getf_next(dbc, 0, counttotalbytes, &e)));
u_int32_t f_flags = 0;
if (prelockflag && (counter || prelock)) {
f_flags |= lock_flag;
}
while (0 == (r = dbc->c_getf_next(dbc, f_flags, counttotalbytes, &e)));
r = dbc->c_close(dbc); assert(r==0);
double thistime = gettime();
double tdiff = thistime-prevtime;
......@@ -165,10 +176,16 @@ void scanscan_verify (void) {
r = db->cursor(db, tid, &dbc2, 0); assert(r==0);
memset(&v.k, 0, sizeof(v.k));
memset(&v.v, 0, sizeof(v.v));
u_int32_t f_flags = 0;
u_int32_t c_get_flags = DB_NEXT;
if (prelockflag && (counter || prelock)) {
f_flags |= lock_flag;
c_get_flags |= lock_flag;
}
while (1) {
int r1,r2;
r2 = dbc1->c_get(dbc1, &v.k, &v.v, DB_NEXT);
r1 = dbc2->c_getf_next(dbc2, 0, checkbytes, &v);
r2 = dbc1->c_get(dbc1, &v.k, &v.v, c_get_flags);
r1 = dbc2->c_getf_next(dbc2, f_flags, checkbytes, &v);
assert(r1==r2);
if (r1) break;
}
......
......@@ -1055,17 +1055,7 @@ static int verify_secondary_key(DB *secondary, DBT *pkey, DBT *data, DBT *skey)
//Get the main portion of a cursor flag (excluding the bitwise or'd components).
static int get_main_cursor_flag(u_int32_t flag) {
#ifdef DB_READ_UNCOMMITTED
flag &= ~DB_READ_UNCOMMITTED;
#endif
#ifdef DB_MULTIPLE
flag &= ~DB_MULTIPLE;
#endif
#ifdef DB_MULTIPLE_KEY
flag &= ~DB_MULTIPLE_KEY;
#endif
flag &= ~DB_RMW;
return flag;
return flag & DB_OPFLAGS_MASK;
}
static inline BOOL toku_c_uninitialized(DBC* c) {
......@@ -1118,6 +1108,7 @@ typedef struct {
DBT tmp_dat; // Temporary data val to protect out param
u_int32_t flag; // The c_get flag
u_int32_t op; // The operation portion of the c_get flag
u_int32_t lock_flags; // The prelock flags.
BOOL cursor_is_write; // Whether op can change position of cursor
BOOL cursor_was_saved; // Whether we saved the cursor yet.
BOOL key_is_read;
......@@ -1133,6 +1124,10 @@ typedef struct {
BOOL tmp_dat_malloced;
} C_GET_VARS;
static inline u_int32_t get_prelocked_flags(u_int32_t flags) {
return flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE);
}
static void toku_c_get_fix_flags(C_GET_VARS* g) {
g->op = get_main_cursor_flag(g->flag);
......@@ -1159,6 +1154,8 @@ static void toku_c_get_fix_flags(C_GET_VARS* g) {
default:
break;
}
g->lock_flags = get_prelocked_flags(g->flag);
g->flag &= ~g->lock_flags;
}
static inline void toku_c_pget_fix_flags(C_GET_VARS* g) {
......@@ -1168,7 +1165,6 @@ static inline void toku_c_pget_fix_flags(C_GET_VARS* g) {
static int toku_c_get_pre_acquire_lock_if_possible(C_GET_VARS* g, DBT* key, DBT* data) {
int r = ENOSYS;
toku_lock_tree* lt = g->db->i->lt;
if (!lt) { r = 0; goto cleanup; }
/* We know what to lock ahead of time. */
if (g->op == DB_GET_BOTH ||
......@@ -1476,6 +1472,13 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * val, u_int32_t flag)
/* Standardize the op flag. */
toku_c_get_fix_flags(&g);
TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL;
if (!g.db->i->lt || g.lock_flags) {
r = toku_brt_cursor_get(c->i->c, key, val, g.flag, txn);
goto cleanup;
}
/* If we know what to lock before the cursor op, lock now. */
if ((r = toku_c_get_pre_acquire_lock_if_possible(&g, key, val))) goto cleanup;
/* Determine whether the key and val parameters are read, write,
......@@ -1486,7 +1489,6 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * val, u_int32_t flag)
/* Save key and value to temporary local versions. */
if ((r = toku_c_get_save_inputs(&g, key, val))) goto cleanup;
/* Run the cursor operation on the brt. */
TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL;
r_cursor_op = r = toku_brt_cursor_get(c->i->c, &g.tmp_key, &g.tmp_val, g.flag, txn);
/* Only DB_CURRENT should possibly retun DB_KEYEMPTY,
* and DB_CURRENT requires no locking. */
......@@ -1528,8 +1530,11 @@ static int toku_c_del_noassociate(DBC * c, u_int32_t flags) {
HANDLE_PANICKED_DB(db);
if (toku_c_uninitialized(c)) return EINVAL;
u_int32_t lock_flags = get_prelocked_flags(flags);
flags &= ~lock_flags;
int r;
if (db->i->lt) {
if (db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE)) {
DBT saved_key;
DBT saved_data;
r = toku_c_get_current_unconditional(c, &saved_key, &saved_data);
......@@ -1606,7 +1611,7 @@ static int toku_c_pget(DBC * c, DBT *key, DBT *pkey, DBT *data, u_int32_t flag)
/* Save the inputs. */
if ((r = toku_c_pget_save_inputs(&g, key, pkey, data))) goto cleanup;
if ((r = toku_c_get_noassociate(c, &g.tmp_key, &g.tmp_val, flag))) goto cleanup;
if ((r = toku_c_get_noassociate(c, &g.tmp_key, &g.tmp_val, g.flag))) goto cleanup;
r = toku_db_get(pdb, c->i->txn, &g.tmp_val, &g.tmp_dat, 0);
if (r==DB_NOTFOUND) goto delete_silently_and_retry;
......@@ -1673,9 +1678,11 @@ static int toku_c_getf_next_old(DBC *c, u_int32_t flag, void(*f)(DBT const *key,
DBT key,val;
memset(&key, 0, sizeof(key));
memset(&val, 0, sizeof(val));
flag &= ~DB_PRELOCKED; // Get rid of the prelock flag, because c_get doesn't know about it.
u_int32_t lock_flags = get_prelocked_flags(flag);
flag &= ~lock_flags;
assert(flag==0);
int r = toku_c_get_noassociate(c, &key, &val, DB_NEXT | flag);
int r = toku_c_get_noassociate(c, &key, &val, DB_NEXT | flag | lock_flags);
if (r==0) f(&key, &val, extra);
return r;
}
......@@ -1683,8 +1690,9 @@ static int toku_c_getf_next_old(DBC *c, u_int32_t flag, void(*f)(DBT const *key,
static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT const *data, void *extra), void *extra) {
HANDLE_PANICKED_DB(c->dbp);
//int prelocked = flag & DB_PRELOCKED;
flag &= ~DB_PRELOCKED; // Get rid of the prelock flag, because c_get doesn't know about it.
if (toku_c_uninitialized(c)) return toku_c_getf_next_old(c, flag, f, extra); //return toku_c_getf_first(c, flag, f, extra);
u_int32_t lock_flags = get_prelocked_flags(flag);
flag &= ~lock_flags;
assert(flag==0);
TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL;
DBT key,val;
......@@ -1693,35 +1701,42 @@ static int toku_c_getf_next(DBC *c, u_int32_t flag, void(*f)(DBT const *key, DBT
DBT prevkey,prevval;
memset(&prevkey, 0, sizeof(key)); prevkey.flags = DB_DBT_MALLOC;
memset(&prevval, 0, sizeof(val)); prevval.flags = DB_DBT_MALLOC;
int r = brt_cursor_save_key_val(c->i->c, &prevkey, &prevval);
if (r!=0) goto cleanup;
int r;
DB *db=c->dbp;
toku_lock_tree* lt = db->i->lt;
BOOL do_locking = lt!=NULL && !lock_flags;
if (do_locking) {
r = brt_cursor_save_key_val(c->i->c, &prevkey, &prevval);
if (r!=0) goto cleanup;
}
unsigned int brtflags;
toku_brt_get_flags(db->i->brt, &brtflags);
int c_get_result = toku_brt_cursor_get(c->i->c, &key, &val,
(brtflags & TOKU_DB_DUPSORT) ? DB_NEXT : DB_NEXT_NODUP,
txn);
(brtflags & TOKU_DB_DUPSORT) ? DB_NEXT : DB_NEXT_NODUP,
txn);
if (c_get_result!=0 && c_get_result!=DB_NOTFOUND) { r = c_get_result; goto cleanup; }
int found = c_get_result==0;
toku_lock_tree* lt = db->i->lt;
DB_TXN *txn_anc = toku_txn_ancestor(c->i->txn);
r = toku_txn_add_lt(txn_anc, lt);
if (r!=0) goto cleanup;
r = toku_lt_acquire_range_read_lock(lt, db, toku_txn_get_txnid(txn_anc->i->tokutxn),
&prevkey, &prevval,
found ? &key : toku_lt_infinity,
found ? &val : toku_lt_infinity);
if (r!=0) goto cleanup;
if (do_locking) {
DB_TXN *txn_anc = toku_txn_ancestor(c->i->txn);
r = toku_txn_add_lt(txn_anc, lt);
if (r!=0) goto cleanup;
r = toku_lt_acquire_range_read_lock(lt, db, toku_txn_get_txnid(txn_anc->i->tokutxn),
&prevkey, &prevval,
found ? &key : toku_lt_infinity,
found ? &val : toku_lt_infinity);
if (r!=0) goto cleanup;
}
if (found) {
f(&key, &val, extra);
}
r = c_get_result;
cleanup:
toku_free(prevkey.data);
toku_free(prevval.data);
if (prevkey.data) toku_free(prevkey.data);
if (prevval.data) toku_free(prevval.data);
return r;
}
......@@ -1781,31 +1796,35 @@ static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) {
static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
int r;
u_int32_t lock_flags = get_prelocked_flags(flags);
flags &= ~lock_flags;
if (flags!=0 && flags!=DB_GET_BOTH) return EINVAL;
DBC *dbc;
r = toku_db_cursor(db, txn, &dbc, 0, 1);
if (r!=0) return r;
r = toku_c_get_noassociate(dbc, key, data,
(flags == 0) ? DB_SET : DB_GET_BOTH);
u_int32_t c_get_flags = (flags == 0) ? DB_SET : DB_GET_BOTH;
r = toku_c_get_noassociate(dbc, key, data, c_get_flags | lock_flags);
int r2 = toku_c_close(dbc);
return r ? r : r2;
}
static int toku_db_del_noassociate(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) {
int r;
u_int32_t lock_flags = get_prelocked_flags(flags);
flags &= ~lock_flags;
if (flags!=0 && flags!=DB_DELETE_ANY) return EINVAL;
//DB_DELETE_ANY supresses the BDB DB->del return value indicating that the key was not found prior to the delete
if (!(flags & DB_DELETE_ANY)) {
DBT search_val; memset(&search_val, 0, sizeof search_val);
search_val.flags = DB_DBT_MALLOC;
r = toku_db_get_noassociate(db, txn, key, &search_val, 0);
r = toku_db_get_noassociate(db, txn, key, &search_val, lock_flags);
if (r != 0)
return r;
toku_free(search_val.data);
}
//Do the actual deleting.
if (db->i->lt) {
if (db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE)) {
DB_TXN* txn_anc = toku_txn_ancestor(txn);
r = toku_txn_add_lt(txn_anc, db->i->lt);
if (r!=0) { return r; }
......@@ -2156,13 +2175,16 @@ static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t
if ((db->i->open_flags & DB_THREAD) && db_thread_need_flags(data))
return EINVAL;
u_int32_t lock_flags = get_prelocked_flags(flags);
flags &= ~lock_flags;
if (flags != 0 && flags != DB_GET_BOTH) return EINVAL;
// We aren't ready to handle flags such as DB_READ_COMMITTED or DB_READ_UNCOMMITTED or DB_RMW
DBC *dbc;
r = toku_db_cursor(db, txn, &dbc, 0, 1);
if (r!=0) return r;
r = toku_c_get(dbc, key, data, (flags == 0) ? DB_SET : DB_GET_BOTH);
u_int32_t c_get_flags = (flags == 0) ? DB_SET : DB_GET_BOTH;
r = toku_c_get(dbc, key, data, c_get_flags | lock_flags);
int r2 = toku_c_close(dbc);
return r ? r : r2;
}
......@@ -2434,6 +2456,8 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data,
if (key->size >= limit || data->size >= limit)
return toku_ydb_do_error(db->dbenv, EINVAL, "The largest key or data item allowed is %d bytes", limit);
}
u_int32_t lock_flags = get_prelocked_flags(flags);
flags &= ~lock_flags;
if (flags == DB_YESOVERWRITE) {
/* tokudb does insert or replace */
......@@ -2443,7 +2467,7 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data,
DBT testfordata;
memset(&testfordata, 0, sizeof(testfordata));
testfordata.flags = DB_DBT_MALLOC;
r = toku_db_get_noassociate(db, txn, key, &testfordata, 0);
r = toku_db_get_noassociate(db, txn, key, &testfordata, lock_flags);
if (r == 0) {
if (testfordata.data) toku_free(testfordata.data);
return DB_KEYEXIST;
......@@ -2456,7 +2480,7 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data,
assert(flags == 0);
if (brtflags & TOKU_DB_DUPSORT) {
#if TDB_EQ_BDB
r = toku_db_get_noassociate(db, txn, key, data, DB_GET_BOTH);
r = toku_db_get_noassociate(db, txn, key, data, DB_GET_BOTH | lock_flags);
if (r == 0)
return DB_KEYEXIST;
if (r != DB_NOTFOUND) return r;
......@@ -2465,7 +2489,7 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data,
#endif
}
}
if (db->i->lt) {
if (db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE)) {
DB_TXN* txn_anc = toku_txn_ancestor(txn);
r = toku_txn_add_lt(txn_anc, db->i->lt);
if (r!=0) { return r; }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment