Commit 99b75cab authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Deal with the multithreaded cursor DBT memory allocaiton problem. Addresses...

Deal with the multithreaded cursor DBT memory allocaiton problem.  Addresses #538.  (I'm going to write a few more tests before calling it fixed.)

git-svn-id: file:///svn/tokudb@2854 c7de825b-a66e-492c-adef-691d508d4ae1
parent d3a49aaa
......@@ -173,6 +173,8 @@ struct brt_cursor {
BRT brt;
DBT key;
DBT val;
int is_temporary_cursor; // If it is a temporary cursor then use the following skey and sval to return tokudb-managed values in dbts. Otherwise use the brt's skey and skval.
void *skey, *sval;
};
int toku_create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logger);
......
......@@ -54,7 +54,7 @@ static void test_multiple_brt_cursor_dbts(int n, DB *db) {
}
for (i=0; i<n; i++) {
r = toku_brt_cursor(brt, &cursors[i]);
r = toku_brt_cursor(brt, &cursors[i], 0);
assert(r == 0);
}
......
......@@ -34,7 +34,7 @@ static void assert_cursor_notfound(BRT brt, int position) {
int r;
DBT kbt, vbt;
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, 0);
assert(r==0);
toku_init_dbt(&kbt); kbt.flags = DB_DBT_MALLOC;
......@@ -52,7 +52,7 @@ static void assert_cursor_value(BRT brt, int position, long long value) {
DBT kbt, vbt;
long long v;
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, 0);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -78,7 +78,7 @@ static void assert_cursor_first_last(BRT brt, long long firstv, long long lastv)
DBT kbt, vbt;
long long v;
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, 0);
assert(r==0);
if (test_cursor_debug && verbose) printf("first key: ");
......@@ -285,7 +285,7 @@ static void assert_cursor_walk(BRT brt, int n) {
int i;
int r;
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, 0);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -357,7 +357,7 @@ static void assert_cursor_rwalk(BRT brt, int n) {
int i;
int r;
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, 0);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -430,7 +430,7 @@ static void assert_cursor_walk_inorder(BRT brt, int n) {
int r;
char *prevkey;
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, 0);
assert(r==0);
prevkey = 0;
......@@ -542,7 +542,7 @@ static void test_brt_cursor_split(int n, DB *db) {
assert(r==0);
}
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, 0);
assert(r==0);
if (test_cursor_debug && verbose) printf("key: ");
......@@ -609,7 +609,7 @@ static void test_multiple_brt_cursors(int n, DB *db) {
int i;
for (i=0; i<n; i++) {
r = toku_brt_cursor(brt, &cursors[i]);
r = toku_brt_cursor(brt, &cursors[i], 0);
assert(r == 0);
}
......@@ -659,7 +659,7 @@ static void test_multiple_brt_cursor_walk(int n, DB *db) {
int c;
/* create the cursors */
for (c=0; c<ncursors; c++) {
r = toku_brt_cursor(brt, &cursors[c]);
r = toku_brt_cursor(brt, &cursors[c], 0);
assert(r == 0);
}
......@@ -751,7 +751,7 @@ static void test_brt_cursor_set(int n, int cursor_op, DB *db) {
assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, 0);
assert(r==0);
/* set cursor to random keys in set { 0, 10, 20, .. 10*(n-1) } */
......@@ -824,7 +824,7 @@ static void test_brt_cursor_set_range(int n, DB *db) {
assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor);
r = toku_brt_cursor(brt, &cursor, 0);
assert(r==0);
/* pick random keys v in 0 <= v < 10*n, the cursor should point
......@@ -875,7 +875,7 @@ static void test_brt_cursor_delete(int n, DB *db) {
error = toku_open_brt(fname, 0, 1, &brt, 1<<12, ct, null_txn, test_brt_cursor_keycompare, db);
assert(error == 0);
error = toku_brt_cursor(brt, &cursor);
error = toku_brt_cursor(brt, &cursor, 0);
assert(error == 0);
DBT key, val;
......@@ -936,7 +936,7 @@ static void test_brt_cursor_get_both(int n, DB *db) {
error = toku_open_brt(fname, 0, 1, &brt, 1<<12, ct, null_txn, test_brt_cursor_keycompare, db);
assert(error == 0);
error = toku_brt_cursor(brt, &cursor);
error = toku_brt_cursor(brt, &cursor, 0);
assert(error == 0);
DBT key, val;
......
......@@ -565,7 +565,7 @@ static void test_cursor_last_empty(void) {
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, toku_get_n_items_malloced()); toku_print_malloced_items();
r = toku_open_brt(n, 0, 1, &brt, 1<<12, ct, null_txn, toku_default_compare_fun, null_db); assert(r==0);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, toku_get_n_items_malloced()); toku_print_malloced_items();
r = toku_brt_cursor(brt, &cursor); assert(r==0);
r = toku_brt_cursor(brt, &cursor, 0); assert(r==0);
toku_init_dbt(&kbt);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, toku_get_n_items_malloced()); toku_print_malloced_items();
toku_init_dbt(&vbt);
......@@ -600,7 +600,7 @@ static void test_cursor_next (void) {
r = toku_brt_insert(brt, toku_fill_dbt(&kbt, "hello", 6), toku_fill_dbt(&vbt, "there", 6), null_txn);
r = toku_brt_insert(brt, toku_fill_dbt(&kbt, "byebye", 7), toku_fill_dbt(&vbt, "byenow", 7), null_txn);
if (verbose) printf("%s:%d calling toku_brt_cursor(...)\n", __FILE__, __LINE__);
r = toku_brt_cursor(brt, &cursor); assert(r==0);
r = toku_brt_cursor(brt, &cursor, 0); assert(r==0);
toku_init_dbt(&kbt);
//printf("%s:%d %d alloced\n", __FILE__, __LINE__, toku_get_n_items_malloced()); toku_print_malloced_items();
toku_init_dbt(&vbt);
......@@ -691,7 +691,7 @@ static void test_wrongendian_compare (int wrong_p, unsigned int N) {
r = toku_brt_insert(brt, &kbt, &vbt, null_txn);
assert(r==0);
}
r = toku_brt_cursor(brt, &cursor); assert(r==0);
r = toku_brt_cursor(brt, &cursor, 0); assert(r==0);
for (i=0; i<2; i++) {
toku_init_dbt(&kbt); toku_init_dbt(&vbt);
......@@ -728,7 +728,7 @@ static void test_wrongendian_compare (int wrong_p, unsigned int N) {
assert(r==0);
toku_cachetable_verify(ct);
}
r = toku_brt_cursor(brt, &cursor); assert(r==0);
r = toku_brt_cursor(brt, &cursor, 0); assert(r==0);
int prev=-1;
for (i=0; i<N; i++) {
......@@ -882,7 +882,7 @@ static void test_brt_delete_present(int n) {
/* cursor should not find anything */
BRT_CURSOR cursor;
r = toku_brt_cursor(t, &cursor);
r = toku_brt_cursor(t, &cursor, 0);
assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
......@@ -1009,7 +1009,7 @@ static void test_brt_delete_cursor_first(int n) {
/* cursor should find the last key: n-1 */
BRT_CURSOR cursor;
r = toku_brt_cursor(t, &cursor);
r = toku_brt_cursor(t, &cursor, 0);
assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
......@@ -1133,7 +1133,7 @@ static void test_brt_delete_both(int n) {
/* cursor should find only odd pairs */
BRT_CURSOR cursor;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
r = toku_brt_cursor(t, &cursor, 0); assert(r == 0);
for (i=1; ; i += 2) {
toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
......@@ -1180,7 +1180,7 @@ static void test_new_brt_cursor_create_close() {
int i;
for (i=0; i<n; i++) {
r = toku_brt_cursor(brt, &cursors[i]); assert(r == 0);
r = toku_brt_cursor(brt, &cursors[i], 0); assert(r == 0);
}
for (i=0; i<n; i++) {
......@@ -1216,7 +1216,7 @@ static void test_new_brt_cursor_first(int n, int dup_mode) {
BRT_CURSOR cursor;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
r = toku_brt_cursor(t, &cursor, 0); assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_REALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
......@@ -1271,7 +1271,7 @@ static void test_new_brt_cursor_last(int n, int dup_mode) {
BRT_CURSOR cursor;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
r = toku_brt_cursor(t, &cursor, 0); assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_REALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
......@@ -1329,7 +1329,7 @@ static void test_new_brt_cursor_next(int n, int dup_mode) {
BRT_CURSOR cursor;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
r = toku_brt_cursor(t, &cursor, 0); assert(r == 0);
for (i=0; ; i++) {
r = toku_brt_cursor_get(cursor, &key, &val, DB_NEXT, null_txn);
......@@ -1379,7 +1379,7 @@ static void test_new_brt_cursor_prev(int n, int dup_mode) {
BRT_CURSOR cursor;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
r = toku_brt_cursor(t, &cursor, 0); assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_REALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
......@@ -1432,7 +1432,7 @@ static void test_new_brt_cursor_current(int n, int dup_mode) {
BRT_CURSOR cursor;
r = toku_brt_cursor(t, &cursor); assert(r == 0);
r = toku_brt_cursor(t, &cursor, 0); assert(r == 0);
toku_init_dbt(&key); key.flags = DB_DBT_REALLOC;
toku_init_dbt(&val); val.flags = DB_DBT_REALLOC;
......@@ -1515,7 +1515,7 @@ static void test_new_brt_cursor_set_range(int n, int dup_mode) {
r = toku_brt_insert(brt, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0); assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor); assert(r==0);
r = toku_brt_cursor(brt, &cursor, 0); assert(r==0);
/* pick random keys v in 0 <= v < 10*n, the cursor should point
to the smallest key in the tree that is >= v */
......@@ -1572,7 +1572,7 @@ static void test_new_brt_cursor_set(int n, int cursor_op, DB *db) {
r = toku_brt_insert(brt, toku_fill_dbt(&key, &k, sizeof k), toku_fill_dbt(&val, &v, sizeof v), 0); assert(r == 0);
}
r = toku_brt_cursor(brt, &cursor); assert(r==0);
r = toku_brt_cursor(brt, &cursor, 0); assert(r==0);
/* set cursor to random keys in set { 0, 10, 20, .. 10*(n-1) } */
for (i=0; i<n; i++) {
......
......@@ -1733,7 +1733,7 @@ int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
int r, rr;
BRT_CURSOR cursor;
rr = toku_brt_cursor(brt, &cursor);
rr = toku_brt_cursor(brt, &cursor, 1);
if (rr != 0) return rr;
int op = brt->flags & TOKU_DB_DUPSORT ? DB_GET_BOTH : DB_SET;
......@@ -2039,7 +2039,7 @@ static inline void brt_cursor_set_key_val(BRT_CURSOR cursor, DBT *newkey, DBT *n
cursor->val = *newval; memset(newval, 0, sizeof *newval);
}
int toku_brt_cursor(BRT brt, BRT_CURSOR *cursorptr) {
int toku_brt_cursor (BRT brt, BRT_CURSOR *cursorptr, int is_temporary_cursor) {
BRT_CURSOR cursor = toku_malloc(sizeof *cursor);
if (cursor == 0)
return ENOMEM;
......@@ -2047,12 +2047,16 @@ int toku_brt_cursor(BRT brt, BRT_CURSOR *cursorptr) {
toku_init_dbt(&cursor->key);
toku_init_dbt(&cursor->val);
list_push(&brt->cursors, &cursor->cursors_link);
cursor->is_temporary_cursor=is_temporary_cursor;
cursor->skey = cursor->sval = 0;
*cursorptr = cursor;
return 0;
}
int toku_brt_cursor_close(BRT_CURSOR cursor) {
brt_cursor_cleanup(cursor);
if (cursor->skey) toku_free(cursor->skey);
if (cursor->sval) toku_free(cursor->sval);
list_remove(&cursor->cursors_link);
toku_free_n(cursor, sizeof *cursor);
return 0;
......@@ -2076,9 +2080,9 @@ static inline int compare_kv_xy(BRT brt, DBT *k, DBT *v, DBT *x, DBT *y) {
static inline int brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) {
int r = 0;
if (key)
r = toku_dbt_set_value(key, cursor->key.data, cursor->key.size, &cursor->brt->skey);
r = toku_dbt_set_value(key, cursor->key.data, cursor->key.size, cursor->is_temporary_cursor ? &cursor->brt->skey : &cursor->skey);
if (r == 0 && val)
r = toku_dbt_set_value(val, cursor->val.data, cursor->val.size, &cursor->brt->sval);
r = toku_dbt_set_value(val, cursor->val.data, cursor->val.size, cursor->is_temporary_cursor ? &cursor->brt->sval : &cursor->sval);
return r;
}
......
......@@ -47,7 +47,7 @@ int toku_verify_brt (BRT brt);
//int show_brt_blocknumbers(BRT);
typedef struct brt_cursor *BRT_CURSOR;
int toku_brt_cursor (BRT, BRT_CURSOR*);
int toku_brt_cursor (BRT, BRT_CURSOR*, int is_temporary_cursor);
int toku_brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int brtc_flags, TOKUTXN);
int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN);
int toku_brt_cursor_close (BRT_CURSOR curs);
......
......@@ -177,6 +177,7 @@ void *toku_tagmalloc(size_t size, int typtag) {
}
void *toku_realloc(void *p, size_t size) {
if (p==0) return toku_malloc(size);
void *newp;
note_did_free(p);
errno=0;
......
......@@ -26,6 +26,7 @@ void test_cursor() {
/* create the dup database file */
r = db_env_create(&env, 0); assert(r == 0);
env->set_errfile(env, stderr);
r = env->open(env, ENVDIR, DB_CREATE|DB_INIT_MPOOL|DB_THREAD|DB_PRIVATE, 0777); CKERR(r);
r = db_create(&db, env, 0); assert(r == 0);
db->set_errfile(db,stderr); // Turn off those annoying errors
......
......@@ -109,16 +109,16 @@ void insert_test() {
/* Try to get it from primary. */
r = dbp->get(dbp, null_txn, &key, &testdata, 0); CKERR(r);
r = dbtcmp(&data, &testdata); CKERR(r);
r = dbtcmp(&data, &testdata); assert(r == 0); // Not ckerr
/* Try to get it from secondary. */
r = sdbp->get(sdbp, null_txn, &skey, &testdata, 0); CKERR(r);
r = dbtcmp(&data, &testdata); CKERR(r);
r = dbtcmp(&data, &testdata); assert(r == 0); // Not ckerr
/* Try to pget from secondary */
r = sdbp->pget(sdbp, null_txn, &skey, &testkey, &testdata, 0); CKERR(r);
r = dbtcmp(&data, &testdata); CKERR(r);
r = dbtcmp(&testkey, &key); CKERR(r);
r = dbtcmp(&data, &testdata); assert(r == 0); // Not ckerr
r = dbtcmp(&testkey, &key); assert(r == 0); // Not ckerr
/* Make sure we fail 'pget' from primary */
r = dbp->pget(dbp, null_txn, &key, &testkey, &data, 0); assert(r == EINVAL);
......
......@@ -77,7 +77,7 @@ static inline int db_opened(DB *db) {
static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags);
static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags);
static int toku_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_int32_t flags);
static int toku_db_cursor(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags);
static int toku_db_cursor(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags, int is_temporary_cursor);
/* txn methods */
......@@ -889,7 +889,7 @@ int log_compare(const DB_LSN * a, const DB_LSN * b) {
static int maybe_do_associate_create (DB_TXN*txn, DB*primary, DB*secondary) {
DBC *dbc;
int r = toku_db_cursor(secondary, txn, &dbc, 0);
int r = toku_db_cursor(secondary, txn, &dbc, 0, 0);
if (r!=0) return r;
DBT key,data;
r = toku_c_get(dbc, &key, &data, DB_FIRST);
......@@ -900,7 +900,7 @@ static int maybe_do_associate_create (DB_TXN*txn, DB*primary, DB*secondary) {
}
}
/* Now we know the secondary is empty. */
r = toku_db_cursor(primary, txn, &dbc, 0);
r = toku_db_cursor(primary, txn, &dbc, 0, 0);
if (r!=0) return r;
for (r = toku_c_get(dbc, &key, &data, DB_FIRST); r==0; r = toku_c_get(dbc, &key, &data, DB_NEXT)) {
r = do_associated_inserts(txn, &key, &data, secondary);
......@@ -1457,7 +1457,7 @@ static int toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) {
r = toku_c_get(cursor, &currentkey, &currentval, DB_CURRENT_BINDING);
if (r != 0) goto finish;
r = toku_db_cursor(cursor->dbp, 0, &count_cursor, 0);
r = toku_db_cursor(cursor->dbp, 0, &count_cursor, 0, 0);
if (r != 0) goto finish;
*count = 0;
......@@ -1489,7 +1489,7 @@ static int toku_db_get_noassociate(DB * db, DB_TXN * txn, DBT * key, DBT * data,
if (flags!=0 && flags!=DB_GET_BOTH) return EINVAL;
DBC *dbc;
r = toku_db_cursor(db, txn, &dbc, 0);
r = toku_db_cursor(db, txn, &dbc, 0, 1);
if (r!=0) return r;
r = toku_c_get_noassociate(dbc, key, data,
(flags == 0) ? DB_SET : DB_GET_BOTH);
......@@ -1538,7 +1538,7 @@ static int do_associated_deletes(DB_TXN *txn, DBT *key, DBT *data, DB *secondary
if (brtflags & TOKU_DB_DUPSORT) {
//If the secondary has duplicates we need to use cursor deletes.
DBC *dbc;
r = toku_db_cursor(secondary, txn, &dbc, 0);
r = toku_db_cursor(secondary, txn, &dbc, 0, 0);
if (r!=0) goto cursor_cleanup;
r = toku_c_get_noassociate(dbc, &idx, key, DB_GET_BOTH);
if (r!=0) goto cursor_cleanup;
......@@ -1698,7 +1698,7 @@ static int locked_c_put(DBC *dbc, DBT *key, DBT *data, u_int32_t flags) {
toku_ydb_lock(); int r = toku_c_put(dbc, key, data, flags); toku_ydb_unlock(); return r;
}
static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) {
static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int is_temporary_cursor) {
HANDLE_PANICKED_DB(db);
if (flags != 0)
return EINVAL;
......@@ -1716,7 +1716,7 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags) {
assert(result->i);
result->dbp = db;
result->i->txn = txn;
int r = toku_brt_cursor(db->i->brt, &result->i->c);
int r = toku_brt_cursor(db->i->brt, &result->i->c, is_temporary_cursor);
assert(r == 0);
*c = result;
return 0;
......@@ -1747,7 +1747,7 @@ static int toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) {
* We have to make certain we cascade all the deletes. */
assert(db->i->primary!=0); //Primary cannot have duplicates.
r = toku_db_cursor(db, txn, &dbc, 0);
r = toku_db_cursor(db, txn, &dbc, 0, 1);
if (r!=0) return r;
r = toku_c_get_noassociate(dbc, key, &data, DB_SET);
while (r==0) {
......@@ -1830,7 +1830,7 @@ static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t
// We aren't ready to handle flags such as DB_READ_COMMITTED or DB_READ_UNCOMMITTED or DB_RMW
DBC *dbc;
r = toku_db_cursor(db, txn, &dbc, 0);
r = toku_db_cursor(db, txn, &dbc, 0, 1);
if (r!=0) return r;
r = toku_c_get(dbc, key, data, (flags == 0) ? DB_SET : DB_GET_BOTH);
int r2 = toku_c_close(dbc);
......@@ -1850,7 +1850,7 @@ static int toku_db_pget (DB *db, DB_TXN *txn, DBT *key, DBT *pkey, DBT *data, u_
if ((db->i->open_flags & DB_THREAD) && (db_thread_need_flags(pkey) || db_thread_need_flags(data)))
return EINVAL;
r = toku_db_cursor(db, txn, &dbc, 0);
r = toku_db_cursor(db, txn, &dbc, 0, 1);
if (r!=0) return r;
r = toku_c_pget(dbc, key, pkey, data, DB_SET);
if (r==DB_KEYEMPTY) r = DB_NOTFOUND;
......@@ -2335,7 +2335,7 @@ static inline int autotxn_db_cursor(DB *db, DB_TXN *txn, DBC **c, u_int32_t flag
return toku_ydb_do_error(db->dbenv, EINVAL,
"Cursors in a transaction environment must have transactions.\n");
}
return toku_db_cursor(db, txn, c, flags);
return toku_db_cursor(db, txn, c, flags, 0);
}
static int locked_db_cursor(DB *db, DB_TXN *txn, DBC **c, u_int32_t flags) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment