Commit 10aab449 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #611

Fixed a bug in tokudb.
In a NO_DUP database, if we have a cursor pointing to an element,
and then insert (overwrite) an element with the same key but larger (as per cmp func)
data, then DB_NEXT should NOT return the newly inserted element.. it should
return the element with the next larger key.

The opposite error also existed for DB_PREV.

We rewrite the flag of DB_NEXT/DB_PREV to DB_NEXT_NODUP and DB_PREV_NODUP
when there are no duplicates.

git-svn-id: file:///svn/tokudb@3155 c7de825b-a66e-492c-adef-691d508d4ae1
parent bc4b3893
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <db.h>
#include "test.h"
static DBC* cursor = NULL;
static DB* db = NULL;
static DB_ENV* env = NULL;
static int r = 0;
static BOOL dups = FALSE;
static DB_TXN* null_txn = NULL;
static void setup_env(void) {
assert(!env && !db && !cursor);
system("rm -rf " ENVDIR);
mkdir(ENVDIR, 0777);
r = db_env_create(&env, 0);
CKERR(r);
assert(env);
env->set_errfile(env, stderr);
r = env->open(env, ENVDIR, DB_CREATE|DB_INIT_MPOOL|DB_THREAD|DB_PRIVATE, 0777);
CKERR(r);
assert(env);
}
static void close_env(void) {
assert(env && !db && !cursor);
r = env->close(env, 0);
CKERR(r);
env = NULL;
}
static void setup_db(u_int32_t dup_flags) {
assert(env && !db && !cursor);
r = db_create(&db, env, 0);
CKERR(r);
assert(db);
db->set_errfile(db, stderr);
if (dup_flags) {
r = db->set_flags(db, dup_flags);
CKERR(r);
}
r = db->open(db, null_txn, "foo.db", "main", DB_BTREE, DB_CREATE, 0666);
CKERR(r);
assert(db);
}
static void close_db(void) {
assert(env && db && !cursor);
r = db->close(db, 0);
CKERR(r);
db = NULL;
}
static void setup_cursor(void) {
assert(env && db && !cursor);
r = db->cursor(db, NULL, &cursor, 0);
CKERR(r);
assert(cursor);
}
static void close_cursor(void) {
assert(env && db && cursor);
r = cursor->c_close(cursor);
CKERR(r);
cursor = NULL;
}
#ifdef USE_BDB
#define DB_YESOVERWRITE 0
#endif
static void insert(char k, char d) {
DBT key;
DBT data;
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof(k)), dbt_init(&data, &d, sizeof(d)), DB_YESOVERWRITE);
CKERR(r);
}
static void c_get(u_int32_t flag, char key_expect, char data_expect) {
DBT key;
DBT data;
r = cursor->c_get(cursor, dbt_init(&key, 0, 0), dbt_init(&data, 0, 0), flag);
CKERR(r);
assert(key.size == sizeof(key_expect));
assert(data.size == sizeof(data_expect));
char got_key = *(char*)key.data;
char got_data = *(char*)data.data;
if (verbose &&
(got_key != key_expect || got_data != data_expect)) {
printf("DUPS [%d] c_get(%u) Expect (%c,%c)\n"
" Got (%c,%c)\n",
(int)dups, flag, key_expect, data_expect, got_key, got_data);
}
assert(got_key == key_expect);
assert(got_data == data_expect);
}
static void test_skip_key(u_int32_t dup_flags, u_int32_t flag, BOOL is_next) {
setup_env();
setup_db(dup_flags);
setup_cursor();
/* ********************************************************************** */
char key = 'g';
char data = 'g';
int forward = is_next ? 1 : -1;
insert(key, data);
insert(key + forward, data);
c_get(flag, key, data);
insert(key, data + forward);
c_get(flag, key + forward, data);
/* ********************************************************************** */
close_cursor();
close_db();
close_env();
}
static void test_do_not_skip_key(u_int32_t dup_flags, u_int32_t flag, BOOL is_next) {
setup_env();
setup_db(dup_flags);
setup_cursor();
char key = 'g';
char data = 'g';
int forward = is_next ? 1 : -1;
insert(key, data);
insert(key + forward, data);
c_get(flag, key, data);
insert(key, data + forward);
c_get(flag, key, data + forward);
close_cursor();
close_db();
close_env();
}
static void run_test(u_int32_t dup_flags) {
dups = dup_flags != 0;
/* ********************************************************************** */
/* Test DB_NEXT works properly. */
if (dups) {
test_do_not_skip_key(dup_flags, DB_NEXT, TRUE);
}
else {
test_skip_key(dup_flags, DB_NEXT, TRUE);
}
/* ********************************************************************** */
/* Test DB_PREV works properly. */
if (dups) {
test_do_not_skip_key(dup_flags, DB_PREV, FALSE);
}
else {
test_skip_key(dup_flags, DB_PREV, FALSE);
}
/* ********************************************************************** */
/* Test DB_PREV_NODUP works properly. */
test_skip_key(dup_flags, DB_PREV_NODUP, FALSE);
/* ********************************************************************** */
/* Test DB_NEXT_NODUP works properly. */
test_skip_key(dup_flags, DB_NEXT_NODUP, TRUE);
/* ********************************************************************** */
}
int main(int argc, const char *argv[]) {
parse_args(argc, argv);
run_test(0);
run_test(DB_DUP | DB_DUPSORT);
return 0;
}
...@@ -1098,6 +1098,40 @@ static inline DB_TXN* toku_txn_ancestor(DB_TXN* txn) { ...@@ -1098,6 +1098,40 @@ static inline DB_TXN* toku_txn_ancestor(DB_TXN* txn) {
static int toku_txn_add_lt(DB_TXN* txn, toku_lock_tree* lt); static int toku_txn_add_lt(DB_TXN* txn, toku_lock_tree* lt);
static void toku_c_get_fix_flags(DBC* c, u_int32_t* flag) {
assert(c && flag);
DB* db = c->dbp;
u_int32_t get_flag = get_main_cursor_flag(*flag);
unsigned int brtflags;
toku_brt_get_flags(db->i->brt, &brtflags);
BOOL duplicates = (brtflags & TOKU_DB_DUPSORT) != 0;
switch (get_flag) {
case (DB_NEXT): {
if (!duplicates) {
toku_swap_flag(flag, &get_flag, DB_NEXT_NODUP);
}
break;
}
case (DB_PREV): {
if (!duplicates) {
toku_swap_flag(flag, &get_flag, DB_PREV_NODUP);
}
break;
}
case (DB_GET_BOTH_RANGE): {
if (!duplicates) {
toku_swap_flag(flag, &get_flag, DB_GET_BOTH);
}
break;
}
default: {
break;
}
}
}
static int toku_c_get_pre_lock(DBC* c, DBT* key, DBT* data, u_int32_t* flag, static int toku_c_get_pre_lock(DBC* c, DBT* key, DBT* data, u_int32_t* flag,
DBT* saved_key, DBT* saved_data) { DBT* saved_key, DBT* saved_data) {
assert(saved_key && saved_data && flag); assert(saved_key && saved_data && flag);
...@@ -1124,7 +1158,6 @@ static int toku_c_get_pre_lock(DBC* c, DBT* key, DBT* data, u_int32_t* flag, ...@@ -1124,7 +1158,6 @@ static int toku_c_get_pre_lock(DBC* c, DBT* key, DBT* data, u_int32_t* flag,
break; break;
} }
case (DB_GET_BOTH): { case (DB_GET_BOTH): {
get_both:
txn_anc = toku_txn_ancestor(txn); txn_anc = toku_txn_ancestor(txn);
r = toku_txn_add_lt(txn_anc, db->i->lt); r = toku_txn_add_lt(txn_anc, db->i->lt);
if (r!=0) return r; if (r!=0) return r;
...@@ -1137,8 +1170,7 @@ static int toku_c_get_pre_lock(DBC* c, DBT* key, DBT* data, u_int32_t* flag, ...@@ -1137,8 +1170,7 @@ static int toku_c_get_pre_lock(DBC* c, DBT* key, DBT* data, u_int32_t* flag,
break; break;
} }
case (DB_GET_BOTH_RANGE): { case (DB_GET_BOTH_RANGE): {
if (!duplicates) { assert(duplicates);
toku_swap_flag(flag, &get_flag, DB_GET_BOTH); goto get_both; }
r = toku_save_original_data(saved_data, data); r = toku_save_original_data(saved_data, data);
break; break;
} }
...@@ -1302,6 +1334,7 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag ...@@ -1302,6 +1334,7 @@ static int toku_c_get_noassociate(DBC * c, DBT * key, DBT * data, u_int32_t flag
DBT saved_data; DBT saved_data;
int r; int r;
toku_c_get_fix_flags(c, &flag);
r = toku_c_get_pre_lock(c, key, data, &flag, &saved_key, &saved_data); r = toku_c_get_pre_lock(c, key, data, &flag, &saved_key, &saved_data);
if (r!=0) return r; if (r!=0) return r;
TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL; TOKUTXN txn = c->i->txn ? c->i->txn->i->tokutxn : NULL;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment