Commit e6125177 authored by Rich Prohaska's avatar Rich Prohaska

fix cursor->get_both in a dupsort db. closes #185

git-svn-id: file:///svn/tokudb@1135 c7de825b-a66e-492c-adef-691d508d4ae1
parent c380755d
...@@ -2670,7 +2670,7 @@ static int brtcurs_set_key(BRT_CURSOR cursor, DISKOFF off, DBT *key, DBT *val, i ...@@ -2670,7 +2670,7 @@ static int brtcurs_set_key(BRT_CURSOR cursor, DISKOFF off, DBT *key, DBT *val, i
if (node->height > 0) { if (node->height > 0) {
cursor->path_len += 1; cursor->path_len += 1;
for (;;) { for (;;) {
childnum = brtnode_which_child(node, key, brt); childnum = brtnode_left_child(node, key, flag == DB_GET_BOTH ? val : 0, brt);
cursor->path[cursor->path_len-1] = node; cursor->path[cursor->path_len-1] = node;
cursor->pathcnum[cursor->path_len-1] = childnum; cursor->pathcnum[cursor->path_len-1] = childnum;
brt_node_add_cursor(node, childnum, cursor); brt_node_add_cursor(node, childnum, cursor);
......
...@@ -823,7 +823,11 @@ int toku_pma_cursor_set_key(PMA_CURSOR c, DBT *key) { ...@@ -823,7 +823,11 @@ int toku_pma_cursor_set_key(PMA_CURSOR c, DBT *key) {
int toku_pma_cursor_set_both(PMA_CURSOR c, DBT *key, DBT *val) { int toku_pma_cursor_set_both(PMA_CURSOR c, DBT *key, DBT *val) {
PMA pma = c->pma; PMA pma = c->pma;
unsigned int here = toku_pmainternal_find(pma, key); unsigned int here; int found;
if (pma->dup_mode & TOKU_DB_DUPSORT)
here = __pma_dup_search(pma, key, val, 0, pma->N, &found);
else
here = toku_pmainternal_find(pma, key);
assert(here<=toku_pma_index_limit(pma)); assert(here<=toku_pma_index_limit(pma));
int r = DB_NOTFOUND; int r = DB_NOTFOUND;
if (here < pma->N) { if (here < pma->N) {
......
...@@ -15,8 +15,6 @@ ...@@ -15,8 +15,6 @@
int errors; int errors;
void db_put(DB *db, int k, int v, u_int32_t put_flags, int rexpect) { void db_put(DB *db, int k, int v, u_int32_t put_flags, int rexpect) {
DBT key, val; DBT key, val;
int r = db->put(db, 0, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), put_flags); int r = db->put(db, 0, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), put_flags);
...@@ -26,6 +24,56 @@ void db_put(DB *db, int k, int v, u_int32_t put_flags, int rexpect) { ...@@ -26,6 +24,56 @@ void db_put(DB *db, int k, int v, u_int32_t put_flags, int rexpect) {
} }
} }
void test_dup_key(int dup_mode, u_int32_t put_flags, int rexpect, int rexpectdupdup) {
if (verbose) printf("test_dup_key: %d, %u, %d, %d\n", dup_mode, put_flags, rexpect, rexpectdupdup);
DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = DIR "/" "test_insert.brt";
int r;
unlink(fname);
/* create the dup database file */
r = db_create(&db, null_env, 0); assert(r == 0);
r = db->set_flags(db, dup_mode);
#if USE_TDB
if (r != 0 && dup_mode == DB_DUP) {
printf("%s:%d:WARNING: tokudb does not support DB_DUP\n", __FILE__, __LINE__);
r = db->close(db, 0); assert(r == 0);
return;
}
#endif
assert(r == 0);
r = db->set_pagesize(db, 4096); assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666); assert(r == 0);
db_put(db, 0, 0, put_flags, rexpect);
db_put(db, 0, 1, put_flags, rexpectdupdup);
DBC *cursor;
r = db->cursor(db, null_txn, &cursor, 0); assert(r == 0);
for (;;) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_NEXT);
if (r != 0) break;
assert(key.size == sizeof (int));
assert(val.size == sizeof (int));
int kk, vv;
memcpy(&kk, key.data, key.size);
memcpy(&vv, val.data, val.size);
if (verbose) printf("kk %d vv %d\n", kk, vv);
free(key.data);
free(val.data);
}
r = cursor->c_close(cursor); assert(r == 0);
r = db->close(db, 0); assert(r == 0);
}
void test_dup_dup(int dup_mode, u_int32_t put_flags, int rexpect, int rexpectdupdup) { void test_dup_dup(int dup_mode, u_int32_t put_flags, int rexpect, int rexpectdupdup) {
if (verbose) printf("test_dup_dup: %d, %u, %d, %d\n", dup_mode, put_flags, rexpect, rexpectdupdup); if (verbose) printf("test_dup_dup: %d, %u, %d, %d\n", dup_mode, put_flags, rexpect, rexpectdupdup);
...@@ -38,8 +86,7 @@ void test_dup_dup(int dup_mode, u_int32_t put_flags, int rexpect, int rexpectdup ...@@ -38,8 +86,7 @@ void test_dup_dup(int dup_mode, u_int32_t put_flags, int rexpect, int rexpectdup
unlink(fname); unlink(fname);
/* create the dup database file */ /* create the dup database file */
r = db_create(&db, null_env, 0); r = db_create(&db, null_env, 0); assert(r == 0);
assert(r == 0);
r = db->set_flags(db, dup_mode); r = db->set_flags(db, dup_mode);
#if USE_TDB #if USE_TDB
if (r != 0 && dup_mode == DB_DUP) { if (r != 0 && dup_mode == DB_DUP) {
...@@ -49,17 +96,14 @@ void test_dup_dup(int dup_mode, u_int32_t put_flags, int rexpect, int rexpectdup ...@@ -49,17 +96,14 @@ void test_dup_dup(int dup_mode, u_int32_t put_flags, int rexpect, int rexpectdup
} }
#endif #endif
assert(r == 0); assert(r == 0);
r = db->set_pagesize(db, 4096); r = db->set_pagesize(db, 4096); assert(r == 0);
assert(r == 0); r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666); assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
db_put(db, 0, 0, put_flags, rexpect); db_put(db, 0, 0, put_flags, rexpect);
db_put(db, 0, 0, put_flags, rexpectdupdup); db_put(db, 0, 0, put_flags, rexpectdupdup);
DBC *cursor; DBC *cursor;
r = db->cursor(db, null_txn, &cursor, 0); r = db->cursor(db, null_txn, &cursor, 0); assert(r == 0);
assert(r == 0);
for (;;) { for (;;) {
DBT key, val; DBT key, val;
...@@ -75,11 +119,68 @@ void test_dup_dup(int dup_mode, u_int32_t put_flags, int rexpect, int rexpectdup ...@@ -75,11 +119,68 @@ void test_dup_dup(int dup_mode, u_int32_t put_flags, int rexpect, int rexpectdup
free(val.data); free(val.data);
} }
r = cursor->c_close(cursor); r = cursor->c_close(cursor); assert(r == 0);
assert(r == 0);
r = db->close(db, 0); assert(r == 0);
}
void test_put_00_01_01(int dup_mode, u_int32_t put_flags) {
if (verbose) printf("test_put_00_01_01: %d, %u\n", dup_mode, put_flags);
r = db->close(db, 0); DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = DIR "/" "test_insert.brt";
int r, expectr;
unlink(fname);
/* create the dup database file */
r = db_create(&db, null_env, 0); assert(r == 0);
r = db->set_flags(db, dup_mode);
#if USE_TDB
if (r != 0 && dup_mode == DB_DUP) {
printf("%s:%d:WARNING: tokudb does not support DB_DUP\n", __FILE__, __LINE__);
r = db->close(db, 0); assert(r == 0);
return;
}
#endif
assert(r == 0); assert(r == 0);
r = db->set_pagesize(db, 4096); assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666); assert(r == 0);
expectr = 0;
db_put(db, 0, 0, put_flags, expectr);
expectr = put_flags == DB_NOOVERWRITE ? DB_KEYEXIST : 0;
db_put(db, 0, 1, put_flags, expectr);
expectr = (put_flags == DB_NOOVERWRITE || dup_mode & DB_DUPSORT) ? DB_KEYEXIST : 0;
#if USE_TDB
if (put_flags == DB_YESOVERWRITE) expectr = 0;
#endif
db_put(db, 0, 1, put_flags, expectr);
DBC *cursor;
r = db->cursor(db, null_txn, &cursor, 0); assert(r == 0);
for (;;) {
DBT key, val;
r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_NEXT);
if (r != 0) break;
assert(key.size == sizeof (int));
assert(val.size == sizeof (int));
int kk, vv;
memcpy(&kk, key.data, key.size);
memcpy(&vv, val.data, val.size);
if (verbose) printf("kk %d vv %d\n", kk, vv);
free(key.data);
free(val.data);
}
r = cursor->c_close(cursor); assert(r == 0);
r = db->close(db, 0); assert(r == 0);
} }
int main(int argc, const char *argv[]) { int main(int argc, const char *argv[]) {
...@@ -89,17 +190,52 @@ int main(int argc, const char *argv[]) { ...@@ -89,17 +190,52 @@ int main(int argc, const char *argv[]) {
system("rm -rf " DIR); system("rm -rf " DIR);
mkdir(DIR, 0777); mkdir(DIR, 0777);
test_dup_dup(0, 0, 0, 0); test_put_00_01_01(0, 0);
test_dup_dup(0, DB_NODUPDATA, EINVAL, EINVAL); test_put_00_01_01(0, DB_NOOVERWRITE);
test_dup_dup(0, DB_NOOVERWRITE, 0, DB_KEYEXIST);
test_put_00_01_01(DB_DUP | DB_DUPSORT, 0);
test_put_00_01_01(DB_DUP | DB_DUPSORT, DB_NOOVERWRITE);
#if USE_TDB
test_put_00_01_01(DB_DUP | DB_DUPSORT, DB_YESOVERWRITE);
#endif
/* dup key uniq data */
test_dup_key(0, 0, 0, 0);
test_dup_key(0, DB_NODUPDATA, EINVAL, EINVAL);
test_dup_key(0, DB_NOOVERWRITE, 0, DB_KEYEXIST);
test_dup_key(DB_DUP, 0, 0, 0);
test_dup_key(DB_DUP, DB_NODUPDATA, EINVAL, EINVAL);
test_dup_key(DB_DUP, DB_NOOVERWRITE, 0, DB_KEYEXIST);
test_dup_dup(DB_DUP, 0, 0, 0); #if USE_TDB
test_dup_dup(DB_DUP, DB_NODUPDATA, EINVAL, EINVAL); // test_dup_key(DB_DUP | DB_DUPSORT, 0, EINVAL, EINVAL);
test_dup_dup(DB_DUP, DB_NOOVERWRITE, 0, DB_KEYEXIST); test_dup_key(DB_DUP | DB_DUPSORT, 0, 0, 0);
test_dup_key(DB_DUP | DB_DUPSORT, DB_YESOVERWRITE, 0, 0);
#else
test_dup_key(DB_DUP | DB_DUPSORT, 0, 0, 0);
#endif
test_dup_key(DB_DUP | DB_DUPSORT, DB_NODUPDATA, 0, 0);
test_dup_key(DB_DUP | DB_DUPSORT, DB_NOOVERWRITE, 0, DB_KEYEXIST);
/* dup key dup data */
test_dup_dup(0, 0, 0, 0);
test_dup_dup(0, DB_NODUPDATA, EINVAL, EINVAL);
test_dup_dup(0, DB_NOOVERWRITE, 0, DB_KEYEXIST);
test_dup_dup(DB_DUP | DB_DUPSORT, 0, 0, DB_KEYEXIST); test_dup_dup(DB_DUP, 0, 0, 0);
test_dup_dup(DB_DUP | DB_DUPSORT, DB_NODUPDATA, 0, DB_KEYEXIST); test_dup_dup(DB_DUP, DB_NODUPDATA, EINVAL, EINVAL);
test_dup_dup(DB_DUP | DB_DUPSORT, DB_NOOVERWRITE, 0, DB_KEYEXIST); test_dup_dup(DB_DUP, DB_NOOVERWRITE, 0, DB_KEYEXIST);
#if USE_TDB
// test_dup_dup(DB_DUP | DB_DUPSORT, 0, EINVAL, EINVAL);
test_dup_dup(DB_DUP | DB_DUPSORT, 0, 0, DB_KEYEXIST);
test_dup_dup(DB_DUP | DB_DUPSORT, DB_YESOVERWRITE, 0, 0);
#else
test_dup_dup(DB_DUP | DB_DUPSORT, 0 , 0, DB_KEYEXIST);
#endif
test_dup_dup(DB_DUP | DB_DUPSORT, DB_NODUPDATA, 0, DB_KEYEXIST);
test_dup_dup(DB_DUP | DB_DUPSORT, DB_NOOVERWRITE, 0, DB_KEYEXIST);
return errors; return errors;
} }
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include <string.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <db.h>
#include "test.h"
void db_put(DB *db, int k, int v) {
DB_TXN * const null_txn = 0;
DBT key, val;
int r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
void db_get(DB *db, int k) {
DB_TXN * const null_txn = 0;
DBT key, val;
int r = db->get(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init_malloc(&val), 0);
assert(r == 0);
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
printf("do_search %d\n", htonl(vv));
free(val.data);
}
void db_del(DB *db, int k) {
DB_TXN * const null_txn = 0;
DBT key;
int r = db->del(db, null_txn, dbt_init(&key, &k, sizeof k), 0);
assert(r == 0);
}
void expect_db_get(DB *db, int k, int v) {
DB_TXN * const null_txn = 0;
DBT key, val;
int r = db->get(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init_malloc(&val), 0);
assert(r == 0);
int vv;
assert(val.size == sizeof vv);
memcpy(&vv, val.data, val.size);
assert(vv == v);
free(val.data);
}
void expect_cursor_get(DBC *cursor, int k, int v) {
DBT key, val;
int r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_NEXT);
assert(r == 0);
assert(key.size == sizeof k);
int kk;
memcpy(&kk, key.data, key.size);
assert(val.size == sizeof v);
int vv;
memcpy(&vv, val.data, val.size);
if (kk != k || vv != v) printf("expect key %d got %d - %d %d\n", htonl(k), htonl(kk), htonl(v), htonl(vv));
assert(kk == k);
assert(vv == v);
free(key.data);
free(val.data);
}
void expect_cursor_set(DBC *cursor, int k) {
DBT key, val;
int r = cursor->c_get(cursor, dbt_init(&key, &k, sizeof k), dbt_init_malloc(&val), DB_SET);
assert(r == 0);
free(val.data);
}
void expect_cursor_get_both(DBC *cursor, int k, int v) {
DBT key, val;
int r = cursor->c_get(cursor, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), DB_GET_BOTH);
assert(r == 0);
}
void expect_cursor_get_current(DBC *cursor, int k, int v) {
DBT key, val;
int r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_CURRENT);
assert(r == 0);
int kk, vv;
assert(key.size == sizeof kk); memcpy(&kk, key.data, key.size); assert(kk == k);
assert(val.size == sizeof vv); memcpy(&vv, val.data, val.size); assert(vv == v);
free(key.data); free(val.data);
}
/* insert, close, delete, insert, search */
void test_icdi_search(int n, int dup_mode) {
if (verbose) printf("test_icdi_search:%d %d\n", n, dup_mode);
DB_ENV * const null_env = 0;
DB *db;
DB_TXN * const null_txn = 0;
const char * const fname = DIR "/" "test_icdi_search.brt";
int r;
unlink(fname);
/* create the dup database file */
r = db_create(&db, null_env, 0); assert(r == 0);
r = db->set_flags(db, dup_mode); assert(r == 0);
r = db->set_pagesize(db, 4096); assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666); assert(r == 0);
/* insert n duplicates */
int i;
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = htonl(i);
db_put(db, k, v);
expect_db_get(db, k, htonl(0));
}
/* reopen the database to force nonleaf buffering */
r = db->close(db, 0); assert(r == 0);
r = db_create(&db, null_env, 0); assert(r == 0);
r = db->set_flags(db, dup_mode); assert(r == 0);
r = db->set_pagesize(db, 4096); assert(r == 0);
r = db->open(db, null_txn, fname, "main", DB_BTREE, 0, 0666); assert(r == 0);
db_del(db, htonl(n/2));
/* insert n duplicates */
for (i=0; i<n; i++) {
int k = htonl(n/2);
int v = htonl(n+i);
db_put(db, k, v);
DBC *cursor;
r = db->cursor(db, 0, &cursor, 0); assert(r == 0);
expect_cursor_get_both(cursor, k, v);
expect_cursor_get_current(cursor, k, v);
r = cursor->c_close(cursor); assert(r == 0);
}
DBC *cursor;
r = db->cursor(db, null_txn, &cursor, 0); assert(r == 0);
for (i=0; i<n; i++) {
expect_cursor_get(cursor, htonl(n/2), htonl(n+i));
}
r = cursor->c_close(cursor); assert(r == 0);
r = db->close(db, 0); assert(r == 0);
}
int main(int argc, const char *argv[]) {
int i;
parse_args(argc, argv);
system("rm -rf " DIR);
mkdir(DIR, 0777);
for (i=1; i<65537; i *= 2) {
test_icdi_search(i, DB_DUP + DB_DUPSORT);
}
return 0;
}
...@@ -12,6 +12,15 @@ ...@@ -12,6 +12,15 @@
#include "test.h" #include "test.h"
#if USE_BDB
#define DB_YESOVERWRITE 0
#endif
int db_put(DB *db, DB_TXN *txn, int k, int v) {
DBT key, val;
int r = db->put(db, txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), DB_YESOVERWRITE);
return r;
}
/* create a tree with 15 of 16 leaf nodes /* create a tree with 15 of 16 leaf nodes
each of the leaves should be about 1/2 full each of the leaves should be about 1/2 full
...@@ -47,28 +56,23 @@ void test_hsoc(int pagesize, int dup_mode) { ...@@ -47,28 +56,23 @@ void test_hsoc(int pagesize, int dup_mode) {
r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666); assert(r == 0); r = db->open(db, null_txn, fname, "main", DB_BTREE, DB_CREATE, 0666); assert(r == 0);
int i; int i;
DBT key, val;
int k, v;
/* force 15 leaves (14 splits) */ /* force 15 leaves (14 splits) */
if (verbose) printf("force15\n"); if (verbose) printf("force15\n");
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
k = htonl(i); v = i; r = db_put(db, null_txn, htonl(i), i); assert(r == 0);
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
} }
/* almost fill leaf 0 */ /* almost fill leaf 0 */
if (verbose) printf("fill0\n"); if (verbose) printf("fill0\n");
for (i=0; i<(npp/2)-4; i++) { for (i=0; i<(npp/2)-4; i++) {
k = htonl(0); v = n+i; r = db_put(db, null_txn, htonl(0), n+i); assert(r == 0);
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
} }
/* almost fill leaf 15 */ /* almost fill leaf 15 */
if (verbose) printf("fill15\n"); if (verbose) printf("fill15\n");
for (i=0; i<111; i++) { // for (i=0; i<(npp/2)-4; i++) { for (i=0; i<111; i++) { // for (i=0; i<(npp/2)-4; i++) {
k = htonl(n); v = i; r = db_put(db, null_txn, htonl(n), i); assert(r == 0);
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
} }
/* reopen the database to force nonleaf buffering */ /* reopen the database to force nonleaf buffering */
...@@ -82,20 +86,20 @@ void test_hsoc(int pagesize, int dup_mode) { ...@@ -82,20 +86,20 @@ void test_hsoc(int pagesize, int dup_mode) {
/* do a cursor get k=0 to pull in leaf 0 */ /* do a cursor get k=0 to pull in leaf 0 */
DBC *cursor; DBC *cursor;
r = db->cursor(db,null_txn, &cursor, 0); assert(r == 0); r = db->cursor(db, null_txn, &cursor, 0); assert(r == 0);
DBT key, val;
r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_FIRST); assert(r == 0); r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_FIRST); assert(r == 0);
free(key.data); free(val.data); free(key.data); free(val.data);
/* fill up buffer 2 in the root node */ /* fill up buffer 2 in the root node */
for (i=0; i<216; i++) { for (i=0; i<216; i++) {
k = htonl(npp); v = i; r = db_put(db, null_txn, htonl(npp), i); assert(r == 0);
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
} }
/* push a cmd to leaf 0 to cause it to split */ /* push a cmd to leaf 0 to cause it to split */
for (i=0; i<3; i++) { for (i=0; i<3; i++) {
k = htonl(0); v = 2*n+i; r = db_put(db, null_txn, htonl(0), 2*n+i); assert(r == 0);
r = db->put(db, null_txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0); assert(r == 0);
} }
r = cursor->c_close(cursor); assert(r == 0); r = cursor->c_close(cursor); assert(r == 0);
......
...@@ -835,18 +835,23 @@ static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, ...@@ -835,18 +835,23 @@ static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data,
int r; int r;
unsigned int brtflags; unsigned int brtflags;
assert(flags == 0); // We aren't ready to handle flags such as DB_GET_BOTH or DB_READ_COMMITTED or DB_READ_UNCOMMITTED or DB_RMW
toku_brt_get_flags(db->i->brt, &brtflags); toku_brt_get_flags(db->i->brt, &brtflags);
if (brtflags & TOKU_DB_DUPSORT) { if (brtflags & TOKU_DB_DUPSORT) {
if (flags != 0 && flags != DB_GET_BOTH)
assert(flags == 0); // We aren't ready to handle flags such as DB_GET_BOTH or DB_READ_COMMITTED or DB_READ_UNCOMMITTED or DB_RMW
DBC *dbc; DBC *dbc;
r = db->cursor(db, txn, &dbc, 0); r = db->cursor(db, txn, &dbc, 0);
if (r!=0) return r; if (r!=0) return r;
r = toku_c_get_noassociate(dbc, key, data, DB_SET); r = toku_c_get_noassociate(dbc, key, data, flags == DB_GET_BOTH ? DB_GET_BOTH : DB_SET);
int r2 = dbc->c_close(dbc); int r2 = dbc->c_close(dbc);
if (r!=0) return r; if (r!=0) return r;
return r2; return r2;
} else {
assert(flags == 0);
return toku_brt_lookup(db->i->brt, key, data);
} }
else return toku_brt_lookup(db->i->brt, key, data);
} }
static int toku_db_del_noassociate(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) { static int toku_db_del_noassociate(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) {
...@@ -1114,10 +1119,11 @@ static int toku_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) { ...@@ -1114,10 +1119,11 @@ static int toku_db_del(DB * db, DB_TXN * txn, DBT * key, u_int32_t flags) {
static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) { static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
int r; int r;
assert(flags == 0); // We aren't ready to handle flags such as DB_GET_BOTH or DB_READ_COMMITTED or DB_READ_UNCOMMITTED or DB_RMW
if (db->i->primary==0) r = toku_db_get_noassociate(db, txn, key, data, flags); if (db->i->primary==0) r = toku_db_get_noassociate(db, txn, key, data, flags);
else { else {
// It's a get on a secondary. // It's a get on a secondary.
assert(flags == 0); // We aren't ready to handle flags such as DB_GET_BOTH or DB_READ_COMMITTED or DB_READ_UNCOMMITTED or DB_RMW
DBT primary_key; DBT primary_key;
memset(&primary_key, 0, sizeof(primary_key)); memset(&primary_key, 0, sizeof(primary_key));
r = db->pget(db, txn, key, &primary_key, data, 0); r = db->pget(db, txn, key, &primary_key, data, 0);
...@@ -1264,12 +1270,12 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db ...@@ -1264,12 +1270,12 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) { static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
int r; int r;
unsigned int brtflags;
unsigned int nodesize;
if (flags != 0) return EINVAL;
unsigned int brtflags;
r = toku_brt_get_flags(db->i->brt, &brtflags); assert(r == 0); r = toku_brt_get_flags(db->i->brt, &brtflags); assert(r == 0);
/* limit the size of key and data */
unsigned int nodesize;
r = toku_brt_get_nodesize(db->i->brt, &nodesize); assert(r == 0); r = toku_brt_get_nodesize(db->i->brt, &nodesize); assert(r == 0);
if (brtflags & TOKU_DB_DUPSORT) { if (brtflags & TOKU_DB_DUPSORT) {
unsigned int limit = nodesize / (2*BRT_FANOUT-1); unsigned int limit = nodesize / (2*BRT_FANOUT-1);
...@@ -1280,6 +1286,26 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data, ...@@ -1280,6 +1286,26 @@ static int toku_db_put_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data,
if (key->size >= limit || data->size >= limit) if (key->size >= limit || data->size >= limit)
return EINVAL; return EINVAL;
} }
if (flags == DB_YESOVERWRITE) {
/* tokudb does insert or replace */
;
} else if (flags == DB_NOOVERWRITE) {
/* check if the key already exists */
DBT testfordata;
r = db->get(db, txn, key, toku_init_dbt(&testfordata), 0);
if (r == 0)
return DB_KEYEXIST;
} else if (flags != 0) {
/* no other flags are currently supported */
return EINVAL;
} else {
if (brtflags & TOKU_DB_DUPSORT) {
r = db->get(db, txn, key, data, DB_GET_BOTH);
if (r == 0)
return DB_KEYEXIST;
}
}
r = toku_brt_insert(db->i->brt, key, data, txn ? txn->i->tokutxn : 0); r = toku_brt_insert(db->i->brt, key, data, txn ? txn->i->tokutxn : 0);
//printf("%s:%d %d=__toku_db_put(...)\n", __FILE__, __LINE__, r); //printf("%s:%d %d=__toku_db_put(...)\n", __FILE__, __LINE__, r);
...@@ -1296,7 +1322,7 @@ static int do_associated_inserts (DB_TXN *txn, DBT *key, DBT *data, DB *secondar ...@@ -1296,7 +1322,7 @@ static int do_associated_inserts (DB_TXN *txn, DBT *key, DBT *data, DB *secondar
return EINVAL; // We aren't ready for this return EINVAL; // We aren't ready for this
} }
#endif #endif
r = toku_db_put_noassociate(secondary, txn, &idx, key, 0); r = toku_db_put_noassociate(secondary, txn, &idx, key, DB_YESOVERWRITE);
if (idx.flags & DB_DBT_APPMALLOC) { if (idx.flags & DB_DBT_APPMALLOC) {
free(idx.data); free(idx.data);
} }
...@@ -1306,7 +1332,6 @@ static int do_associated_inserts (DB_TXN *txn, DBT *key, DBT *data, DB *secondar ...@@ -1306,7 +1332,6 @@ static int do_associated_inserts (DB_TXN *txn, DBT *key, DBT *data, DB *secondar
static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) { static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
int r; int r;
if (flags != 0) return EINVAL;
//Cannot put directly into a secondary. //Cannot put directly into a secondary.
if (db->i->primary != 0) return EINVAL; if (db->i->primary != 0) return EINVAL;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment