Commit f8f97b92 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #293

More implementation of write lock

git-svn-id: file:///svn/tokudb@2005 c7de825b-a66e-492c-adef-691d508d4ae1
parent 764ebe50
...@@ -13,9 +13,8 @@ ...@@ -13,9 +13,8 @@
/* TODO: During integration, create a db panic function to take care of this. /* TODO: During integration, create a db panic function to take care of this.
The panic function will go in ydb.c. The panic function will go in ydb.c.
We may have to return the panic return code DB_RUNRECOVERY. */ We may have to return the panic return code DB_RUNRECOVERY. */
static int __toku_lt_panic(toku_lock_tree *tree, int r) { static int __toku_lt_panic(toku_lock_tree *tree) {
tree->panic(tree->db); return tree->panic(tree->db);
return r;
} }
const unsigned __toku_default_buflen = 2; const unsigned __toku_default_buflen = 2;
...@@ -679,7 +678,7 @@ static void __toku_lt_free_contents(toku_lock_tree* tree, toku_range_tree* rt) { ...@@ -679,7 +678,7 @@ static void __toku_lt_free_contents(toku_lock_tree* tree, toku_range_tree* rt) {
} }
int toku_lt_create(toku_lock_tree** ptree, DB* db, BOOL duplicates, int toku_lt_create(toku_lock_tree** ptree, DB* db, BOOL duplicates,
void (*panic)(DB*), size_t payload_capacity, int (*panic)(DB*), size_t payload_capacity,
int (*compare_fun)(DB*,const DBT*,const DBT*), int (*compare_fun)(DB*,const DBT*,const DBT*),
int (*dup_compare)(DB*,const DBT*,const DBT*), int (*dup_compare)(DB*,const DBT*,const DBT*),
void* (*user_malloc) (size_t), void* (*user_malloc) (size_t),
...@@ -855,8 +854,8 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -855,8 +854,8 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn,
BOOL dominated; BOOL dominated;
toku_range_tree* mainread; toku_range_tree* mainread;
__toku_init_point(&endpoint, tree, key, data); __toku_init_point(&endpoint, tree, key, data);
__toku_init_query(&query, &endpoint, &endpoint); __toku_init_query(&query, &endpoint, &endpoint);
/* if 'K' is dominated by selfwrite('txn') then return success. */ /* if 'K' is dominated by selfwrite('txn') then return success. */
r = __toku_lt_dominated(tree, &query, r = __toku_lt_dominated(tree, &query,
...@@ -865,7 +864,7 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -865,7 +864,7 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn,
/* else if K meets mainread at 'txn2' then return failure */ /* else if K meets mainread at 'txn2' then return failure */
BOOL met; BOOL met;
mainread = tree->mainread; assert(mainread); mainread = tree->mainread; assert(mainread);
r = __toku_lt_meets_peer(tree, &query, mainread, txn, &met); r = __toku_lt_meets_peer(tree, &query, mainread, txn, &met);
if (r!=0) return r; if (r!=0) return r;
if (met) return DB_LOCK_NOTGRANTED; if (met) return DB_LOCK_NOTGRANTED;
...@@ -877,10 +876,7 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -877,10 +876,7 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn,
r = __toku_lt_check_borderwrite_conflict(tree, txn, &query); r = __toku_lt_check_borderwrite_conflict(tree, txn, &query);
if (r!=0) return r; if (r!=0) return r;
/* Now need to copy the memory and insert. */ /* Now need to copy the memory and insert.
assert(FALSE); //Not implemented yet.
/*
No merging required in selfwrite. No merging required in selfwrite.
This is a point, and if merging was possible it would have been This is a point, and if merging was possible it would have been
dominated by selfwrite. dominated by selfwrite.
...@@ -920,53 +916,34 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -920,53 +916,34 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn,
This is a point, and if merging was possible it would have been This is a point, and if merging was possible it would have been
dominated by selfwrite. dominated by selfwrite.
*/ */
BOOL dummy = TRUE; toku_range to_insert;
toku_range to_insert;
__toku_init_insert(&to_insert, &endpoint, &endpoint, txn); __toku_init_insert(&to_insert, &endpoint, &endpoint, txn);
BOOL dummy = TRUE;
r = __toku_lt_alloc_extreme(tree, &to_insert, TRUE, &dummy) r = __toku_lt_alloc_extreme(tree, &to_insert, TRUE, &dummy)
if (0) { if (0) { died1: __toku_p_free(tree, to_insert->left); return r; }
died1:
__toku_p_free(tree, to_insert->left);
return r;
}
if (r!=0) return r; if (r!=0) return r;
toku_range_tree* selfwrite; toku_range_tree* selfwrite;
r = __toku_lt_selfwrite(tree, txn, &selfwrite); r = __toku_lt_selfwrite(tree, txn, &selfwrite);
if (r!=0) goto died1; if (r!=0) goto died1;
assert(selfwrite); assert(selfwrite);
/* TODO: We are inserting here, but maybe this should be later. */
r = toku_rt_insert(selfwrite, &to_insert); r = toku_rt_insert(selfwrite, &to_insert);
if (0) {
died2:
int r2;
r2 = toku_rt_delete(selfwrite, &to_insert);
if (r2!=0) r = __toku_lt_panic(tree, r);
goto died1;
}
if (r!=0) goto died1; if (r!=0) goto died1;
/* Need to update borderwrite. */ /* Need to update borderwrite. */
toku_range_tree* borderwrite = tree->borderwrite; toku_range_tree* borderwrite = tree->borderwrite; assert(borderwrite);
assert(borderwrite);
unsigned numfound; unsigned numfound;
r = toku_rt_find(borderwrite, &query, 1, &tree->buf, &tree->buflen, r = toku_rt_find(borderwrite, &query, 1, &tree->buf, &tree->buflen,
&numfound); &numfound);
/* If find fails, there is no way we can run the algorithm, so we panic! */ if (r!=0) return __toku_lt_panic(tree, r);
if (r!=0) { r = __toku_lt_panic(tree, r); goto died2; }
assert(numfound <= 1); assert(numfound <= 1);
/* No updated needed in borderwrite: we return right away. */ /* No updated needed in borderwrite: we return right away. */
if (numfound == 1 && tree->buf[0].data == txn) return 0; if (numfound == 1 && tree->buf[0].data == txn) return 0;
/* The range we insert in borderwrite may differ (bigger) than the
to_insert=point that we inserted before. We need a new one because
the old one may be needed for error recovery. */
toku_range border_insert;
memcpy(&border_insert, &to_insert, sizeof(toku_range));
/* Find predecessor and successors */ /* Find predecessor and successors */
toku_range pred; toku_range pred;
toku_range succ; toku_range succ;
...@@ -976,44 +953,47 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -976,44 +953,47 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn,
range_tree* rt; range_tree* rt;
rt = numfound == 0 ? borderwrite : rt = numfound == 0 ? borderwrite :
__toku_lt_ifexist_selfwrite(tree, tree->buf[0].data); __toku_lt_ifexist_selfwrite(tree, tree->buf[0].data);
if (!rt) { r = __toku_lt_panic(tree, EINVAL); goto died2; } if (!rt) return __toku_lt_panic(tree);
r = toku_rt_predecessor(rt, to_insert.left, &pred, &found_p); r = toku_rt_predecessor(rt, to_insert.left, &pred, &found_p);
if (r!=0) { r = __toku_lt_panic(tree, r); goto died2; } if (r!=0) return __toku_lt_panic(tree);
r = toku_rt_successor (rt, to_insert.right, &succ, &found_s); r = toku_rt_successor (rt, to_insert.right, &succ, &found_s);
if (r!=0) { r = __toku_lt_panic(tree, r); goto died2; } if (r!=0) return __toku_lt_panic(tree);
if (found_p && found_s && pred.data == succ.data) { if (found_p && found_s && pred.data == succ.data) {
r = __toku_lt_panic(tree, EINVAL); goto died2; } return __toku_lt_panic(tree); }
if (numfound == 0) { if (numfound == 0) {
if (found_p && pred.data == txn) { if (found_p && pred.data == txn) {
r = toku_rt_delete(borderwrite, &pred); r = toku_rt_delete(borderwrite, &pred);
if (r!=0) { r = __toku_lt_panic(tree, r); goto died2; } if (r!=0) return __toku_lt_panic(tree);
border_insert.left = pred.left; to_insert.left = pred.left;
} }
else if (found_s && succ.data == txn) { else if (found_s && succ.data == txn) {
r = toku_rt_delete(borderwrite, &succ); r = toku_rt_delete(borderwrite, &succ);
if (r!=0) { r = __toku_lt_panic(tree, r); goto died2; } if (r!=0) return __toku_lt_panic(tree);
border_insert.right = succ.right; to_insert.right = succ.right;
} }
} }
else { else {
assert(tree->buf[0].data != txn); assert(tree->buf[0].data != txn);
if (!found_s || !found_p) { if (!found_s || !found_p) return __toku_lt_panic(tree);
r = __toku_lt_panic(tree, EINVAL); goto died2; }
r = toku_rt_delete(borderwrite, &tree->buf[0]); r = toku_rt_delete(borderwrite, &tree->buf[0]);
if (r!=0) return __toku_lt_panic(tree, r); if (r!=0) return __toku_lt_panic(tree);
pred.right = tree->buf[0].right;
succ.left = tree->buf[0].left; pred.left = tree->buf[0].left;
succ.right = tree->buf[0].right;
if (tree->end_cmp(pred.left, pred.right) > 0 ||
tree->end_cmp(succ.left, succ.right) > 0) {
return __toku_lt_panic(tree);}
r = toku_rt_insert(borderwrite, &pred); r = toku_rt_insert(borderwrite, &pred);
if (r!=0) return __toku_lt_panic(tree, r); if (r!=0) return __toku_lt_panic(tree);
r = toku_rt_insert(borderwrite, &succ); r = toku_rt_insert(borderwrite, &succ);
if (r!=0) return __toku_lt_panic(tree, r); if (r!=0) return __toku_lt_panic(tree);
}
if (numfound == 0 || tree->buf[0].data != txn) {
r = toku_rt_insert(borderwrite, &to_insert);
if (r!=0) return __toku_lt_panic(tree, r);
} }
r = toku_rt_insert(borderwrite, &to_insert);
if (r!=0) return __toku_lt_panic(tree);
return 0; return 0;
} }
...@@ -1027,6 +1007,8 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -1027,6 +1007,8 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn,
__toku_lt_is_infinite(key_left)) return EINVAL; __toku_lt_is_infinite(key_left)) return EINVAL;
if (tree->duplicates && key_right != data_right && if (tree->duplicates && key_right != data_right &&
__toku_lt_is_infinite(key_right)) return EINVAL; __toku_lt_is_infinite(key_right)) return EINVAL;
/* Verify that NULL keys have payload and size that are mutually
consistent*/
__toku_lt_verify_null_key(key_left); __toku_lt_verify_null_key(key_left);
__toku_lt_verify_null_key(data_left); __toku_lt_verify_null_key(data_left);
__toku_lt_verify_null_key(key_right); __toku_lt_verify_null_key(key_right);
......
...@@ -73,7 +73,7 @@ typedef struct { ...@@ -73,7 +73,7 @@ typedef struct {
/** The data compare function */ /** The data compare function */
int (*dup_compare)(DB*,const DBT*,const DBT*); int (*dup_compare)(DB*,const DBT*,const DBT*);
/** The panic function */ /** The panic function */
void (*panic)(DB*); int (*panic)(DB*);
/** The user malloc function */ /** The user malloc function */
void* (*malloc) (size_t); void* (*malloc) (size_t);
/** The user free function */ /** The user free function */
...@@ -146,7 +146,7 @@ int toku_lt_point_cmp(void* a, void* b); ...@@ -146,7 +146,7 @@ int toku_lt_point_cmp(void* a, void* b);
instead. instead.
*/ */
int toku_lt_create(toku_lock_tree** ptree, DB* db, BOOL duplicates, int toku_lt_create(toku_lock_tree** ptree, DB* db, BOOL duplicates,
void(*panic)(DB*), size_t payload_capacity, int (*panic)(DB*), size_t payload_capacity,
int (*compare_fun)(DB*,const DBT*,const DBT*), int (*compare_fun)(DB*,const DBT*,const DBT*),
int (*dup_compare)(DB*,const DBT*,const DBT*), int (*dup_compare)(DB*,const DBT*,const DBT*),
void* (*user_malloc) (size_t), void* (*user_malloc) (size_t),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment