Commit bf807a8a authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Fixes #853. {{{svn merge -r5009:5043 https://svn.tokutek.com/tokudb/tokudb.853}}}.

git-svn-id: file:///svn/tokudb@5046 c7de825b-a66e-492c-adef-691d508d4ae1
parent 8d432d4b
......@@ -41,6 +41,7 @@ int prelock = 0;
int prelockflag = 0;
int items_per_transaction = DEFAULT_ITEMS_PER_TRANSACTION;
int items_per_iteration = DEFAULT_ITEMS_TO_INSERT_PER_ITERATION;
int singlex = 0; // Do a single transaction
int do_transactions = 0;
int n_insertions_since_txn_began=0;
int env_open_flags = DB_CREATE|DB_PRIVATE|DB_INIT_MPOOL;
......@@ -108,7 +109,8 @@ void setup (void) {
assert(r == 0);
if (do_transactions) {
r=dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0);
r=dbenv->txn_begin(dbenv, 0, &tid, 0); CKERR(r);
if (singlex) do_prelock(db, tid);
}
if (pagesize && db->set_pagesize) {
r = db->set_pagesize(db, pagesize);
......@@ -121,7 +123,7 @@ void setup (void) {
r = db->open(db, tid, dbfilename, NULL, DB_BTREE, DB_CREATE, 0644);
if (r!=0) fprintf(stderr, "errno=%d, %s\n", errno, strerror(errno));
assert(r == 0);
if (do_transactions) {
if (do_transactions && !singlex) {
r=tid->commit(tid, 0); assert(r==0);
}
......@@ -130,6 +132,10 @@ void setup (void) {
void shutdown (void) {
int r;
if (do_transactions && singlex) {
r=tid->commit(tid, 0); assert(r==0);
}
r = db->close(db, 0);
assert(r == 0);
r = dbenv->close(dbenv, 0);
......@@ -159,7 +165,7 @@ void insert (long long v) {
int r = db->put(db, tid, fill_dbt(&kt, kc, keysize), fill_dbt(&vt, vc, valsize), put_flags);
CKERR(r);
if (do_transactions) {
if (n_insertions_since_txn_began>=items_per_transaction) {
if (n_insertions_since_txn_began>=items_per_transaction && !singlex) {
n_insertions_since_txn_began=0;
r = tid->commit(tid, 0); assert(r==0);
r=dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0);
......@@ -172,7 +178,7 @@ void insert (long long v) {
void serial_insert_from (long long from) {
long long i;
if (do_transactions) {
if (do_transactions && !singlex) {
int r = dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0);
do_prelock(db, tid);
{
......@@ -185,7 +191,7 @@ void serial_insert_from (long long from) {
for (i=0; i<items_per_iteration; i++) {
insert((from+i)*SERIAL_SPACING);
}
if (do_transactions) {
if (do_transactions && !singlex) {
int r= tid->commit(tid, 0); assert(r==0);
tid=0;
}
......@@ -197,14 +203,14 @@ long long llrandom (void) {
void random_insert_below (long long below) {
long long i;
if (do_transactions) {
if (do_transactions && !singlex) {
int r = dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0);
do_prelock(db, tid);
}
for (i=0; i<items_per_iteration; i++) {
insert(llrandom()%below);
}
if (do_transactions) {
if (do_transactions && !singlex) {
int r= tid->commit(tid, 0); assert(r==0);
tid=0;
}
......@@ -251,6 +257,7 @@ int print_usage (const char *argv0) {
fprintf(stderr, " --pagesize PAGESIZE sets the database page size\n");
fprintf(stderr, " --noserial causes the serial insertions to be skipped\n");
fprintf(stderr, " --xcount N how many insertions per transaction (default=%d)\n", DEFAULT_ITEMS_PER_TRANSACTION);
fprintf(stderr, " --singlex Run the whole job as a single transaction. (Default don't run as a single transaction.)\n");
fprintf(stderr, " --periter N how many insertions per iteration (default=%d)\n", DEFAULT_ITEMS_TO_INSERT_PER_ITERATION);
fprintf(stderr, " --DB_INIT_TXN (1|0) turn on or turn off the DB_INIT_TXN env_open_flag\n");
fprintf(stderr, " --DB_INIT_LOG (1|0) turn on or turn off the DB_INIT_LOG env_open_flag\n");
......@@ -273,7 +280,6 @@ int main (int argc, const char *argv[]) {
verbose--; if (verbose<0) verbose=0;
} else if (strcmp(arg, "-x") == 0) {
do_transactions = 1;
env_open_flags += DB_INIT_TXN | DB_INIT_LOG | DB_INIT_LOCK;
} else if (strcmp(arg, "--DB_INIT_TXN") == 0) {
if (i+1 >= argc) return print_usage(argv[0]);
if (atoi(argv[++i]))
......@@ -294,6 +300,9 @@ int main (int argc, const char *argv[]) {
noserial=1;
} else if (strcmp(arg, "--norandom") == 0) {
norandom=1;
} else if (strcmp(arg, "--singlex") == 0) {
do_transactions = 1;
singlex = 1;
} else if (strcmp(arg, "--xcount") == 0) {
if (i+1 >= argc) return print_usage(argv[0]);
items_per_transaction = strtoll(argv[++i], 0, 10);
......@@ -327,6 +336,9 @@ int main (int argc, const char *argv[]) {
return print_usage(argv[0]);
}
}
if (do_transactions) {
env_open_flags |= DB_INIT_TXN | DB_INIT_LOG | DB_INIT_LOCK;
}
if (do_transactions && prelockflag) {
put_flags |= DB_PRELOCKED_WRITE;
}
......
......@@ -151,6 +151,7 @@ struct brt {
void *skey,*sval; /* Used for DBT return values. */
OMT txns; // transactions that are using this OMT (note that the transaction checks the cf also)
u_int64_t txn_that_created; // which txn created it. Use 0 if no such txn.
};
/* serialization code */
......
......@@ -570,8 +570,10 @@ static int log_and_save_brtenq(TOKULOGGER logger, BRT t, BRTNODE node, int child
u_int32_t new_fingerprint = old_fingerprint + fdiff;
//printf("%s:%d node=%lld fingerprint old=%08x new=%08x diff=%08x xid=%lld\n", __FILE__, __LINE__, (long long)node->thisnodename, old_fingerprint, new_fingerprint, fdiff, (long long)xid);
*fingerprint = new_fingerprint;
int r = toku_log_brtenq(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, xid, type, keybs, databs);
if (r!=0) return r;
if (t->txn_that_created != xid) {
int r = toku_log_brtenq(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, xid, type, keybs, databs);
if (r!=0) return r;
}
return 0;
}
......@@ -632,8 +634,10 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
u_int32_t delta = toku_calccrc32_cmd(type, xid, key, keylen, data, datalen);
u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta;
if (r!=0) return r;
r = toku_log_brtdeq(logger, &node->log_lsn, 0, fnum, node->thisnodename, n_children_in_a);
if (r!=0) return r;
if (t->txn_that_created != xid) {
r = toku_log_brtdeq(logger, &node->log_lsn, 0, fnum, node->thisnodename, n_children_in_a);
if (r!=0) return r;
}
r = log_and_save_brtenq(logger, t, B, targchild, xid, type, key, keylen, data, datalen, &B->local_fingerprint);
r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type, xid);
if (r!=0) return r;
......@@ -814,7 +818,7 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum
u_int32_t old_fingerprint = node->local_fingerprint;
u_int32_t new_fingerprint = old_fingerprint - node->rand4fingerprint*toku_calccrc32_cmdstruct(cmd);
node->local_fingerprint = new_fingerprint;
{
if (t->txn_that_created != cmd->xid) {
int r = toku_log_brtdeq(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum);
assert(r==0);
}
......@@ -907,7 +911,10 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
({
u_int32_t old_fingerprint = node->local_fingerprint;
u_int32_t new_fingerprint = old_fingerprint - node->rand4fingerprint*toku_calccrc32_cmd(type, xid, skey, skeylen, sval, svallen);
r = toku_log_brtdeq(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum);
if (t->txn_that_created != xid) {
r = toku_log_brtdeq(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum);
assert(r==0);
}
node->local_fingerprint = new_fingerprint;
}));
......@@ -1492,8 +1499,10 @@ static int brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER
//printf(" got "); print_leafentry(stdout, newdata); printf("\n");
if (le && newdata) {
if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) return r;
if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, newdata))) return r;
if (t->txn_that_created != cmd->xid) {
if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) return r;
if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, newdata))) return r;
}
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le);
node->local_fingerprint -= node->rand4fingerprint * toku_le_crc(le);
......@@ -1518,7 +1527,9 @@ static int brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER
if (le) {
// It's there, note that it's gone and remove it from the mempool
if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) return r;
if (t->txn_that_created != cmd->xid) {
if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) return r;
}
if ((r = toku_omt_delete_at(node->u.l.buffer, idx))) return r;
......@@ -1534,7 +1545,9 @@ static int brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER
memcpy(new_le, newdata, newlen);
if ((r = toku_omt_insert_at(node->u.l.buffer, new_le, idx))) return r;
if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, newdata))) return r;
if (t->txn_that_created != cmd->xid) {
if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, newdata))) return r;
}
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
node->local_fingerprint += node->rand4fingerprint*toku_le_crc(newdata);
......@@ -2129,6 +2142,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
}
t->database_name = malloced_name;
t->db = db;
t->txn_that_created = 0; // Uses 0 for no transaction.
{
int fd = open(fname, O_RDWR, 0777);
if (fd==-1) {
......@@ -2139,6 +2153,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
goto died0a;
}
fd = open(fname, O_RDWR | O_CREAT, 0777);
t->txn_that_created = toku_txn_get_txnid(txn);
if (fd==-1) {
r = errno;
t->database_name=0;
......@@ -2571,10 +2586,10 @@ int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) {
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
int r;
if (txn) {
if (txn && (brt->txn_that_created != toku_txn_get_txnid(txn))) {
toku_cachefile_refup(brt->cf);
BYTESTRING keybs = {key->size, toku_memdup(key->data, key->size)};
BYTESTRING databs = {val->size, toku_memdup(val->data, val->size)};
toku_cachefile_refup(brt->cf);
r = toku_logger_save_rollback_cmdinsert(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs, databs);
if (r!=0) return r;
r = toku_txn_note_brt(txn, brt);
......@@ -2603,7 +2618,7 @@ int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
int r;
if (txn) {
if (txn && (brt->txn_that_created != toku_txn_get_txnid(txn))) {
BYTESTRING keybs = {key->size, toku_memdup(key->data, key->size)};
toku_cachefile_refup(brt->cf);
r = toku_logger_save_rollback_cmddelete(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs);
......@@ -2620,7 +2635,7 @@ int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
//{ unsigned i; printf("del %p keylen=%d key={", brt->db, key->size); for(i=0; i<key->size; i++) printf("%d,", ((char*)key->data)[i]); printf("} datalen=%d data={", val->size); for(i=0; i<val->size; i++) printf("%d,", ((char*)val->data)[i]); printf("}\n"); }
int r;
if (txn) {
if (txn && (brt->txn_that_created != toku_txn_get_txnid(txn))) {
BYTESTRING keybs = {key->size, toku_memdup(key->data, key->size)};
BYTESTRING databs = {val->size, toku_memdup(val->data, val->size)};
toku_cachefile_refup(brt->cf);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment