Commit 110754f5 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1125 Merged nested transactions from temporary merge branch into main.

Current tests fail (not regressions, they fail as of 13461)
 * {{{x1.tdbrun}}}
 * {{{test_log(2,3,4,5,6,7,8,9,10).recover}}}
 * {{{test-recover(1,2,3).tdbrun}}}
 * {{{test1324.tdbrun}}}
ULE_DEBUG disabled (defined to 0)  Can be re-enabled for test purposes (set to 1).
refs [t:1125]
Merging into the temp branch (tokudb.main_13461+1125)
{{{svn merge --accept=postpone -r 12527:13461 ../tokudb.1125 ./}}}

Merging into main
{{{svn merge --accept=postpone -r13462:13463 ../tokudb.main_13461+1125/ ./}}}


git-svn-id: file:///svn/toku/tokudb@13464 c7de825b-a66e-492c-adef-691d508d4ae1
parent 363e3717
......@@ -182,9 +182,20 @@ QUIET_BENCH_ARG=
rm -rf $@
$(BENCH_TIME) ./$< --env $@ $(QUIET_BENCH_ARG) $(EXTRA_BENCH_ARGS)
no-txn.benchmark.dir: EXTRA_BENCH_ARGS=
txn.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock
abort.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --abort
DB_TYPES = no-txn txn abort child child_abort child_abortfirst txn1 abort1 child1 child-abort1 child_abortfirst1
no-txn.benchmark.dir: EXTRA_BENCH_ARGS=
txn.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock
abort.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --abort
child.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --singlex-child
child-abort.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --abort --singlex-child
child-abortfirst.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --abort --singlex-child --finish-child-first
txn1.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --insert1first
abort1.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --abort --insert1first
child1.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --singlex-child --insert1first
child-abort1.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --abort --singlex-child --insert1first
child-abortfirst1.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --abort --singlex-child --finish-child-first --insert1first
QUIET_SCAN_ARG=
SCANSCAN_ARGS=--lwc --prelock --prelockflag --cachesize 268435456 # scanscan default, cache of windows (256MB)
......@@ -210,6 +221,7 @@ SCANSCAN_TDB = scanscan-tokudb$(BINSUF)
%.flattenedscan.dir: QUIET_SCAN_ARGS=-q
%.flattenedscan.dir: BENCH_TIME=
%.flattenedscan.dir: $(SCANSCAN_TDB) %.benchmark.dir
rm -rf $@
cp -R $*.benchmark.dir $@
./$< --env $@ $(SCANSCAN_ARGS) $(QUIET_SCAN_ARG)
......@@ -217,3 +229,5 @@ SCANSCAN_TDB = scanscan-tokudb$(BINSUF)
%.flattenedscan: $(SCANSCAN_TDB) %.flattenedscan.dir
$(SCAN_TIME) ./$< --env $@.dir $(SCANSCAN_ARGS)$(QUIET_SCAN_ARG)
.SECONDARY: $(patsubst %,%.flattenedscan.dir, $(DB_TYPES))
......@@ -45,6 +45,8 @@ int prelock = 0;
int prelockflag = 0;
int items_per_transaction = DEFAULT_ITEMS_PER_TRANSACTION;
int items_per_iteration = DEFAULT_ITEMS_TO_INSERT_PER_ITERATION;
int finish_child_first = 0; // Commit or abort child first (before doing so to the parent). No effect if child does not exist.
int singlex_child = 0; // Do a single transaction, but do all work with a child
int singlex = 0; // Do a single transaction
int singlex_create = 0; // Create the db using the single transaction (only valid if singlex)
int insert1first = 0; // insert 1 before doing the rest
......@@ -79,6 +81,7 @@ char *dbname;
DB_ENV *dbenv;
DB *db;
DB_TXN *parenttid=0;
DB_TXN *tid=0;
......@@ -152,27 +155,37 @@ static void benchmark_setup (void) {
if (do_transactions) {
r=tid->commit(tid, 0);
assert(r==0);
tid = NULL;
r=dbenv->txn_begin(dbenv, 0, &tid, 0); CKERR(r);
}
insert(-1);
if (singlex) {
r=tid->commit(tid, 0);
assert(r==0);
tid = NULL;
r=dbenv->txn_begin(dbenv, 0, &tid, 0); CKERR(r);
}
}
else if (singlex && !singlex_create) {
r=tid->commit(tid, 0);
assert(r==0);
tid = NULL;
r=dbenv->txn_begin(dbenv, 0, &tid, 0); CKERR(r);
}
if (do_transactions) {
if (singlex) do_prelock(db, tid);
if (singlex)
do_prelock(db, tid);
else {
r=tid->commit(tid, 0);
assert(r==0);
tid = NULL;
}
}
if (singlex_child) {
parenttid = tid;
tid = NULL;
r=dbenv->txn_begin(dbenv, parenttid, &tid, 0); CKERR(r);
}
}
......@@ -187,15 +200,34 @@ static void benchmark_shutdown (void) {
#endif
if (do_transactions && singlex && !insert1first && (singlex_create || prelock)) {
#if defined(TOKUDB)
//There should be a single 'truncate' in the rolltmp instead of many 'insert' entries.
struct txn_stat *s;
r = tid->txn_stat(tid, &s);
assert(r==0);
assert(s->rolltmp_raw_count < 100);
//TODO: #1125 Always do the test after performance testing is done.
if (singlex_child) fprintf(stderr, "SKIPPED 'small rolltmp' test for child txn\n");
else
assert(s->rolltmp_raw_count < 100); // gross test, not worth investigating details
os_free(s);
//system("ls -l bench.tokudb");
#endif
r = (do_abort ? tid->abort(tid) : tid->commit(tid, 0)); assert(r==0);
}
if (do_transactions && singlex) {
if (!singlex_child || finish_child_first) {
assert(tid);
r = (do_abort ? tid->abort(tid) : tid->commit(tid, 0)); assert(r==0);
tid = NULL;
}
if (singlex_child) {
assert(parenttid);
r = (do_abort ? parenttid->abort(parenttid) : parenttid->commit(parenttid, 0)); assert(r==0);
parenttid = NULL;
}
else
assert(!parenttid);
}
assert(!tid);
assert(!parenttid);
r = db->close(db, 0);
assert(r == 0);
......@@ -240,6 +272,7 @@ static void insert (long long v) {
if (n_insertions_since_txn_began>=items_per_transaction && !singlex) {
n_insertions_since_txn_began=0;
r = tid->commit(tid, 0); assert(r==0);
tid = NULL;
r=dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0);
do_prelock(db, tid);
n_insertions_since_txn_began=0;
......@@ -265,7 +298,7 @@ static void serial_insert_from (long long from) {
}
if (do_transactions && !singlex) {
int r= tid->commit(tid, 0); assert(r==0);
tid=0;
tid=NULL;
}
}
......@@ -284,7 +317,7 @@ static void random_insert_below (long long below) {
}
if (do_transactions && !singlex) {
int r= tid->commit(tid, 0); assert(r==0);
tid=0;
tid=NULL;
}
}
......@@ -328,6 +361,8 @@ static int print_usage (const char *argv0) {
fprintf(stderr, " --compressibility C creates data that should compress by about a factor C. Default C is large. C is an float.\n");
fprintf(stderr, " --xcount N how many insertions per transaction (default=%d)\n", DEFAULT_ITEMS_PER_TRANSACTION);
fprintf(stderr, " --singlex (implies -x) Run the whole job as a single transaction. (Default don't run as a single transaction.)\n");
fprintf(stderr, " --singlex-child (implies -x) Run the whole job as a single transaction, do all work a child of that transaction.\n");
fprintf(stderr, " --finish-child-first Commit/abort child before doing so to parent (no effect if no child).\n");
fprintf(stderr, " --singlex-create (implies --singlex) Create the file using the single transaction (Default is to use a different transaction to create.)\n");
fprintf(stderr, " --check_small_rolltmp (Only valid in --singlex mode) Verify that very little data was saved in the rollback logs.\n");
fprintf(stderr, " --prelock Prelock the database.\n");
......@@ -405,6 +440,12 @@ int main (int argc, const char *argv[]) {
do_transactions = 1;
singlex = 1;
singlex_create = 1;
} else if (strcmp(arg, "--finish-child-first") == 0) {
finish_child_first = 1;
} else if (strcmp(arg, "--singlex-child") == 0) {
do_transactions = 1;
singlex = 1;
singlex_child = 1;
} else if (strcmp(arg, "--singlex") == 0) {
do_transactions = 1;
singlex = 1;
......
......@@ -14,9 +14,9 @@ struct simple_dbt {
struct __toku_db_txn_internal {
//TXNID txnid64; /* A sixty-four bit txn id. */
struct tokutxn *tokutxn;
struct __toku_lth *lth;
struct __toku_lth *lth; //Hash table holding list of dictionaries this txn has touched
u_int32_t flags;
DB_TXN *child, *next, *prev;
DB_TXN *child;
};
struct __toku_dbc_internal {
......
......@@ -42,10 +42,12 @@ BRT_SOURCES = \
brt-serialize \
brt-verify \
brt \
brt_msg \
brt-test-helpers \
cachetable \
checkpoint \
fifo \
fifo_msg \
fingerprint \
key \
leafentry \
......@@ -60,11 +62,13 @@ BRT_SOURCES = \
recover \
roll \
rollback \
ule \
threadpool \
toku_worker \
trace_mem \
txn \
x1764 \
xids \
ybt \
# keep this line so I can have a \ on the previous line
......
......@@ -26,10 +26,8 @@ typedef void *OMTVALUE;
enum { TREE_FANOUT = BRT_FANOUT };
enum { KEY_VALUE_OVERHEAD = 8 }; /* Must store the two lengths. */
enum { OMT_ITEM_OVERHEAD = 0 }; /* No overhead for the OMT item. The PMA needed to know the idx, but the OMT doesn't. */
enum { BRT_CMD_OVERHEAD = (1 // the type
+ 8) // the xid
enum { BRT_CMD_OVERHEAD = (1) // the type
};
enum { LE_OVERHEAD_BOUND = 9 }; // the type and xid
enum { BRT_DEFAULT_NODE_SIZE = 1 << 22 };
......@@ -246,7 +244,7 @@ static const BRTNODE null_brtnode=0;
//extern u_int32_t toku_calccrc32_kvpair (const void *key, int keylen, const void *val, int vallen);
//extern u_int32_t toku_calccrc32_kvpair_struct (const struct kv_pair *kvp);
extern u_int32_t toku_calc_fingerprint_cmd (u_int32_t type, TXNID xid, const void *key, u_int32_t keylen, const void *val, u_int32_t vallen);
extern u_int32_t toku_calc_fingerprint_cmd (u_int32_t type, XIDS xids, const void *key, u_int32_t keylen, const void *val, u_int32_t vallen);
extern u_int32_t toku_calc_fingerprint_cmdstruct (BRT_CMD cmd);
// How long is the pivot key?
......@@ -322,6 +320,7 @@ enum brt_layout_version_e {
BRT_LAYOUT_VERSION_8 = 8, // Diff from 7 to 8: Use murmur instead of crc32. We are going to make a simplification and stop supporting version 7 and before. Current As of Beta 1.0.6
BRT_LAYOUT_VERSION_9 = 9, // Diff from 8 to 9: Variable-sized blocks and compression.
BRT_LAYOUT_VERSION_10 = 10, // Diff from 9 to 10: Variable number of compressed sub-blocks per block, disk byte order == intel byte order, Subtree estimates instead of just leafentry estimates, translation table, dictionary descriptors, checksum in header, subdb support removed from brt layer
BRT_LAYOUT_VERSION_11 = 11, // Diff from 10 to 11: Nested transaction leafentries (completely redesigned). BRT_CMDs on disk now support XIDS (multiple txnids) instead of exactly one.
BRT_ANTEULTIMATE_VERSION, // the version after the most recent version
BRT_LAYOUT_VERSION = BRT_ANTEULTIMATE_VERSION-1 // A hack so I don't have to change this line.
};
......
......@@ -188,10 +188,11 @@ static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) {
assert(0 <= n_buffers && n_buffers < TREE_FANOUT+1);
for (i=0; i< n_buffers; i++) {
FIFO_ITERATE(BNC_BUFFER(node,i),
key __attribute__((__unused__)), keylen,
key, keylen,
data __attribute__((__unused__)), datalen,
type __attribute__((__unused__)), xid __attribute__((__unused__)),
(hsize+=BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+keylen+datalen));
type __attribute__((__unused__)), xids,
(hsize+=BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+keylen+datalen+
xids_get_serialize_size(xids)));
}
assert(hsize==node->u.n.n_bytes_in_buffers);
assert(csize==node->u.n.totalchildkeylens);
......@@ -201,7 +202,7 @@ static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) {
toku_omt_iterate(node->u.l.buffer,
addupsize,
&hsize);
assert(hsize<=node->u.l.n_bytes_in_buffer);
assert(hsize==node->u.l.n_bytes_in_buffer);
hsize+=4; /* add n entries in buffer table. */
hsize+=3*8; /* add the three leaf stats, but no exact bit. */
return size+hsize;
......@@ -226,11 +227,11 @@ unsigned int toku_serialize_brtnode_size (BRTNODE node) {
result+=4; /* n_entries in buffer table. */
result+=3*8; /* the three leaf stats. */
result+=node->u.l.n_bytes_in_buffer;
if (toku_memory_check) {
unsigned int slowresult = toku_serialize_brtnode_size_slow(node);
if (result!=slowresult) printf("%s:%d result=%u slowresult=%u\n", __FILE__, __LINE__, result, slowresult);
assert(result==slowresult);
}
}
if (toku_memory_check) {
unsigned int slowresult = toku_serialize_brtnode_size_slow(node);
if (result!=slowresult) printf("%s:%d result=%u slowresult=%u\n", __FILE__, __LINE__, result, slowresult);
assert(result==slowresult);
}
return result;
}
......@@ -408,14 +409,14 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
for (i=0; i< n_buffers; i++) {
//printf("%s:%d p%d=%p n_entries=%d\n", __FILE__, __LINE__, i, node->mdicts[i], mdict_n_entries(node->mdicts[i]));
wbuf_int(&w, toku_fifo_n_entries(BNC_BUFFER(node,i)));
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid,
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xids,
{
assert(type>=0 && type<256);
wbuf_char(&w, (unsigned char)type);
wbuf_TXNID(&w, xid);
wbuf_xids(&w, xids);
wbuf_bytes(&w, key, keylen);
wbuf_bytes(&w, data, datalen);
check_local_fingerprint+=node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen);
check_local_fingerprint+=node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xids, key, keylen, data, datalen);
});
}
//printf("%s:%d check_local_fingerprint=%8x\n", __FILE__, __LINE__, check_local_fingerprint);
......@@ -736,7 +737,7 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash
result->layout_version = rbuf_int(&rc);
{
switch (result->layout_version) {
case BRT_LAYOUT_VERSION_10: goto ok_layout_version;
case BRT_LAYOUT_VERSION: goto ok_layout_version;
// Don't support older versions.
}
r=toku_db_badformat();
......@@ -826,19 +827,21 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash
bytevec val; ITEMLEN vallen;
//toku_verify_counts(result);
int type = rbuf_char(&rc);
TXNID xid = rbuf_ulonglong(&rc);
XIDS xids;
xids_create_from_buffer(&rc, &xids);
rbuf_bytes(&rc, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(&rc, &val, &vallen);
check_local_fingerprint += result->rand4fingerprint * toku_calc_fingerprint_cmd(type, xid, key, keylen, val, vallen);
check_local_fingerprint += result->rand4fingerprint * toku_calc_fingerprint_cmd(type, xids, key, keylen, val, vallen);
//printf("Found %s,%s\n", (char*)key, (char*)val);
{
r=toku_fifo_enq(BNC_BUFFER(result, cnum), key, keylen, val, vallen, type, xid); /* Copies the data into the hash table. */
r=toku_fifo_enq(BNC_BUFFER(result, cnum), key, keylen, val, vallen, type, xids); /* Copies the data into the hash table. */
if (r!=0) { goto died_12; }
}
diff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
diff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids);
result->u.n.n_bytes_in_buffers += diff;
BNC_NBYTESINBUF(result,cnum) += diff;
//printf("Inserted\n");
xids_destroy(&xids);
}
}
if (check_local_fingerprint != result->local_fingerprint) {
......@@ -977,6 +980,7 @@ serialize_brt_header_min_size (u_int32_t version) {
u_int32_t size;
switch(version) {
case BRT_LAYOUT_VERSION_10:
case BRT_LAYOUT_VERSION_11:
size = (+8 // "tokudata"
+4 // version
+4 // size
......@@ -1231,7 +1235,7 @@ deserialize_brtheader (int fd, struct rbuf *rb, struct brt_header **brth) {
list_init(&h->zombie_brts);
//version MUST be in network order on disk regardless of disk order
h->layout_version = rbuf_network_int(&rc);
assert(h->layout_version==BRT_LAYOUT_VERSION_10);
assert(h->layout_version==BRT_LAYOUT_VERSION);
//Size MUST be in network order regardless of disk order.
u_int32_t size = rbuf_network_int(&rc);
......@@ -1311,8 +1315,9 @@ deserialize_brtheader_from_fd_into_rbuf(int fd, toku_off_t offset, struct rbuf *
if (r==0) {
//Version MUST be in network order regardless of disk order.
version = rbuf_network_int(rb);
if (version < BRT_LAYOUT_VERSION_10) r = TOKUDB_DICTIONARY_TOO_OLD; //Cannot use
if (version > BRT_LAYOUT_VERSION_10) r = TOKUDB_DICTIONARY_TOO_NEW; //Cannot use
//TODO: #1125 Possibly support transparent upgrade. If so, it should be < ...10
if (version < BRT_LAYOUT_VERSION) r = TOKUDB_DICTIONARY_TOO_OLD; //Cannot use
if (version > BRT_LAYOUT_VERSION) r = TOKUDB_DICTIONARY_TOO_NEW; //Cannot use
}
u_int32_t size;
if (r==0) {
......
......@@ -3,6 +3,7 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
#include "ule.h"
int toku_testsetup_leaf(BRT brt, BLOCKNUM *blocknum) {
BRTNODE node;
......@@ -74,15 +75,21 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
toku_verify_counts(node);
assert(node->height==0);
u_int32_t lesize, disksize;
size_t lesize, disksize;
LEAFENTRY leafentry;
r = le_committed(keylen, key, vallen, val, &lesize, &disksize, &leafentry, node->u.l.buffer, &node->u.l.buffer_mempool, 0);
OMTVALUE storeddatav;
u_int32_t idx;
DBT keydbt,valdbt;
BRT_CMD_S cmd = {BRT_INSERT, 0, .u.id={toku_fill_dbt(&keydbt, key, keylen),
toku_fill_dbt(&valdbt, val, vallen)}};
BRT_CMD_S cmd = {BRT_INSERT, xids_get_root_xids(),
.u.id={toku_fill_dbt(&keydbt, key, keylen),
toku_fill_dbt(&valdbt, val, vallen)}};
//Generate a leafentry (committed insert key,val)
r = apply_msg_to_leafentry(&cmd, NULL, //No old leafentry
&lesize, &disksize, &leafentry,
node->u.l.buffer, &node->u.l.buffer_mempool, 0);
assert(r==0);
struct cmd_leafval_heaviside_extra be = {brt, &cmd, node->flags & TOKU_DB_DUPSORT};
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_heaviside, &be, &storeddatav, &idx, NULL);
......@@ -127,12 +134,13 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_cmd_t
toku_fill_dbt(&v, val, vallen),
brt);
r = toku_fifo_enq(BNC_BUFFER(node, childnum), key, keylen, val, vallen, cmdtype, (TXNID)0);
XIDS xids_0 = xids_get_root_xids();
r = toku_fifo_enq(BNC_BUFFER(node, childnum), key, keylen, val, vallen, cmdtype, xids_0);
assert(r==0);
u_int32_t fdelta = node->rand4fingerprint * toku_calc_fingerprint_cmd(cmdtype, (TXNID)0, key, keylen, val, vallen);
u_int32_t fdelta = node->rand4fingerprint * toku_calc_fingerprint_cmd(cmdtype, xids_0, key, keylen, val, vallen);
node->local_fingerprint += fdelta;
*subtree_fingerprint += fdelta;
int sizediff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD;
int sizediff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids_0);
node->u.n.n_bytes_in_buffers += sizediff;
BNC_NBYTESINBUF(node, childnum) += sizediff;
node->dirty = 1;
......
......@@ -50,12 +50,12 @@ static int compare_pairs (BRT brt, struct kv_pair *a, struct kv_pair *b) {
static int compare_leafentries (BRT brt, LEAFENTRY a, LEAFENTRY b) {
DBT x,y;
int cmp = brt->compare_fun(brt->db,
toku_fill_dbt(&x, le_any_key(a), le_any_keylen(a)),
toku_fill_dbt(&y, le_any_key(b), le_any_keylen(b)));
toku_fill_dbt(&x, le_key(a), le_keylen(a)),
toku_fill_dbt(&y, le_key(b), le_keylen(b)));
if (cmp==0 && (brt->flags & TOKU_DB_DUPSORT)) {
cmp = brt->dup_compare(brt->db,
toku_fill_dbt(&x, le_any_val(a), le_any_vallen(a)),
toku_fill_dbt(&y, le_any_val(b), le_any_vallen(b)));
toku_fill_dbt(&x, le_innermost_inserted_val(a), le_innermost_inserted_vallen(a)),
toku_fill_dbt(&y, le_innermost_inserted_val(b), le_innermost_inserted_vallen(b)));
}
return cmp;
}
......@@ -75,7 +75,7 @@ static void verify_pair (bytevec key, unsigned int keylen,
bytevec data __attribute__((__unused__)),
unsigned int datalen __attribute__((__unused__)),
int type __attribute__((__unused__)),
TXNID xid __attribute__((__unused__)),
XIDS xids __attribute__((__unused__)),
void *arg) {
struct verify_pair_arg *vparg = (struct verify_pair_arg *)arg;
BRT brt = vparg->brt;
......
This diff is collapsed.
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#include <toku_portability.h>
#include "brttypes.h"
#include "xids.h"
#include "fifo_msg.h"
#include "brt_msg.h"
//BRT_MSG internals are in host order
//XIDS are not 'internal' to BRT_MSG
void
brt_msg_from_dbts(BRT_MSG brt_msg,
DBT *key, DBT *val,
XIDS xids, brt_msg_type type) {
brt_msg->u.id.key = key;
brt_msg->u.id.val = val;
brt_msg->xids = xids;
brt_msg->type = type;
}
//No conversion (from disk to host) is necessary
//Accessor functions for fifo return host order bytes.
#if 0
void
brt_msg_from_fifo_msg(BRT_MSG brt_msg, FIFO_MSG fifo_msg) {
brt_msg->keylen = fifo_msg_get_keylen(fifo_msg);
brt_msg->vallen = fifo_msg_get_vallen(fifo_msg);
brt_msg->vallen = fifo_msg_get_vallen(fifo_msg);
brt_msg->key = fifo_msg_get_key(fifo_msg);
brt_msg->val = fifo_msg_get_val(fifo_msg);
brt_msg->xids = fifo_msg_get_xids(fifo_msg);
brt_msg->type = fifo_msg_get_type(fifo_msg);
}
#endif
u_int32_t
brt_msg_get_keylen(BRT_MSG brt_msg) {
u_int32_t rval = brt_msg->u.id.key->size;
return rval;
}
u_int32_t
brt_msg_get_vallen(BRT_MSG brt_msg) {
u_int32_t rval = brt_msg->u.id.val->size;
return rval;
}
XIDS
brt_msg_get_xids(BRT_MSG brt_msg) {
XIDS rval = brt_msg->xids;
return rval;
}
void *
brt_msg_get_key(BRT_MSG brt_msg) {
void * rval = brt_msg->u.id.key->data;
return rval;
}
void *
brt_msg_get_val(BRT_MSG brt_msg) {
void * rval = brt_msg->u.id.val->data;
return rval;
}
brt_msg_type
brt_msg_get_type(BRT_MSG brt_msg) {
brt_msg_type rval = brt_msg->type;
return rval;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
/* The purpose of this file is to provide access to the brt_msg,
* which is the ephemeral version of the fifo_msg.
*/
#ifndef BRT_MSG_H
#define BRT_MSG_H
u_int32_t brt_msg_get_keylen(BRT_MSG brt_msg);
u_int32_t brt_msg_get_vallen(BRT_MSG brt_msg);
XIDS brt_msg_get_xids(BRT_MSG brt_msg);
void * brt_msg_get_key(BRT_MSG brt_msg);
void * brt_msg_get_val(BRT_MSG brt_msg);
brt_msg_type brt_msg_get_type(BRT_MSG brt_msg);
void brt_msg_from_fifo_msg(BRT_MSG brt_msg, FIFO_MSG fifo_msg);
void brt_msg_from_dbts(BRT_MSG brt_msg, DBT *key, DBT *val, XIDS xids, brt_msg_type type);
#endif
......@@ -100,7 +100,7 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
printf(" child %d: %" PRId64 "\n", i, BNC_BLOCKNUM(n, i).b);
printf(" buffer contains %u bytes (%d items)\n", BNC_NBYTESINBUF(n, i), toku_fifo_n_entries(BNC_BUFFER(n,i)));
if (dump_data) {
FIFO_ITERATE(BNC_BUFFER(n,i), key, keylen, data, datalen, typ, xid,
FIFO_ITERATE(BNC_BUFFER(n,i), key, keylen, data, datalen, typ, xids,
{
printf(" TYPE=");
switch ((enum brt_cmd_type)typ) {
......@@ -115,7 +115,7 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
}
printf("HUH?");
ok:
printf(" xid=%"PRIu64" ", xid);
printf(" xid=%"PRIu64" ", xids_get_innermost_xid(xids));
print_item(key, keylen);
if (datalen>0) {
printf(" ");
......
......@@ -90,10 +90,12 @@ enum brt_cmd_type {
BRT_COMMIT_BOTH = 7
};
typedef struct xids_t *XIDS;
typedef struct fifo_msg_t *FIFO_MSG;
/* tree commands */
struct brt_cmd {
enum brt_cmd_type type;
TXNID xid;
XIDS xids;
union {
/* insert or delete */
struct brt_cmd_insert_delete {
......@@ -104,17 +106,15 @@ struct brt_cmd {
};
typedef struct brt_cmd BRT_CMD_S, *BRT_CMD;
#if !defined(__cplusplus)
static inline
BRT_CMD_S
build_brt_cmd (enum brt_cmd_type type, TXNID xid, DBT *key, DBT *val) {
BRT_CMD_S result = {type, xid, .u.id={key,val}};
return result;
}
#endif
// TODO: replace brt_cmd_type when ready
typedef enum brt_cmd_type brt_msg_type;
// Message sent into brt to implement command (insert, delete, etc.)
// This structure supports nested transactions, and obsoletes brt_cmd.
typedef struct brt_cmd BRT_MSG_S, *BRT_MSG;
#define UU(x) x __attribute__((__unused__))
typedef struct leafentry *LEAFENTRY;
#define UU(x) x __attribute__((__unused__))
#endif
......@@ -3,6 +3,7 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h"
#include "xids.h"
struct fifo {
int n_items_in_fifo;
......@@ -22,7 +23,9 @@ static void fifo_init(struct fifo *fifo) {
}
static int fifo_entry_size(struct fifo_entry *entry) {
return sizeof (struct fifo_entry) + entry->keylen + entry->vallen;
return sizeof (struct fifo_entry) + entry->keylen + entry->vallen
+ xids_get_size(&entry->xids_s)
- sizeof(XIDS_S); //Prevent double counting from fifo_entry+xids_get_size
}
static struct fifo_entry *fifo_peek(struct fifo *fifo) {
......@@ -59,8 +62,11 @@ static int next_power_of_two (int n) {
return r;
}
int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, int type, TXNID xid) {
int need_space_here = sizeof(struct fifo_entry) + keylen + datalen;
int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, int type, XIDS xids) {
int need_space_here = sizeof(struct fifo_entry)
+ keylen + datalen
+ xids_get_size(xids)
- sizeof(XIDS_S); //Prevent double counting
int need_space_total = fifo->memory_used+need_space_here;
if (fifo->memory == NULL) {
fifo->memory_size = next_power_of_two(need_space_total);
......@@ -88,30 +94,32 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d
}
struct fifo_entry *entry = (struct fifo_entry *)(fifo->memory + fifo->memory_start + fifo->memory_used);
entry->type = (unsigned char)type;
entry->xid = xid;
xids_cpy(&entry->xids_s, xids);
entry->keylen = keylen;
memcpy(entry->key, key, keylen);
unsigned char *e_key = xids_get_end_of_array(&entry->xids_s);
memcpy(e_key, key, keylen);
entry->vallen = datalen;
memcpy(entry->key + keylen, data, datalen);
memcpy(e_key + keylen, data, datalen);
fifo->n_items_in_fifo++;
fifo->memory_used += need_space_here;
return 0;
}
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_CMD cmd) {
return toku_fifo_enq(fifo, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->type, cmd->xid);
return toku_fifo_enq(fifo, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->type, cmd->xids);
}
/* peek at the head (the oldest entry) of the fifo */
int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, u_int32_t *type, TXNID *xid) {
int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, u_int32_t *type, XIDS *xids) {
struct fifo_entry *entry = fifo_peek(fifo);
if (entry == 0) return -1;
*key = entry->key;
unsigned char *e_key = xids_get_end_of_array(&entry->xids_s);
*key = e_key;
*keylen = entry->keylen;
*data = entry->key + entry->keylen;
*data = e_key + entry->keylen;
*datalen = entry->vallen;
*type = entry->type;
*xid = entry->xid;
*xids = &entry->xids_s;
return 0;
}
......@@ -120,7 +128,7 @@ int toku_fifo_peek_cmdstruct (FIFO fifo, BRT_CMD cmd, DBT*key, DBT*data) {
u_int32_t type;
bytevec keyb,datab;
unsigned int keylen,datalen;
int r = toku_fifo_peek(fifo, &keyb, &keylen, &datab, &datalen, &type, &cmd->xid);
int r = toku_fifo_peek(fifo, &keyb, &keylen, &datab, &datalen, &type, &cmd->xids);
if (r!=0) return r;
cmd->type=(enum brt_cmd_type)type;
toku_fill_dbt(key, keyb, keylen);
......@@ -151,10 +159,10 @@ struct fifo_entry * toku_fifo_iterate_internal_get_entry(FIFO fifo, int off) {
return (struct fifo_entry *)(fifo->memory + off);
}
void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void *arg) {
void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, XIDS xids, void*), void *arg) {
FIFO_ITERATE(fifo,
key, keylen, data, datalen, type, xid,
f(key,keylen,data,datalen,type,xid, arg));
key, keylen, data, datalen, type, xids,
f(key,keylen,data,datalen,type,xids, arg));
}
unsigned long toku_fifo_memory_size(FIFO fifo) {
......
......@@ -5,13 +5,14 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "brttypes.h"
#include "xids-internal.h"
#include "xids.h"
struct fifo_entry {
unsigned int keylen;
unsigned int vallen;
unsigned char type;
TXNID xid;
unsigned char key[];
XIDS_S xids_s;
};
typedef struct fifo *FIFO;
......@@ -20,8 +21,8 @@ int toku_fifo_create(FIFO *);
void toku_fifo_free(FIFO *);
int toku_fifo_n_entries(FIFO);
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_CMD cmd);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, TXNID xid);
int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, XIDS xids);
int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, XIDS *xids);
int toku_fifo_peek_cmdstruct (FIFO, BRT_CMD, DBT*, DBT*); // fill in the BRT_CMD, using the two DBTs for the DBT part.
int toku_fifo_deq(FIFO);
......@@ -30,20 +31,20 @@ unsigned long toku_fifo_memory_size(FIFO); // return how much memory the fifo us
//These two are problematic, since I don't want to malloc() the bytevecs, but dequeueing the fifo frees the memory.
//int toku_fifo_peek_deq (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid);
//int toku_fifo_peek_deq_cmdstruct (FIFO, BRT_CMD, DBT*, DBT*); // fill in the BRT_CMD, using the two DBTs for the DBT part.
void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void*);
void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, XIDS xids, void*), void*);
#define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,xidvar,body) do { \
#define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,xidsvar,body) do { \
int fifo_iterate_off; \
for (fifo_iterate_off = toku_fifo_iterate_internal_start(fifo); \
toku_fifo_iterate_internal_has_more(fifo, fifo_iterate_off); \
fifo_iterate_off = toku_fifo_iterate_internal_next(fifo, fifo_iterate_off)) { \
struct fifo_entry *e = toku_fifo_iterate_internal_get_entry(fifo, fifo_iterate_off); \
bytevec keyvar = e->key; \
ITEMLEN keylenvar = e->keylen; \
bytevec datavar = e->key + e->keylen; \
ITEMLEN datalenvar = e->vallen; \
int typevar = e->type; \
TXNID xidvar = e->xid; \
XIDS xidsvar = &e->xids_s; \
bytevec keyvar = xids_get_end_of_array(xidsvar); \
bytevec datavar = keyvar + e->keylen; \
body; \
} } while (0)
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
/* Purpose of this file is to define and handle the fifo_msg, which
* is the stored format of a brt_msg.
*
* Note, when translating from fifo_msg to brt_msg, the brt_msg
* will be created with a pointer into the xids in the fifo_msg.
* (The xids will not be embedded in the brt_msg.) This means
* that a valid xids struct must be embedded in the fifo_msg.
*
* NOTE: fifo_msg is stored in memory and on disk in same format.
* fifo_msg is stored in same byte order both in-memory
* and on-disk. Accessors are responsible for tranposition
* to host order.
*/
#include <string.h>
#include <toku_portability.h>
#include "brttypes.h"
#include "xids.h"
#include "xids-internal.h"
#include "brt_msg.h"
#include "fifo_msg.h"
#include <toku_htod.h>
// xids_and_key_and_val field is XIDS_S followed by key
// followed by value.
struct fifo_msg_t {
u_int32_t keylen;
u_int32_t vallen;
u_int8_t type;
// u_int8_t pad[7]; // force 64-bit alignment if needed ???
u_int8_t xids_and_key_and_val[]; // undifferentiated bytes
};
u_int32_t
fifo_msg_get_keylen(FIFO_MSG fifo_msg) {
u_int32_t rval = fifo_msg->keylen;
rval = toku_dtoh32(rval);
return rval;
}
u_int32_t
fifo_msg_get_vallen(FIFO_MSG fifo_msg) {
u_int32_t rval = fifo_msg->vallen;
rval = toku_dtoh32(rval);
return rval;
}
XIDS
fifo_msg_get_xids(FIFO_MSG fifo_msg) {
XIDS rval = (XIDS) &fifo_msg->xids_and_key_and_val;
return rval;
}
static u_int32_t
fifo_msg_get_xids_size(FIFO_MSG fifo_msg) {
u_int32_t rval;
XIDS xids = fifo_msg_get_xids(fifo_msg);
rval = xids_get_size(xids);
return rval;
}
void *
fifo_msg_get_key(FIFO_MSG fifo_msg) {
void * rval;
u_int32_t xidslen = fifo_msg_get_xids_size(fifo_msg);
rval = (void*)fifo_msg->xids_and_key_and_val + xidslen;
return rval;
}
void *
fifo_msg_get_val(FIFO_MSG fifo_msg) {
void * rval;
void * key = fifo_msg_get_key(fifo_msg);
u_int32_t keylen = fifo_msg_get_keylen(fifo_msg);
rval = key + keylen;
return rval;
}
brt_msg_type
fifo_msg_get_type(FIFO_MSG fifo_msg) {
brt_msg_type rval = fifo_msg->type;
return rval;
}
// Finds size of a fifo msg.
u_int32_t
fifo_msg_get_size(FIFO_MSG fifo_msg) {
u_int32_t rval;
u_int32_t keylen = fifo_msg_get_keylen(fifo_msg);
u_int32_t vallen = fifo_msg_get_vallen(fifo_msg);
u_int32_t xidslen = fifo_msg_get_xids_size(fifo_msg);
rval = keylen + vallen + xidslen + sizeof(*fifo_msg);
return rval;
}
// Return number of bytes required for a fifo_msg created from
// the given brt_msg
u_int32_t
fifo_msg_get_size_required(BRT_MSG brt_msg) {
u_int32_t rval;
u_int32_t keylen = brt_msg_get_keylen(brt_msg);
u_int32_t vallen = brt_msg_get_vallen(brt_msg);
XIDS xids = brt_msg_get_xids(brt_msg);
u_int32_t xidslen = xids_get_size(xids);
rval = keylen + vallen + xidslen + sizeof(struct fifo_msg_t);
return rval;
}
void
fifo_msg_from_brt_msg(FIFO_MSG fifo_msg, BRT_MSG brt_msg) {
u_int32_t keylen_host = brt_msg_get_keylen(brt_msg);
u_int32_t vallen_host = brt_msg_get_vallen(brt_msg);
fifo_msg->type = brt_msg_get_type(brt_msg);
fifo_msg->keylen = toku_htod32(keylen_host);
fifo_msg->vallen = toku_htod32(vallen_host);
//Copy XIDS
XIDS xids = brt_msg_get_xids(brt_msg);
XIDS xids_target = fifo_msg_get_xids(fifo_msg);
u_int32_t xidslen = xids_get_size(xids);
memcpy(xids_target, xids, xidslen);
//Copy Key
void *key = brt_msg_get_key(brt_msg);
void *key_target = fifo_msg_get_key(fifo_msg);
memcpy(key_target, key, keylen_host);
//Copy Val
void *val = brt_msg_get_val(brt_msg);
void *val_target = fifo_msg_get_val(fifo_msg);
memcpy(val_target, val, vallen_host);
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
/* The purpose of this file is to provide access to the fifo_msg,
* which is the stored representation of the brt_msg.
*
* NOTE: Accessor functions return all values in host byte order.
*/
#ifndef FIFO_MSG_H
#define FIFO_MSG_H
u_int32_t fifo_msg_get_keylen(FIFO_MSG fifo_msg);
u_int32_t fifo_msg_get_vallen(FIFO_MSG fifo_msg);
XIDS fifo_msg_get_xids(FIFO_MSG fifo_msg);
void * fifo_msg_get_key(FIFO_MSG fifo_msg);
void * fifo_msg_get_val(FIFO_MSG fifo_msg);
brt_msg_type fifo_msg_get_type(FIFO_MSG fifo_msg);
u_int32_t fifo_msg_get_size(FIFO_MSG fifo_msg);
// Return number of bytes required for a fifo_msg created from
// the given brt_msg
u_int32_t fifo_msg_get_size_required(BRT_MSG brt_msg);
#endif
......@@ -28,15 +28,12 @@ u_int32_t toku_calccrc32_kvpair_struct (const struct kv_pair *kvp) {
}
#endif
u_int32_t toku_calc_fingerprint_cmd (u_int32_t type, TXNID xid, const void *key, u_int32_t keylen, const void *val, u_int32_t vallen) {
u_int32_t toku_calc_fingerprint_cmd (u_int32_t type, XIDS xids, const void *key, u_int32_t keylen, const void *val, u_int32_t vallen) {
unsigned char type_c = (unsigned char)type;
unsigned int a = toku_htod32(xid>>32);
unsigned int b = toku_htod32(xid&0xffffffff);
struct x1764 mm;
x1764_init(&mm);
x1764_add(&mm, &type_c, 1);
x1764_add(&mm, &a, 4);
x1764_add(&mm, &b, 4);
toku_calc_more_murmur_xids(&mm, xids);
toku_calc_more_murmur_kvpair(&mm, key, keylen, val, vallen);
return x1764_finish(&mm);
}
This diff is collapsed.
This diff is collapsed.
......@@ -105,6 +105,7 @@ struct tokutxn {
toku_off_t rollentry_filesize; // How many bytes are in the rollentry file (this is the uncompressed bytes. If the file is compressed it may actually be smaller (or even larger with header information))
u_int64_t rollentry_raw_count; // the total count of every byte in the transaction and all its children.
OMT open_brts; // a collection of the brts that we touched. Indexed by filenum.
XIDS xids; //Represents the xid list
};
int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbuf *wbuf, int do_fsync);
......
......@@ -32,7 +32,6 @@ struct roll_entry;
#include "recover.h"
#include "txn.h"
// needed by logformat.c
static inline int toku_copy_BYTESTRING(BYTESTRING *target, BYTESTRING val) {
target->len = val.len;
target->data = toku_memdup(val.data, (size_t)val.len);
......
......@@ -44,26 +44,26 @@ struct logtype {
int logformat_version_number = 0;
const struct logtype rollbacks[] = {
{"fcreate", 'F', FA{{"TXNID", "xid", 0},
{"fcreate", 'F', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"BYTESTRING", "fname", 0},
NULLFIELD}},
// cmdinsert is used to insert a key-value pair into a NODUP DB. For rollback we don't need the data.
{"cmdinsert", 'i', FA{{"TXNID", "xid", 0},
{"cmdinsert", 'i', FA{
{"FILENUM", "filenum", 0},
{"BYTESTRING", "key", 0},
NULLFIELD}},
{"cmdinsertboth", 'I', FA{{"TXNID", "xid", 0},
{"cmdinsertboth", 'I', FA{
{"FILENUM", "filenum", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
{"cmddeleteboth", 'D', FA{{"TXNID", "xid", 0},
{"cmddeleteboth", 'D', FA{
{"FILENUM", "filenum", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
{"cmddelete", 'd', FA{{"TXNID", "xid", 0},
{"cmddelete", 'd', FA{
{"FILENUM", "filenum", 0},
{"BYTESTRING", "key", 0},
NULLFIELD}},
......
This diff is collapsed.
......@@ -239,11 +239,17 @@ toku_recover_enq_insert (LSN lsn __attribute__((__unused__)), FILENUM filenum, T
struct brt_cmd cmd;
DBT keydbt, valdbt;
cmd.type=BRT_INSERT;
cmd.xid =xid;
//TODO: #1125 and recovery: Remove this hack
// Assume this is a root txn (not yet enough info to construct full XIDS for message)
XIDS root = xids_get_root_xids();
r = xids_create_child(root, &cmd.xids, xid);
assert(r==0);
cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len);
cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger);
assert(r==0);
xids_destroy(&cmd.xids);
xids_destroy(&root);
toku_free(key.data);
toku_free(val.data);
}
......@@ -265,11 +271,17 @@ toku_recover_enq_delete_both (LSN lsn __attribute__((__unused__)), FILENUM filen
struct brt_cmd cmd;
DBT keydbt, valdbt;
cmd.type = BRT_DELETE_BOTH;
cmd.xid =xid;
//TODO: #1125 and recovery: Remove this hack
// Assume this is a root txn (not yet enough info to construct full XIDS for message)
XIDS root = xids_get_root_xids();
r = xids_create_child(root, &cmd.xids, xid);
assert(r==0);
cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len);
cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger);
assert(r==0);
xids_destroy(&cmd.xids);
xids_destroy(&root);
toku_free(key.data);
toku_free(val.data);
}
......@@ -291,11 +303,17 @@ toku_recover_enq_delete_any (LSN lsn __attribute__((__unused__)), FILENUM filenu
struct brt_cmd cmd;
DBT keydbt, valdbt;
cmd.type = BRT_DELETE_ANY;
cmd.xid = xid;
//TODO: #1125 and recovery: Remove this hack
// Assume this is a root txn (not yet enough info to construct full XIDS for message)
XIDS root = xids_get_root_xids();
r = xids_create_child(root, &cmd.xids, xid);
assert(r==0);
cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len);
cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger);
assert(r==0);
xids_destroy(&cmd.xids);
xids_destroy(&root);
toku_free(key.data);
toku_free(val.data);
}
......
......@@ -7,12 +7,8 @@
#include "includes.h"
#include "checkpoint.h"
// these flags control whether or not we send commit messages for
// various operations
#define TOKU_DO_COMMIT_CMD_INSERT 0
#define TOKU_DO_COMMIT_CMD_DELETE 1
#define TOKU_DO_COMMIT_CMD_DELETE_BOTH 1
#include "xids.h"
#include "roll.h"
int
toku_commit_fcreate (TXNID UU(xid),
......@@ -63,14 +59,15 @@ static int find_brt_from_filenum (OMTVALUE v, void *filenumvp) {
return 0;
}
static int do_insertion (enum brt_cmd_type type, TXNID xid, FILENUM filenum, BYTESTRING key, BYTESTRING *data,TOKUTXN txn) {
static int do_insertion (enum brt_cmd_type type, FILENUM filenum, BYTESTRING key, BYTESTRING *data,TOKUTXN txn) {
CACHEFILE cf;
//printf("%s:%d committing insert %s %s\n", __FILE__, __LINE__, key.data, data.data);
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
assert(r==0);
DBT key_dbt,data_dbt;
BRT_CMD_S brtcmd = { type, xid,
XIDS xids = toku_txn_get_xids(txn);
BRT_CMD_S brtcmd = { type, xids,
.u.id={toku_fill_dbt(&key_dbt, key.data, key.len),
data
? toku_fill_dbt(&data_dbt, data->data, data->len)
......@@ -93,18 +90,17 @@ static int do_nothing_with_filenum(TOKUTXN txn, FILENUM filenum) {
}
int toku_commit_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key, TOKUTXN txn, YIELDF UU(yield), void *UU(yieldv)) {
int toku_commit_cmdinsert (FILENUM filenum, BYTESTRING key, TOKUTXN txn, YIELDF UU(yield), void *UU(yieldv)) {
#if TOKU_DO_COMMIT_CMD_INSERT
return do_insertion (BRT_COMMIT_ANY, xid, filenum, key, 0, txn);
return do_insertion (BRT_COMMIT_ANY, filenum, key, 0, txn);
#else
xid = xid; key = key;
key = key;
return do_nothing_with_filenum(txn, filenum);
#endif
}
int
toku_commit_cmdinsertboth (TXNID xid,
FILENUM filenum,
toku_commit_cmdinsertboth (FILENUM filenum,
BYTESTRING key,
BYTESTRING data,
TOKUTXN txn,
......@@ -112,39 +108,36 @@ toku_commit_cmdinsertboth (TXNID xid,
void * UU(yieldv))
{
#if TOKU_DO_COMMIT_CMD_INSERT
return do_insertion (BRT_COMMIT_BOTH, xid, filenum, key, &data, txn);
return do_insertion (BRT_COMMIT_BOTH, filenum, key, &data, txn);
#else
xid = xid; key = key; data = data;
key = key; data = data;
return do_nothing_with_filenum(txn, filenum);
#endif
}
int
toku_rollback_cmdinsert (TXNID xid,
FILENUM filenum,
toku_rollback_cmdinsert (FILENUM filenum,
BYTESTRING key,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
{
return do_insertion (BRT_ABORT_ANY, xid, filenum, key, 0, txn);
return do_insertion (BRT_ABORT_ANY, filenum, key, 0, txn);
}
int
toku_rollback_cmdinsertboth (TXNID xid,
FILENUM filenum,
toku_rollback_cmdinsertboth (FILENUM filenum,
BYTESTRING key,
BYTESTRING data,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
{
return do_insertion (BRT_ABORT_BOTH, xid, filenum, key, &data, txn);
return do_insertion (BRT_ABORT_BOTH, filenum, key, &data, txn);
}
int
toku_commit_cmddeleteboth (TXNID xid,
FILENUM filenum,
toku_commit_cmddeleteboth (FILENUM filenum,
BYTESTRING key,
BYTESTRING data,
TOKUTXN txn,
......@@ -152,7 +145,7 @@ toku_commit_cmddeleteboth (TXNID xid,
void * UU(yieldv))
{
#if TOKU_DO_COMMIT_CMD_DELETE_BOTH
return do_insertion (BRT_COMMIT_BOTH, xid, filenum, key, &data, txn);
return do_insertion (BRT_COMMIT_BOTH, filenum, key, &data, txn);
#else
xid = xid; key = key; data = data;
return do_nothing_with_filenum(txn, filenum);
......@@ -160,27 +153,25 @@ toku_commit_cmddeleteboth (TXNID xid,
}
int
toku_rollback_cmddeleteboth (TXNID xid,
FILENUM filenum,
toku_rollback_cmddeleteboth (FILENUM filenum,
BYTESTRING key,
BYTESTRING data,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
{
return do_insertion (BRT_ABORT_BOTH, xid, filenum, key, &data, txn);
return do_insertion (BRT_ABORT_BOTH, filenum, key, &data, txn);
}
int
toku_commit_cmddelete (TXNID xid,
FILENUM filenum,
toku_commit_cmddelete (FILENUM filenum,
BYTESTRING key,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
{
#if TOKU_DO_COMMIT_CMD_DELETE
return do_insertion (BRT_COMMIT_ANY, xid, filenum, key, 0, txn);
return do_insertion (BRT_COMMIT_ANY, filenum, key, 0, txn);
#else
xid = xid; key = key;
return do_nothing_with_filenum(txn, filenum);
......@@ -188,14 +179,13 @@ toku_commit_cmddelete (TXNID xid,
}
int
toku_rollback_cmddelete (TXNID xid,
FILENUM filenum,
toku_rollback_cmddelete (FILENUM filenum,
BYTESTRING key,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
{
return do_insertion (BRT_ABORT_ANY, xid, filenum, key, 0, txn);
return do_insertion (BRT_ABORT_ANY, filenum, key, 0, txn);
}
int
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id: roll.c 12588 2009-06-09 00:05:02Z yfogel $"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#ifndef TOKUDB_ROLL_H
#define TOKUDB_ROLL_H
// these flags control whether or not we send commit messages for
// various operations
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_INSERT message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_INSERT 0
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_DELETE_ANY message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_DELETE 1
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_DELETE_BOTH message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_DELETE_BOTH 1
#endif
......@@ -32,6 +32,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
list_remove(&txn->live_txns_link);
note_txn_closing(txn);
xids_destroy(&txn->xids);
toku_free(txn);
return;
}
......
......@@ -94,7 +94,8 @@ REGRESSION_TESTS_RAW = \
test-brt-overflow \
test-del-inorder \
test-inc-split \
test-leafentry \
test-leafentry10 \
test-leafentry-nested \
test_oexcl \
test_toku_malloc_plain_free \
threadpool-test \
......
......@@ -46,12 +46,25 @@ static void test_serialize(void) {
BNC_SUBTREE_ESTIMATES(&sn, 1).exact = (BOOL)(random()%2 != 0);
r = toku_fifo_create(&BNC_BUFFER(&sn,0)); assert(r==0);
r = toku_fifo_create(&BNC_BUFFER(&sn,1)); assert(r==0);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "a", 2, "aval", 5, BRT_NONE, (TXNID)0); assert(r==0); sn.local_fingerprint += randval*toku_calc_fingerprint_cmd(BRT_NONE, (TXNID)0, "a", 2, "aval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "b", 2, "bval", 5, BRT_NONE, (TXNID)123); assert(r==0); sn.local_fingerprint += randval*toku_calc_fingerprint_cmd(BRT_NONE, (TXNID)123, "b", 2, "bval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,1), "x", 2, "xval", 5, BRT_NONE, (TXNID)234); assert(r==0); sn.local_fingerprint += randval*toku_calc_fingerprint_cmd(BRT_NONE, (TXNID)234, "x", 2, "xval", 5);
BNC_NBYTESINBUF(&sn, 0) = 2*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5);
BNC_NBYTESINBUF(&sn, 1) = 1*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5);
sn.u.n.n_bytes_in_buffers = 3*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5);
//Create XIDS
XIDS xids_0 = xids_get_root_xids();
XIDS xids_123;
XIDS xids_234;
r = xids_create_child(xids_0, &xids_123, (TXNID)123);
CKERR(r);
r = xids_create_child(xids_123, &xids_234, (TXNID)234);
CKERR(r);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "a", 2, "aval", 5, BRT_NONE, xids_0); assert(r==0); sn.local_fingerprint += randval*toku_calc_fingerprint_cmd(BRT_NONE, xids_0, "a", 2, "aval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "b", 2, "bval", 5, BRT_NONE, xids_123); assert(r==0); sn.local_fingerprint += randval*toku_calc_fingerprint_cmd(BRT_NONE, xids_123, "b", 2, "bval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,1), "x", 2, "xval", 5, BRT_NONE, xids_234); assert(r==0); sn.local_fingerprint += randval*toku_calc_fingerprint_cmd(BRT_NONE, xids_234, "x", 2, "xval", 5);
BNC_NBYTESINBUF(&sn, 0) = 2*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5) + xids_get_serialize_size(xids_0) + xids_get_serialize_size(xids_123);
BNC_NBYTESINBUF(&sn, 1) = 1*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5) + xids_get_serialize_size(xids_234);
sn.u.n.n_bytes_in_buffers = 3*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5) + xids_get_serialize_size(xids_0) + xids_get_serialize_size(xids_123) + xids_get_serialize_size(xids_234);
//Cleanup:
xids_destroy(&xids_0);
xids_destroy(&xids_123);
xids_destroy(&xids_234);
struct brt *XMALLOC(brt);
struct brt_header *XCALLOC(brt_h);
......
......@@ -46,18 +46,26 @@ test_fifo_enq (int n) {
for (i=0; i<n; i++) {
buildkey(i);
buildval(i);
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i, (TXNID)i); assert(r == 0);
XIDS xids;
if (i==0)
xids = xids_get_root_xids();
else {
r = xids_create_child(xids_get_root_xids(), &xids, (TXNID)i);
assert(r==0);
}
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i, xids); assert(r == 0);
xids_destroy(&xids);
}
i = 0;
FIFO_ITERATE(f, key, keylen, val, vallen, type, xid, {
FIFO_ITERATE(f, key, keylen, val, vallen, type, xids, {
if (verbose) printf("checkit %d %d\n", i, type);
buildkey(i);
buildval(i);
assert((int) keylen == thekeylen); assert(memcmp(key, thekey, keylen) == 0);
assert((int) vallen == thevallen); assert(memcmp(val, theval, vallen) == 0);
assert(i % 256 == type);
assert((TXNID)i==xid);
assert((TXNID)i==xids_get_innermost_xid(xids));
i += 1;
});
assert(i == n);
......
This diff is collapsed.
......@@ -19,7 +19,7 @@ static void test_leafentry_1 (void) {
LEAFENTRY l;
int r;
u_int32_t msize, dsize;
r = le_committed(4, "abc", 3, "xy", &msize, &dsize, &l, 0, 0, 0);
r = le10_committed(4, "abc", 3, "xy", &msize, &dsize, &l, 0, 0, 0);
assert(r==0);
char expect[] = {LE_COMMITTED,
UINT32TOCHAR(4),
......@@ -36,7 +36,7 @@ static void test_leafentry_2 (void) {
LEAFENTRY l;
int r;
u_int32_t msize, dsize;
r = le_both(0x0123456789abcdef0LL, 3, "ab", 4, "xyz", 5, "lmno", &msize, &dsize, &l, 0, 0, 0);
r = le10_both(0x0123456789abcdef0LL, 3, "ab", 4, "xyz", 5, "lmno", &msize, &dsize, &l, 0, 0, 0);
assert(r==0);
char expect[] = {LE_BOTH,
UINT64TOCHAR(0x0123456789abcdef0LL),
......@@ -53,7 +53,7 @@ static void test_leafentry_3 (void) {
LEAFENTRY l;
int r;
u_int32_t msize, dsize;
r = le_provdel(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l, 0, 0, 0);
r = le10_provdel(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l, 0, 0, 0);
assert(r==0);
char expect[] = {LE_PROVDEL,
UINT64TOCHAR(0x0123456789abcdef0LL),
......@@ -69,7 +69,7 @@ static void test_leafentry_4 (void) {
LEAFENTRY l;
int r;
u_int32_t msize, dsize;
r = le_provpair(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l, 0, 0, 0);
r = le10_provpair(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l, 0, 0, 0);
assert(r==0);
char expect[] = {LE_PROVPAIR,
UINT64TOCHAR(0x0123456789abcdef0LL),
......@@ -101,7 +101,7 @@ static void test_leafentry_3long (void) {
LEAFENTRY l;
int r;
u_int32_t msize, dsize;
r = le_provdel(0x0123456789abcdef0LL, 301, zeros, 1025, zeros, &msize, &dsize, &l, 0, 0, 0);
r = le10_provdel(0x0123456789abcdef0LL, 301, zeros, 1025, zeros, &msize, &dsize, &l, 0, 0, 0);
assert(r==0);
assert(sizeof(expect_3long)==msize);
assert(msize==dsize);
......
......@@ -20,11 +20,19 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
}
r = toku_omt_create(&result->open_brts);
if (r!=0) {
died1:
toku_logger_panic(logger, r);
toku_free(result);
return r;
}
result->txnid64 = result->first_lsn.lsn;
XIDS parent_xids;
if (parent_tokutxn==NULL)
parent_xids = xids_get_root_xids();
else
parent_xids = parent_tokutxn->xids;
if ((r=xids_create_child(parent_xids, &result->xids, result->txnid64)))
goto died1;
result->logger = logger;
result->parent = parent_tokutxn;
result->oldest_logentry = result->newest_logentry = 0;
......@@ -70,3 +78,9 @@ void toku_txn_close_txn(TOKUTXN txn) {
toku_rollback_txn_close(txn);
return;
}
XIDS toku_txn_get_xids (TOKUTXN txn) {
if (txn==0) return xids_get_root_xids();
else return txn->xids;
}
......@@ -9,5 +9,6 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void*yieldv);
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void*yieldv);
void toku_txn_close_txn(TOKUTXN txn);
XIDS toku_txn_get_xids (TOKUTXN);
#endif //TOKUTXN_H
This diff is collapsed.
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
/* Purpose of this file is to provide the world with everything necessary
* to use the nested transaction logic and nothing else. No internal
* requirements of the nested transaction logic belongs here.
*/
#ifndef ULE_H
#define ULE_H
//1 does much slower debugging
#define ULE_DEBUG 0
/////////////////////////////////////////////////////////////////////////////////
// Following data structures are the unpacked format of a leafentry.
// * ule is the unpacked leaf entry, that contains an array of unpacked
// transaction records
// * uxr is the unpacked transaction record
//
//Types of transaction records.
enum {XR_INSERT = 1,
XR_DELETE = 2,
XR_PLACEHOLDER = 3};
typedef struct { // unpacked transaction record
u_int8_t type; // delete/insert/placeholder
u_int32_t vallen; // number of bytes in value
void * valp; // pointer to value (Where is value really stored?)
TXNID xid; // transaction id
// Note: when packing ule into a new leafentry, will need
// to copy actual data from valp to new leafentry
} UXR_S, *UXR;
// Unpacked Leaf Entry is of fixed size because it's just on the
// stack and we care about ease of access more than the memory footprint.
typedef struct { // unpacked leaf entry
u_int8_t num_uxrs; // how many of uxrs[] are valid
u_int32_t keylen;
void * keyp;
UXR_S uxrs[MAX_TRANSACTION_RECORDS]; // uxrs[0] is outermost, uxrs[num_uxrs-1] is innermost
} ULE_S, *ULE;
int apply_msg_to_leafentry(BRT_MSG msg,
LEAFENTRY old_leafentry, // NULL if there was no stored data.
size_t *new_leafentry_memorysize,
size_t *new_leafentry_disksize,
LEAFENTRY *new_leafentry_p,
OMT omt,
struct mempool *mp,
void **maybe_free);
//////////////////////////////////////////////////////////////////////////////////////
//Functions exported for test purposes only (used internally for non-test purposes).
void le_unpack(ULE ule, LEAFENTRY le);
int le_pack(ULE ule, // data to be packed into new leafentry
size_t *new_leafentry_memorysize,
size_t *new_leafentry_disksize,
LEAFENTRY * const new_leafentry_p, // this is what this function creates
OMT omt,
struct mempool *mp,
void **maybe_free);
size_t le_memsize_from_ule (ULE ule);
#endif // ULE_H
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#ifndef XIDS_INTERNAL_H
#define XIDS_INTERNAL_H
// Variable size list of transaction ids (known in design doc as xids<>).
// ids[0] is the outermost transaction.
// ids[num_xids - 1] is the innermost transaction.
// Should only be accessed by accessor functions xids_xxx, not directly.
typedef struct xids_t {
u_int8_t num_stored_xids; // maximum value of MAX_TRANSACTION_RECORDS - 1 ...
// ... because transaction 0 is implicit
TXNID ids[];
} XIDS_S;
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
/* Purpose of this file is to implement xids list of nested transactions
* ids.
*
* See design documentation for nested transactions at
* TokuWiki/Imp/TransactionsOverview.
*
* NOTE: xids are always stored in disk byte order.
* Accessors are responsible for transposing bytes to
* host order.
*/
#include <errno.h>
#include <string.h>
#include <toku_portability.h>
#include "brttypes.h"
#include "xids.h"
#include "xids-internal.h"
#include "toku_assert.h"
#include "memory.h"
#include <toku_htod.h>
/////////////////////////////////////////////////////////////////////////////////
// This layer of abstraction (xids_xxx) understands xids<> and nothing else.
// It contains all the functions that understand xids<>
//
// xids<> do not store the implicit transaction id of 0 at index 0.
// The accessor functions make the id of 0 explicit at index 0.
// The number of xids physically stored in the xids array is in
// the variable num_stored_xids.
//
// The xids struct is immutable. The caller gets an initial version of XIDS
// by calling xids_get_root_xids(), which returns the constant struct
// representing the root transaction (id 0). When a transaction begins,
// a new XIDS is created with the id of the current transaction appended to
// the list.
//
//
// This is the xids list for a transactionless environment.
// It is also the initial state of any xids list created for
// nested transactions.
XIDS
xids_get_root_xids(void) {
static const struct xids_t root_xids = {
.num_stored_xids = 0
};
XIDS rval = (XIDS)&root_xids;
return rval;
}
// xids is immutable. This function creates a new xids by copying the
// parent's list and then appending the xid of the new transaction.
int
xids_create_child(XIDS parent_xids, // xids list for parent transaction
XIDS * xids_p, // xids list created
TXNID this_xid) { // xid of this transaction (new innermost)
int rval;
assert(parent_xids);
assert(this_xid > xids_get_innermost_xid(parent_xids));
u_int8_t num_stored_xids = parent_xids->num_stored_xids + 1;
u_int8_t num_xids = num_stored_xids + 1;
assert(num_xids > 0);
assert(num_xids <= MAX_TRANSACTION_RECORDS);
if (num_xids == MAX_TRANSACTION_RECORDS) rval = EINVAL;
else {
XIDS xids = toku_malloc(sizeof(*xids) + num_stored_xids*sizeof(xids->ids[0]));
if (!xids) rval = ENOMEM;
else {
xids->num_stored_xids = num_stored_xids;
memcpy(xids->ids,
parent_xids->ids,
parent_xids->num_stored_xids*sizeof(parent_xids->ids[0]));
TXNID this_xid_disk = toku_htod64(this_xid);
xids->ids[num_stored_xids-1] = this_xid_disk;
*xids_p = xids;
rval = 0;
}
}
return rval;
}
void
xids_create_from_buffer(struct rbuf *rb, // xids list for parent transaction
XIDS * xids_p) { // xids list created
u_int8_t num_stored_xids = rbuf_char(rb);
u_int8_t num_xids = num_stored_xids + 1;
assert(num_xids > 0);
assert(num_xids < MAX_TRANSACTION_RECORDS);
XIDS xids = toku_xmalloc(sizeof(*xids) + num_stored_xids*sizeof(xids->ids[0]));
xids->num_stored_xids = num_stored_xids;
u_int8_t index;
for (index = 0; index < xids->num_stored_xids; index++) {
rbuf_TXNID(rb, &xids->ids[index]);
if (index > 0)
assert(xids->ids[index] > xids->ids[index-1]);
}
*xids_p = xids;
}
void
xids_destroy(XIDS *xids_p) {
if (*xids_p != xids_get_root_xids()) toku_free(*xids_p);
*xids_p = NULL;
}
// Return xid at requested position.
// If requesting an xid out of range (which will be the case if xids array is empty)
// then return 0, the xid of the root transaction.
TXNID
xids_get_xid(XIDS xids, u_int8_t index) {
TXNID rval = 0;
if (index > 0) {
assert(index < xids_get_num_xids(xids));
rval = xids->ids[index-1];
rval = toku_dtoh64(rval);
}
return rval;
}
// This function assumes that target_xid IS in the list
// of xids.
u_int8_t
xids_find_index_of_xid(XIDS xids, TXNID target_xid) {
u_int8_t index = 0; // search outer to inner
TXNID current_xid = xids_get_xid(xids, index);
while (current_xid != target_xid) {
assert(current_xid < target_xid);
index++;
current_xid = xids_get_xid(xids, index); // Next inner txnid in xids.
}
return index;
}
u_int8_t
xids_get_num_xids(XIDS xids) {
u_int8_t rval = xids->num_stored_xids+1; //+1 for the id of 0 made explicit by xids<> accessors
return rval;
}
// Return innermost xid
TXNID
xids_get_innermost_xid(XIDS xids) {
TXNID rval = xids_get_xid(xids, xids_get_num_xids(xids)-1);
return rval;
}
void
xids_cpy(XIDS target, XIDS source) {
size_t size = xids_get_size(source);
memcpy(target, source, size);
}
// return size in bytes
u_int32_t
xids_get_size(XIDS xids){
u_int32_t rval;
u_int8_t num_stored_xids = xids->num_stored_xids;
rval = sizeof(*xids) + num_stored_xids * sizeof(xids->ids[0]);
return rval;
};
u_int32_t
xids_get_serialize_size(XIDS xids){
u_int32_t rval;
u_int8_t num_stored_xids = xids->num_stored_xids;
rval = 1 + //num stored xids
8 * num_stored_xids;
return rval;
};
void
toku_calc_more_murmur_xids (struct x1764 *mm, XIDS xids) {
x1764_add(mm, &xids->num_stored_xids, 1);
u_int8_t index;
u_int8_t num_xids = xids_get_num_xids(xids);
for (index = 0; index < num_xids; index++) {
TXNID current_xid = xids_get_xid(xids, index);
x1764_add(mm, &current_xid, 8);
}
}
unsigned char *
xids_get_end_of_array(XIDS xids) {
TXNID *r = xids->ids + xids->num_stored_xids;
return (unsigned char*)r;
}
void wbuf_xids(struct wbuf *wb, XIDS xids) {
wbuf_char(wb, (unsigned char)xids->num_stored_xids);
u_int8_t index;
for (index = 0; index < xids->num_stored_xids; index++) {
wbuf_TXNID(wb, xids->ids[index]);
}
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
/* Purpose of this file is to provide the world with everything necessary
* to use the xids and nothing else.
* Internal requirements of the xids logic do not belong here.
*
* xids is (abstractly) an immutable list of nested transaction ids, accessed only
* via the functions in this file.
*
* See design documentation for nested transactions at
* TokuWiki/Imp/TransactionsOverview.
*/
#ifndef XIDS_H
#define XIDS_H
#include "x1764.h"
#include "rbuf.h"
#include "wbuf.h"
/* The number of transaction ids stored in the xids structure is
* represented by an 8-bit value. The value 255 is reserved.
* The constant MAX_NESTED_TRANSACTIONS is one less because
* one slot in the packed leaf entry is used for the implicit
* root transaction (id 0).
*/
enum {MAX_NESTED_TRANSACTIONS = 253};
enum {MAX_TRANSACTION_RECORDS = MAX_NESTED_TRANSACTIONS + 1};
//Retrieve an XIDS representing the root transaction.
XIDS xids_get_root_xids(void);
void xids_cpy(XIDS target, XIDS source);
//Creates an XIDS representing this transaction.
//You must pass in an XIDS representing the parent of this transaction.
int xids_create_child(XIDS parent_xids, XIDS *xids_p, TXNID this_xid);
void xids_create_from_buffer(struct rbuf *rb, XIDS * xids_p);
void xids_destroy(XIDS *xids_p);
TXNID xids_get_xid(XIDS xids, u_int8_t index);
u_int8_t xids_find_index_of_xid(XIDS xids, TXNID target_xid);
u_int8_t xids_get_num_xids(XIDS xids);
TXNID xids_get_innermost_xid(XIDS xids);
// return size in bytes
u_int32_t xids_get_size(XIDS xids);
u_int32_t xids_get_serialize_size(XIDS xids);
void toku_calc_more_murmur_xids (struct x1764 *mm, XIDS xids);
unsigned char *xids_get_end_of_array(XIDS xids);
void wbuf_xids(struct wbuf *wb, XIDS xids);
#endif
......@@ -137,8 +137,6 @@ TDB_TESTS_THAT_SHOULD_FAIL= \
test_groupcommit_count \
test944 \
test_truncate_txn_abort \
test_txn_nested_abort3 \
test_txn_nested_abort4 \
#\ ends prev line
ifneq ($(OS_CHOICE),windows)
TDB_TESTS_THAT_SHOULD_FAIL+= \
......
......@@ -111,7 +111,12 @@ test_main(int argc, char *argv[]) {
DB_ENV *env;
r = db_env_create(&env, 0); assert(r == 0);
r = env->set_cachesize(env, 0, 8000000, 1); assert(r == 0);
//r = env->set_cachesize(env, 0, 8000000, 1); assert(r == 0); //Prior to nested transactions
//This ran incredibly slow with nested transactions. I believe it makes sense to do the following:
//a node is 4MiB. Nodes can become overfull. If you can't have two nodes in memory, you thrash,
//So support 2 nodes plus a bit of wiggle room.
//r = env->set_cachesize(env, 0, (8<<20) + (1<<8), 1); assert(r == 0); //As of [13075] this is enough to hold the 2 nodes/run fast
r = env->set_cachesize(env, 0, (9<<20), 1); assert(r == 0);
r = env->open(env, ENVDIR, DB_CREATE + DB_THREAD + DB_PRIVATE + DB_INIT_MPOOL + DB_INIT_LOCK, S_IRWXU+S_IRWXG+S_IRWXO); assert(r == 0);
DB *db;
......
......@@ -92,6 +92,21 @@ void toku_ydb_unlock(void);
/** Handle a panicked database: return EINVAL if the database env is panicked */
#define HANDLE_PANICKED_DB(db) HANDLE_PANICKED_ENV(db->dbenv)
/** Handle a transaction that has a child: return EINVAL if the transaction tries to do any work.
Only commit/abort/prelock (which are used by handlerton) are allowed when a child exists. */
#define HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn) \
RAISE_COND_EXCEPTION(((txn) && db_txn_struct_i(txn)->child), \
toku_ydb_do_error((env), \
EINVAL, \
"%s: Transaction cannot do work when child exists", __FUNCTION__))
#define HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn) \
HANDLE_ILLEGAL_WORKING_PARENT_TXN((db)->dbenv, txn)
#define HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c) \
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN((c)->dbp, dbc_struct_i(c)->txn)
/* */
void toku_ydb_error_all_cases(const DB_ENV * env,
int error,
......
This diff is collapsed.
* One
* two
1. FOO
a. sock
a. pizza
2. elephant
[[Include(source:toku/tokudb.1125/test2.wiki,wiki)]]
1. these
1. lines
1. from
1. test2.wiki
......@@ -184,6 +184,9 @@ VGRIND=valgrind --quiet --error-exitcode=1 --leak-check=full --show-reachable=ye
ifeq ($(DB_ATTACH),1)
VGRIND+=--db-attach=yes
endif
ifeq ($(TRACK_ORIGINS),1)
VGRIND+=--track-origins=yes
endif
HGRIND=valgrind --quiet --tool=helgrind --error-exitcode=1
......
......@@ -47,6 +47,16 @@ static const int64_t toku_byte_order_host = 0x0102030405060708LL;
#endif
#if DISK_BYTE_ORDER == HOST_BYTE_ORDER
static inline uint64_t
toku_dtoh64(uint64_t i) {
return i;
}
static inline uint64_t
toku_htod64(uint64_t i) {
return i;
}
static inline uint32_t
toku_dtoh32(uint32_t i) {
return i;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment