Commit 595554cf authored by Yoni Fogel's avatar Yoni Fogel

Closes #908

Implements DB->pre_acquire_table_lock
Also optimized lock tree for table write locks

git-svn-id: file:///svn/tokudb@4480 c7de825b-a66e-492c-adef-691d508d4ae1
parent f0d8e3aa
...@@ -16,6 +16,11 @@ ...@@ -16,6 +16,11 @@
#define DB_YESOVERWRITE 0 #define DB_YESOVERWRITE 0
#endif #endif
#if !defined(DB_PRELOCKED_WRITE)
#define NO_DB_PRELOCKED
#define DB_PRELOCKED_WRITE 0
#endif
int verbose=1; int verbose=1;
enum { SERIAL_SPACING = 1<<6 }; enum { SERIAL_SPACING = 1<<6 };
...@@ -32,11 +37,24 @@ long long cachesize = 128*1024*1024; ...@@ -32,11 +37,24 @@ long long cachesize = 128*1024*1024;
int dupflags = 0; int dupflags = 0;
int noserial = 0; // Don't do the serial stuff int noserial = 0; // Don't do the serial stuff
int norandom = 0; // Don't do the random stuff int norandom = 0; // Don't do the random stuff
int prelock = 0;
int prelockflag = 0;
int items_per_transaction = DEFAULT_ITEMS_PER_TRANSACTION; int items_per_transaction = DEFAULT_ITEMS_PER_TRANSACTION;
int items_per_iteration = DEFAULT_ITEMS_TO_INSERT_PER_ITERATION; int items_per_iteration = DEFAULT_ITEMS_TO_INSERT_PER_ITERATION;
int do_transactions = 0; int do_transactions = 0;
int n_insertions_since_txn_began=0; int n_insertions_since_txn_began=0;
int env_open_flags = DB_CREATE|DB_PRIVATE|DB_INIT_MPOOL; int env_open_flags = DB_CREATE|DB_PRIVATE|DB_INIT_MPOOL;
u_int32_t put_flags = DB_YESOVERWRITE;
static void do_prelock(DB* db, DB_TXN* txn) {
if (prelock) {
#if !defined(NO_DB_PRELOCKED)
int r = db->pre_acquire_table_lock(db, txn);
assert(r==0);
#endif
}
}
#define STRINGIFY2(s) #s #define STRINGIFY2(s) #s
#define STRINGIFY(s) STRINGIFY2(s) #define STRINGIFY(s) STRINGIFY2(s)
...@@ -138,13 +156,14 @@ void insert (long long v) { ...@@ -138,13 +156,14 @@ void insert (long long v) {
long_long_to_array(kc, v); long_long_to_array(kc, v);
memset(vc, 0, sizeof vc); memset(vc, 0, sizeof vc);
long_long_to_array(vc, v); long_long_to_array(vc, v);
int r = db->put(db, tid, fill_dbt(&kt, kc, keysize), fill_dbt(&vt, vc, valsize), DB_YESOVERWRITE); int r = db->put(db, tid, fill_dbt(&kt, kc, keysize), fill_dbt(&vt, vc, valsize), put_flags);
CKERR(r); CKERR(r);
if (do_transactions) { if (do_transactions) {
if (n_insertions_since_txn_began>=items_per_transaction) { if (n_insertions_since_txn_began>=items_per_transaction) {
n_insertions_since_txn_began=0; n_insertions_since_txn_began=0;
r = tid->commit(tid, 0); assert(r==0); r = tid->commit(tid, 0); assert(r==0);
r=dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0); r=dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0);
do_prelock(db, tid);
n_insertions_since_txn_began=0; n_insertions_since_txn_began=0;
} }
n_insertions_since_txn_began++; n_insertions_since_txn_began++;
...@@ -155,9 +174,10 @@ void serial_insert_from (long long from) { ...@@ -155,9 +174,10 @@ void serial_insert_from (long long from) {
long long i; long long i;
if (do_transactions) { if (do_transactions) {
int r = dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0); int r = dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0);
do_prelock(db, tid);
{ {
DBT k,v; DBT k,v;
r=db->put(db, tid, fill_dbt(&k, "a", 1), fill_dbt(&v, "b", 1), DB_YESOVERWRITE); r=db->put(db, tid, fill_dbt(&k, "a", 1), fill_dbt(&v, "b", 1), put_flags);
CKERR(r); CKERR(r);
} }
...@@ -179,6 +199,7 @@ void random_insert_below (long long below) { ...@@ -179,6 +199,7 @@ void random_insert_below (long long below) {
long long i; long long i;
if (do_transactions) { if (do_transactions) {
int r = dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0); int r = dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0);
do_prelock(db, tid);
} }
for (i=0; i<items_per_iteration; i++) { for (i=0; i<items_per_iteration; i++) {
insert(llrandom()%below); insert(llrandom()%below);
...@@ -297,10 +318,18 @@ int main (int argc, const char *argv[]) { ...@@ -297,10 +318,18 @@ int main (int argc, const char *argv[]) {
} else if (strcmp(arg, "--env") == 0) { } else if (strcmp(arg, "--env") == 0) {
if (i+1 >= argc) return print_usage(argv[0]); if (i+1 >= argc) return print_usage(argv[0]);
dbdir = argv[++i]; dbdir = argv[++i];
} else if (strcmp(arg, "--prelock") == 0) {
prelock=1;
} else if (strcmp(arg, "--prelockflag") == 0) {
prelock=1;
prelockflag=1;
} else { } else {
return print_usage(argv[0]); return print_usage(argv[0]);
} }
} }
if (do_transactions && prelockflag) {
put_flags |= DB_PRELOCKED_WRITE;
}
if (i<argc) { if (i<argc) {
/* if it looks like a number */ /* if it looks like a number */
char *end; char *end;
......
...@@ -950,7 +950,7 @@ static int free_contents_helper(toku_range* value, void* extra) { ...@@ -950,7 +950,7 @@ static int free_contents_helper(toku_range* value, void* extra) {
toku__lt_free_point (singular) toku__lt_free_point (singular)
*/ */
static inline int toku__lt_free_contents(toku_lock_tree* tree, toku_range_tree* rt, static inline int toku__lt_free_contents(toku_lock_tree* tree, toku_range_tree* rt,
toku_range_tree *rtdel) { toku_range_tree *rtdel, BOOL close) {
assert(tree); assert(tree);
if (!rt) return 0; if (!rt) return 0;
...@@ -967,7 +967,11 @@ static inline int toku__lt_free_contents(toku_lock_tree* tree, toku_range_tree* ...@@ -967,7 +967,11 @@ static inline int toku__lt_free_contents(toku_lock_tree* tree, toku_range_tree*
info.store_value = &tree->buf[0]; info.store_value = &tree->buf[0];
if ((r=toku_rt_iterate(rt, free_contents_helper, &info))) return r; if ((r=toku_rt_iterate(rt, free_contents_helper, &info))) return r;
r = toku_rt_close(rt); if (close) r = toku_rt_close(rt);
else {
r = 0;
toku_rt_clear(rt);
}
assert(r == 0); assert(r == 0);
return r; return r;
} }
...@@ -1349,9 +1353,9 @@ int toku_lt_close(toku_lock_tree* tree) { ...@@ -1349,9 +1353,9 @@ int toku_lt_close(toku_lock_tree* tree) {
rt_forest* forest; rt_forest* forest;
while ((forest = toku_rth_next(tree->rth)) != NULL) { while ((forest = toku_rth_next(tree->rth)) != NULL) {
r = toku__lt_free_contents(tree, forest->self_read, NULL); r = toku__lt_free_contents(tree, forest->self_read, NULL, TRUE);
if (!first_error && r!=0) { first_error = r; } if (!first_error && r!=0) { first_error = r; }
r = toku__lt_free_contents(tree, forest->self_write, NULL); r = toku__lt_free_contents(tree, forest->self_write, NULL, TRUE);
if (!first_error && r!=0) { first_error = r; } if (!first_error && r!=0) { first_error = r; }
} }
toku_rth_close(tree->rth); toku_rth_close(tree->rth);
...@@ -1402,6 +1406,10 @@ static int toku__lt_try_acquire_range_read_lock(toku_lock_tree* tree, ...@@ -1402,6 +1406,10 @@ static int toku__lt_try_acquire_range_read_lock(toku_lock_tree* tree,
add 'K' to selfread('txn') and selfwrite('txn'). add 'K' to selfread('txn') and selfwrite('txn').
This requires merging.. see below. This requires merging.. see below.
*/ */
if (tree->table_is_locked) {
r = (txn==tree->table_lock_owner) ? 0 : DB_LOCK_NOTGRANTED;
goto cleanup;
}
/* if 'K' is dominated by selfwrite('txn') then return success. */ /* if 'K' is dominated by selfwrite('txn') then return success. */
r = toku__lt_rt_dominates(tree, &query, r = toku__lt_rt_dominates(tree, &query,
...@@ -1475,6 +1483,51 @@ cleanup: ...@@ -1475,6 +1483,51 @@ cleanup:
return r; return r;
} }
static int lt_global_lock(toku_lock_tree* tree, TXNID txn) {
int r;
assert(!tree->table_is_locked);
//Create the self write table if it does not exist.
//This saves the fact that txn is still locked.
toku_range_tree* selfwrite;
if ((r = toku__lt_selfwrite(tree, txn, &selfwrite))) return r;
//Clear out the borderwrite, selfwrite, selfread, and mainread tables.
//The selfread and selfwrite tables also need to free memory.
toku_rt_clear(tree->borderwrite);
//Errors at this point on are panics.
#if !defined(TOKU_RT_NOOVERLAPS)
toku_rt_clear(tree->mainread);
#endif
u_int32_t ranges;
r = toku_rt_get_size(selfwrite, &ranges);
if ((r = toku__lt_free_contents(tree, selfwrite, NULL, FALSE))) {
r = toku__lt_panic(tree, r);
goto cleanup;
}
toku_range_tree* selfread = toku__lt_ifexist_selfread(tree, txn);
if (selfread) {
u_int32_t size;
r = toku_rt_get_size(selfread, &size);
assert(r==0);
ranges += size;
if ((r = toku__lt_free_contents(tree, selfread, NULL, FALSE))) {
r = toku__lt_panic(tree, r);
goto cleanup;
}
}
toku__lt_lock_decr_per_db(tree, ranges);
tree->table_lock_owner = txn;
tree->table_is_locked = TRUE;
r = 0;
cleanup:
return r;
}
/* */ /* */
static inline int toku__escalate_writes_from_border_range(toku_lock_tree* tree, static inline int toku__escalate_writes_from_border_range(toku_lock_tree* tree,
toku_range* border_range) { toku_range* border_range) {
...@@ -1493,30 +1546,39 @@ static inline int toku__escalate_writes_from_border_range(toku_lock_tree* tree, ...@@ -1493,30 +1546,39 @@ static inline int toku__escalate_writes_from_border_range(toku_lock_tree* tree,
if (r != 0) { goto cleanup; } if (r != 0) { goto cleanup; }
/* Need at least two entries for this to actually help. */ /* Need at least two entries for this to actually help. */
if (numfound < 2) { goto cleanup; } if (numfound < 2) { goto cleanup; }
u_int32_t i;
for (i = 0; i < numfound; i++) {
r = toku_rt_delete(self_write, &tree->buf[i]);
if (r != 0) { r = toku__lt_panic(tree, r); goto cleanup; }
/*
* Clean up memory that is not referenced by border_range.
*/
if (tree->buf[i].ends.left != tree->buf[i].ends.right &&
toku__lt_p_independent(tree->buf[i].ends.left, &border_range->ends)) {
/* Do not double free if left and right are same point. */
toku__p_free(tree, tree->buf[i].ends.left);
}
if (toku__lt_p_independent(tree->buf[i].ends.right, &border_range->ends)) {
toku__p_free(tree, tree->buf[i].ends.right);
}
}
/* /*
* Insert border_range into self_write table * Insert border_range into self_write table
*/ */
r = toku_rt_insert(self_write, border_range); if (border_range->ends.left->key_payload==toku_lt_neg_infinity &&
if (r != 0) { r = toku__lt_panic(tree, r); goto cleanup; } border_range->ends.right->key_payload==toku_lt_infinity) {
//Lock Entire Table
if ((r = lt_global_lock(tree, txn))) goto cleanup;
}
else {
u_int32_t i;
for (i = 0; i < numfound; i++) {
r = toku_rt_delete(self_write, &tree->buf[i]);
if (r != 0) { r = toku__lt_panic(tree, r); goto cleanup; }
/*
* Clean up memory that is not referenced by border_range.
*/
if (tree->buf[i].ends.left != tree->buf[i].ends.right &&
toku__lt_p_independent(tree->buf[i].ends.left, &border_range->ends)) {
/* Do not double free if left and right are same point. */
toku__p_free(tree, tree->buf[i].ends.left);
}
if (toku__lt_p_independent(tree->buf[i].ends.right, &border_range->ends)) {
toku__p_free(tree, tree->buf[i].ends.right);
}
}
//Insert escalated range.
r = toku_rt_insert(self_write, border_range);
if (r != 0) { r = toku__lt_panic(tree, r); goto cleanup; }
toku__lt_lock_incr_per_db(tree, numfound);
}
toku__lt_lock_incr_per_db(tree, numfound);
r = 0; r = 0;
cleanup: cleanup:
return r; return r;
...@@ -1754,7 +1816,10 @@ static int toku__lt_try_acquire_write_lock(toku_lock_tree* tree, ...@@ -1754,7 +1816,10 @@ static int toku__lt_try_acquire_write_lock(toku_lock_tree* tree,
&query, out_of_locks); &query, out_of_locks);
if (r!=0) { goto cleanup; } if (r!=0) { goto cleanup; }
*out_of_locks = FALSE; if (tree->table_is_locked) {
r = (txn==tree->table_lock_owner) ? 0 : DB_LOCK_NOTGRANTED;
goto cleanup;
}
/* if 'K' is dominated by selfwrite('txn') then return success. */ /* if 'K' is dominated by selfwrite('txn') then return success. */
r = toku__lt_rt_dominates(tree, &query, r = toku__lt_rt_dominates(tree, &query,
toku__lt_ifexist_selfwrite(tree, txn), &dominated); toku__lt_ifexist_selfwrite(tree, txn), &dominated);
...@@ -1862,14 +1927,28 @@ static int toku__lt_try_acquire_range_write_lock(toku_lock_tree* tree, ...@@ -1862,14 +1927,28 @@ static int toku__lt_try_acquire_range_write_lock(toku_lock_tree* tree,
&left, &right, &left, &right,
&query, out_of_locks); &query, out_of_locks);
if (r!=0) { goto cleanup; } if (r!=0) { goto cleanup; }
*out_of_locks = FALSE;
//We are not ready for this. if (tree->table_is_locked) {
//Not needed for Feb 1 release. r = (txn==tree->table_lock_owner) ? 0 : DB_LOCK_NOTGRANTED;
r = ENOSYS; goto cleanup;
}
if (key_left!=toku_lt_neg_infinity || key_right!=toku_lt_infinity) {
//We are not ready for this.
//Not needed for Feb 1 release.
r=ENOSYS;
goto cleanup;
}
// Acquire table write lock.
//If there are any other writes, we fail.
if ((r = toku__lt_check_borderwrite_conflict(tree, txn, &query))) goto cleanup;
//If there are any other reads, we fail.
if ((r = toku__lt_write_point_conflicts_reads(tree, txn, &query))) goto cleanup;
if ((r = lt_global_lock(tree, txn))) goto cleanup;
r = 0;
cleanup: cleanup:
if (tree) { toku__lt_postprocess(tree); } if (tree) { toku__lt_postprocess(tree); }
return r; return r;
} }
...@@ -2032,7 +2111,7 @@ static inline int toku__lt_unlock_txn(toku_lock_tree* tree, TXNID txn) { ...@@ -2032,7 +2111,7 @@ static inline int toku__lt_unlock_txn(toku_lock_tree* tree, TXNID txn) {
r = toku_rt_get_size(selfread, &size); r = toku_rt_get_size(selfread, &size);
assert(r==0); assert(r==0);
ranges += size; ranges += size;
r = toku__lt_free_contents(tree, selfread, tree->mainread); r = toku__lt_free_contents(tree, selfread, tree->mainread, TRUE);
if (r!=0) return toku__lt_panic(tree, r); if (r!=0) return toku__lt_panic(tree, r);
} }
...@@ -2043,9 +2122,10 @@ static inline int toku__lt_unlock_txn(toku_lock_tree* tree, TXNID txn) { ...@@ -2043,9 +2122,10 @@ static inline int toku__lt_unlock_txn(toku_lock_tree* tree, TXNID txn) {
ranges += size; ranges += size;
r = toku__lt_border_delete(tree, selfwrite); r = toku__lt_border_delete(tree, selfwrite);
if (r!=0) return toku__lt_panic(tree, r); if (r!=0) return toku__lt_panic(tree, r);
r = toku__lt_free_contents(tree, selfwrite, NULL); r = toku__lt_free_contents(tree, selfwrite, NULL, TRUE);
if (r!=0) return toku__lt_panic(tree, r); if (r!=0) return toku__lt_panic(tree, r);
} }
if (tree->table_lock_owner==txn) tree->table_is_locked = FALSE;
if (selfread || selfwrite) toku_rth_delete(tree->rth, txn); if (selfread || selfwrite) toku_rth_delete(tree->rth, txn);
...@@ -2086,14 +2166,14 @@ static inline void toku__lt_clear(toku_lock_tree* tree) { ...@@ -2086,14 +2166,14 @@ static inline void toku__lt_clear(toku_lock_tree* tree) {
r = toku_rt_get_size(forest->self_read, &size); r = toku_rt_get_size(forest->self_read, &size);
assert(r==0); assert(r==0);
ranges += size; ranges += size;
r = toku__lt_free_contents(tree, forest->self_read, NULL); r = toku__lt_free_contents(tree, forest->self_read, NULL, TRUE);
assert(r==0); assert(r==0);
} }
if (forest->self_write) { if (forest->self_write) {
r = toku_rt_get_size(forest->self_write, &size); r = toku_rt_get_size(forest->self_write, &size);
assert(r==0); assert(r==0);
ranges += size; ranges += size;
r = toku__lt_free_contents(tree, forest->self_write, NULL); r = toku__lt_free_contents(tree, forest->self_write, NULL, TRUE);
assert(r==0); assert(r==0);
} }
...@@ -2102,6 +2182,7 @@ static inline void toku__lt_clear(toku_lock_tree* tree) { ...@@ -2102,6 +2182,7 @@ static inline void toku__lt_clear(toku_lock_tree* tree) {
toku_rth_clear(tree->txns_to_unlock); toku_rth_clear(tree->txns_to_unlock);
/* tree->txns_still_locked is already empty, so we do not clear it. */ /* tree->txns_still_locked is already empty, so we do not clear it. */
toku__lt_lock_decr_per_db(tree, ranges); toku__lt_lock_decr_per_db(tree, ranges);
tree->table_is_locked = FALSE;
} }
int toku_lt_unlock(toku_lock_tree* tree, TXNID txn) { int toku_lt_unlock(toku_lock_tree* tree, TXNID txn) {
......
...@@ -122,6 +122,8 @@ struct __toku_lock_tree { ...@@ -122,6 +122,8 @@ struct __toku_lock_tree {
u_int32_t ref_count; u_int32_t ref_count;
/** db_id associated with the lock tree */ /** db_id associated with the lock tree */
toku_db_id* db_id; toku_db_id* db_id;
TXNID table_lock_owner;
BOOL table_is_locked;
}; };
struct __toku_ltm { struct __toku_ltm {
......
...@@ -2721,6 +2721,20 @@ int toku_db_pre_acquire_read_lock(DB *db, DB_TXN *txn, const DBT *key_left, cons ...@@ -2721,6 +2721,20 @@ int toku_db_pre_acquire_read_lock(DB *db, DB_TXN *txn, const DBT *key_left, cons
return r; return r;
} }
int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
HANDLE_PANICKED_DB(db);
if (!db->i->lt || !txn) return EINVAL;
DB_TXN* txn_anc = toku_txn_ancestor(txn);
int r;
if ((r=toku_txn_add_lt(txn_anc, db->i->lt))) return r;
TXNID id_anc = toku_txn_get_txnid(txn_anc->i->tokutxn);
r = toku_lt_acquire_range_write_lock(db->i->lt, db, id_anc,
toku_lt_neg_infinity, toku_lt_neg_infinity,
toku_lt_infinity, toku_lt_infinity);
return r;
}
//TODO: DB_AUTO_COMMIT. //TODO: DB_AUTO_COMMIT.
//TODO: Nowait only conditionally? //TODO: Nowait only conditionally?
...@@ -2811,6 +2825,13 @@ int locked_db_pre_acquire_read_lock(DB *db, DB_TXN *txn, const DBT *key_left, co ...@@ -2811,6 +2825,13 @@ int locked_db_pre_acquire_read_lock(DB *db, DB_TXN *txn, const DBT *key_left, co
return r; return r;
} }
int locked_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
toku_ydb_lock();
int r = toku_db_pre_acquire_table_lock(db, txn);
toku_ydb_unlock();
return r;
}
static inline int autotxn_db_pget(DB* db, DB_TXN* txn, DBT* key, DBT* pkey, static inline int autotxn_db_pget(DB* db, DB_TXN* txn, DBT* key, DBT* pkey,
DBT* data, u_int32_t flags) { DBT* data, u_int32_t flags) {
BOOL changed; int r; BOOL changed; int r;
...@@ -2953,6 +2974,7 @@ static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) { ...@@ -2953,6 +2974,7 @@ static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
// SDB(stat); // SDB(stat);
SDB(fd); SDB(fd);
SDB(pre_acquire_read_lock); SDB(pre_acquire_read_lock);
SDB(pre_acquire_table_lock);
#undef SDB #undef SDB
result->dbt_pos_infty = toku_db_dbt_pos_infty; result->dbt_pos_infty = toku_db_dbt_pos_infty;
result->dbt_neg_infty = toku_db_dbt_neg_infty; result->dbt_neg_infty = toku_db_dbt_neg_infty;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment