Commit 6b8ee583 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #337

Addresses #293
Addresses #307
Adds range count to limit lock system memory from the dbenv, and correspondingly changes range trees and test files

git-svn-id: file:///svn/tokudb@2103 c7de825b-a66e-492c-adef-691d508d4ae1
parent aa9e2e14
......@@ -5,8 +5,8 @@ LIBNAME=libdb
OPTFLAGS = -O2
# GCOV_FLAGS = -fprofile-arcs -ftest-coverage
CFLAGS = -W -Wall -Werror -g -fPIC $(OPTFLAGS) $(GCOV_FLAGS)
CPPFLAGS = -I../include -I../newbrt
CFLAGS = -W -Wall -Werror -g3 -ggdb3 -fPIC $(OPTFLAGS) $(GCOV_FLAGS)
CPPFLAGS = -I../include -I../newbrt -I./lock_tree/ -I./range_tree/
CPPFLAGS += -D_GNU_SOURCE -D_THREAD_SAFE -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE
......@@ -20,13 +20,19 @@ SHARED=-shared
endif
.PHONY: install logformat
install: logformat $(LIBNAME).$(LIBEXT) $(LIBNAME).a
install: logformat rangetree locktree $(LIBNAME).$(LIBEXT) $(LIBNAME).a
cp $(LIBNAME).$(LIBEXT) ../lib/
cp $(LIBNAME).a ../lib
logformat:
(cd ../newbrt;make)
locktree:
cd lock_tree;make OSX=$(OSX) OPT=$(OPT) GCOV=$(GCOV)
rangetree:
cd range_tree;make OSX=$(OSX) OPT=$(OPT) GCOV=$(GCOV)
check: $(LIBNAME).$(LIBEXT)
python tokuglobals.py $(LIBNAME).$(LIBEXT)
......@@ -36,9 +42,16 @@ strip: $(LIBNAME).$(LIBEXT)
clean:
rm -rf $(LIBNAME).$(LIBEXT) $(LIBNAME).a *.o *.gcno *.gcda *.gcov
cd tests;make clean
cd lock_tree;make clean
cd range_tree;make clean
ydb.o: ../include/db.h ../newbrt/cachetable.h ../newbrt/brt.h ../newbrt/log.c
DBBINS = ydb.o ../newbrt/brt.o ../newbrt/brt-serialize.o ../newbrt/brt-verify.o ../newbrt/cachetable.o ../newbrt/fifo.o ../newbrt/key.o ../newbrt/memory.o ../newbrt/mempool.o ../newbrt/pma.o ../newbrt/ybt.o ../newbrt/primes.o ../newbrt/log.o ../newbrt/fingerprint.o ../newbrt/log_code.o ../newbrt/roll.o
RANGETREE_BINS = range_tree/linear.o
LOCKTREE_BINS = lock_tree/rth.o lock_tree/locktree.o
DBBINS = ydb.o ../newbrt/brt.o ../newbrt/brt-serialize.o ../newbrt/brt-verify.o ../newbrt/cachetable.o ../newbrt/fifo.o ../newbrt/key.o ../newbrt/memory.o ../newbrt/mempool.o ../newbrt/pma.o ../newbrt/ybt.o ../newbrt/primes.o ../newbrt/log.o ../newbrt/fingerprint.o ../newbrt/log_code.o ../newbrt/roll.o $(RANGETREE_BINS) $(LOCKTREE_BINS)
$(LIBNAME).$(LIBEXT): $(DBBINS)
cc $(CPPFLAGS) $(DBBINS) $(SHARED) -o $@ $(CFLAGS) -lz
......
......@@ -4,9 +4,9 @@
LIBNAME=liblocktree
ifneq ($(OPT),)
OPTFLAGS = -O4
OPTFLAGS = -O3
else
OPTFLAGS = -O0 -g3 -ggdb3
OPTFLAGS = -O2 -g3 -ggdb3
endif
ifneq ($(GCOV),)
......@@ -17,7 +17,7 @@ endif
CFLAGS = -W -Wall -Wextra -Werror -fPIC $(OPTFLAGS) $(GCOV_FLAGS)
CFLAGS += -Wbad-function-cast -Wcast-align -Wconversion -Waggregate-return
CFLAGS += -Wmissing-noreturn -Wmissing-format-attribute -Wunreachable-code
CFLAGS += -Wmissing-noreturn -Wmissing-format-attribute
CPPFLAGS = -I. -I.. -I../range_tree -I../../include -I../../newbrt -L../range_tree
CPPFLAGS += -D_GNU_SOURCE -D_THREAD_SAFE -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE
......@@ -37,19 +37,21 @@ install: $(LIBNAME).$(LIBEXT) $(LIBNAME).a
cp $(LIBNAME).a ../../lib
clean:
rm -rf $(LIBNAMELINEAR).$(LIBEXT) $(LIBNAMELINEAR).a
rm -rf $(LIBNAME).$(LIBEXT) $(LIBNAME).a *.o *.gcno *.gcda *.gcov
rm -rf $(LIBNAME_LINEAR).$(LIBEXT) $(LIBNAME_LINEAR).a
rm -rf $(LIBNAME_TLOG).$(LIBEXT) $(LIBNAME_TLOG).a
rm -rf $(LIBNAME_LOG).$(LIBEXT) $(LIBNAME_LOG).a
rm -rf *.gcno *.gcda *.gcov
cd tests;make clean
locktree.o: locktree.c locktree.h
gcc $(CFLAGS) $(CPPFLAGS) -c -DTOKU_LINEAR_RANGE_TREE $< -o $@
gcc $(CFLAGS) $(CPPFLAGS) -DTOKU_LT_LINEAR -c $< -o $@
rth.o: rth.c rth.h
gcc $(CFLAGS) $(CPPFLAGS) -c -DTOKU_LINEAR_RANGE_TREE $< -o $@
gcc $(CFLAGS) $(CPPFLAGS) -c $< -o $@
$(LIBNAME).$(LIBEXT): locktree.o rth.o
cc $(CPPFLAGS) $^ $(SHARED) -o $@ $(CFLAGS) -lrangetreelinear
cc $(CPPFLAGS) $^ $(SHARED) -o $@ $(CFLAGS) -lrangetree_linear
$(LIBNAME).a: locktree.o rth.o
$(AR) rv $@ $^
......@@ -20,8 +20,8 @@
*/
/* TODO: During integration, make sure we first verify the NULL CONSISTENCY,
(return EINVAL if necessary) before making lock tree calls. */
static int __toku_lt_panic(toku_lock_tree *tree) {
return tree->panic(tree->db);
inline static int __toku_lt_panic(toku_lock_tree *tree, int r) {
return tree->panic(tree->db, r);
}
const u_int32_t __toku_default_buflen = 2;
......@@ -32,6 +32,13 @@ static const DBT __toku_lt_neg_infinity;
const DBT* const toku_lt_infinity = &__toku_lt_infinity;
const DBT* const toku_lt_neg_infinity = &__toku_lt_neg_infinity;
char* toku_lt_strerror(TOKU_LT_ERROR r) {
if (r >= 0) return strerror(r);
if (r == TOKU_LT_INCONSISTENT) {
return "Locking data structures have become inconsistent.\n";
}
return "Unknown error in locking data structures.\n";
}
/* Compare two payloads assuming that at least one of them is infinite */
static int __toku_infinite_compare(void* a, void* b) {
if (a == b) return 0;
......@@ -105,11 +112,30 @@ int __toku_lt_point_cmp(void* a, void* b) {
__toku_recreate_DBT(&point_2, y->data_payload, y->data_len));
}
/* Functions to update the range count and compare it with the
maximum number of ranges */
static BOOL __toku_lt_range_test_incr(toku_lock_tree* tree, u_int32_t replace) {
assert(tree);
assert(tree->num_ranges);
assert(replace <= *tree->num_ranges);
return *tree->num_ranges - replace < tree->max_ranges;
}
static void __toku_lt_range_incr(toku_lock_tree* tree, u_int32_t replace) {
assert(__toku_lt_range_test_incr(tree, replace));
*tree->num_ranges -= replace;
*tree->num_ranges += 1;
}
static void __toku_lt_range_decr(toku_lock_tree* tree, u_int32_t ranges) {
assert(tree);
assert(tree->num_ranges);
assert(*tree->num_ranges >= ranges);
*tree->num_ranges -= ranges;
}
static void __toku_p_free(toku_lock_tree* tree, toku_point* point) {
assert(point);
tree->payload_used -= point->key_len;
tree->payload_used -= point->data_len;
tree->payload_used -= sizeof(toku_point);
if (!__toku_lt_is_infinite(point->key_payload)) {
tree->free(point->key_payload);
}
......@@ -133,10 +159,8 @@ static int __toku_payload_copy(toku_lock_tree* tree,
}
else {
assert(payload_in);
if (tree->payload_used + len_in > tree->payload_capacity) return ENOMEM;
*payload_out = tree->malloc((size_t)len_in);
if (!*payload_out) return errno;
tree->payload_used += len_in;
*len_out = len_in;
memcpy(*payload_out, payload_in, (size_t)len_in);
}
......@@ -149,14 +173,10 @@ static int __toku_p_makecopy(toku_lock_tree* tree, void** ppoint) {
toku_point* temp_point = NULL;
int r;
if (tree->payload_used + sizeof(toku_point) > tree->payload_capacity) {
return ENOMEM;}
temp_point = (toku_point*)tree->malloc(sizeof(toku_point));
if (0) {
died1: tree->free(temp_point);
tree->payload_used -= sizeof(toku_point); return r; }
died1: tree->free(temp_point); return r; }
if (!temp_point) return errno;
tree->payload_used += sizeof(toku_point);
memcpy(temp_point, point, sizeof(toku_point));
r = __toku_payload_copy(tree,
......@@ -166,7 +186,7 @@ static int __toku_p_makecopy(toku_lock_tree* tree, void** ppoint) {
died2:
if (!__toku_lt_is_infinite(temp_point->key_payload)) {
tree->free(temp_point->key_payload); }
tree->payload_used -= temp_point->key_len; goto died1; }
goto died1; }
if (r!=0) goto died1;
__toku_payload_copy(tree,
&temp_point->data_payload, &temp_point->data_len,
......@@ -176,7 +196,6 @@ static int __toku_p_makecopy(toku_lock_tree* tree, void** ppoint) {
return 0;
}
/* Provides access to a selfread tree for a particular transaction.
Returns NULL if it does not exist yet. */
toku_range_tree* __toku_lt_ifexist_selfread(toku_lock_tree* tree, DB_TXN* txn) {
......@@ -398,7 +417,7 @@ static int __toku_lt_check_borderwrite_conflict(toku_lock_tree* tree,
if (conflict == TOKU_MAYBE_CONFLICT) {
assert(peer);
peer_selfwrite = __toku_lt_ifexist_selfwrite(tree, peer);
if (!peer_selfwrite) return __toku_lt_panic(tree);
if (!peer_selfwrite) return __toku_lt_panic(tree, TOKU_LT_INCONSISTENT);
BOOL met;
r = __toku_lt_meets(tree, query, peer_selfwrite, &met);
......@@ -475,7 +494,7 @@ static int __toku_lt_extend_extreme(toku_lock_tree* tree,toku_range* to_insert,
<= 0) {
if ((!*alloc_left && c == 0) ||
!__toku_lt_p_independent(tree->buf[i].left, to_insert)) {
return __toku_lt_panic(tree); }
return __toku_lt_panic(tree, TOKU_LT_INCONSISTENT); }
*alloc_left = FALSE;
to_insert->left = tree->buf[i].left;
}
......@@ -486,7 +505,7 @@ static int __toku_lt_extend_extreme(toku_lock_tree* tree,toku_range* to_insert,
(tree->buf[i].right == to_insert->left &&
tree->buf[i].left != to_insert->left) ||
tree->buf[i].right == to_insert->right) {
return __toku_lt_panic(tree); }
return __toku_lt_panic(tree, TOKU_LT_INCONSISTENT); }
*alloc_right = FALSE;
to_insert->right = tree->buf[i].right;
}
......@@ -548,7 +567,7 @@ static int __toku_lt_free_points(toku_lock_tree* tree, toku_range* to_insert,
for (i = 0; i < numfound; i++) {
if (rt != NULL) {
r = toku_rt_delete(rt, to_insert);
if (r!=0) return __toku_lt_panic(tree);
if (r!=0) return __toku_lt_panic(tree, r);
}
/*
We will maintain the invariant: (separately for read and write
......@@ -592,6 +611,7 @@ static int __toku_consolidate(toku_lock_tree* tree,
r = __toku_lt_extend_extreme(tree, to_insert, &alloc_left, &alloc_right,
numfound);
if (r!=0) return r;
if (!__toku_lt_range_test_incr(tree, numfound)) return ENOMEM;
/* Allocate the consolidated range */
r = __toku_lt_alloc_extreme(tree, to_insert, alloc_left, &alloc_right);
if (0) { died1:
......@@ -601,30 +621,34 @@ static int __toku_consolidate(toku_lock_tree* tree,
/* From this point on we have to panic if we cannot finish. */
/* Delete overlapping ranges from selfread ... */
r = __toku_lt_delete_overlapping_ranges(tree, selfread, numfound);
if (r!=0) return __toku_lt_panic(tree);
if (r!=0) return __toku_lt_panic(tree, r);
/* ... and mainread.
Growth direction: if we had no overlaps, the next line
should be commented out */
r = __toku_lt_delete_overlapping_ranges(tree, mainread, numfound);
if (r!=0) return __toku_lt_panic(tree);
if (r!=0) return __toku_lt_panic(tree, r);
/* Free all the points from ranges in tree->buf[0]..tree->buf[numfound-1] */
__toku_lt_free_points(tree, to_insert, numfound, NULL);
/* We don't necessarily need to panic after here unless numfound > 0
Which indicates we deleted something. */
/* Insert extreme range into selfread. */
/* VL */
r = toku_rt_insert(selfread, to_insert);
int r2;
if (0) { died2: r2 = toku_rt_delete(selfread, to_insert);
if (r2!=0) return __toku_lt_panic(tree); goto died1; }
if (r2!=0) return __toku_lt_panic(tree, r2); goto died1; }
if (r!=0) {
/* If we deleted/merged anything, this is a panic situation. */
if (numfound) return __toku_lt_panic(tree); goto died1; }
if (numfound) return __toku_lt_panic(tree, TOKU_LT_INCONSISTENT);
goto died1; }
/* Insert extreme range into mainread. */
assert(tree->mainread);
r = toku_rt_insert(tree->mainread, to_insert);
if (r!=0) {
/* If we deleted/merged anything, this is a panic situation. */
if (numfound) return __toku_lt_panic(tree); goto died2; }
if (numfound) return __toku_lt_panic(tree, TOKU_LT_INCONSISTENT);
goto died2; }
__toku_lt_range_incr(tree, numfound);
return 0;
}
......@@ -725,6 +749,7 @@ static int __toku_lt_preprocess(toku_lock_tree* tree, DB_TXN* txn,
__toku_init_query(query, left, right);
/* Verify left <= right, otherwise return EDOM. */
if (__toku_r_backwards(query)) return EDOM;
tree->dups_final = TRUE;
return 0;
}
......@@ -737,7 +762,7 @@ static int __toku_lt_get_border(toku_lock_tree* tree, BOOL in_borderwrite,
toku_range_tree* rt;
rt = in_borderwrite ? tree->borderwrite :
__toku_lt_ifexist_selfwrite(tree, tree->buf[0].data);
if (!rt) return __toku_lt_panic(tree);
if (!rt) return __toku_lt_panic(tree, TOKU_LT_INCONSISTENT);
r = toku_rt_predecessor(rt, to_insert->left, pred, found_p);
if (r!=0) return r;
r = toku_rt_successor (rt, to_insert->right, succ, found_s);
......@@ -769,20 +794,20 @@ static int __toku_lt_split_border(toku_lock_tree* tree, toku_range* to_insert,
assert(tree && to_insert && pred && succ);
int r;
assert(tree->buf[0].data != to_insert->data);
if (!found_s || !found_p) return __toku_lt_panic(tree);
if (!found_s || !found_p) return __toku_lt_panic(tree, TOKU_LT_INCONSISTENT);
r = toku_rt_delete(tree->borderwrite, &tree->buf[0]);
if (r!=0) return __toku_lt_panic(tree);
if (r!=0) return __toku_lt_panic(tree, r);
pred->left = tree->buf[0].left;
succ->right = tree->buf[0].right;
if (__toku_r_backwards(pred) || __toku_r_backwards(succ)) {
return __toku_lt_panic(tree);}
return __toku_lt_panic(tree, TOKU_LT_INCONSISTENT);}
r = toku_rt_insert(tree->borderwrite, pred);
if (r!=0) return __toku_lt_panic(tree);
if (r!=0) return __toku_lt_panic(tree, r);
r = toku_rt_insert(tree->borderwrite, succ);
if (r!=0) return __toku_lt_panic(tree);
if (r!=0) return __toku_lt_panic(tree, r);
return 0;
}
......@@ -826,7 +851,7 @@ static int __toku_lt_borderwrite_insert(toku_lock_tree* tree,
u_int32_t numfound;
r = toku_rt_find(borderwrite, query, query_size, &tree->buf, &tree->buflen,
&numfound);
if (r!=0) return __toku_lt_panic(tree);
if (r!=0) return __toku_lt_panic(tree, r);
assert(numfound <= query_size);
/* No updated needed in borderwrite: we return right away. */
......@@ -840,36 +865,36 @@ static int __toku_lt_borderwrite_insert(toku_lock_tree* tree,
r = __toku_lt_get_border(tree, numfound == 0, &pred, &succ,
&found_p, &found_s, to_insert);
if (r!=0) return __toku_lt_panic(tree);
if (r!=0) return __toku_lt_panic(tree, r);
if (numfound == 0) {
if (found_p && found_s && pred.data == succ.data) {
return __toku_lt_panic(tree); }
return __toku_lt_panic(tree, TOKU_LT_INCONSISTENT); }
r = __toku_lt_expand_border(tree, to_insert, &pred, &succ,
found_p, found_s);
if (r!=0) return __toku_lt_panic(tree);
if (r!=0) return __toku_lt_panic(tree, r);
}
else {
r = __toku_lt_split_border( tree, to_insert, &pred, &succ,
found_p, found_s);
if (r!=0) return __toku_lt_panic(tree);
if (r!=0) return __toku_lt_panic(tree, r);
}
r = toku_rt_insert(borderwrite, to_insert);
if (r!=0) return __toku_lt_panic(tree);
if (r!=0) return __toku_lt_panic(tree, r);
return 0;
}
int toku_lt_create(toku_lock_tree** ptree, DB* db, BOOL duplicates,
int (*panic)(DB*), size_t payload_capacity,
int (*panic)(DB*, int), u_int32_t max_ranges,
u_int32_t* num_ranges,
int (*compare_fun)(DB*,const DBT*,const DBT*),
int (*dup_compare)(DB*,const DBT*,const DBT*),
void* (*user_malloc) (size_t),
void (*user_free) (void*),
void* (*user_realloc)(void*, size_t)) {
if (!ptree || !db || !compare_fun || !dup_compare || !panic ||
!payload_capacity || !user_malloc || !user_free || !user_realloc) {
return EINVAL;
}
!max_ranges || !num_ranges || !user_malloc || !user_free ||
!user_realloc) { return EINVAL; }
int r;
toku_lock_tree* tmp_tree = (toku_lock_tree*)user_malloc(sizeof(*tmp_tree));
......@@ -879,7 +904,8 @@ int toku_lt_create(toku_lock_tree** ptree, DB* db, BOOL duplicates,
tmp_tree->db = db;
tmp_tree->duplicates = duplicates;
tmp_tree->panic = panic;
tmp_tree->payload_capacity = payload_capacity;
tmp_tree->max_ranges = max_ranges;
tmp_tree->num_ranges = num_ranges;
tmp_tree->compare_fun = compare_fun;
tmp_tree->dup_compare = dup_compare;
tmp_tree->malloc = user_malloc;
......@@ -1020,6 +1046,8 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn,
*/
toku_range to_insert;
__toku_init_insert(&to_insert, &endpoint, &endpoint, txn);
if (!__toku_lt_range_test_incr(tree, 0)) return ENOMEM;
BOOL dummy = TRUE;
r = __toku_lt_alloc_extreme(tree, &to_insert, TRUE, &dummy);
if (0) { died1: __toku_p_free(tree, to_insert.left); return r; }
......@@ -1028,11 +1056,13 @@ int toku_lt_acquire_write_lock(toku_lock_tree* tree, DB_TXN* txn,
r = __toku_lt_selfwrite(tree, txn, &selfwrite);
if (r!=0) goto died1;
assert(selfwrite);
/* VL change this: r = toku_rt_insert(selfwrite, &to_insert); */
r = toku_rt_insert(selfwrite, &to_insert);
if (r!=0) goto died1;
/* Need to update borderwrite. */
r = __toku_lt_borderwrite_insert(tree, &query, &to_insert);
if (r!=0) return __toku_lt_panic(tree);
if (r!=0) return __toku_lt_panic(tree, r);
__toku_lt_range_incr(tree, 0);
return 0;
}
......@@ -1094,7 +1124,8 @@ static int __toku_sweep_border(toku_lock_tree* tree, toku_range* range) {
&buf[0]);
if (r!=0) return r;
if (found_p && found_s && pred.data == succ.data &&
pred.data == buf[0].data) { return __toku_lt_panic(tree); }
pred.data == buf[0].data) {
return __toku_lt_panic(tree, TOKU_LT_INCONSISTENT); }
/* If both found and pred.data=succ.data, merge pred and succ (expand?)
free_points */
......@@ -1149,18 +1180,34 @@ static int __toku_lt_border_delete(toku_lock_tree* tree, toku_range_tree* rt) {
}
int toku_lt_unlock(toku_lock_tree* tree, DB_TXN* txn) {
assert(tree && txn);
if (!tree || !txn) return EINVAL;
int r;
toku_range_tree *selfwrite = __toku_lt_ifexist_selfwrite(tree, txn);
toku_range_tree *selfread = __toku_lt_ifexist_selfread (tree, txn);
r = __toku_lt_free_contents(tree, selfread, tree->mainread);
if (r!=0) return __toku_lt_panic(tree);
u_int32_t ranges = 0;
if (selfwrite) {
ranges = toku_rt_get_size(selfwrite);
r = __toku_lt_free_contents(tree, selfread, tree->mainread);
if (r!=0) return __toku_lt_panic(tree, r);
}
r = __toku_lt_border_delete(tree, selfwrite);
if (r!=0) return __toku_lt_panic(tree);
if (selfread) {
ranges += toku_rt_get_size(selfread);
r = __toku_lt_border_delete(tree, selfwrite);
if (r!=0) return __toku_lt_panic(tree, r);
}
if (selfread || selfwrite) toku_rth_delete(tree->rth, txn);
__toku_lt_range_decr(tree, ranges);
return 0;
}
int toku_lt_set_dups(toku_lock_tree* tree, BOOL duplicates) {
if (!tree) return EINVAL;
if (tree->dups_final) return EDOM;
tree->duplicates = duplicates;
return 0;
}
......@@ -46,15 +46,34 @@
#include <assert.h>
#include <db.h>
#include <brttypes.h>
#if defined(TOKU_LT_LINEAR)
#define TOKU_RT_LINEAR
#elif defined(TOKU_LT_TLOG)
#define TOKU_RT_TLOG
#elif defined(TOKU_LT_LOG)
#define TOKU_RT_LOG
#else
#error Using an undefined LOCK TREE TYPE.
#endif
#include <rangetree.h>
#include <rth.h>
typedef enum {
TOKU_LT_INCONSISTENT=-1,
} TOKU_LT_ERROR;
char* toku_lt_strerror(TOKU_LT_ERROR r) __attribute__((const,pure));
/** \brief The lock tree structure */
typedef struct {
typedef struct __toku_lock_tree {
/** The database for which this locktree will be handling locks */
DB* db;
/** Whether the db supports duplicate */
BOOL duplicates;
/** Whether the duplicates flag can no longer be changed. */
BOOL dups_final;
toku_range_tree* mainread; /**< See design document */
toku_range_tree* borderwrite; /**< See design document */
toku_rth* rth;
......@@ -62,16 +81,16 @@ typedef struct {
the range trees that this lock tree owns */
toku_range* buf;
u_int32_t buflen; /**< The length of buf */
/** The maximum amount of memory to be used for DBT payloads. */
size_t payload_capacity;
/** The current amount of memory used for DBT payloads. */
size_t payload_used;
/** The maximum number of ranges allowed. */
u_int32_t max_ranges;
/** The current number of ranges. */
u_int32_t* num_ranges;
/** The key compare function */
int (*compare_fun)(DB*,const DBT*,const DBT*);
/** The data compare function */
int (*dup_compare)(DB*,const DBT*,const DBT*);
/** The panic function */
int (*panic)(DB*);
int (*panic)(DB*, int);
/** The user malloc function */
void* (*malloc) (size_t);
/** The user free function */
......@@ -135,13 +154,23 @@ typedef struct {
instead.
*/
int toku_lt_create(toku_lock_tree** ptree, DB* db, BOOL duplicates,
int (*panic)(DB*), size_t payload_capacity,
int (*panic)(DB*, int), u_int32_t max_locks,
u_int32_t* num_locks,
int (*compare_fun)(DB*,const DBT*,const DBT*),
int (*dup_compare)(DB*,const DBT*,const DBT*),
void* (*user_malloc) (size_t),
void (*user_free) (void*),
void* (*user_realloc)(void*, size_t));
/**
Set whether duplicates are allowed.
This can be called after create, but NOT after any locks or unlocks have
occurred.
Return: 0 on success.
Return: EINVAL if tree is NULL
Return: EDOM if it is too late to change.
*/
int toku_lt_set_dups(toku_lock_tree* tree, BOOL duplicates);
/**
Closes and frees a lock tree.
......
......@@ -39,31 +39,35 @@ CPPFLAGS += -I. -I../ -I../../range_tree -I../../../newbrt -I../../../include -l
SRCS = $(wildcard *.c)
LOG_TESTS = $(patsubst %.c,%.log,$(SRCS))
TLOG_TESTS = $(patsubst %.c,%.tlog,$(SRCS))
LIN_TESTS = $(patsubst %.c,%.lin,$(SRCS))
ALL_TESTS = $(LOG_TESTS) $(LIN_TESTS)
ALL_TESTS = $(LIN_TESTS) $(TLOG_TESTS) $(LOG_TESTS)
RUN_LOG_TESTS = $(patsubst %.log,%.logrun,$(LOG_TESTS))
RUN_LIN_TESTS = $(patsubst %.lin,%.linrun,$(LIN_TESTS))
RUN_ALL_TESTS = $(RUN_LOG_TESTS) $(RUN_LIN_TESTS)
RUN_LOG_TESTS = $(patsubst %.log,%.logrun,$(LOG_TESTS))
RUN_TLOG_TESTS = $(patsubst %.tlog,%.tlogrun,$(TLOG_TESTS))
RUN_LIN_TESTS = $(patsubst %.lin,%.linrun,$(LIN_TESTS))
RUN_ALL_TESTS = $(RUN_LIN_TESTS) $(RUN_TLOG_TESTS) $(RUN_LOG_TESTS)
all: make_libs $(ALL_TESTS)
.PHONY: check check.lin check.log tests.lin tests.log
check: check.lin check.log
.PHONY: check check.lin check.log tests.lin tests.log check.tlog tests.tlog
check: check.lin check.tlog check.log
@ echo ok
tests.lin: make_libs $(LIN_TESTS)
check.lin: make_libs $(RUN_LIN_TESTS)
tests.tlog: make_libs $(TLOG_TESTS)
check.tlog: make_libs $(RUN_TLOG_TESTS)
tests.log: make_libs $(LOG_TESTS)
check.log: make_libs $(RUN_LOG_TESTS)
# Need these rule so that Make knows about all the file names
.PHONY: %.linrun %.logrun %.run
.PHONY: %.linrun %.logrun %.run %.tlogrun
$(RUN_ALL_TESTS):
$(ALL_TESTS):
%.run: %.linrun %.logrun
%.run: %.linrun %.tlogrun %.logrun
@ echo ok
ifeq ($(VERBOSE),2)
......@@ -87,14 +91,18 @@ endif
$(MAYBEATSIGN) $(SETENV) $(VGRIND) ./$< $(VERBVERBOSE)
%.logrun: %.log
$(MAYBEATSIGN) $(SETENV) $(VGRIND) ./$< $(VERBVERBOSE)
%.tlogrun: %.tlog
$(MAYBEATSIGN) $(SETENV) $(VGRIND) ./$< $(VERBVERBOSE)
libs:
cd ..;make
%.lin: %.c ../locktree.h test.h
$(SETENV) cc -DDIR=\"dir.$<.lin\" -DTOKU_LINEAR_RANGE_TREE $(CPPFLAGS) $(CFLAGS) $< -lrangetreelinear -llocktree -o $@
$(SETENV) cc -DDIR=\"dir.$<.lin\" -DTOKU_LT_LINEAR $(CPPFLAGS) $(CFLAGS) $< -lrangetree_linear -llocktree_linear -o $@
%.tlog: %.c ../locktree.h test.h
$(SETENV) cc -DDIR=\"dir.$<.log\" -DTOKU_LT_TLOG $(CPPFLAGS) $(CFLAGS) $< -lrangetree_tlog -llocktree_tlog -o $@
%.log: %.c ../locktree.h test.h
$(SETENV) cc -DDIR=\"dir.$<.log\" -DTOKU_LOG_RANGE_TREE $(CPPFLAGS) $(CFLAGS) $< -lrangetree -llocktre -o $@
$(SETENV) cc -DDIR=\"dir.$<.log\" -DTOKU_LT_LOG $(CPPFLAGS) $(CFLAGS) $< -lrangetree_log -llocktree_log -o $@
.PHONY: make_libs
......
......@@ -146,6 +146,12 @@ static void do_point_test(int (*acquire)(toku_lock_tree*, DB_TXN*,
CKERR(r);
assert(lt);
r = toku_lt_unlock(NULL, (DB_TXN*)1);
CKERR2(r, EINVAL);
r = toku_lt_unlock(lt, NULL);
CKERR2(r, EINVAL);
r = acquire(NULL, txn, key, data);
CKERR2(r, EINVAL);
......
# On OSX do:
# make OSX=OSX
LIBNAMELINEAR=librangetreelinear
LIBNAME=librangetree
LIBNAME_LINEAR = librangetree_linear
LIBNAME_TLOG = librangetree_tlog
LIBNAME_LOG = librangetree_log
ifneq ($(OPT),)
OPTFLAGS = -O4
OPTFLAGS = -O3
else
OPTFLAGS = -O0 -g3 -ggdb3
OPTFLAGS = -O2 -g3 -ggdb3
endif
ifneq ($(GCOV),)
......@@ -18,7 +19,7 @@ endif
CFLAGS = -W -Wall -Wextra -Werror -fPIC $(OPTFLAGS) $(GCOV_FLAGS)
CFLAGS += -Wbad-function-cast -Wcast-align -Wconversion -Waggregate-return
CFLAGS += -Wmissing-noreturn -Wmissing-format-attribute -Wunreachable-code
CFLAGS += -Wmissing-noreturn -Wmissing-format-attribute
CPPFLAGS = -I../../include -I../../newbrt
CPPFLAGS += -D_GNU_SOURCE -D_THREAD_SAFE -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE
......@@ -34,32 +35,44 @@ SHARED=-shared
endif
.PHONY: install logformat
install: $(LIBNAMELINEAR).$(LIBEXT) $(LIBNAMELINEAR).a #$(LIBNAME).$(LIBEXT) $(LIBNAME).a
#cp $(LIBNAME).$(LIBEXT) ../../lib/
#cp $(LIBNAME).a ../../lib
cp $(LIBNAMELINEAR).$(LIBEXT) ../../lib/
cp $(LIBNAMELINEAR).a ../../lib
install: $(LIBNAME_LINEAR).$(LIBEXT) $(LIBNAME_LINEAR).a #$(LIBNAME_TLOG).$(LIBEXT) $(LIBNAME_TLOG).a $(LIBNAME_LOG).$(LIBEXT) $(LIBNAME_LOG).a
#cp $(LIBNAME_LOG).$(LIBEXT) ../../lib/
#cp $(LIBNAME_LOG).a ../../lib
#cp $(LIBNAME_TLOG).$(LIBEXT) ../../lib/
#cp $(LIBNAME_TLOG).a ../../lib
cp $(LIBNAME_LINEAR).$(LIBEXT) ../../lib/
cp $(LIBNAME_LINEAR).a ../../lib
clean:
rm -rf $(LIBNAMELINEAR).$(LIBEXT) $(LIBNAMELINEAR).a
rm -rf $(LIBNAME).$(LIBEXT) $(LIBNAME).a *.o *.gcno *.gcda *.gcov
rm -rf $(LIBNAME_LINEAR).$(LIBEXT) $(LIBNAME_LINEAR).a
rm -rf $(LIBNAME_TLOG).$(LIBEXT) $(LIBNAME_TLOG).a
rm -rf $(LIBNAME_LOG).$(LIBEXT) $(LIBNAME_LOG).a
cd tests;make clean
linear.o: linear.c rangetree.h
gcc $(CFLAGS) $(CPPFLAGS) -DTOKU_LINEAR_RANGE_TREE -c $< -o $@
gcc $(CFLAGS) $(CPPFLAGS) -DTOKU_RT_LINEAR -c $< -o $@
log.o: log.c rangetree.h
gcc $(CFLAGS) $(CPPFLAGS) -DTOKU_LOG_RANGE_TREE -c $< -o $@
gcc $(CFLAGS) $(CPPFLAGS) -DTOKU_RT_LOG -c $< -o $@
$(LIBNAME).$(LIBEXT): log.o
tlog.o: tlog.c rangetree.h
gcc $(CFLAGS) $(CPPFLAGS) -DTOKU_RT_TLOG -c $< -o $@
$(LIBNAME_TLOG).$(LIBEXT): tlog.o
cc $(CPPFLAGS) $< $(SHARED) -o $@ $(CFLAGS)
$(LIBNAME_LOG).$(LIBEXT): log.o
cc $(CPPFLAGS) $< $(SHARED) -o $@ $(CFLAGS)
$(LIBNAMELINEAR).$(LIBEXT): linear.o
$(LIBNAME_LINEAR).$(LIBEXT): linear.o
cc $(CPPFLAGS) $< $(SHARED) -o $@ $(CFLAGS)
$(LIBNAME).a: log.o
$(LIBNAME_TLOG).a: tlog.o
$(AR) rv $@ $<
$(LIBNAME_LOG).a: log.o
$(AR) rv $@ $<
$(LIBNAMELINEAR).a: linear.o
$(LIBNAME_LINEAR).a: linear.o
$(AR) rv $@ $<
......@@ -33,7 +33,7 @@ static int __toku_rt_p_cmp(toku_range_tree* tree,
}
static int __toku_rt_decrease_capacity(toku_range_tree* tree, u_int32_t _num) {
assert(tree);
//TODO: SOME ATTRIBUTE TO REMOVE NEVER EXECUTABLE ERROR: assert(tree);
u_int32_t num = _num < minlen ? minlen : _num;
if (tree->ranges_len >= num * 2) {
......@@ -50,7 +50,7 @@ static int __toku_rt_decrease_capacity(toku_range_tree* tree, u_int32_t _num) {
}
static int __toku_rt_increase_capacity(toku_range_tree* tree, u_int32_t num) {
assert(tree);
//TODO: SOME ATTRIBUTE TO REMOVE NEVER EXECUTABLE ERROR: assert(tree);
if (tree->ranges_len < num) {
u_int32_t temp_len = tree->ranges_len;
while (temp_len < num) temp_len *= 2;
......@@ -66,7 +66,7 @@ static int __toku_rt_increase_capacity(toku_range_tree* tree, u_int32_t num) {
static int __toku_rt_increase_buffer(toku_range_tree* tree, toku_range** buf,
u_int32_t* buflen, u_int32_t num) {
assert(buf);
assert(buflen);
//TODO: SOME ATTRIBUTE TO REMOVE NEVER EXECUTABLE ERROR: assert(buflen);
if (*buflen < num) {
u_int32_t temp_len = *buflen;
while (temp_len < num) temp_len *= 2;
......@@ -250,3 +250,8 @@ int toku_rt_get_allow_overlaps(toku_range_tree* tree, BOOL* allowed) {
*allowed = tree->allow_overlaps;
return 0;
}
u_int32_t toku_rt_get_size(toku_range_tree *rt) {
assert(rt);
return rt->numelements;
}
......@@ -53,14 +53,20 @@ struct __toku_range_tree_internal {
void (*free) (void*);
/** The user realloc function */
void* (*realloc)(void*, size_t);
#if defined(TOKU_LINEAR_RANGE_TREE)
#if defined(TOKU_LOG_RANGE_TREE)
#if defined(TOKU_RT_LINEAR)
#if defined(TOKU_RT_TLOG) || defined(TOKU_RT_LOG)
#error Choose just one range tree type.
#endif
//Linear version only fields:
toku_range* ranges;
u_int32_t ranges_len;
#elif defined(TOKU_LOG_RANGE_TREE)
#elif defined(TOKU_RT_TLOG)
#if defined(TOKU_RT_LOG)
#error Choose just one range tree type.
#endif
#error Not defined yet.
//TLog version only fields:
#elif defined(TOKU_RT_LOG)
#error Not defined yet.
//Log version only fields:
#else
......@@ -225,4 +231,7 @@ int toku_rt_predecessor(toku_range_tree* tree, void* point, toku_range* pred,
*/
int toku_rt_successor(toku_range_tree* tree, void* point, toku_range* succ,
BOOL* wasfound);
u_int32_t toku_rt_get_size(toku_range_tree *);
#endif /* #if !defined(TOKU_RANGE_TREE_H) */
......@@ -31,31 +31,35 @@ CPPFLAGS += -L../ -I../ -I../../../newbrt -I../../../include -lpthread
SRCS = $(wildcard *.c)
LOG_TESTS = $(patsubst %.c,%.log,$(SRCS))
TLOG_TESTS = $(patsubst %.c,%.tlog,$(SRCS))
LIN_TESTS = $(patsubst %.c,%.lin,$(SRCS))
ALL_TESTS = $(LOG_TESTS) $(LIN_TESTS)
ALL_TESTS = $(LIN_TESTS) $(LOG_TESTS) $(TLOG_TESTS)
RUN_LOG_TESTS = $(patsubst %.log,%.logrun,$(LOG_TESTS))
RUN_LIN_TESTS = $(patsubst %.lin,%.linrun,$(LIN_TESTS))
RUN_ALL_TESTS = $(RUN_LOG_TESTS) $(RUN_LIN_TESTS)
RUN_LOG_TESTS = $(patsubst %.log,%.logrun,$(LOG_TESTS))
RUN_TLOG_TESTS = $(patsubst %.log,%.logrun,$(TLOG_TESTS))
RUN_LIN_TESTS = $(patsubst %.lin,%.linrun,$(LIN_TESTS))
RUN_ALL_TESTS = $(RUN_LIN_TESTS) $(RUN_TLOG_TESTS) $(RUN_LOG_TESTS)
all: make_libs $(ALL_TESTS)
.PHONY: check check.lin check.log tests.lin tests.log
check: check.lin check.log
.PHONY: check check.lin check.log tests.lin tests.log tests.tlog
check: check.lin check.tlog check.log
@ echo ok
tests.lin: make_libs $(LIN_TESTS)
check.lin: make_libs $(RUN_LIN_TESTS)
tests.log: make_libs $(LOG_TESTS)
check.log: make_libs $(RUN_LOG_TESTS)
tests.lin: make_libs $(LIN_TESTS)
check.lin: make_libs $(RUN_LIN_TESTS)
tests.tlog: make_libs $(TLOG_TESTS)
check.tlog: make_libs $(RUN_TLOG_TESTS)
tests.log: make_libs $(LOG_TESTS)
check.log: make_libs $(RUN_LOG_TESTS)
# Need these rule so that Make knows about all the file names
.PHONY: %.linrun %.logrun %.run
.PHONY: %.linrun %.logrun %.run %.tlogrun
$(RUN_ALL_TESTS):
$(ALL_TESTS):
%.run: %.linrun %.logrun
%.run: %.linrun %.tlogrun %.logrun
@ echo ok
ifeq ($(VERBOSE),2)
......@@ -79,14 +83,18 @@ endif
$(MAYBEATSIGN) $(SETENV) $(VGRIND) ./$< $(VERBVERBOSE)
%.logrun: %.log
$(MAYBEATSIGN) $(SETENV) $(VGRIND) ./$< $(VERBVERBOSE)
%.tlogrun: %.tlog
$(MAYBEATSIGN) $(SETENV) $(VGRIND) ./$< $(VERBVERBOSE)
libs:
cd ..;make
%.lin: %.c ../rangetree.h test.h
$(SETENV) cc -DDIR=\"dir.$<.lin\" -DTOKU_LINEAR_RANGE_TREE $(CPPFLAGS) $(CFLAGS) $< -lrangetreelinear -o $@
$(SETENV) cc -DDIR=\"dir.$<.lin\" -DTOKU_RT_LINEAR $(CPPFLAGS) $(CFLAGS) $< -lrangetree_linear -o $@
%.log: %.c ../rangetree.h test.h
$(SETENV) cc -DDIR=\"dir.$<.log\" -DTOKU_LOG_RANGE_TREE $(CPPFLAGS) $(CFLAGS) $< -lrangetree -o $@
$(SETENV) cc -DDIR=\"dir.$<.log\" -DTOKU_RT_LOG $(CPPFLAGS) $(CFLAGS) $< -lrangetree_log -o $@
%.tlog: %.c ../rangetree.h test.h
$(SETENV) cc -DDIR=\"dir.$<.log\" -DTOKU_RT_TLOG $(CPPFLAGS) $(CFLAGS) $< -lrangetree_tlog -o $@
.PHONY: make_libs
......
......@@ -72,3 +72,14 @@ void toku_free(void* p) {
void* toku_realloc(void *ptr, size_t size) {
return realloc(ptr, size);
}
int mallocced = 0;
int failon = -1;
void* fail_malloc(size_t size) {
if (++mallocced == failon) {
errno = ENOMEM;
return NULL;
}
return malloc(size);
}
......@@ -2,17 +2,6 @@
#include "test.h"
int mallocced = 0;
int failon = -1;
void* fail_malloc(size_t size) {
if (++mallocced == failon) {
errno = ENOMEM;
return NULL;
}
return malloc(size);
}
int main(int argc, const char *argv[]) {
int r;
toku_range_tree *tree;
......
......@@ -14,6 +14,8 @@ struct db_header {
BRT *database_brts; // These
};
struct __toku_lock_tree;
struct __toku_db_internal {
DB *db; // A pointer back to the DB.
int freed;
......@@ -30,6 +32,7 @@ struct __toku_db_internal {
DB *primary; // For secondary (associated) databases, what is the primary? NULL if not a secondary.
int(*associate_callback)(DB*, const DBT*, const DBT*, DBT*); // For secondary, the callback function for associate. NULL if not secondary
int associate_is_immutable; // If this DB is a secondary then this field indicates that the index never changes due to updates.
struct __toku_lock_tree* lt;
};
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR == 1
......@@ -57,6 +60,8 @@ struct __toku_db_env_internal {
unsigned long cachetable_size;
CACHETABLE cachetable;
TOKULOGGER logger;
u_int32_t max_locks;
u_int32_t num_locks;
};
struct __toku_db_txn_internal {
......
......@@ -28,6 +28,11 @@ const char *toku_copyright_string = "Copyright (c) 2007, 2008 Tokutek Inc. All
#include "log.h"
#include "memory.h"
#define TOKU_LT_LINEAR
#include <locktree.h>
const u_int32_t __toku_env_default_max_locks = 1000;
/* the ydb big lock serializes access to the tokudb
every call (including methods) into the tokudb library gets the lock
no internal function should invoke a method through an object */
......@@ -148,6 +153,9 @@ void toku_do_error_all_cases(const DB_ENV * env, int error, int include_stderrst
}
}
static int do_error (DB_ENV *dbenv, int error, const char *string, ...)
__attribute__((__format__(__printf__, 3, 4)));
// Handle all the error cases (but don't do the default thing.)
static int do_error (DB_ENV *dbenv, int error, const char *string, ...) {
if (toku_logger_panicked(dbenv->i->logger)) dbenv->i->is_panicked=1;
......@@ -607,6 +615,10 @@ void toku_default_errcall(const DB_ENV *env, const char *errpfx, const char *msg
#if _THREAD_SAFE
static void locked_env_err(const DB_ENV * env, int error, const char *fmt, ...)
__attribute__((__format__(__printf__, 3, 4)));
static void locked_env_err(const DB_ENV * env, int error, const char *fmt, ...) {
ydb_lock();
va_list ap;
......@@ -725,6 +737,7 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) {
result->i->errcall = 0;
result->i->errpfx = 0;
result->i->errfile = 0;
result->i->max_locks = __toku_env_default_max_locks;
{
int r = toku_logger_create(&result->i->logger);
......@@ -1616,6 +1629,31 @@ static int find_db_file(DB_ENV* dbenv, const char *fname, char** full_name_out)
return 0;
}
static int toku_env_set_lk_max_locks(DB_ENV *dbenv, u_int32_t max) {
HANDLE_PANICKED_ENV(dbenv);
if (env_opened(dbenv)) return EINVAL;
if (!max) return EINVAL;
dbenv->i->max_locks = max;
return 0;
}
static int toku_env_get_lk_max_locks(DB_ENV *dbenv, u_int32_t *lk_maxp) {
HANDLE_PANICKED_ENV(dbenv);
if (!lk_maxp) return EINVAL;
*lk_maxp = dbenv->i->max_locks;
return 0;
}
static int toku_db_lt_panic(DB* db, int r) {
assert(db && db->i && db->dbenv && db->dbenv->i);
DB_ENV* env = db->dbenv;
env->i->is_panicked = 1;
if (r < 0) do_error(env, 0, toku_lt_strerror(r));
else do_error(env, r, "Error in locktree.\n");
return EINVAL;
}
static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYPE dbtype, u_int32_t flags, int mode) {
HANDLE_PANICKED_DB(db);
// Warning. Should check arguments. Should check return codes on malloc and open and so forth.
......@@ -1679,6 +1717,14 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
db->i->open_flags = flags;
db->i->open_mode = mode;
/* TODO: Only create lock tree if necessary! (lock subsystem?) */
r = toku_lt_create(&db->i->lt, db, FALSE,
toku_db_lt_panic, db->dbenv->i->max_locks,
&db->dbenv->i->num_locks,
db->i->brt->compare_fun, db->i->brt->dup_compare,
toku_malloc, toku_free, toku_realloc);
if (r!=0) goto error_cleanup;
r = toku_brt_open(db->i->brt, db->i->full_fname, fname, dbname,
is_db_create, is_db_excl, is_db_unknown,
db->dbenv->i->cachetable,
......@@ -1687,6 +1733,15 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
if (r != 0)
goto error_cleanup;
unsigned int brtflags;
BOOL dups;
/* Whether we have dups is only known starting now. */
toku_brt_get_flags(db->i->brt, &brtflags);
dups = (brtflags & TOKU_DB_DUPSORT || brtflags & TOKU_DB_DUP);
r = toku_lt_set_dups(db->i->lt, dups);
/* toku_lt_set_dups cannot return an error here. */
assert(r==0);
return 0;
error_cleanup:
......@@ -1698,6 +1753,10 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
toku_free(db->i->full_fname);
db->i->full_fname = NULL;
}
if (db->i->lt) {
toku_lt_close(db->i->lt);
db->i->lt = NULL;
}
return r;
}
......@@ -1903,6 +1962,14 @@ static int toku_db_fd(DB *db, int *fdp) {
#if _THREAD_SAFE
static int __attribute__((unused)) locked_env_set_lk_max_locks(DB_ENV *dbenv, u_int32_t max) {
ydb_lock(); int r = toku_env_set_lk_max_locks(dbenv, max); ydb_unlock(); return r;
}
static int __attribute__((unused)) locked_env_get_lk_max_locks(DB_ENV *dbenv, u_int32_t *lk_maxp) {
ydb_lock(); int r = toku_env_get_lk_max_locks(dbenv, lk_maxp); ydb_unlock(); return r;
}
static int locked_db_associate (DB *primary, DB_TXN *txn, DB *secondary,
int (*callback)(DB *secondary, const DBT *key, const DBT *data, DBT *result), u_int32_t flags) {
ydb_lock(); int r = toku_db_associate(primary, txn, secondary, callback, flags); ydb_unlock(); return r;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment