Commit 58d57cb1 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge 2948 changes to main: Start putting upsert into brt layer (Refs #2948). ...

Merge 2948 changes to main: Start putting upsert into brt layer (Refs #2948).  Also add unused attributes (Closes #2950). [t:2948] close[t:2950] 
{{{
svn merge -c 24081 https://svn.tokutek.com/tokudb/toku/tokudb.2948
}}}
.


git-svn-id: file:///svn/toku/tokudb@24083 c7de825b-a66e-492c-adef-691d508d4ae1
parent ca87a619
......@@ -201,6 +201,24 @@ struct brt {
BOOL did_set_descriptor;
DESCRIPTOR_S temp_descriptor;
int (*compare_fun)(DB*,const DBT*,const DBT*);
// When an upsert message arrives it contains a key, a value (upserted_val), and an extra DBT (upserted_extra).
// If there is no such key in the database, then the upserted value is inserted.
// If there is such a key in the database, then there is associated with that key another value (prev_val).
// The system calls upsert_fun(DB, upserted_value, upserted_extra, prev_val, set_val, set_extra)
// where set_val and set_extra are provided by the system.
// The upsert_fun can look at the DBTs and the DB (to get the db descriptor) and perform one of the following actions:
// a) It can return DB_DELETE_UPSERT (which is defined in db.h). In this case, the system deletes the key-value pair.
// b) OR it can return call set_val(new_val, set_extra),
// where new_val is the dbt that was passed in. The set_val function will copy anything it needs out of new_val, so the memory pointed
// to by new_val may be stack allocated by upsert_fun (or it may be malloced, in which case upsert_fun should free that memory).
// Notes:
// 1) The DBTs passed to upsert_fun may point to memory that will be freed after the upsert_fun returns.
// 2) Furtheremore, there is likely to be some sort of lock held when upsert_fun is called.
// Those notes should not matter, since the upsert_fun should essentially be a pure function of the DBTs and DB descriptor passed in.
int (*upsert_fun)(DB*, const DBT*key, const DBT *upserted_val, const DBT *upserted_extra, const DBT *prev_val,
void (*set_val)(const DBT *new_val, void*set_extra), void *set_extra);
DB *db; // To pass to the compare fun, and close once transactions are done.
OMT txns; // transactions that are using this OMT (note that the transaction checks the cf also)
......
......@@ -516,7 +516,6 @@ toku_cmd_leafval_heaviside (OMTVALUE lev, void *extra) {
be);
}
// If you pass in data==0 then it only compares the key, not the data (even if is a DUPSORT database)
static int
brt_compare_pivot(BRT brt, DBT *key, bytevec ck)
{
......@@ -2844,8 +2843,8 @@ int toku_open_brt (const char *fname, int is_create, BRT *newbrt, int nodesize,
r = toku_brt_create(&brt);
if (r != 0)
return r;
toku_brt_set_nodesize(brt, nodesize);
toku_brt_set_bt_compare(brt, compare_fun);
r = toku_brt_set_nodesize(brt, nodesize); assert(r==0);
r = toku_brt_set_bt_compare(brt, compare_fun); assert(r==0);
r = toku_brt_open(brt, fname, is_create, only_create, cachetable, txn, db);
if (r != 0) {
......@@ -3619,7 +3618,7 @@ int toku_brt_set_flags(BRT brt, unsigned int flags) {
int toku_brt_get_flags(BRT brt, unsigned int *flags) {
*flags = brt->flags;
assert(brt->flags==(brt->flags&TOKU_DB_KEYCMP_BUILTIN)); // make sure there are no extranious flags
assert(brt->flags==(brt->flags&TOKU_DB_KEYCMP_BUILTIN)); // make sure there are no extraneous flags
return 0;
}
......@@ -3639,6 +3638,12 @@ int toku_brt_set_bt_compare(BRT brt, int (*bt_compare)(DB *, const DBT*, const D
return 0;
}
int toku_brt_set_upsert(BRT brt, int (*upsert)(DB *, const DBT *key, const DBT *upserted_val, const DBT *upserted_extra, const DBT *prev_val,
void (*set_val)(const DBT *new_val, void *set_extra), void *set_extra)) {
brt->upsert_fun = upsert;
return 0;
}
brt_compare_func toku_brt_get_bt_compare (BRT brt) {
return brt->compare_fun;
}
......
This diff is collapsed.
......@@ -201,7 +201,8 @@ toku_logger_close_rollback(TOKULOGGER logger, BOOL recovery_failed) {
struct brt_header *h = toku_cachefile_get_userdata(cf);
toku_brtheader_lock(h);
if (!h->panic && recovery_failed) {
toku_brt_header_set_panic(h, EINVAL, "Recovery failed");
r = toku_brt_header_set_panic(h, EINVAL, "Recovery failed");
assert(r==0);
}
//Verify it is safe to close it.
if (!h->panic) { //If paniced, it is safe to close.
......
......@@ -107,7 +107,8 @@ static void file_map_close_dictionaries(struct file_map *fmap, BOOL recovery_suc
assert(tuple->brt);
if (!recovery_succeeded) {
// don't update the brt on close
toku_brt_set_panic(tuple->brt, DB_RUNRECOVERY, "recovery failed");
r = toku_brt_set_panic(tuple->brt, DB_RUNRECOVERY, "recovery failed");
assert(r==0);
}
//Logging is already back on. No need to pass LSN into close.
char *error_string = NULL;
......@@ -245,11 +246,14 @@ static int internal_recover_fopen_or_fcreate (RECOVER_ENV renv, BOOL must_create
r = toku_brt_create(&brt);
assert(r == 0);
toku_brt_set_flags(brt, treeflags);
r = toku_brt_set_flags(brt, treeflags);
assert(r==0);
// set the key compare functions
if (!(treeflags & TOKU_DB_KEYCMP_BUILTIN) && renv->bt_compare)
toku_brt_set_bt_compare(brt, renv->bt_compare);
if (!(treeflags & TOKU_DB_KEYCMP_BUILTIN) && renv->bt_compare) {
r = toku_brt_set_bt_compare(brt, renv->bt_compare);
assert(r==0);
}
// TODO mode (FUTURE FEATURE)
mode = mode;
......
......@@ -18,9 +18,15 @@ static void dummy_set_brt(DB *db UU(), BRT brt UU()) {}
int
main(int argc, const char *const argv[]) {
toku_brt_init(dummy, dummy, dummy_set_brt);
{
int rr = toku_brt_init(dummy, dummy, dummy_set_brt);
assert(rr==0);
}
int r = recovery_main(argc, argv);
toku_brt_destroy();
{
int rr = toku_brt_destroy();
assert(rr=0);
}
return r;
}
......
......@@ -19,7 +19,7 @@ static void test_dump_empty_db (void) {
unlink(fname);
r = toku_open_brt(fname, 1, &t, 1024, ct, null_txn, toku_builtin_compare_fun, null_db);
assert(r==0);
if (verbose) toku_dump_brt(stdout, t);
if (verbose) { r=toku_dump_brt(stdout, t); assert(r==0); }
r = toku_close_brt(t, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
......@@ -44,15 +44,15 @@ static void test_multiple_files_of_size (int size) {
DBT k,v;
snprintf(key, 100, "key%d", i);
snprintf(val, 100, "val%d", i);
toku_brt_insert(t0, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
r = toku_brt_insert(t0, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn); assert(r==0);
snprintf(val, 100, "Val%d", i);
toku_brt_insert(t1, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
r = toku_brt_insert(t1, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn); assert(r==0);
}
//toku_verify_brt(t0);
//dump_brt(t0);
//dump_brt(t1);
toku_verify_brt(t0);
toku_verify_brt(t1);
r = toku_verify_brt(t0); assert(r==0);
r = toku_verify_brt(t1); assert(r==0);
r = toku_close_brt(t0, 0); assert(r==0);
r = toku_close_brt(t1, 0); assert(r==0);
......@@ -105,7 +105,8 @@ static void test_multiple_brts_one_db_one_file (void) {
DBT kb, vb;
snprintf(k, 20, "key%d", i);
snprintf(v, 20, "val%d", i);
toku_brt_insert(trees[i], toku_fill_dbt(&kb, k, strlen(k)+1), toku_fill_dbt(&vb, v, strlen(v)+1), null_txn);
r = toku_brt_insert(trees[i], toku_fill_dbt(&kb, k, strlen(k)+1), toku_fill_dbt(&vb, v, strlen(v)+1), null_txn);
assert(r==0);
}
for (i=0; i<MANYN; i++) {
char k[20],vexpect[20];
......@@ -148,7 +149,8 @@ static void test_read_what_was_written (void) {
/* See if we can put something in it. */
{
DBT k,v;
toku_brt_insert(brt, toku_fill_dbt(&k, "hello", 6), toku_fill_dbt(&v, "there", 6), null_txn);
r = toku_brt_insert(brt, toku_fill_dbt(&k, "hello", 6), toku_fill_dbt(&v, "there", 6), null_txn);
assert(r==0);
}
r = toku_close_brt(brt, 0); assert(r==0);
......@@ -176,11 +178,13 @@ static void test_read_what_was_written (void) {
int verify_result=toku_verify_brt(brt);;
assert(verify_result==0);
}
toku_brt_insert(brt, toku_fill_dbt(&k, key, strlen(key)+1), toku_fill_dbt(&v, val, strlen(val)+1), null_txn);
r = toku_brt_insert(brt, toku_fill_dbt(&k, key, strlen(key)+1), toku_fill_dbt(&v, val, strlen(val)+1), null_txn);
assert(r==0);
if (i<600) {
int verify_result=toku_verify_brt(brt);
if (verify_result) {
toku_dump_brt(stdout, brt);
r = toku_dump_brt(stdout, brt);
assert(r==0);
assert(0);
}
{
......@@ -198,7 +202,8 @@ static void test_read_what_was_written (void) {
if (verbose) printf("Now read them out\n");
//show_brt_blocknumbers(brt);
toku_verify_brt(brt);
r = toku_verify_brt(brt);
assert(r==0);
//dump_brt(brt);
/* See if we can read them all out again. */
......
......@@ -20,7 +20,8 @@ static void test1 (void) {
unlink(fname);
r = toku_open_brt(fname, 1, &t, 1024, ct, null_txn, toku_builtin_compare_fun, null_db);
assert(r==0);
toku_brt_insert(t, toku_fill_dbt(&k, "hello", 6), toku_fill_dbt(&v, "there", 6), null_txn);
r = toku_brt_insert(t, toku_fill_dbt(&k, "hello", 6), toku_fill_dbt(&v, "there", 6), null_txn);
assert(r==0);
{
struct check_pair pair = {6, "hello", 6, "there", 0};
r = toku_brt_lookup(t, toku_fill_dbt(&k, "hello", 6), lookup_checkf, &pair);
......
......@@ -26,11 +26,13 @@ static void test2 (int memcheck, int limit) {
char key[100],val[100];
snprintf(key,100,"hello%d",i);
snprintf(val,100,"there%d",i);
toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
r = toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
assert(r==0);
r = toku_verify_brt(t); assert(r==0);
//printf("%s:%d did insert %d\n", __FILE__, __LINE__, i);
if (0) {
toku_brt_flush(t);
r = toku_brt_flush(t);
assert(r==0);
{
int n = toku_get_n_items_malloced();
if (verbose) printf("%s:%d i=%d n_items_malloced=%d\n", __FILE__, __LINE__, i, n);
......
......@@ -28,7 +28,8 @@ static void test3 (int nodesize, int count, int memcheck) {
DBT k,v;
snprintf(key,100,"hello%d",i);
snprintf(val,100,"there%d",i);
toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
r = toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
assert(r==0);
}
r = toku_verify_brt(t); assert(r==0);
r = toku_close_brt(t, 0); assert(r==0);
......
......@@ -28,7 +28,8 @@ static void test4 (int nodesize, int count, int memcheck) {
DBT k,v;
snprintf(key,100,"hello%d",rv);
snprintf(val,100,"there%d",i);
toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
r = toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
assert(r==0);
}
r = toku_verify_brt(t); assert(r==0);
r = toku_close_brt(t, 0); assert(r==0);
......
......@@ -30,7 +30,8 @@ static void test5 (void) {
snprintf(key, 100, "key%d", rk);
snprintf(val, 100, "val%d", rv);
DBT k,v;
toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
r = toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
assert(r==0);
}
if (verbose) printf("\n");
for (i=0; i<limit/2; i++) {
......
......@@ -51,7 +51,8 @@ static void verify_dbfile(int n, const char *name) {
TOKUTXN const null_txn = NULL;
BRT t = NULL;
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_bt_compare(t, compare_ints); assert(r == 0);
r = toku_brt_set_bt_compare(t, compare_ints);
assert(r==0);
r = toku_brt_open(t, name, 0, 0, ct, null_txn, 0); assert(r==0);
BRT_CURSOR cursor = NULL;
......
......@@ -250,7 +250,7 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c
TOKUTXN const null_txn = NULL;
BRT t = NULL;
r = toku_brt_create(&t); assert(r == 0);
r = toku_brt_set_bt_compare(t, compare_ints); assert(r == 0);
r = toku_brt_set_bt_compare(t, compare_ints); assert(r==0);
r = toku_brt_open(t, name, 0, 0, ct, null_txn, 0); assert(r==0);
BRT_CURSOR cursor = NULL;
......
......@@ -28,7 +28,8 @@ test_overflow (void) {
int i;
for (i=0; i<8; i++) {
char key[]={(char)('a'+i), 0};
toku_brt_insert(t, toku_fill_dbt(&k, key, 2), toku_fill_dbt(&v,buf,sizeof(buf)), null_txn);
r = toku_brt_insert(t, toku_fill_dbt(&k, key, 2), toku_fill_dbt(&v,buf,sizeof(buf)), null_txn);
assert(r=0);
}
r = toku_close_brt(t, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
......
......@@ -24,7 +24,8 @@ test_main(int argc, const char *argv[]) {
DBT k,v;
snprintf(key, 100, "key%d", i);
snprintf(val, 100, "val%d", i);
toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
r = toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
assert(r==0);
}
r = toku_dump_brt(f, t); assert(r==0);
r = toku_close_brt(t, 0); assert(r==0);
......
......@@ -35,7 +35,10 @@ test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute_
int r = toku_os_get_file_size(fd, &file_size);
assert(r==0);
}
maybe_preallocate_in_file(fd, 1000);
{
int r = maybe_preallocate_in_file(fd, 1000);
assert(r==0);
}
int64_t file_size2;
{
int r = toku_os_get_file_size(fd, &file_size2);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment