Commit 84e7f1bb authored by Yoni Fogel's avatar Yoni Fogel

Addresses #293

- Lock tree takes parameter of a db panic function, defers to it in panic
  situations.
- Code reorganization due to code reviews
- DBT consistency is assumed (should be EINVAL at higher level)




git-svn-id: file:///svn/tokudb@1971 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0264be19
...@@ -7,9 +7,10 @@ ...@@ -7,9 +7,10 @@
#include <ydb-internal.h> #include <ydb-internal.h>
#include <brt-internal.h> #include <brt-internal.h>
//TODO: Modify toku_lt_panic to use the panic thingy from DB /* TODO: During integration, create a db panic function to take care of this.
The panic function will go in ydb.c */
static int __toku_lt_panic(toku_lock_tree *tree, int r) { static int __toku_lt_panic(toku_lock_tree *tree, int r) {
tree->panicked = TRUE; tree->panic(tree->db);
return r; return r;
} }
...@@ -27,14 +28,21 @@ static int __toku_infinite_compare(void* a, void* b) { ...@@ -27,14 +28,21 @@ static int __toku_infinite_compare(void* a, void* b) {
if (a == toku_lt_infinity) return 1; if (a == toku_lt_infinity) return 1;
if (b == toku_lt_infinity) return -1; if (b == toku_lt_infinity) return -1;
if (a == toku_lt_neg_infinity) return -1; if (a == toku_lt_neg_infinity) return -1;
if (b == toku_lt_neg_infinity) return 1; assert(b == toku_lt_neg_infinity);
assert(FALSE); return 1;
} }
static BOOL __toku_lt_is_infinite(const void* p) { static BOOL __toku_lt_is_infinite(const void* p) {
return (p == toku_lt_infinity) || (p == toku_lt_neg_infinity); return (p == toku_lt_infinity) || (p == toku_lt_neg_infinity);
} }
/* Verifies that NULL data and size are consistent.
i.e. The size is 0 if and only if the data is NULL. */
static void __toku_lt_verify_null_key(const DBT* key) {
assert(!key || __toku_lt_is_infinite(key) ||
(key->size > 0) == (key->data != NULL));
}
static DBT* __toku_recreate_DBT(DBT* dbt, void* payload, u_int32_t length) { static DBT* __toku_recreate_DBT(DBT* dbt, void* payload, u_int32_t length) {
memset(dbt, 0, sizeof(DBT)); memset(dbt, 0, sizeof(DBT));
dbt->data = payload; dbt->data = payload;
...@@ -107,11 +115,13 @@ static int __toku_payload_copy(toku_lock_tree* tree, ...@@ -107,11 +115,13 @@ static int __toku_payload_copy(toku_lock_tree* tree,
*payload_out = payload_in; *payload_out = payload_in;
*len_out = 0; *len_out = 0;
} }
else if (!len_in || !payload_in) { else if (!len_in) {
assert(!payload_in);
*payload_out = NULL; *payload_out = NULL;
*len_out = 0; *len_out = 0;
} }
else { else {
assert(payload_in);
if (tree->payload_used + len_in > tree->payload_capacity) return ENOMEM; if (tree->payload_used + len_in > tree->payload_capacity) return ENOMEM;
*payload_out = tree->malloc(len_in); *payload_out = tree->malloc(len_in);
if (!*payload_out) return errno; if (!*payload_out) return errno;
...@@ -128,8 +138,8 @@ static int __toku_p_makecopy(toku_lock_tree* tree, void** ppoint) { ...@@ -128,8 +138,8 @@ static int __toku_p_makecopy(toku_lock_tree* tree, void** ppoint) {
toku_point* temp_point = NULL; toku_point* temp_point = NULL;
int r; int r;
if (tree->payload_used + sizeof(toku_point) > if (tree->payload_used + sizeof(toku_point) > tree->payload_capacity) {
tree->payload_capacity) return ENOMEM; return ENOMEM;}
temp_point = (toku_point*)tree->malloc(sizeof(toku_point)); temp_point = (toku_point*)tree->malloc(sizeof(toku_point));
if (0) { if (0) {
died1: died1:
...@@ -138,8 +148,8 @@ static int __toku_p_makecopy(toku_lock_tree* tree, void** ppoint) { ...@@ -138,8 +148,8 @@ static int __toku_p_makecopy(toku_lock_tree* tree, void** ppoint) {
return r; return r;
} }
if (!temp_point) return errno; if (!temp_point) return errno;
memcpy(temp_point, point, sizeof(toku_point));
tree->payload_used += sizeof(toku_point); tree->payload_used += sizeof(toku_point);
memcpy(temp_point, point, sizeof(toku_point));
r = __toku_payload_copy(tree, r = __toku_payload_copy(tree,
&temp_point->key_payload, &temp_point->key_len, &temp_point->key_payload, &temp_point->key_len,
...@@ -375,11 +385,13 @@ static void __toku_payload_from_dbt(void** payload, u_int32_t* len, ...@@ -375,11 +385,13 @@ static void __toku_payload_from_dbt(void** payload, u_int32_t* len,
*payload = (void*)dbt; *payload = (void*)dbt;
*len = 0; *len = 0;
} }
else if (!dbt->data || !dbt->size) { else if (!dbt->size) {
assert(!dbt->data);
*len = 0; *len = 0;
*payload = NULL; *payload = NULL;
} }
else { else {
assert(dbt->data);
*len = dbt->size; *len = dbt->size;
*payload = dbt->data; *payload = dbt->data;
} }
...@@ -491,6 +503,13 @@ static void __toku_lt_delete_overlapping_ranges(toku_lock_tree* tree, ...@@ -491,6 +503,13 @@ static void __toku_lt_delete_overlapping_ranges(toku_lock_tree* tree,
} }
} }
/* Returns whether the point already exists
as an endpoint of the given range. */
static BOOL __toku_lt_p_independent(toku_point* point, toku_range* range) {
assert(point && range);
return point != range->left && point != range->right;
}
static void __toku_lt_free_points(toku_lock_tree* tree, toku_range* to_insert, static void __toku_lt_free_points(toku_lock_tree* tree, toku_range* to_insert,
unsigned numfound) { unsigned numfound) {
assert(tree && to_insert); assert(tree && to_insert);
...@@ -504,12 +523,10 @@ static void __toku_lt_free_points(toku_lock_tree* tree, toku_range* to_insert, ...@@ -504,12 +523,10 @@ static void __toku_lt_free_points(toku_lock_tree* tree, toku_range* to_insert,
*/ */
/* Do not double-free. */ /* Do not double-free. */
if (tree->buf[i].right != tree->buf[i].left && if (tree->buf[i].right != tree->buf[i].left &&
tree->buf[i].right != to_insert->left && __toku_lt_p_independent(tree->buf[i].right, to_insert)) {
tree->buf[i].right != to_insert->right) {
__toku_p_free(tree, tree->buf[i].right); __toku_p_free(tree, tree->buf[i].right);
} }
if (tree->buf[i].left != to_insert->left && if (__toku_lt_p_independent(tree->buf[i].left, to_insert)) {
tree->buf[i].left != to_insert->right) {
__toku_p_free(tree, tree->buf[i].left); __toku_p_free(tree, tree->buf[i].left);
} }
} }
...@@ -630,7 +647,7 @@ static void __toku_lt_free_contents(toku_lock_tree* tree, toku_range_tree* rt) { ...@@ -630,7 +647,7 @@ static void __toku_lt_free_contents(toku_lock_tree* tree, toku_range_tree* rt) {
unsigned numfound; unsigned numfound;
r = toku_rt_find(rt, &query, 0, &tree->buf, &tree->buflen, &numfound); r = toku_rt_find(rt, &query, 0, &tree->buf, &tree->buflen, &numfound);
if (r==0) __toku_lt_free_points(tree, &query, numfound); if (r==0) __toku_lt_free_points(tree, &query, numfound);
else { else if (r==ENOMEM) {
/* /*
To free space the fast way, we need to allocate more space. To free space the fast way, we need to allocate more space.
Since we can't, we free the slow way. Since we can't, we free the slow way.
...@@ -648,19 +665,22 @@ static void __toku_lt_free_contents(toku_lock_tree* tree, toku_range_tree* rt) { ...@@ -648,19 +665,22 @@ static void __toku_lt_free_contents(toku_lock_tree* tree, toku_range_tree* rt) {
__toku_lt_free_points(tree, &query, numfound); __toku_lt_free_points(tree, &query, numfound);
} while (TRUE); } while (TRUE);
} }
else __toku_lt_panic(tree, r);
r = toku_rt_close(rt); r = toku_rt_close(rt);
assert(r == 0); assert(r == 0);
} }
int toku_lt_create(toku_lock_tree** ptree, DB* db, BOOL duplicates, int toku_lt_create(toku_lock_tree** ptree, DB* db, BOOL duplicates,
size_t payload_capacity, void (*panic)(DB*), size_t payload_capacity,
int (*compare_fun)(DB*,const DBT*,const DBT*), int (*compare_fun)(DB*,const DBT*,const DBT*),
int (*dup_compare)(DB*,const DBT*,const DBT*), int (*dup_compare)(DB*,const DBT*,const DBT*),
void* (*user_malloc) (size_t), void* (*user_malloc) (size_t),
void (*user_free) (void*), void (*user_free) (void*),
void* (*user_realloc)(void*, size_t)) { void* (*user_realloc)(void*, size_t)) {
if (!ptree || !db || !compare_fun || !dup_compare || !payload_capacity || if (!ptree || !db || !compare_fun || !dup_compare || !panic ||
!user_malloc || !user_free || !user_realloc) return EINVAL; !payload_capacity || !user_malloc || !user_free || !user_realloc) {
return EINVAL;
}
int r; int r;
toku_lock_tree* temp_tree =(toku_lock_tree*)user_malloc(sizeof(*temp_tree)); toku_lock_tree* temp_tree =(toku_lock_tree*)user_malloc(sizeof(*temp_tree));
...@@ -673,12 +693,13 @@ int toku_lt_create(toku_lock_tree** ptree, DB* db, BOOL duplicates, ...@@ -673,12 +693,13 @@ int toku_lt_create(toku_lock_tree** ptree, DB* db, BOOL duplicates,
memset(temp_tree, 0, sizeof(*temp_tree)); memset(temp_tree, 0, sizeof(*temp_tree));
temp_tree->db = db; temp_tree->db = db;
temp_tree->duplicates = duplicates; temp_tree->duplicates = duplicates;
temp_tree->panic = panic;
temp_tree->payload_capacity = payload_capacity;
temp_tree->compare_fun = compare_fun; temp_tree->compare_fun = compare_fun;
temp_tree->dup_compare = dup_compare; temp_tree->dup_compare = dup_compare;
temp_tree->malloc = user_malloc; temp_tree->malloc = user_malloc;
temp_tree->free = user_free; temp_tree->free = user_free;
temp_tree->realloc = user_realloc; temp_tree->realloc = user_realloc;
temp_tree->payload_capacity = payload_capacity;
r = toku_rt_create(&temp_tree->mainread, r = toku_rt_create(&temp_tree->mainread,
toku_lt_point_cmp, __toku_lt_txn_cmp, TRUE, toku_lt_point_cmp, __toku_lt_txn_cmp, TRUE,
user_malloc, user_free, user_realloc); user_malloc, user_free, user_realloc);
...@@ -745,6 +766,12 @@ int toku_lt_acquire_range_read_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -745,6 +766,12 @@ int toku_lt_acquire_range_read_lock(toku_lock_tree* tree, DB_TXN* txn,
if (tree->duplicates && key_right != data_right && if (tree->duplicates && key_right != data_right &&
__toku_lt_is_infinite(key_right)) return EINVAL; __toku_lt_is_infinite(key_right)) return EINVAL;
/* Verify that NULL keys have payload and size that are mutually
consistent*/
__toku_lt_verify_null_key(key_left);
__toku_lt_verify_null_key(data_left);
__toku_lt_verify_null_key(key_right);
__toku_lt_verify_null_key(data_right);
int r; int r;
toku_point left; toku_point left;
...@@ -755,7 +782,10 @@ int toku_lt_acquire_range_read_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -755,7 +782,10 @@ int toku_lt_acquire_range_read_lock(toku_lock_tree* tree, DB_TXN* txn,
__toku_init_point(&left, tree, key_left, data_left); __toku_init_point(&left, tree, key_left, data_left);
__toku_init_point(&right, tree, key_right, data_right); __toku_init_point(&right, tree, key_right, data_right);
/* Verify left <= right. */ /*
Optimization: if all the pointers are equal, clearly left == right.
Verify left <= right, otherwise return EDOM.
*/
if ((key_left != key_right || data_left != data_right) && if ((key_left != key_right || data_left != data_right) &&
toku_lt_point_cmp(&left, &right) > 0) return EDOM; toku_lt_point_cmp(&left, &right) > 0) return EDOM;
...@@ -805,6 +835,9 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -805,6 +835,9 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn,
if (tree->duplicates && !data) return EINVAL; if (tree->duplicates && !data) return EINVAL;
if (tree->duplicates && key != data && if (tree->duplicates && key != data &&
__toku_lt_is_infinite(key)) return EINVAL; __toku_lt_is_infinite(key)) return EINVAL;
__toku_lt_verify_null_key(key);
__toku_lt_verify_null_key(data);
int r; int r;
toku_point left; toku_point left;
toku_point right; toku_point right;
...@@ -895,7 +928,7 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -895,7 +928,7 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn,
unsigned numfound; unsigned numfound;
r = toku_rt_find(borderwrite, &query, 1, &tree->buf, &tree->buflen, r = toku_rt_find(borderwrite, &query, 1, &tree->buf, &tree->buflen,
&numfound); &numfound);
if (r!=0) __toku_lt_panic(tree, r); if (r!=0) return __toku_lt_panic(tree, r);
assert(numfound == 0 || numfound == 1); assert(numfound == 0 || numfound == 1);
toku_range pred; toku_range pred;
...@@ -959,6 +992,10 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn, ...@@ -959,6 +992,10 @@ int toku_lt_acquire_range_write_lock(toku_lock_tree* tree, DB_TXN* txn,
__toku_lt_is_infinite(key_left)) return EINVAL; __toku_lt_is_infinite(key_left)) return EINVAL;
if (tree->duplicates && key_right != data_right && if (tree->duplicates && key_right != data_right &&
__toku_lt_is_infinite(key_right)) return EINVAL; __toku_lt_is_infinite(key_right)) return EINVAL;
__toku_lt_verify_null_key(key_left);
__toku_lt_verify_null_key(data_left);
__toku_lt_verify_null_key(key_right);
__toku_lt_verify_null_key(data_right);
toku_point left; toku_point left;
toku_point right; toku_point right;
......
...@@ -9,6 +9,11 @@ ...@@ -9,6 +9,11 @@
Lock trees are toku-struct's for granting long-lived locks to transactions. Lock trees are toku-struct's for granting long-lived locks to transactions.
See more details on the design document. See more details on the design document.
TODO: If the various range trees are inconsistent with
each other, due to some system error like failed malloc,
we defer to the db panic handler
TODO: Pass in another parameter to do this.
*/ */
#include <assert.h> #include <assert.h>
...@@ -32,11 +37,6 @@ typedef struct { ...@@ -32,11 +37,6 @@ typedef struct {
the range trees that this lock tree owns */ the range trees that this lock tree owns */
toku_range* buf; toku_range* buf;
unsigned buflen; /**< The length of buf */ unsigned buflen; /**< The length of buf */
/** It is true only if the various range trees are inconsistent with
each other, due to some system error like failed malloc.
If the lock tree implementation panicks, it defers to the db
panic handler */
BOOL panicked;
/** The maximum amount of memory to be used for DBT payloads. */ /** The maximum amount of memory to be used for DBT payloads. */
size_t payload_capacity; size_t payload_capacity;
/** The current amount of memory used for DBT payloads. */ /** The current amount of memory used for DBT payloads. */
...@@ -45,6 +45,8 @@ typedef struct { ...@@ -45,6 +45,8 @@ typedef struct {
int (*compare_fun)(DB*,const DBT*,const DBT*); int (*compare_fun)(DB*,const DBT*,const DBT*);
/** The data compare function */ /** The data compare function */
int (*dup_compare)(DB*,const DBT*,const DBT*); int (*dup_compare)(DB*,const DBT*,const DBT*);
/** The panic function */
void (*panic)(DB*);
/** The user malloc function */ /** The user malloc function */
void* (*malloc) (size_t); void* (*malloc) (size_t);
/** The user free function */ /** The user free function */
...@@ -94,10 +96,11 @@ int toku_lt_point_cmp(void* a, void* b); ...@@ -94,10 +96,11 @@ int toku_lt_point_cmp(void* a, void* b);
\param duplicates Whether the db supports duplicates. \param duplicates Whether the db supports duplicates.
\param compare_fun The key compare function. \param compare_fun The key compare function.
\param dup_compare The data compare function. \param dup_compare The data compare function.
\param panic The function to cause the db to panic. i.e. godzilla_rampage()
\param payload_capacity The maximum amount of memory to use for dbt payloads.
\param user_malloc A user provided malloc(3) function. \param user_malloc A user provided malloc(3) function.
\param user_free A user provided free(3) function. \param user_free A user provided free(3) function.
\param user_realloc A user provided realloc(3) function. \param user_realloc A user provided realloc(3) function.
\param payload_capacity The maximum amount of memory to use for dbt payloads.
\return \return
- 0: Success - 0: Success
...@@ -113,7 +116,7 @@ int toku_lt_point_cmp(void* a, void* b); ...@@ -113,7 +116,7 @@ int toku_lt_point_cmp(void* a, void* b);
instead. instead.
*/ */
int toku_lt_create(toku_lock_tree** ptree, DB* db, BOOL duplicates, int toku_lt_create(toku_lock_tree** ptree, DB* db, BOOL duplicates,
size_t payload_capacity, void(*panic)(DB*), size_t payload_capacity,
int (*compare_fun)(DB*,const DBT*,const DBT*), int (*compare_fun)(DB*,const DBT*,const DBT*),
int (*dup_compare)(DB*,const DBT*,const DBT*), int (*dup_compare)(DB*,const DBT*,const DBT*),
void* (*user_malloc) (size_t), void* (*user_malloc) (size_t),
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment