Commit a26e3379 authored by Rich Prohaska's avatar Rich Prohaska

merge branch 838 to main. addresses #838

git-svn-id: file:///svn/tokudb@4493 c7de825b-a66e-492c-adef-691d508d4ae1
parent 595554cf
......@@ -2703,7 +2703,7 @@ static int bessel_from_search_t (OMTVALUE lev, void *extra) {
LESWITCHCALL(leafval, pair_leafval_bessel, search);
}
static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, OMTCURSOR omtcursor) {
static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger, OMTCURSOR omtcursor) {
// Now we have to convert from brt_search_t to the bessel function with a direction. What a pain...
int direction;
switch (search->direction) {
......@@ -2723,12 +2723,35 @@ static int brt_search_leaf_node(BRT brt, BRTNODE node, brt_search_t *search, DBT
LEAFENTRY le = datav;
if (le_is_provdel(le)) {
TXNID xid = le_any_xid(le);
TOKUTXN txn = 0;
toku_txn_find_by_xid(brt, xid, &txn);
// Provisionally deleted stuff is gone.
// So we need to scan in the direction to see if we can find something
while (1) {
// see if the transaction is alive
TXNID newxid = le_any_xid(le);
if (newxid != xid) {
xid = newxid;
txn = 0;
toku_txn_find_by_xid(brt, xid, &txn);
}
switch (search->direction) {
case BRT_SEARCH_LEFT:
idx++;
if (txn) {
// printf("xid %llu -> %p\n", (unsigned long long) xid, txn);
idx++;
} else {
// apply a commit message for this leafentry to the node
// printf("apply commit_both %llu\n", (unsigned long long) xid);
DBT key, val;
BRT_CMD_S brtcmd = { BRT_COMMIT_BOTH, xid, .u.id= {toku_fill_dbt(&key, le_latest_key(le), le_latest_keylen(le)),
toku_fill_dbt(&val, le_latest_val(le), le_latest_vallen(le))} };
r = brt_leaf_apply_cmd_once(brt, node, &brtcmd, logger, idx, le);
assert(r == 0);
}
if (idx>=toku_omt_size(node->u.l.buffer)) return DB_NOTFOUND;
break;
case BRT_SEARCH_RIGHT:
......@@ -2760,7 +2783,7 @@ static int brt_search_node(BRT brt, BRTNODE node, brt_search_t *search, DBT *new
if (node->height > 0)
return brt_search_nonleaf_node(brt, node, search, newkey, newval, split, logger, omtcursor);
else
return brt_search_leaf_node(brt, node, search, newkey, newval, omtcursor);
return brt_search_leaf_node(brt, node, search, newkey, newval, logger, omtcursor);
}
int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOKULOGGER logger, OMTCURSOR omtcursor, uint64_t *root_put_counter)
......
......@@ -407,3 +407,24 @@ u_int32_t any_vallen_le_provpair (TXNID UU(xid), u_int32_t UU(klen), void *UU(kv
u_int32_t le_any_vallen (LEAFENTRY le) {
LESWITCHCALL(le, any_vallen);
}
u_int64_t any_xid_le_committed (u_int32_t UU(keylen), void *UU(key), u_int32_t UU(vallen), void *UU(val)) {
return 0;
}
u_int64_t any_xid_le_both (TXNID xid, u_int32_t UU(klen), void *UU(kval), u_int32_t UU(clen), void *UU(cval), u_int32_t UU(plen), void *UU(pval)) {
return xid;
}
u_int64_t any_xid_le_provdel (TXNID xid, u_int32_t UU(klen), void *UU(kval), u_int32_t UU(clen), void *UU(cval)) {
return xid;
}
u_int64_t any_xid_le_provpair (TXNID xid, u_int32_t UU(klen), void *UU(kval), u_int32_t UU(plen), void *UU(pval)) {
return xid;
}
u_int64_t le_any_xid (LEAFENTRY le) {
LESWITCHCALL(le, any_xid);
}
......@@ -132,6 +132,7 @@ void* le_any_key (LEAFENTRY le);
u_int32_t le_any_keylen (LEAFENTRY le);
void* le_any_val (LEAFENTRY le);
u_int32_t le_any_vallen (LEAFENTRY le);
u_int64_t le_any_xid (LEAFENTRY le);
#endif
......@@ -1001,11 +1001,10 @@ int toku_read_rollback_backwards(int fd, off_t at, struct roll_entry **item, off
return 0;
}
static int find_ptr (OMTVALUE v, void *vfind) {
if (v<vfind) return -1;
if (v>vfind) return +1;
return 0;
static int find_xid (OMTVALUE v, void *txnv) {
TOKUTXN txn = v;
TOKUTXN txnfind = txnv;
return txn->txnid64 - txnfind->txnid64;
}
static int find_filenum (OMTVALUE v, void *brtv) {
......@@ -1022,7 +1021,7 @@ static int find_filenum (OMTVALUE v, void *brtv) {
int toku_txn_note_brt (TOKUTXN txn, BRT brt) {
OMTVALUE txnv;
u_int32_t index;
int r = toku_omt_find_zero(brt->txns, find_ptr, txn, &txnv, &index, NULL);
int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv, &index, NULL);
if (r==0) {
// It's already there.
assert((TOKUTXN)txnv==txn);
......@@ -1060,7 +1059,7 @@ static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv) {
TOKUTXN txn = txnv;
OMTVALUE txnv_again=NULL;
u_int32_t index;
int r = toku_omt_find_zero(brt->txns, find_ptr, txn, &txnv_again, &index, NULL);
int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv_again, &index, NULL);
assert(r==0);
assert((void*)txnv_again==txnv);
r = toku_omt_delete_at(brt->txns, index);
......@@ -1073,3 +1072,12 @@ static void note_txn_closing (TOKUTXN txn) {
toku_omt_iterate(txn->open_brts, remove_txn, txn);
toku_omt_destroy(&txn->open_brts);
}
int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr) {
struct tokutxn fake_txn; fake_txn.txnid64 = xid;
uint32_t index;
OMTVALUE txnv;
int r = toku_omt_find_zero(brt->txns, find_xid, &fake_txn, &txnv, &index, NULL);
if (r == 0) *txnptr = txnv;
return r;
}
......@@ -169,4 +169,8 @@ int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item);
int toku_txn_note_brt (TOKUTXN txn, BRT brt);
int toku_txn_note_close_brt (BRT brt);
// find the TOKUTXN object by xid
// if found then return 0 and set txnptr to the address of the TOKUTXN object
int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr);
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <arpa/inet.h>
#include <db.h>
#include "test.h"
// the exit value of this program is nonzero when the test fails
int testresult = 0;
int numexperiments = 20;
// maxt is set to the longest cursor next without transactions
// we then compare this time to the time with transactions and try to be within a factor of 10
unsigned long long maxt;
DBT *dbt_init_static(DBT *dbt) {
memset(dbt, 0, sizeof *dbt);
return dbt;
}
void test_838(int n) {
if (verbose) printf("%s:%d\n", __FUNCTION__, n);
int r;
// setup test directory
system("rm -rf " ENVDIR);
mkdir(ENVDIR, 0777);
// setup environment
DB_ENV *env;
{
r = db_env_create(&env, 0); assert(r == 0);
r = env->set_data_dir(env, ENVDIR);
r = env->set_lg_dir(env, ENVDIR);
env->set_errfile(env, stdout);
r = env->open(env, 0, DB_INIT_MPOOL + DB_PRIVATE + DB_CREATE, 0777);
assert(r == 0);
}
// setup database
DB *db;
{
DB_TXN *txn = 0;
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, txn, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0);
}
// insert, commit
{
DB_TXN *txn = 0;
int i;
for (i=0; i<n; i++) {
int k = htonl(i);
int v = 0;
DBT key, val;
r = db->put(db, txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
}
// delete, commit
{
DB_TXN *txn = 0;
int i;
for (i=0; i<n; i++) {
int k = htonl(i);
DBT key;
r = db->del(db, txn, dbt_init(&key, &k, sizeof k), 0);
assert(r == 0);
}
}
// walk
maxt = 0;
{
DB_TXN *txn = 0;
DBC *cursor;
r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
int i;
for (i=0; i<numexperiments; i++) {
struct timeval tstart, tnow;
gettimeofday(&tstart, 0);
DBT key, val;
r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_FIRST);
assert(r == DB_NOTFOUND);
gettimeofday(&tnow, 0);
unsigned long long t = tnow.tv_sec * 1000000ULL + tnow.tv_usec;
t -= tstart.tv_sec * 1000000ULL + tstart.tv_usec;
if (verbose) printf("%d %llu\n", i, t);
if (t > maxt) maxt = t;
}
r = cursor->c_close(cursor); assert(r == 0);
}
// close db
r = db->close(db, 0); assert(r == 0);
// reopen and walk
{
DB_TXN *txn = 0;
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, txn, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0);
}
{
DB_TXN *txn = 0;
DBC *cursor;
r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
int i;
for (i=0; i<numexperiments; i++) {
struct timeval tstart, tnow;
gettimeofday(&tstart, 0);
DBT key, val;
r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_FIRST);
assert(r == DB_NOTFOUND);
gettimeofday(&tnow, 0);
unsigned long long t = tnow.tv_sec * 1000000ULL + tnow.tv_usec;
t -= tstart.tv_sec * 1000000ULL + tstart.tv_usec;
if (verbose) printf("%d %llu\n", i, t);
if (t > maxt) maxt = t;
}
r = cursor->c_close(cursor); assert(r == 0);
// close db
r = db->close(db, 0); assert(r == 0);
}
// close env
r = env->close(env, 0); assert(r == 0);
}
void test_838_txn(int n) {
if (verbose) printf("%s:%d\n", __FUNCTION__, n);
int r;
// setup test directory
system("rm -rf " ENVDIR);
mkdir(ENVDIR, 0777);
// setup environment
DB_ENV *env;
{
r = db_env_create(&env, 0); assert(r == 0);
r = env->set_data_dir(env, ENVDIR);
r = env->set_lg_dir(env, ENVDIR);
env->set_errfile(env, stdout);
r = env->open(env, 0, DB_INIT_MPOOL + DB_INIT_LOG + DB_INIT_LOCK + DB_INIT_TXN + DB_PRIVATE + DB_CREATE, 0777);
assert(r == 0);
}
// setup database
DB *db;
{
DB_TXN *txn = 0;
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, txn, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0);
r = txn->commit(txn, 0); assert(r == 0);
}
// insert, commit
{
DB_TXN *txn_master;
r = env->txn_begin(env, 0, &txn_master, 0); assert(r == 0);
DB_TXN *txn;
r = env->txn_begin(env, txn_master, &txn, 0); assert(r == 0);
int i;
for (i=0; i<n; i++) {
int k = htonl(i);
int v = 0;
DBT key, val;
r = db->put(db, txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
r = txn->commit(txn, 0); assert(r == 0);
r = txn_master->commit(txn_master, 0); assert(r == 0);
}
// delete, commit
{
DB_TXN *txn_master;
r = env->txn_begin(env, 0, &txn_master, 0); assert(r == 0);
DB_TXN *txn;
r = env->txn_begin(env, txn_master, &txn, 0); assert(r == 0);
int i;
for (i=0; i<n; i++) {
int k = htonl(i);
DBT key;
r = db->del(db, txn, dbt_init(&key, &k, sizeof k), 0);
assert(r == 0);
}
r = txn->commit(txn, 0); assert(r == 0);
r = txn_master->commit(txn_master, 0); assert(r == 0);
}
// walk
{
DB_TXN *txn;
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
DBC *cursor;
r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
int i;
for (i=0; i<numexperiments; i++) {
struct timeval tstart, tnow;
gettimeofday(&tstart, 0);
DBT key, val;
r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_FIRST);
assert(r == DB_NOTFOUND);
gettimeofday(&tnow, 0);
unsigned long long t = tnow.tv_sec * 1000000ULL + tnow.tv_usec;
t -= tstart.tv_sec * 1000000ULL + tstart.tv_usec;
if (verbose) printf("%d %llu\n", i, t);
// the first cursor op takes a long time as it needs to clean out the provisionally
// deleted messages
if (i > 0 && t > 10*maxt)
testresult = 1;
}
r = cursor->c_close(cursor); assert(r == 0);
r = txn->commit(txn, 0); assert(r == 0);
}
// close db
r = db->close(db, 0); assert(r == 0);
// reopen and walk
{
DB_TXN *txn = 0;
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, txn, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0);
r = txn->commit(txn, 0); assert(r == 0);
}
{
DB_TXN *txn;
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
DBC *cursor;
r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
int i;
for (i=0; i<numexperiments; i++) {
struct timeval tstart, tnow;
gettimeofday(&tstart, 0);
DBT key, val;
r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_FIRST);
assert(r == DB_NOTFOUND);
gettimeofday(&tnow, 0);
unsigned long long t = tnow.tv_sec * 1000000ULL + tnow.tv_usec;
t -= tstart.tv_sec * 1000000ULL + tstart.tv_usec;
if (verbose) printf("%d %llu\n", i, t);
if (i > 0 && t > 10*maxt)
testresult = 1;
}
r = cursor->c_close(cursor); assert(r == 0);
r = txn->commit(txn, 0); assert(r == 0);
// close db
r = db->close(db, 0); assert(r == 0);
}
// close env
r = env->close(env, 0); assert(r == 0);
}
void test_838_defer_delete_commit(int n) {
if (verbose) printf("%s:%d\n", __FUNCTION__, n);
int r;
// setup test directory
system("rm -rf " ENVDIR);
mkdir(ENVDIR, 0777);
// setup environment
DB_ENV *env;
{
r = db_env_create(&env, 0); assert(r == 0);
r = env->set_data_dir(env, ENVDIR);
r = env->set_lg_dir(env, ENVDIR);
env->set_errfile(env, stdout);
r = env->open(env, 0, DB_INIT_MPOOL + DB_INIT_LOG + DB_INIT_LOCK + DB_INIT_TXN + DB_PRIVATE + DB_CREATE, 0777);
assert(r == 0);
}
// setup database
DB *db;
{
DB_TXN *txn = 0;
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, txn, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0);
r = txn->commit(txn, 0); assert(r == 0);
}
// insert, commit
{
DB_TXN *txn_master;
r = env->txn_begin(env, 0, &txn_master, 0); assert(r == 0);
DB_TXN *txn;
r = env->txn_begin(env, txn_master, &txn, 0); assert(r == 0);
int i;
for (i=0; i<n; i++) {
int k = htonl(i);
int v = 0;
DBT key, val;
r = db->put(db, txn, dbt_init(&key, &k, sizeof k), dbt_init(&val, &v, sizeof v), 0);
assert(r == 0);
}
r = txn->commit(txn, 0); assert(r == 0);
r = txn_master->commit(txn_master, 0); assert(r == 0);
}
// delete
DB_TXN *txn_master_delete;
r = env->txn_begin(env, 0, &txn_master_delete, 0); assert(r == 0);
DB_TXN *txn_delete;
r = env->txn_begin(env, txn_master_delete, &txn_delete, 0); assert(r == 0);
int i;
for (i=0; i<n; i++) {
int k = htonl(i);
DBT key;
r = db->del(db, txn_delete, dbt_init(&key, &k, sizeof k), 0);
assert(r == 0);
}
// walk
{
DB_TXN *txn;
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
DBC *cursor;
r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
int i;
for (i=0; i<numexperiments; i++) {
struct timeval tstart, tnow;
gettimeofday(&tstart, 0);
DBT key, val;
r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_FIRST);
assert(r == DB_LOCK_NOTGRANTED);
gettimeofday(&tnow, 0);
unsigned long long t = tnow.tv_sec * 1000000ULL + tnow.tv_usec;
t -= tstart.tv_sec * 1000000ULL + tstart.tv_usec;
if (verbose) printf("%d %llu\n", i, t);
// the first cursor op takes a long time as it needs to clean out the provisionally
// deleted messages
if (i > 0 && t > 10*maxt)
testresult = 1;
}
r = cursor->c_close(cursor); assert(r == 0);
r = txn->commit(txn, 0); assert(r == 0);
}
// delete commit
r = txn_delete->commit(txn_delete, 0); assert(r == 0);
r = txn_master_delete->commit(txn_master_delete, 0); assert(r == 0);
// close db
r = db->close(db, 0); assert(r == 0);
// reopen and walk
{
DB_TXN *txn = 0;
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, txn, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0);
r = txn->commit(txn, 0); assert(r == 0);
}
{
DB_TXN *txn;
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
DBC *cursor;
r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
int i;
for (i=0; i<numexperiments; i++) {
struct timeval tstart, tnow;
gettimeofday(&tstart, 0);
DBT key, val;
r = cursor->c_get(cursor, dbt_init_malloc(&key), dbt_init_malloc(&val), DB_FIRST);
assert(r == DB_NOTFOUND);
gettimeofday(&tnow, 0);
unsigned long long t = tnow.tv_sec * 1000000ULL + tnow.tv_usec;
t -= tstart.tv_sec * 1000000ULL + tstart.tv_usec;
if (verbose) printf("%d %llu\n", i, t);
if (i > 0 && t > 10*maxt)
testresult = 1;
}
r = cursor->c_close(cursor); assert(r == 0);
r = txn->commit(txn, 0); assert(r == 0);
// close db
r = db->close(db, 0); assert(r == 0);
}
// close env
r = env->close(env, 0); assert(r == 0);
}
int main(int argc, const char *argv[]) {
parse_args(argc, argv);
int n;
for (n=100000; n<=100000; n *= 10) {
test_838(n);
test_838_txn(n);
test_838_defer_delete_commit(n);
}
return testresult;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment