Commit b6c03d45 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1125 Merged nested transactions from temporary merge branch into main.

Current tests fail (not regressions, they fail as of 13461)
 * {{{x1.tdbrun}}}
 * {{{test_log(2,3,4,5,6,7,8,9,10).recover}}}
 * {{{test-recover(1,2,3).tdbrun}}}
 * {{{test1324.tdbrun}}}
ULE_DEBUG disabled (defined to 0)  Can be re-enabled for test purposes (set to 1).
refs [t:1125]
Merging into the temp branch (tokudb.main_13461+1125)
{{{svn merge --accept=postpone -r 12527:13461 ../tokudb.1125 ./}}}

Merging into main
{{{svn merge --accept=postpone -r13462:13463 ../tokudb.main_13461+1125/ ./}}}


git-svn-id: file:///svn/toku/tokudb@13464 c7de825b-a66e-492c-adef-691d508d4ae1
parent 19e7b929
...@@ -182,9 +182,20 @@ QUIET_BENCH_ARG= ...@@ -182,9 +182,20 @@ QUIET_BENCH_ARG=
rm -rf $@ rm -rf $@
$(BENCH_TIME) ./$< --env $@ $(QUIET_BENCH_ARG) $(EXTRA_BENCH_ARGS) $(BENCH_TIME) ./$< --env $@ $(QUIET_BENCH_ARG) $(EXTRA_BENCH_ARGS)
no-txn.benchmark.dir: EXTRA_BENCH_ARGS= DB_TYPES = no-txn txn abort child child_abort child_abortfirst txn1 abort1 child1 child-abort1 child_abortfirst1
txn.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock
abort.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --abort no-txn.benchmark.dir: EXTRA_BENCH_ARGS=
txn.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock
abort.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --abort
child.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --singlex-child
child-abort.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --abort --singlex-child
child-abortfirst.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --abort --singlex-child --finish-child-first
txn1.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --insert1first
abort1.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --abort --insert1first
child1.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --singlex-child --insert1first
child-abort1.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --abort --singlex-child --insert1first
child-abortfirst1.benchmark.dir: EXTRA_BENCH_ARGS= -x --singlex --prelock --abort --singlex-child --finish-child-first --insert1first
QUIET_SCAN_ARG= QUIET_SCAN_ARG=
SCANSCAN_ARGS=--lwc --prelock --prelockflag --cachesize 268435456 # scanscan default, cache of windows (256MB) SCANSCAN_ARGS=--lwc --prelock --prelockflag --cachesize 268435456 # scanscan default, cache of windows (256MB)
...@@ -210,6 +221,7 @@ SCANSCAN_TDB = scanscan-tokudb$(BINSUF) ...@@ -210,6 +221,7 @@ SCANSCAN_TDB = scanscan-tokudb$(BINSUF)
%.flattenedscan.dir: QUIET_SCAN_ARGS=-q %.flattenedscan.dir: QUIET_SCAN_ARGS=-q
%.flattenedscan.dir: BENCH_TIME= %.flattenedscan.dir: BENCH_TIME=
%.flattenedscan.dir: $(SCANSCAN_TDB) %.benchmark.dir %.flattenedscan.dir: $(SCANSCAN_TDB) %.benchmark.dir
rm -rf $@
cp -R $*.benchmark.dir $@ cp -R $*.benchmark.dir $@
./$< --env $@ $(SCANSCAN_ARGS) $(QUIET_SCAN_ARG) ./$< --env $@ $(SCANSCAN_ARGS) $(QUIET_SCAN_ARG)
...@@ -217,3 +229,5 @@ SCANSCAN_TDB = scanscan-tokudb$(BINSUF) ...@@ -217,3 +229,5 @@ SCANSCAN_TDB = scanscan-tokudb$(BINSUF)
%.flattenedscan: $(SCANSCAN_TDB) %.flattenedscan.dir %.flattenedscan: $(SCANSCAN_TDB) %.flattenedscan.dir
$(SCAN_TIME) ./$< --env $@.dir $(SCANSCAN_ARGS)$(QUIET_SCAN_ARG) $(SCAN_TIME) ./$< --env $@.dir $(SCANSCAN_ARGS)$(QUIET_SCAN_ARG)
.SECONDARY: $(patsubst %,%.flattenedscan.dir, $(DB_TYPES))
...@@ -45,6 +45,8 @@ int prelock = 0; ...@@ -45,6 +45,8 @@ int prelock = 0;
int prelockflag = 0; int prelockflag = 0;
int items_per_transaction = DEFAULT_ITEMS_PER_TRANSACTION; int items_per_transaction = DEFAULT_ITEMS_PER_TRANSACTION;
int items_per_iteration = DEFAULT_ITEMS_TO_INSERT_PER_ITERATION; int items_per_iteration = DEFAULT_ITEMS_TO_INSERT_PER_ITERATION;
int finish_child_first = 0; // Commit or abort child first (before doing so to the parent). No effect if child does not exist.
int singlex_child = 0; // Do a single transaction, but do all work with a child
int singlex = 0; // Do a single transaction int singlex = 0; // Do a single transaction
int singlex_create = 0; // Create the db using the single transaction (only valid if singlex) int singlex_create = 0; // Create the db using the single transaction (only valid if singlex)
int insert1first = 0; // insert 1 before doing the rest int insert1first = 0; // insert 1 before doing the rest
...@@ -79,6 +81,7 @@ char *dbname; ...@@ -79,6 +81,7 @@ char *dbname;
DB_ENV *dbenv; DB_ENV *dbenv;
DB *db; DB *db;
DB_TXN *parenttid=0;
DB_TXN *tid=0; DB_TXN *tid=0;
...@@ -152,27 +155,37 @@ static void benchmark_setup (void) { ...@@ -152,27 +155,37 @@ static void benchmark_setup (void) {
if (do_transactions) { if (do_transactions) {
r=tid->commit(tid, 0); r=tid->commit(tid, 0);
assert(r==0); assert(r==0);
tid = NULL;
r=dbenv->txn_begin(dbenv, 0, &tid, 0); CKERR(r); r=dbenv->txn_begin(dbenv, 0, &tid, 0); CKERR(r);
} }
insert(-1); insert(-1);
if (singlex) { if (singlex) {
r=tid->commit(tid, 0); r=tid->commit(tid, 0);
assert(r==0); assert(r==0);
tid = NULL;
r=dbenv->txn_begin(dbenv, 0, &tid, 0); CKERR(r); r=dbenv->txn_begin(dbenv, 0, &tid, 0); CKERR(r);
} }
} }
else if (singlex && !singlex_create) { else if (singlex && !singlex_create) {
r=tid->commit(tid, 0); r=tid->commit(tid, 0);
assert(r==0); assert(r==0);
tid = NULL;
r=dbenv->txn_begin(dbenv, 0, &tid, 0); CKERR(r); r=dbenv->txn_begin(dbenv, 0, &tid, 0); CKERR(r);
} }
if (do_transactions) { if (do_transactions) {
if (singlex) do_prelock(db, tid); if (singlex)
do_prelock(db, tid);
else { else {
r=tid->commit(tid, 0); r=tid->commit(tid, 0);
assert(r==0); assert(r==0);
tid = NULL;
} }
} }
if (singlex_child) {
parenttid = tid;
tid = NULL;
r=dbenv->txn_begin(dbenv, parenttid, &tid, 0); CKERR(r);
}
} }
...@@ -187,15 +200,34 @@ static void benchmark_shutdown (void) { ...@@ -187,15 +200,34 @@ static void benchmark_shutdown (void) {
#endif #endif
if (do_transactions && singlex && !insert1first && (singlex_create || prelock)) { if (do_transactions && singlex && !insert1first && (singlex_create || prelock)) {
#if defined(TOKUDB) #if defined(TOKUDB)
//There should be a single 'truncate' in the rolltmp instead of many 'insert' entries.
struct txn_stat *s; struct txn_stat *s;
r = tid->txn_stat(tid, &s); r = tid->txn_stat(tid, &s);
assert(r==0); assert(r==0);
assert(s->rolltmp_raw_count < 100); //TODO: #1125 Always do the test after performance testing is done.
if (singlex_child) fprintf(stderr, "SKIPPED 'small rolltmp' test for child txn\n");
else
assert(s->rolltmp_raw_count < 100); // gross test, not worth investigating details
os_free(s); os_free(s);
//system("ls -l bench.tokudb"); //system("ls -l bench.tokudb");
#endif #endif
r = (do_abort ? tid->abort(tid) : tid->commit(tid, 0)); assert(r==0);
} }
if (do_transactions && singlex) {
if (!singlex_child || finish_child_first) {
assert(tid);
r = (do_abort ? tid->abort(tid) : tid->commit(tid, 0)); assert(r==0);
tid = NULL;
}
if (singlex_child) {
assert(parenttid);
r = (do_abort ? parenttid->abort(parenttid) : parenttid->commit(parenttid, 0)); assert(r==0);
parenttid = NULL;
}
else
assert(!parenttid);
}
assert(!tid);
assert(!parenttid);
r = db->close(db, 0); r = db->close(db, 0);
assert(r == 0); assert(r == 0);
...@@ -240,6 +272,7 @@ static void insert (long long v) { ...@@ -240,6 +272,7 @@ static void insert (long long v) {
if (n_insertions_since_txn_began>=items_per_transaction && !singlex) { if (n_insertions_since_txn_began>=items_per_transaction && !singlex) {
n_insertions_since_txn_began=0; n_insertions_since_txn_began=0;
r = tid->commit(tid, 0); assert(r==0); r = tid->commit(tid, 0); assert(r==0);
tid = NULL;
r=dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0); r=dbenv->txn_begin(dbenv, 0, &tid, 0); assert(r==0);
do_prelock(db, tid); do_prelock(db, tid);
n_insertions_since_txn_began=0; n_insertions_since_txn_began=0;
...@@ -265,7 +298,7 @@ static void serial_insert_from (long long from) { ...@@ -265,7 +298,7 @@ static void serial_insert_from (long long from) {
} }
if (do_transactions && !singlex) { if (do_transactions && !singlex) {
int r= tid->commit(tid, 0); assert(r==0); int r= tid->commit(tid, 0); assert(r==0);
tid=0; tid=NULL;
} }
} }
...@@ -284,7 +317,7 @@ static void random_insert_below (long long below) { ...@@ -284,7 +317,7 @@ static void random_insert_below (long long below) {
} }
if (do_transactions && !singlex) { if (do_transactions && !singlex) {
int r= tid->commit(tid, 0); assert(r==0); int r= tid->commit(tid, 0); assert(r==0);
tid=0; tid=NULL;
} }
} }
...@@ -328,6 +361,8 @@ static int print_usage (const char *argv0) { ...@@ -328,6 +361,8 @@ static int print_usage (const char *argv0) {
fprintf(stderr, " --compressibility C creates data that should compress by about a factor C. Default C is large. C is an float.\n"); fprintf(stderr, " --compressibility C creates data that should compress by about a factor C. Default C is large. C is an float.\n");
fprintf(stderr, " --xcount N how many insertions per transaction (default=%d)\n", DEFAULT_ITEMS_PER_TRANSACTION); fprintf(stderr, " --xcount N how many insertions per transaction (default=%d)\n", DEFAULT_ITEMS_PER_TRANSACTION);
fprintf(stderr, " --singlex (implies -x) Run the whole job as a single transaction. (Default don't run as a single transaction.)\n"); fprintf(stderr, " --singlex (implies -x) Run the whole job as a single transaction. (Default don't run as a single transaction.)\n");
fprintf(stderr, " --singlex-child (implies -x) Run the whole job as a single transaction, do all work a child of that transaction.\n");
fprintf(stderr, " --finish-child-first Commit/abort child before doing so to parent (no effect if no child).\n");
fprintf(stderr, " --singlex-create (implies --singlex) Create the file using the single transaction (Default is to use a different transaction to create.)\n"); fprintf(stderr, " --singlex-create (implies --singlex) Create the file using the single transaction (Default is to use a different transaction to create.)\n");
fprintf(stderr, " --check_small_rolltmp (Only valid in --singlex mode) Verify that very little data was saved in the rollback logs.\n"); fprintf(stderr, " --check_small_rolltmp (Only valid in --singlex mode) Verify that very little data was saved in the rollback logs.\n");
fprintf(stderr, " --prelock Prelock the database.\n"); fprintf(stderr, " --prelock Prelock the database.\n");
...@@ -405,6 +440,12 @@ int main (int argc, const char *argv[]) { ...@@ -405,6 +440,12 @@ int main (int argc, const char *argv[]) {
do_transactions = 1; do_transactions = 1;
singlex = 1; singlex = 1;
singlex_create = 1; singlex_create = 1;
} else if (strcmp(arg, "--finish-child-first") == 0) {
finish_child_first = 1;
} else if (strcmp(arg, "--singlex-child") == 0) {
do_transactions = 1;
singlex = 1;
singlex_child = 1;
} else if (strcmp(arg, "--singlex") == 0) { } else if (strcmp(arg, "--singlex") == 0) {
do_transactions = 1; do_transactions = 1;
singlex = 1; singlex = 1;
......
...@@ -14,9 +14,9 @@ struct simple_dbt { ...@@ -14,9 +14,9 @@ struct simple_dbt {
struct __toku_db_txn_internal { struct __toku_db_txn_internal {
//TXNID txnid64; /* A sixty-four bit txn id. */ //TXNID txnid64; /* A sixty-four bit txn id. */
struct tokutxn *tokutxn; struct tokutxn *tokutxn;
struct __toku_lth *lth; struct __toku_lth *lth; //Hash table holding list of dictionaries this txn has touched
u_int32_t flags; u_int32_t flags;
DB_TXN *child, *next, *prev; DB_TXN *child;
}; };
struct __toku_dbc_internal { struct __toku_dbc_internal {
......
...@@ -42,10 +42,12 @@ BRT_SOURCES = \ ...@@ -42,10 +42,12 @@ BRT_SOURCES = \
brt-serialize \ brt-serialize \
brt-verify \ brt-verify \
brt \ brt \
brt_msg \
brt-test-helpers \ brt-test-helpers \
cachetable \ cachetable \
checkpoint \ checkpoint \
fifo \ fifo \
fifo_msg \
fingerprint \ fingerprint \
key \ key \
leafentry \ leafentry \
...@@ -60,11 +62,13 @@ BRT_SOURCES = \ ...@@ -60,11 +62,13 @@ BRT_SOURCES = \
recover \ recover \
roll \ roll \
rollback \ rollback \
ule \
threadpool \ threadpool \
toku_worker \ toku_worker \
trace_mem \ trace_mem \
txn \ txn \
x1764 \ x1764 \
xids \
ybt \ ybt \
# keep this line so I can have a \ on the previous line # keep this line so I can have a \ on the previous line
......
...@@ -26,10 +26,8 @@ typedef void *OMTVALUE; ...@@ -26,10 +26,8 @@ typedef void *OMTVALUE;
enum { TREE_FANOUT = BRT_FANOUT }; enum { TREE_FANOUT = BRT_FANOUT };
enum { KEY_VALUE_OVERHEAD = 8 }; /* Must store the two lengths. */ enum { KEY_VALUE_OVERHEAD = 8 }; /* Must store the two lengths. */
enum { OMT_ITEM_OVERHEAD = 0 }; /* No overhead for the OMT item. The PMA needed to know the idx, but the OMT doesn't. */ enum { OMT_ITEM_OVERHEAD = 0 }; /* No overhead for the OMT item. The PMA needed to know the idx, but the OMT doesn't. */
enum { BRT_CMD_OVERHEAD = (1 // the type enum { BRT_CMD_OVERHEAD = (1) // the type
+ 8) // the xid
}; };
enum { LE_OVERHEAD_BOUND = 9 }; // the type and xid
enum { BRT_DEFAULT_NODE_SIZE = 1 << 22 }; enum { BRT_DEFAULT_NODE_SIZE = 1 << 22 };
...@@ -246,7 +244,7 @@ static const BRTNODE null_brtnode=0; ...@@ -246,7 +244,7 @@ static const BRTNODE null_brtnode=0;
//extern u_int32_t toku_calccrc32_kvpair (const void *key, int keylen, const void *val, int vallen); //extern u_int32_t toku_calccrc32_kvpair (const void *key, int keylen, const void *val, int vallen);
//extern u_int32_t toku_calccrc32_kvpair_struct (const struct kv_pair *kvp); //extern u_int32_t toku_calccrc32_kvpair_struct (const struct kv_pair *kvp);
extern u_int32_t toku_calc_fingerprint_cmd (u_int32_t type, TXNID xid, const void *key, u_int32_t keylen, const void *val, u_int32_t vallen); extern u_int32_t toku_calc_fingerprint_cmd (u_int32_t type, XIDS xids, const void *key, u_int32_t keylen, const void *val, u_int32_t vallen);
extern u_int32_t toku_calc_fingerprint_cmdstruct (BRT_CMD cmd); extern u_int32_t toku_calc_fingerprint_cmdstruct (BRT_CMD cmd);
// How long is the pivot key? // How long is the pivot key?
...@@ -322,6 +320,7 @@ enum brt_layout_version_e { ...@@ -322,6 +320,7 @@ enum brt_layout_version_e {
BRT_LAYOUT_VERSION_8 = 8, // Diff from 7 to 8: Use murmur instead of crc32. We are going to make a simplification and stop supporting version 7 and before. Current As of Beta 1.0.6 BRT_LAYOUT_VERSION_8 = 8, // Diff from 7 to 8: Use murmur instead of crc32. We are going to make a simplification and stop supporting version 7 and before. Current As of Beta 1.0.6
BRT_LAYOUT_VERSION_9 = 9, // Diff from 8 to 9: Variable-sized blocks and compression. BRT_LAYOUT_VERSION_9 = 9, // Diff from 8 to 9: Variable-sized blocks and compression.
BRT_LAYOUT_VERSION_10 = 10, // Diff from 9 to 10: Variable number of compressed sub-blocks per block, disk byte order == intel byte order, Subtree estimates instead of just leafentry estimates, translation table, dictionary descriptors, checksum in header, subdb support removed from brt layer BRT_LAYOUT_VERSION_10 = 10, // Diff from 9 to 10: Variable number of compressed sub-blocks per block, disk byte order == intel byte order, Subtree estimates instead of just leafentry estimates, translation table, dictionary descriptors, checksum in header, subdb support removed from brt layer
BRT_LAYOUT_VERSION_11 = 11, // Diff from 10 to 11: Nested transaction leafentries (completely redesigned). BRT_CMDs on disk now support XIDS (multiple txnids) instead of exactly one.
BRT_ANTEULTIMATE_VERSION, // the version after the most recent version BRT_ANTEULTIMATE_VERSION, // the version after the most recent version
BRT_LAYOUT_VERSION = BRT_ANTEULTIMATE_VERSION-1 // A hack so I don't have to change this line. BRT_LAYOUT_VERSION = BRT_ANTEULTIMATE_VERSION-1 // A hack so I don't have to change this line.
}; };
......
...@@ -188,10 +188,11 @@ static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) { ...@@ -188,10 +188,11 @@ static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) {
assert(0 <= n_buffers && n_buffers < TREE_FANOUT+1); assert(0 <= n_buffers && n_buffers < TREE_FANOUT+1);
for (i=0; i< n_buffers; i++) { for (i=0; i< n_buffers; i++) {
FIFO_ITERATE(BNC_BUFFER(node,i), FIFO_ITERATE(BNC_BUFFER(node,i),
key __attribute__((__unused__)), keylen, key, keylen,
data __attribute__((__unused__)), datalen, data __attribute__((__unused__)), datalen,
type __attribute__((__unused__)), xid __attribute__((__unused__)), type __attribute__((__unused__)), xids,
(hsize+=BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+keylen+datalen)); (hsize+=BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+keylen+datalen+
xids_get_serialize_size(xids)));
} }
assert(hsize==node->u.n.n_bytes_in_buffers); assert(hsize==node->u.n.n_bytes_in_buffers);
assert(csize==node->u.n.totalchildkeylens); assert(csize==node->u.n.totalchildkeylens);
...@@ -201,7 +202,7 @@ static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) { ...@@ -201,7 +202,7 @@ static unsigned int toku_serialize_brtnode_size_slow (BRTNODE node) {
toku_omt_iterate(node->u.l.buffer, toku_omt_iterate(node->u.l.buffer,
addupsize, addupsize,
&hsize); &hsize);
assert(hsize<=node->u.l.n_bytes_in_buffer); assert(hsize==node->u.l.n_bytes_in_buffer);
hsize+=4; /* add n entries in buffer table. */ hsize+=4; /* add n entries in buffer table. */
hsize+=3*8; /* add the three leaf stats, but no exact bit. */ hsize+=3*8; /* add the three leaf stats, but no exact bit. */
return size+hsize; return size+hsize;
...@@ -226,11 +227,11 @@ unsigned int toku_serialize_brtnode_size (BRTNODE node) { ...@@ -226,11 +227,11 @@ unsigned int toku_serialize_brtnode_size (BRTNODE node) {
result+=4; /* n_entries in buffer table. */ result+=4; /* n_entries in buffer table. */
result+=3*8; /* the three leaf stats. */ result+=3*8; /* the three leaf stats. */
result+=node->u.l.n_bytes_in_buffer; result+=node->u.l.n_bytes_in_buffer;
if (toku_memory_check) { }
unsigned int slowresult = toku_serialize_brtnode_size_slow(node); if (toku_memory_check) {
if (result!=slowresult) printf("%s:%d result=%u slowresult=%u\n", __FILE__, __LINE__, result, slowresult); unsigned int slowresult = toku_serialize_brtnode_size_slow(node);
assert(result==slowresult); if (result!=slowresult) printf("%s:%d result=%u slowresult=%u\n", __FILE__, __LINE__, result, slowresult);
} assert(result==slowresult);
} }
return result; return result;
} }
...@@ -408,14 +409,14 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -408,14 +409,14 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
for (i=0; i< n_buffers; i++) { for (i=0; i< n_buffers; i++) {
//printf("%s:%d p%d=%p n_entries=%d\n", __FILE__, __LINE__, i, node->mdicts[i], mdict_n_entries(node->mdicts[i])); //printf("%s:%d p%d=%p n_entries=%d\n", __FILE__, __LINE__, i, node->mdicts[i], mdict_n_entries(node->mdicts[i]));
wbuf_int(&w, toku_fifo_n_entries(BNC_BUFFER(node,i))); wbuf_int(&w, toku_fifo_n_entries(BNC_BUFFER(node,i)));
FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xid, FIFO_ITERATE(BNC_BUFFER(node,i), key, keylen, data, datalen, type, xids,
{ {
assert(type>=0 && type<256); assert(type>=0 && type<256);
wbuf_char(&w, (unsigned char)type); wbuf_char(&w, (unsigned char)type);
wbuf_TXNID(&w, xid); wbuf_xids(&w, xids);
wbuf_bytes(&w, key, keylen); wbuf_bytes(&w, key, keylen);
wbuf_bytes(&w, data, datalen); wbuf_bytes(&w, data, datalen);
check_local_fingerprint+=node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xid, key, keylen, data, datalen); check_local_fingerprint+=node->rand4fingerprint*toku_calc_fingerprint_cmd(type, xids, key, keylen, data, datalen);
}); });
} }
//printf("%s:%d check_local_fingerprint=%8x\n", __FILE__, __LINE__, check_local_fingerprint); //printf("%s:%d check_local_fingerprint=%8x\n", __FILE__, __LINE__, check_local_fingerprint);
...@@ -736,7 +737,7 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash ...@@ -736,7 +737,7 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash
result->layout_version = rbuf_int(&rc); result->layout_version = rbuf_int(&rc);
{ {
switch (result->layout_version) { switch (result->layout_version) {
case BRT_LAYOUT_VERSION_10: goto ok_layout_version; case BRT_LAYOUT_VERSION: goto ok_layout_version;
// Don't support older versions. // Don't support older versions.
} }
r=toku_db_badformat(); r=toku_db_badformat();
...@@ -826,19 +827,21 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash ...@@ -826,19 +827,21 @@ int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash
bytevec val; ITEMLEN vallen; bytevec val; ITEMLEN vallen;
//toku_verify_counts(result); //toku_verify_counts(result);
int type = rbuf_char(&rc); int type = rbuf_char(&rc);
TXNID xid = rbuf_ulonglong(&rc); XIDS xids;
xids_create_from_buffer(&rc, &xids);
rbuf_bytes(&rc, &key, &keylen); /* Returns a pointer into the rbuf. */ rbuf_bytes(&rc, &key, &keylen); /* Returns a pointer into the rbuf. */
rbuf_bytes(&rc, &val, &vallen); rbuf_bytes(&rc, &val, &vallen);
check_local_fingerprint += result->rand4fingerprint * toku_calc_fingerprint_cmd(type, xid, key, keylen, val, vallen); check_local_fingerprint += result->rand4fingerprint * toku_calc_fingerprint_cmd(type, xids, key, keylen, val, vallen);
//printf("Found %s,%s\n", (char*)key, (char*)val); //printf("Found %s,%s\n", (char*)key, (char*)val);
{ {
r=toku_fifo_enq(BNC_BUFFER(result, cnum), key, keylen, val, vallen, type, xid); /* Copies the data into the hash table. */ r=toku_fifo_enq(BNC_BUFFER(result, cnum), key, keylen, val, vallen, type, xids); /* Copies the data into the hash table. */
if (r!=0) { goto died_12; } if (r!=0) { goto died_12; }
} }
diff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD; diff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids);
result->u.n.n_bytes_in_buffers += diff; result->u.n.n_bytes_in_buffers += diff;
BNC_NBYTESINBUF(result,cnum) += diff; BNC_NBYTESINBUF(result,cnum) += diff;
//printf("Inserted\n"); //printf("Inserted\n");
xids_destroy(&xids);
} }
} }
if (check_local_fingerprint != result->local_fingerprint) { if (check_local_fingerprint != result->local_fingerprint) {
...@@ -977,6 +980,7 @@ serialize_brt_header_min_size (u_int32_t version) { ...@@ -977,6 +980,7 @@ serialize_brt_header_min_size (u_int32_t version) {
u_int32_t size; u_int32_t size;
switch(version) { switch(version) {
case BRT_LAYOUT_VERSION_10: case BRT_LAYOUT_VERSION_10:
case BRT_LAYOUT_VERSION_11:
size = (+8 // "tokudata" size = (+8 // "tokudata"
+4 // version +4 // version
+4 // size +4 // size
...@@ -1231,7 +1235,7 @@ deserialize_brtheader (int fd, struct rbuf *rb, struct brt_header **brth) { ...@@ -1231,7 +1235,7 @@ deserialize_brtheader (int fd, struct rbuf *rb, struct brt_header **brth) {
list_init(&h->zombie_brts); list_init(&h->zombie_brts);
//version MUST be in network order on disk regardless of disk order //version MUST be in network order on disk regardless of disk order
h->layout_version = rbuf_network_int(&rc); h->layout_version = rbuf_network_int(&rc);
assert(h->layout_version==BRT_LAYOUT_VERSION_10); assert(h->layout_version==BRT_LAYOUT_VERSION);
//Size MUST be in network order regardless of disk order. //Size MUST be in network order regardless of disk order.
u_int32_t size = rbuf_network_int(&rc); u_int32_t size = rbuf_network_int(&rc);
...@@ -1311,8 +1315,9 @@ deserialize_brtheader_from_fd_into_rbuf(int fd, toku_off_t offset, struct rbuf * ...@@ -1311,8 +1315,9 @@ deserialize_brtheader_from_fd_into_rbuf(int fd, toku_off_t offset, struct rbuf *
if (r==0) { if (r==0) {
//Version MUST be in network order regardless of disk order. //Version MUST be in network order regardless of disk order.
version = rbuf_network_int(rb); version = rbuf_network_int(rb);
if (version < BRT_LAYOUT_VERSION_10) r = TOKUDB_DICTIONARY_TOO_OLD; //Cannot use //TODO: #1125 Possibly support transparent upgrade. If so, it should be < ...10
if (version > BRT_LAYOUT_VERSION_10) r = TOKUDB_DICTIONARY_TOO_NEW; //Cannot use if (version < BRT_LAYOUT_VERSION) r = TOKUDB_DICTIONARY_TOO_OLD; //Cannot use
if (version > BRT_LAYOUT_VERSION) r = TOKUDB_DICTIONARY_TOO_NEW; //Cannot use
} }
u_int32_t size; u_int32_t size;
if (r==0) { if (r==0) {
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h" #include "includes.h"
#include "ule.h"
int toku_testsetup_leaf(BRT brt, BLOCKNUM *blocknum) { int toku_testsetup_leaf(BRT brt, BLOCKNUM *blocknum) {
BRTNODE node; BRTNODE node;
...@@ -74,15 +75,21 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke ...@@ -74,15 +75,21 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
toku_verify_counts(node); toku_verify_counts(node);
assert(node->height==0); assert(node->height==0);
u_int32_t lesize, disksize; size_t lesize, disksize;
LEAFENTRY leafentry; LEAFENTRY leafentry;
r = le_committed(keylen, key, vallen, val, &lesize, &disksize, &leafentry, node->u.l.buffer, &node->u.l.buffer_mempool, 0);
OMTVALUE storeddatav; OMTVALUE storeddatav;
u_int32_t idx; u_int32_t idx;
DBT keydbt,valdbt; DBT keydbt,valdbt;
BRT_CMD_S cmd = {BRT_INSERT, 0, .u.id={toku_fill_dbt(&keydbt, key, keylen), BRT_CMD_S cmd = {BRT_INSERT, xids_get_root_xids(),
toku_fill_dbt(&valdbt, val, vallen)}}; .u.id={toku_fill_dbt(&keydbt, key, keylen),
toku_fill_dbt(&valdbt, val, vallen)}};
//Generate a leafentry (committed insert key,val)
r = apply_msg_to_leafentry(&cmd, NULL, //No old leafentry
&lesize, &disksize, &leafentry,
node->u.l.buffer, &node->u.l.buffer_mempool, 0);
assert(r==0);
struct cmd_leafval_heaviside_extra be = {brt, &cmd, node->flags & TOKU_DB_DUPSORT}; struct cmd_leafval_heaviside_extra be = {brt, &cmd, node->flags & TOKU_DB_DUPSORT};
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_heaviside, &be, &storeddatav, &idx, NULL); r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_heaviside, &be, &storeddatav, &idx, NULL);
...@@ -127,12 +134,13 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_cmd_t ...@@ -127,12 +134,13 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_cmd_t
toku_fill_dbt(&v, val, vallen), toku_fill_dbt(&v, val, vallen),
brt); brt);
r = toku_fifo_enq(BNC_BUFFER(node, childnum), key, keylen, val, vallen, cmdtype, (TXNID)0); XIDS xids_0 = xids_get_root_xids();
r = toku_fifo_enq(BNC_BUFFER(node, childnum), key, keylen, val, vallen, cmdtype, xids_0);
assert(r==0); assert(r==0);
u_int32_t fdelta = node->rand4fingerprint * toku_calc_fingerprint_cmd(cmdtype, (TXNID)0, key, keylen, val, vallen); u_int32_t fdelta = node->rand4fingerprint * toku_calc_fingerprint_cmd(cmdtype, xids_0, key, keylen, val, vallen);
node->local_fingerprint += fdelta; node->local_fingerprint += fdelta;
*subtree_fingerprint += fdelta; *subtree_fingerprint += fdelta;
int sizediff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD; int sizediff = keylen + vallen + KEY_VALUE_OVERHEAD + BRT_CMD_OVERHEAD + xids_get_serialize_size(xids_0);
node->u.n.n_bytes_in_buffers += sizediff; node->u.n.n_bytes_in_buffers += sizediff;
BNC_NBYTESINBUF(node, childnum) += sizediff; BNC_NBYTESINBUF(node, childnum) += sizediff;
node->dirty = 1; node->dirty = 1;
......
...@@ -50,12 +50,12 @@ static int compare_pairs (BRT brt, struct kv_pair *a, struct kv_pair *b) { ...@@ -50,12 +50,12 @@ static int compare_pairs (BRT brt, struct kv_pair *a, struct kv_pair *b) {
static int compare_leafentries (BRT brt, LEAFENTRY a, LEAFENTRY b) { static int compare_leafentries (BRT brt, LEAFENTRY a, LEAFENTRY b) {
DBT x,y; DBT x,y;
int cmp = brt->compare_fun(brt->db, int cmp = brt->compare_fun(brt->db,
toku_fill_dbt(&x, le_any_key(a), le_any_keylen(a)), toku_fill_dbt(&x, le_key(a), le_keylen(a)),
toku_fill_dbt(&y, le_any_key(b), le_any_keylen(b))); toku_fill_dbt(&y, le_key(b), le_keylen(b)));
if (cmp==0 && (brt->flags & TOKU_DB_DUPSORT)) { if (cmp==0 && (brt->flags & TOKU_DB_DUPSORT)) {
cmp = brt->dup_compare(brt->db, cmp = brt->dup_compare(brt->db,
toku_fill_dbt(&x, le_any_val(a), le_any_vallen(a)), toku_fill_dbt(&x, le_innermost_inserted_val(a), le_innermost_inserted_vallen(a)),
toku_fill_dbt(&y, le_any_val(b), le_any_vallen(b))); toku_fill_dbt(&y, le_innermost_inserted_val(b), le_innermost_inserted_vallen(b)));
} }
return cmp; return cmp;
} }
...@@ -75,7 +75,7 @@ static void verify_pair (bytevec key, unsigned int keylen, ...@@ -75,7 +75,7 @@ static void verify_pair (bytevec key, unsigned int keylen,
bytevec data __attribute__((__unused__)), bytevec data __attribute__((__unused__)),
unsigned int datalen __attribute__((__unused__)), unsigned int datalen __attribute__((__unused__)),
int type __attribute__((__unused__)), int type __attribute__((__unused__)),
TXNID xid __attribute__((__unused__)), XIDS xids __attribute__((__unused__)),
void *arg) { void *arg) {
struct verify_pair_arg *vparg = (struct verify_pair_arg *)arg; struct verify_pair_arg *vparg = (struct verify_pair_arg *)arg;
BRT brt = vparg->brt; BRT brt = vparg->brt;
......
This diff is collapsed.
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#include <toku_portability.h>
#include "brttypes.h"
#include "xids.h"
#include "fifo_msg.h"
#include "brt_msg.h"
//BRT_MSG internals are in host order
//XIDS are not 'internal' to BRT_MSG
void
brt_msg_from_dbts(BRT_MSG brt_msg,
DBT *key, DBT *val,
XIDS xids, brt_msg_type type) {
brt_msg->u.id.key = key;
brt_msg->u.id.val = val;
brt_msg->xids = xids;
brt_msg->type = type;
}
//No conversion (from disk to host) is necessary
//Accessor functions for fifo return host order bytes.
#if 0
void
brt_msg_from_fifo_msg(BRT_MSG brt_msg, FIFO_MSG fifo_msg) {
brt_msg->keylen = fifo_msg_get_keylen(fifo_msg);
brt_msg->vallen = fifo_msg_get_vallen(fifo_msg);
brt_msg->vallen = fifo_msg_get_vallen(fifo_msg);
brt_msg->key = fifo_msg_get_key(fifo_msg);
brt_msg->val = fifo_msg_get_val(fifo_msg);
brt_msg->xids = fifo_msg_get_xids(fifo_msg);
brt_msg->type = fifo_msg_get_type(fifo_msg);
}
#endif
u_int32_t
brt_msg_get_keylen(BRT_MSG brt_msg) {
u_int32_t rval = brt_msg->u.id.key->size;
return rval;
}
u_int32_t
brt_msg_get_vallen(BRT_MSG brt_msg) {
u_int32_t rval = brt_msg->u.id.val->size;
return rval;
}
XIDS
brt_msg_get_xids(BRT_MSG brt_msg) {
XIDS rval = brt_msg->xids;
return rval;
}
void *
brt_msg_get_key(BRT_MSG brt_msg) {
void * rval = brt_msg->u.id.key->data;
return rval;
}
void *
brt_msg_get_val(BRT_MSG brt_msg) {
void * rval = brt_msg->u.id.val->data;
return rval;
}
brt_msg_type
brt_msg_get_type(BRT_MSG brt_msg) {
brt_msg_type rval = brt_msg->type;
return rval;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
/* The purpose of this file is to provide access to the brt_msg,
* which is the ephemeral version of the fifo_msg.
*/
#ifndef BRT_MSG_H
#define BRT_MSG_H
u_int32_t brt_msg_get_keylen(BRT_MSG brt_msg);
u_int32_t brt_msg_get_vallen(BRT_MSG brt_msg);
XIDS brt_msg_get_xids(BRT_MSG brt_msg);
void * brt_msg_get_key(BRT_MSG brt_msg);
void * brt_msg_get_val(BRT_MSG brt_msg);
brt_msg_type brt_msg_get_type(BRT_MSG brt_msg);
void brt_msg_from_fifo_msg(BRT_MSG brt_msg, FIFO_MSG fifo_msg);
void brt_msg_from_dbts(BRT_MSG brt_msg, DBT *key, DBT *val, XIDS xids, brt_msg_type type);
#endif
...@@ -100,7 +100,7 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) { ...@@ -100,7 +100,7 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
printf(" child %d: %" PRId64 "\n", i, BNC_BLOCKNUM(n, i).b); printf(" child %d: %" PRId64 "\n", i, BNC_BLOCKNUM(n, i).b);
printf(" buffer contains %u bytes (%d items)\n", BNC_NBYTESINBUF(n, i), toku_fifo_n_entries(BNC_BUFFER(n,i))); printf(" buffer contains %u bytes (%d items)\n", BNC_NBYTESINBUF(n, i), toku_fifo_n_entries(BNC_BUFFER(n,i)));
if (dump_data) { if (dump_data) {
FIFO_ITERATE(BNC_BUFFER(n,i), key, keylen, data, datalen, typ, xid, FIFO_ITERATE(BNC_BUFFER(n,i), key, keylen, data, datalen, typ, xids,
{ {
printf(" TYPE="); printf(" TYPE=");
switch ((enum brt_cmd_type)typ) { switch ((enum brt_cmd_type)typ) {
...@@ -115,7 +115,7 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) { ...@@ -115,7 +115,7 @@ dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
} }
printf("HUH?"); printf("HUH?");
ok: ok:
printf(" xid=%"PRIu64" ", xid); printf(" xid=%"PRIu64" ", xids_get_innermost_xid(xids));
print_item(key, keylen); print_item(key, keylen);
if (datalen>0) { if (datalen>0) {
printf(" "); printf(" ");
......
...@@ -90,10 +90,12 @@ enum brt_cmd_type { ...@@ -90,10 +90,12 @@ enum brt_cmd_type {
BRT_COMMIT_BOTH = 7 BRT_COMMIT_BOTH = 7
}; };
typedef struct xids_t *XIDS;
typedef struct fifo_msg_t *FIFO_MSG;
/* tree commands */ /* tree commands */
struct brt_cmd { struct brt_cmd {
enum brt_cmd_type type; enum brt_cmd_type type;
TXNID xid; XIDS xids;
union { union {
/* insert or delete */ /* insert or delete */
struct brt_cmd_insert_delete { struct brt_cmd_insert_delete {
...@@ -104,17 +106,15 @@ struct brt_cmd { ...@@ -104,17 +106,15 @@ struct brt_cmd {
}; };
typedef struct brt_cmd BRT_CMD_S, *BRT_CMD; typedef struct brt_cmd BRT_CMD_S, *BRT_CMD;
#if !defined(__cplusplus) // TODO: replace brt_cmd_type when ready
static inline typedef enum brt_cmd_type brt_msg_type;
BRT_CMD_S // Message sent into brt to implement command (insert, delete, etc.)
build_brt_cmd (enum brt_cmd_type type, TXNID xid, DBT *key, DBT *val) { // This structure supports nested transactions, and obsoletes brt_cmd.
BRT_CMD_S result = {type, xid, .u.id={key,val}}; typedef struct brt_cmd BRT_MSG_S, *BRT_MSG;
return result;
}
#endif
#define UU(x) x __attribute__((__unused__))
typedef struct leafentry *LEAFENTRY;
#define UU(x) x __attribute__((__unused__))
#endif #endif
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "includes.h" #include "includes.h"
#include "xids.h"
struct fifo { struct fifo {
int n_items_in_fifo; int n_items_in_fifo;
...@@ -22,7 +23,9 @@ static void fifo_init(struct fifo *fifo) { ...@@ -22,7 +23,9 @@ static void fifo_init(struct fifo *fifo) {
} }
static int fifo_entry_size(struct fifo_entry *entry) { static int fifo_entry_size(struct fifo_entry *entry) {
return sizeof (struct fifo_entry) + entry->keylen + entry->vallen; return sizeof (struct fifo_entry) + entry->keylen + entry->vallen
+ xids_get_size(&entry->xids_s)
- sizeof(XIDS_S); //Prevent double counting from fifo_entry+xids_get_size
} }
static struct fifo_entry *fifo_peek(struct fifo *fifo) { static struct fifo_entry *fifo_peek(struct fifo *fifo) {
...@@ -59,8 +62,11 @@ static int next_power_of_two (int n) { ...@@ -59,8 +62,11 @@ static int next_power_of_two (int n) {
return r; return r;
} }
int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, int type, TXNID xid) { int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *data, unsigned int datalen, int type, XIDS xids) {
int need_space_here = sizeof(struct fifo_entry) + keylen + datalen; int need_space_here = sizeof(struct fifo_entry)
+ keylen + datalen
+ xids_get_size(xids)
- sizeof(XIDS_S); //Prevent double counting
int need_space_total = fifo->memory_used+need_space_here; int need_space_total = fifo->memory_used+need_space_here;
if (fifo->memory == NULL) { if (fifo->memory == NULL) {
fifo->memory_size = next_power_of_two(need_space_total); fifo->memory_size = next_power_of_two(need_space_total);
...@@ -88,30 +94,32 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d ...@@ -88,30 +94,32 @@ int toku_fifo_enq(FIFO fifo, const void *key, unsigned int keylen, const void *d
} }
struct fifo_entry *entry = (struct fifo_entry *)(fifo->memory + fifo->memory_start + fifo->memory_used); struct fifo_entry *entry = (struct fifo_entry *)(fifo->memory + fifo->memory_start + fifo->memory_used);
entry->type = (unsigned char)type; entry->type = (unsigned char)type;
entry->xid = xid; xids_cpy(&entry->xids_s, xids);
entry->keylen = keylen; entry->keylen = keylen;
memcpy(entry->key, key, keylen); unsigned char *e_key = xids_get_end_of_array(&entry->xids_s);
memcpy(e_key, key, keylen);
entry->vallen = datalen; entry->vallen = datalen;
memcpy(entry->key + keylen, data, datalen); memcpy(e_key + keylen, data, datalen);
fifo->n_items_in_fifo++; fifo->n_items_in_fifo++;
fifo->memory_used += need_space_here; fifo->memory_used += need_space_here;
return 0; return 0;
} }
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_CMD cmd) { int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_CMD cmd) {
return toku_fifo_enq(fifo, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->type, cmd->xid); return toku_fifo_enq(fifo, cmd->u.id.key->data, cmd->u.id.key->size, cmd->u.id.val->data, cmd->u.id.val->size, cmd->type, cmd->xids);
} }
/* peek at the head (the oldest entry) of the fifo */ /* peek at the head (the oldest entry) of the fifo */
int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, u_int32_t *type, TXNID *xid) { int toku_fifo_peek(FIFO fifo, bytevec *key, unsigned int *keylen, bytevec *data, unsigned int *datalen, u_int32_t *type, XIDS *xids) {
struct fifo_entry *entry = fifo_peek(fifo); struct fifo_entry *entry = fifo_peek(fifo);
if (entry == 0) return -1; if (entry == 0) return -1;
*key = entry->key; unsigned char *e_key = xids_get_end_of_array(&entry->xids_s);
*key = e_key;
*keylen = entry->keylen; *keylen = entry->keylen;
*data = entry->key + entry->keylen; *data = e_key + entry->keylen;
*datalen = entry->vallen; *datalen = entry->vallen;
*type = entry->type; *type = entry->type;
*xid = entry->xid; *xids = &entry->xids_s;
return 0; return 0;
} }
...@@ -120,7 +128,7 @@ int toku_fifo_peek_cmdstruct (FIFO fifo, BRT_CMD cmd, DBT*key, DBT*data) { ...@@ -120,7 +128,7 @@ int toku_fifo_peek_cmdstruct (FIFO fifo, BRT_CMD cmd, DBT*key, DBT*data) {
u_int32_t type; u_int32_t type;
bytevec keyb,datab; bytevec keyb,datab;
unsigned int keylen,datalen; unsigned int keylen,datalen;
int r = toku_fifo_peek(fifo, &keyb, &keylen, &datab, &datalen, &type, &cmd->xid); int r = toku_fifo_peek(fifo, &keyb, &keylen, &datab, &datalen, &type, &cmd->xids);
if (r!=0) return r; if (r!=0) return r;
cmd->type=(enum brt_cmd_type)type; cmd->type=(enum brt_cmd_type)type;
toku_fill_dbt(key, keyb, keylen); toku_fill_dbt(key, keyb, keylen);
...@@ -151,10 +159,10 @@ struct fifo_entry * toku_fifo_iterate_internal_get_entry(FIFO fifo, int off) { ...@@ -151,10 +159,10 @@ struct fifo_entry * toku_fifo_iterate_internal_get_entry(FIFO fifo, int off) {
return (struct fifo_entry *)(fifo->memory + off); return (struct fifo_entry *)(fifo->memory + off);
} }
void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void *arg) { void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, XIDS xids, void*), void *arg) {
FIFO_ITERATE(fifo, FIFO_ITERATE(fifo,
key, keylen, data, datalen, type, xid, key, keylen, data, datalen, type, xids,
f(key,keylen,data,datalen,type,xid, arg)); f(key,keylen,data,datalen,type,xids, arg));
} }
unsigned long toku_fifo_memory_size(FIFO fifo) { unsigned long toku_fifo_memory_size(FIFO fifo) {
......
...@@ -5,13 +5,14 @@ ...@@ -5,13 +5,14 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "brttypes.h" #include "brttypes.h"
#include "xids-internal.h"
#include "xids.h"
struct fifo_entry { struct fifo_entry {
unsigned int keylen; unsigned int keylen;
unsigned int vallen; unsigned int vallen;
unsigned char type; unsigned char type;
TXNID xid; XIDS_S xids_s;
unsigned char key[];
}; };
typedef struct fifo *FIFO; typedef struct fifo *FIFO;
...@@ -20,8 +21,8 @@ int toku_fifo_create(FIFO *); ...@@ -20,8 +21,8 @@ int toku_fifo_create(FIFO *);
void toku_fifo_free(FIFO *); void toku_fifo_free(FIFO *);
int toku_fifo_n_entries(FIFO); int toku_fifo_n_entries(FIFO);
int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_CMD cmd); int toku_fifo_enq_cmdstruct (FIFO fifo, const BRT_CMD cmd);
int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, TXNID xid); int toku_fifo_enq (FIFO, const void *key, ITEMLEN keylen, const void *data, ITEMLEN datalen, int type, XIDS xids);
int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid); int toku_fifo_peek (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, XIDS *xids);
int toku_fifo_peek_cmdstruct (FIFO, BRT_CMD, DBT*, DBT*); // fill in the BRT_CMD, using the two DBTs for the DBT part. int toku_fifo_peek_cmdstruct (FIFO, BRT_CMD, DBT*, DBT*); // fill in the BRT_CMD, using the two DBTs for the DBT part.
int toku_fifo_deq(FIFO); int toku_fifo_deq(FIFO);
...@@ -30,20 +31,20 @@ unsigned long toku_fifo_memory_size(FIFO); // return how much memory the fifo us ...@@ -30,20 +31,20 @@ unsigned long toku_fifo_memory_size(FIFO); // return how much memory the fifo us
//These two are problematic, since I don't want to malloc() the bytevecs, but dequeueing the fifo frees the memory. //These two are problematic, since I don't want to malloc() the bytevecs, but dequeueing the fifo frees the memory.
//int toku_fifo_peek_deq (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid); //int toku_fifo_peek_deq (FIFO, bytevec *key, ITEMLEN *keylen, bytevec *data, ITEMLEN *datalen, u_int32_t *type, TXNID *xid);
//int toku_fifo_peek_deq_cmdstruct (FIFO, BRT_CMD, DBT*, DBT*); // fill in the BRT_CMD, using the two DBTs for the DBT part. //int toku_fifo_peek_deq_cmdstruct (FIFO, BRT_CMD, DBT*, DBT*); // fill in the BRT_CMD, using the two DBTs for the DBT part.
void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, TXNID xid, void*), void*); void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,ITEMLEN datalen,int type, XIDS xids, void*), void*);
#define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,xidvar,body) do { \ #define FIFO_ITERATE(fifo,keyvar,keylenvar,datavar,datalenvar,typevar,xidsvar,body) do { \
int fifo_iterate_off; \ int fifo_iterate_off; \
for (fifo_iterate_off = toku_fifo_iterate_internal_start(fifo); \ for (fifo_iterate_off = toku_fifo_iterate_internal_start(fifo); \
toku_fifo_iterate_internal_has_more(fifo, fifo_iterate_off); \ toku_fifo_iterate_internal_has_more(fifo, fifo_iterate_off); \
fifo_iterate_off = toku_fifo_iterate_internal_next(fifo, fifo_iterate_off)) { \ fifo_iterate_off = toku_fifo_iterate_internal_next(fifo, fifo_iterate_off)) { \
struct fifo_entry *e = toku_fifo_iterate_internal_get_entry(fifo, fifo_iterate_off); \ struct fifo_entry *e = toku_fifo_iterate_internal_get_entry(fifo, fifo_iterate_off); \
bytevec keyvar = e->key; \
ITEMLEN keylenvar = e->keylen; \ ITEMLEN keylenvar = e->keylen; \
bytevec datavar = e->key + e->keylen; \
ITEMLEN datalenvar = e->vallen; \ ITEMLEN datalenvar = e->vallen; \
int typevar = e->type; \ int typevar = e->type; \
TXNID xidvar = e->xid; \ XIDS xidsvar = &e->xids_s; \
bytevec keyvar = xids_get_end_of_array(xidsvar); \
bytevec datavar = keyvar + e->keylen; \
body; \ body; \
} } while (0) } } while (0)
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
/* Purpose of this file is to define and handle the fifo_msg, which
* is the stored format of a brt_msg.
*
* Note, when translating from fifo_msg to brt_msg, the brt_msg
* will be created with a pointer into the xids in the fifo_msg.
* (The xids will not be embedded in the brt_msg.) This means
* that a valid xids struct must be embedded in the fifo_msg.
*
* NOTE: fifo_msg is stored in memory and on disk in same format.
* fifo_msg is stored in same byte order both in-memory
* and on-disk. Accessors are responsible for tranposition
* to host order.
*/
#include <string.h>
#include <toku_portability.h>
#include "brttypes.h"
#include "xids.h"
#include "xids-internal.h"
#include "brt_msg.h"
#include "fifo_msg.h"
#include <toku_htod.h>
// xids_and_key_and_val field is XIDS_S followed by key
// followed by value.
struct fifo_msg_t {
u_int32_t keylen;
u_int32_t vallen;
u_int8_t type;
// u_int8_t pad[7]; // force 64-bit alignment if needed ???
u_int8_t xids_and_key_and_val[]; // undifferentiated bytes
};
u_int32_t
fifo_msg_get_keylen(FIFO_MSG fifo_msg) {
u_int32_t rval = fifo_msg->keylen;
rval = toku_dtoh32(rval);
return rval;
}
u_int32_t
fifo_msg_get_vallen(FIFO_MSG fifo_msg) {
u_int32_t rval = fifo_msg->vallen;
rval = toku_dtoh32(rval);
return rval;
}
XIDS
fifo_msg_get_xids(FIFO_MSG fifo_msg) {
XIDS rval = (XIDS) &fifo_msg->xids_and_key_and_val;
return rval;
}
static u_int32_t
fifo_msg_get_xids_size(FIFO_MSG fifo_msg) {
u_int32_t rval;
XIDS xids = fifo_msg_get_xids(fifo_msg);
rval = xids_get_size(xids);
return rval;
}
void *
fifo_msg_get_key(FIFO_MSG fifo_msg) {
void * rval;
u_int32_t xidslen = fifo_msg_get_xids_size(fifo_msg);
rval = (void*)fifo_msg->xids_and_key_and_val + xidslen;
return rval;
}
void *
fifo_msg_get_val(FIFO_MSG fifo_msg) {
void * rval;
void * key = fifo_msg_get_key(fifo_msg);
u_int32_t keylen = fifo_msg_get_keylen(fifo_msg);
rval = key + keylen;
return rval;
}
brt_msg_type
fifo_msg_get_type(FIFO_MSG fifo_msg) {
brt_msg_type rval = fifo_msg->type;
return rval;
}
// Finds size of a fifo msg.
u_int32_t
fifo_msg_get_size(FIFO_MSG fifo_msg) {
u_int32_t rval;
u_int32_t keylen = fifo_msg_get_keylen(fifo_msg);
u_int32_t vallen = fifo_msg_get_vallen(fifo_msg);
u_int32_t xidslen = fifo_msg_get_xids_size(fifo_msg);
rval = keylen + vallen + xidslen + sizeof(*fifo_msg);
return rval;
}
// Return number of bytes required for a fifo_msg created from
// the given brt_msg
u_int32_t
fifo_msg_get_size_required(BRT_MSG brt_msg) {
u_int32_t rval;
u_int32_t keylen = brt_msg_get_keylen(brt_msg);
u_int32_t vallen = brt_msg_get_vallen(brt_msg);
XIDS xids = brt_msg_get_xids(brt_msg);
u_int32_t xidslen = xids_get_size(xids);
rval = keylen + vallen + xidslen + sizeof(struct fifo_msg_t);
return rval;
}
void
fifo_msg_from_brt_msg(FIFO_MSG fifo_msg, BRT_MSG brt_msg) {
u_int32_t keylen_host = brt_msg_get_keylen(brt_msg);
u_int32_t vallen_host = brt_msg_get_vallen(brt_msg);
fifo_msg->type = brt_msg_get_type(brt_msg);
fifo_msg->keylen = toku_htod32(keylen_host);
fifo_msg->vallen = toku_htod32(vallen_host);
//Copy XIDS
XIDS xids = brt_msg_get_xids(brt_msg);
XIDS xids_target = fifo_msg_get_xids(fifo_msg);
u_int32_t xidslen = xids_get_size(xids);
memcpy(xids_target, xids, xidslen);
//Copy Key
void *key = brt_msg_get_key(brt_msg);
void *key_target = fifo_msg_get_key(fifo_msg);
memcpy(key_target, key, keylen_host);
//Copy Val
void *val = brt_msg_get_val(brt_msg);
void *val_target = fifo_msg_get_val(fifo_msg);
memcpy(val_target, val, vallen_host);
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
/* The purpose of this file is to provide access to the fifo_msg,
* which is the stored representation of the brt_msg.
*
* NOTE: Accessor functions return all values in host byte order.
*/
#ifndef FIFO_MSG_H
#define FIFO_MSG_H
u_int32_t fifo_msg_get_keylen(FIFO_MSG fifo_msg);
u_int32_t fifo_msg_get_vallen(FIFO_MSG fifo_msg);
XIDS fifo_msg_get_xids(FIFO_MSG fifo_msg);
void * fifo_msg_get_key(FIFO_MSG fifo_msg);
void * fifo_msg_get_val(FIFO_MSG fifo_msg);
brt_msg_type fifo_msg_get_type(FIFO_MSG fifo_msg);
u_int32_t fifo_msg_get_size(FIFO_MSG fifo_msg);
// Return number of bytes required for a fifo_msg created from
// the given brt_msg
u_int32_t fifo_msg_get_size_required(BRT_MSG brt_msg);
#endif
...@@ -28,15 +28,12 @@ u_int32_t toku_calccrc32_kvpair_struct (const struct kv_pair *kvp) { ...@@ -28,15 +28,12 @@ u_int32_t toku_calccrc32_kvpair_struct (const struct kv_pair *kvp) {
} }
#endif #endif
u_int32_t toku_calc_fingerprint_cmd (u_int32_t type, TXNID xid, const void *key, u_int32_t keylen, const void *val, u_int32_t vallen) { u_int32_t toku_calc_fingerprint_cmd (u_int32_t type, XIDS xids, const void *key, u_int32_t keylen, const void *val, u_int32_t vallen) {
unsigned char type_c = (unsigned char)type; unsigned char type_c = (unsigned char)type;
unsigned int a = toku_htod32(xid>>32);
unsigned int b = toku_htod32(xid&0xffffffff);
struct x1764 mm; struct x1764 mm;
x1764_init(&mm); x1764_init(&mm);
x1764_add(&mm, &type_c, 1); x1764_add(&mm, &type_c, 1);
x1764_add(&mm, &a, 4); toku_calc_more_murmur_xids(&mm, xids);
x1764_add(&mm, &b, 4);
toku_calc_more_murmur_kvpair(&mm, key, keylen, val, vallen); toku_calc_more_murmur_kvpair(&mm, key, keylen, val, vallen);
return x1764_finish(&mm); return x1764_finish(&mm);
} }
This diff is collapsed.
This diff is collapsed.
...@@ -105,6 +105,7 @@ struct tokutxn { ...@@ -105,6 +105,7 @@ struct tokutxn {
toku_off_t rollentry_filesize; // How many bytes are in the rollentry file (this is the uncompressed bytes. If the file is compressed it may actually be smaller (or even larger with header information)) toku_off_t rollentry_filesize; // How many bytes are in the rollentry file (this is the uncompressed bytes. If the file is compressed it may actually be smaller (or even larger with header information))
u_int64_t rollentry_raw_count; // the total count of every byte in the transaction and all its children. u_int64_t rollentry_raw_count; // the total count of every byte in the transaction and all its children.
OMT open_brts; // a collection of the brts that we touched. Indexed by filenum. OMT open_brts; // a collection of the brts that we touched. Indexed by filenum.
XIDS xids; //Represents the xid list
}; };
int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbuf *wbuf, int do_fsync); int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbuf *wbuf, int do_fsync);
......
...@@ -32,7 +32,6 @@ struct roll_entry; ...@@ -32,7 +32,6 @@ struct roll_entry;
#include "recover.h" #include "recover.h"
#include "txn.h" #include "txn.h"
// needed by logformat.c
static inline int toku_copy_BYTESTRING(BYTESTRING *target, BYTESTRING val) { static inline int toku_copy_BYTESTRING(BYTESTRING *target, BYTESTRING val) {
target->len = val.len; target->len = val.len;
target->data = toku_memdup(val.data, (size_t)val.len); target->data = toku_memdup(val.data, (size_t)val.len);
......
...@@ -44,26 +44,26 @@ struct logtype { ...@@ -44,26 +44,26 @@ struct logtype {
int logformat_version_number = 0; int logformat_version_number = 0;
const struct logtype rollbacks[] = { const struct logtype rollbacks[] = {
{"fcreate", 'F', FA{{"TXNID", "xid", 0}, {"fcreate", 'F', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0}, {"FILENUM", "filenum", 0},
{"BYTESTRING", "fname", 0}, {"BYTESTRING", "fname", 0},
NULLFIELD}}, NULLFIELD}},
// cmdinsert is used to insert a key-value pair into a NODUP DB. For rollback we don't need the data. // cmdinsert is used to insert a key-value pair into a NODUP DB. For rollback we don't need the data.
{"cmdinsert", 'i', FA{{"TXNID", "xid", 0}, {"cmdinsert", 'i', FA{
{"FILENUM", "filenum", 0}, {"FILENUM", "filenum", 0},
{"BYTESTRING", "key", 0}, {"BYTESTRING", "key", 0},
NULLFIELD}}, NULLFIELD}},
{"cmdinsertboth", 'I', FA{{"TXNID", "xid", 0}, {"cmdinsertboth", 'I', FA{
{"FILENUM", "filenum", 0}, {"FILENUM", "filenum", 0},
{"BYTESTRING", "key", 0}, {"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0}, {"BYTESTRING", "data", 0},
NULLFIELD}}, NULLFIELD}},
{"cmddeleteboth", 'D', FA{{"TXNID", "xid", 0}, {"cmddeleteboth", 'D', FA{
{"FILENUM", "filenum", 0}, {"FILENUM", "filenum", 0},
{"BYTESTRING", "key", 0}, {"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0}, {"BYTESTRING", "data", 0},
NULLFIELD}}, NULLFIELD}},
{"cmddelete", 'd', FA{{"TXNID", "xid", 0}, {"cmddelete", 'd', FA{
{"FILENUM", "filenum", 0}, {"FILENUM", "filenum", 0},
{"BYTESTRING", "key", 0}, {"BYTESTRING", "key", 0},
NULLFIELD}}, NULLFIELD}},
......
This diff is collapsed.
...@@ -239,11 +239,17 @@ toku_recover_enq_insert (LSN lsn __attribute__((__unused__)), FILENUM filenum, T ...@@ -239,11 +239,17 @@ toku_recover_enq_insert (LSN lsn __attribute__((__unused__)), FILENUM filenum, T
struct brt_cmd cmd; struct brt_cmd cmd;
DBT keydbt, valdbt; DBT keydbt, valdbt;
cmd.type=BRT_INSERT; cmd.type=BRT_INSERT;
cmd.xid =xid; //TODO: #1125 and recovery: Remove this hack
// Assume this is a root txn (not yet enough info to construct full XIDS for message)
XIDS root = xids_get_root_xids();
r = xids_create_child(root, &cmd.xids, xid);
assert(r==0);
cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len); cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len);
cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len); cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger); r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger);
assert(r==0); assert(r==0);
xids_destroy(&cmd.xids);
xids_destroy(&root);
toku_free(key.data); toku_free(key.data);
toku_free(val.data); toku_free(val.data);
} }
...@@ -265,11 +271,17 @@ toku_recover_enq_delete_both (LSN lsn __attribute__((__unused__)), FILENUM filen ...@@ -265,11 +271,17 @@ toku_recover_enq_delete_both (LSN lsn __attribute__((__unused__)), FILENUM filen
struct brt_cmd cmd; struct brt_cmd cmd;
DBT keydbt, valdbt; DBT keydbt, valdbt;
cmd.type = BRT_DELETE_BOTH; cmd.type = BRT_DELETE_BOTH;
cmd.xid =xid; //TODO: #1125 and recovery: Remove this hack
// Assume this is a root txn (not yet enough info to construct full XIDS for message)
XIDS root = xids_get_root_xids();
r = xids_create_child(root, &cmd.xids, xid);
assert(r==0);
cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len); cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len);
cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len); cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger); r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger);
assert(r==0); assert(r==0);
xids_destroy(&cmd.xids);
xids_destroy(&root);
toku_free(key.data); toku_free(key.data);
toku_free(val.data); toku_free(val.data);
} }
...@@ -291,11 +303,17 @@ toku_recover_enq_delete_any (LSN lsn __attribute__((__unused__)), FILENUM filenu ...@@ -291,11 +303,17 @@ toku_recover_enq_delete_any (LSN lsn __attribute__((__unused__)), FILENUM filenu
struct brt_cmd cmd; struct brt_cmd cmd;
DBT keydbt, valdbt; DBT keydbt, valdbt;
cmd.type = BRT_DELETE_ANY; cmd.type = BRT_DELETE_ANY;
cmd.xid = xid; //TODO: #1125 and recovery: Remove this hack
// Assume this is a root txn (not yet enough info to construct full XIDS for message)
XIDS root = xids_get_root_xids();
r = xids_create_child(root, &cmd.xids, xid);
assert(r==0);
cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len); cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len);
cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len); cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger); r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger);
assert(r==0); assert(r==0);
xids_destroy(&cmd.xids);
xids_destroy(&root);
toku_free(key.data); toku_free(key.data);
toku_free(val.data); toku_free(val.data);
} }
......
...@@ -7,12 +7,8 @@ ...@@ -7,12 +7,8 @@
#include "includes.h" #include "includes.h"
#include "checkpoint.h" #include "checkpoint.h"
#include "xids.h"
// these flags control whether or not we send commit messages for #include "roll.h"
// various operations
#define TOKU_DO_COMMIT_CMD_INSERT 0
#define TOKU_DO_COMMIT_CMD_DELETE 1
#define TOKU_DO_COMMIT_CMD_DELETE_BOTH 1
int int
toku_commit_fcreate (TXNID UU(xid), toku_commit_fcreate (TXNID UU(xid),
...@@ -63,14 +59,15 @@ static int find_brt_from_filenum (OMTVALUE v, void *filenumvp) { ...@@ -63,14 +59,15 @@ static int find_brt_from_filenum (OMTVALUE v, void *filenumvp) {
return 0; return 0;
} }
static int do_insertion (enum brt_cmd_type type, TXNID xid, FILENUM filenum, BYTESTRING key, BYTESTRING *data,TOKUTXN txn) { static int do_insertion (enum brt_cmd_type type, FILENUM filenum, BYTESTRING key, BYTESTRING *data,TOKUTXN txn) {
CACHEFILE cf; CACHEFILE cf;
//printf("%s:%d committing insert %s %s\n", __FILE__, __LINE__, key.data, data.data); //printf("%s:%d committing insert %s %s\n", __FILE__, __LINE__, key.data, data.data);
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf); int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
assert(r==0); assert(r==0);
DBT key_dbt,data_dbt; DBT key_dbt,data_dbt;
BRT_CMD_S brtcmd = { type, xid, XIDS xids = toku_txn_get_xids(txn);
BRT_CMD_S brtcmd = { type, xids,
.u.id={toku_fill_dbt(&key_dbt, key.data, key.len), .u.id={toku_fill_dbt(&key_dbt, key.data, key.len),
data data
? toku_fill_dbt(&data_dbt, data->data, data->len) ? toku_fill_dbt(&data_dbt, data->data, data->len)
...@@ -93,18 +90,17 @@ static int do_nothing_with_filenum(TOKUTXN txn, FILENUM filenum) { ...@@ -93,18 +90,17 @@ static int do_nothing_with_filenum(TOKUTXN txn, FILENUM filenum) {
} }
int toku_commit_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key, TOKUTXN txn, YIELDF UU(yield), void *UU(yieldv)) { int toku_commit_cmdinsert (FILENUM filenum, BYTESTRING key, TOKUTXN txn, YIELDF UU(yield), void *UU(yieldv)) {
#if TOKU_DO_COMMIT_CMD_INSERT #if TOKU_DO_COMMIT_CMD_INSERT
return do_insertion (BRT_COMMIT_ANY, xid, filenum, key, 0, txn); return do_insertion (BRT_COMMIT_ANY, filenum, key, 0, txn);
#else #else
xid = xid; key = key; key = key;
return do_nothing_with_filenum(txn, filenum); return do_nothing_with_filenum(txn, filenum);
#endif #endif
} }
int int
toku_commit_cmdinsertboth (TXNID xid, toku_commit_cmdinsertboth (FILENUM filenum,
FILENUM filenum,
BYTESTRING key, BYTESTRING key,
BYTESTRING data, BYTESTRING data,
TOKUTXN txn, TOKUTXN txn,
...@@ -112,39 +108,36 @@ toku_commit_cmdinsertboth (TXNID xid, ...@@ -112,39 +108,36 @@ toku_commit_cmdinsertboth (TXNID xid,
void * UU(yieldv)) void * UU(yieldv))
{ {
#if TOKU_DO_COMMIT_CMD_INSERT #if TOKU_DO_COMMIT_CMD_INSERT
return do_insertion (BRT_COMMIT_BOTH, xid, filenum, key, &data, txn); return do_insertion (BRT_COMMIT_BOTH, filenum, key, &data, txn);
#else #else
xid = xid; key = key; data = data; key = key; data = data;
return do_nothing_with_filenum(txn, filenum); return do_nothing_with_filenum(txn, filenum);
#endif #endif
} }
int int
toku_rollback_cmdinsert (TXNID xid, toku_rollback_cmdinsert (FILENUM filenum,
FILENUM filenum,
BYTESTRING key, BYTESTRING key,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield), YIELDF UU(yield),
void * UU(yieldv)) void * UU(yieldv))
{ {
return do_insertion (BRT_ABORT_ANY, xid, filenum, key, 0, txn); return do_insertion (BRT_ABORT_ANY, filenum, key, 0, txn);
} }
int int
toku_rollback_cmdinsertboth (TXNID xid, toku_rollback_cmdinsertboth (FILENUM filenum,
FILENUM filenum,
BYTESTRING key, BYTESTRING key,
BYTESTRING data, BYTESTRING data,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield), YIELDF UU(yield),
void * UU(yieldv)) void * UU(yieldv))
{ {
return do_insertion (BRT_ABORT_BOTH, xid, filenum, key, &data, txn); return do_insertion (BRT_ABORT_BOTH, filenum, key, &data, txn);
} }
int int
toku_commit_cmddeleteboth (TXNID xid, toku_commit_cmddeleteboth (FILENUM filenum,
FILENUM filenum,
BYTESTRING key, BYTESTRING key,
BYTESTRING data, BYTESTRING data,
TOKUTXN txn, TOKUTXN txn,
...@@ -152,7 +145,7 @@ toku_commit_cmddeleteboth (TXNID xid, ...@@ -152,7 +145,7 @@ toku_commit_cmddeleteboth (TXNID xid,
void * UU(yieldv)) void * UU(yieldv))
{ {
#if TOKU_DO_COMMIT_CMD_DELETE_BOTH #if TOKU_DO_COMMIT_CMD_DELETE_BOTH
return do_insertion (BRT_COMMIT_BOTH, xid, filenum, key, &data, txn); return do_insertion (BRT_COMMIT_BOTH, filenum, key, &data, txn);
#else #else
xid = xid; key = key; data = data; xid = xid; key = key; data = data;
return do_nothing_with_filenum(txn, filenum); return do_nothing_with_filenum(txn, filenum);
...@@ -160,27 +153,25 @@ toku_commit_cmddeleteboth (TXNID xid, ...@@ -160,27 +153,25 @@ toku_commit_cmddeleteboth (TXNID xid,
} }
int int
toku_rollback_cmddeleteboth (TXNID xid, toku_rollback_cmddeleteboth (FILENUM filenum,
FILENUM filenum,
BYTESTRING key, BYTESTRING key,
BYTESTRING data, BYTESTRING data,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield), YIELDF UU(yield),
void * UU(yieldv)) void * UU(yieldv))
{ {
return do_insertion (BRT_ABORT_BOTH, xid, filenum, key, &data, txn); return do_insertion (BRT_ABORT_BOTH, filenum, key, &data, txn);
} }
int int
toku_commit_cmddelete (TXNID xid, toku_commit_cmddelete (FILENUM filenum,
FILENUM filenum,
BYTESTRING key, BYTESTRING key,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield), YIELDF UU(yield),
void * UU(yieldv)) void * UU(yieldv))
{ {
#if TOKU_DO_COMMIT_CMD_DELETE #if TOKU_DO_COMMIT_CMD_DELETE
return do_insertion (BRT_COMMIT_ANY, xid, filenum, key, 0, txn); return do_insertion (BRT_COMMIT_ANY, filenum, key, 0, txn);
#else #else
xid = xid; key = key; xid = xid; key = key;
return do_nothing_with_filenum(txn, filenum); return do_nothing_with_filenum(txn, filenum);
...@@ -188,14 +179,13 @@ toku_commit_cmddelete (TXNID xid, ...@@ -188,14 +179,13 @@ toku_commit_cmddelete (TXNID xid,
} }
int int
toku_rollback_cmddelete (TXNID xid, toku_rollback_cmddelete (FILENUM filenum,
FILENUM filenum,
BYTESTRING key, BYTESTRING key,
TOKUTXN txn, TOKUTXN txn,
YIELDF UU(yield), YIELDF UU(yield),
void * UU(yieldv)) void * UU(yieldv))
{ {
return do_insertion (BRT_ABORT_ANY, xid, filenum, key, 0, txn); return do_insertion (BRT_ABORT_ANY, filenum, key, 0, txn);
} }
int int
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id: roll.c 12588 2009-06-09 00:05:02Z yfogel $"
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#ifndef TOKUDB_ROLL_H
#define TOKUDB_ROLL_H
// these flags control whether or not we send commit messages for
// various operations
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_INSERT message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_INSERT 0
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_DELETE_ANY message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_DELETE 1
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_DELETE_BOTH message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_DELETE_BOTH 1
#endif
...@@ -32,6 +32,7 @@ void toku_rollback_txn_close (TOKUTXN txn) { ...@@ -32,6 +32,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
list_remove(&txn->live_txns_link); list_remove(&txn->live_txns_link);
note_txn_closing(txn); note_txn_closing(txn);
xids_destroy(&txn->xids);
toku_free(txn); toku_free(txn);
return; return;
} }
......
...@@ -94,7 +94,8 @@ REGRESSION_TESTS_RAW = \ ...@@ -94,7 +94,8 @@ REGRESSION_TESTS_RAW = \
test-brt-overflow \ test-brt-overflow \
test-del-inorder \ test-del-inorder \
test-inc-split \ test-inc-split \
test-leafentry \ test-leafentry10 \
test-leafentry-nested \
test_oexcl \ test_oexcl \
test_toku_malloc_plain_free \ test_toku_malloc_plain_free \
threadpool-test \ threadpool-test \
......
...@@ -46,12 +46,25 @@ static void test_serialize(void) { ...@@ -46,12 +46,25 @@ static void test_serialize(void) {
BNC_SUBTREE_ESTIMATES(&sn, 1).exact = (BOOL)(random()%2 != 0); BNC_SUBTREE_ESTIMATES(&sn, 1).exact = (BOOL)(random()%2 != 0);
r = toku_fifo_create(&BNC_BUFFER(&sn,0)); assert(r==0); r = toku_fifo_create(&BNC_BUFFER(&sn,0)); assert(r==0);
r = toku_fifo_create(&BNC_BUFFER(&sn,1)); assert(r==0); r = toku_fifo_create(&BNC_BUFFER(&sn,1)); assert(r==0);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "a", 2, "aval", 5, BRT_NONE, (TXNID)0); assert(r==0); sn.local_fingerprint += randval*toku_calc_fingerprint_cmd(BRT_NONE, (TXNID)0, "a", 2, "aval", 5); //Create XIDS
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "b", 2, "bval", 5, BRT_NONE, (TXNID)123); assert(r==0); sn.local_fingerprint += randval*toku_calc_fingerprint_cmd(BRT_NONE, (TXNID)123, "b", 2, "bval", 5); XIDS xids_0 = xids_get_root_xids();
r = toku_fifo_enq(BNC_BUFFER(&sn,1), "x", 2, "xval", 5, BRT_NONE, (TXNID)234); assert(r==0); sn.local_fingerprint += randval*toku_calc_fingerprint_cmd(BRT_NONE, (TXNID)234, "x", 2, "xval", 5); XIDS xids_123;
BNC_NBYTESINBUF(&sn, 0) = 2*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5); XIDS xids_234;
BNC_NBYTESINBUF(&sn, 1) = 1*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5); r = xids_create_child(xids_0, &xids_123, (TXNID)123);
sn.u.n.n_bytes_in_buffers = 3*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5); CKERR(r);
r = xids_create_child(xids_123, &xids_234, (TXNID)234);
CKERR(r);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "a", 2, "aval", 5, BRT_NONE, xids_0); assert(r==0); sn.local_fingerprint += randval*toku_calc_fingerprint_cmd(BRT_NONE, xids_0, "a", 2, "aval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,0), "b", 2, "bval", 5, BRT_NONE, xids_123); assert(r==0); sn.local_fingerprint += randval*toku_calc_fingerprint_cmd(BRT_NONE, xids_123, "b", 2, "bval", 5);
r = toku_fifo_enq(BNC_BUFFER(&sn,1), "x", 2, "xval", 5, BRT_NONE, xids_234); assert(r==0); sn.local_fingerprint += randval*toku_calc_fingerprint_cmd(BRT_NONE, xids_234, "x", 2, "xval", 5);
BNC_NBYTESINBUF(&sn, 0) = 2*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5) + xids_get_serialize_size(xids_0) + xids_get_serialize_size(xids_123);
BNC_NBYTESINBUF(&sn, 1) = 1*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5) + xids_get_serialize_size(xids_234);
sn.u.n.n_bytes_in_buffers = 3*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5) + xids_get_serialize_size(xids_0) + xids_get_serialize_size(xids_123) + xids_get_serialize_size(xids_234);
//Cleanup:
xids_destroy(&xids_0);
xids_destroy(&xids_123);
xids_destroy(&xids_234);
struct brt *XMALLOC(brt); struct brt *XMALLOC(brt);
struct brt_header *XCALLOC(brt_h); struct brt_header *XCALLOC(brt_h);
......
...@@ -46,18 +46,26 @@ test_fifo_enq (int n) { ...@@ -46,18 +46,26 @@ test_fifo_enq (int n) {
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
buildkey(i); buildkey(i);
buildval(i); buildval(i);
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i, (TXNID)i); assert(r == 0); XIDS xids;
if (i==0)
xids = xids_get_root_xids();
else {
r = xids_create_child(xids_get_root_xids(), &xids, (TXNID)i);
assert(r==0);
}
r = toku_fifo_enq(f, thekey, thekeylen, theval, thevallen, i, xids); assert(r == 0);
xids_destroy(&xids);
} }
i = 0; i = 0;
FIFO_ITERATE(f, key, keylen, val, vallen, type, xid, { FIFO_ITERATE(f, key, keylen, val, vallen, type, xids, {
if (verbose) printf("checkit %d %d\n", i, type); if (verbose) printf("checkit %d %d\n", i, type);
buildkey(i); buildkey(i);
buildval(i); buildval(i);
assert((int) keylen == thekeylen); assert(memcmp(key, thekey, keylen) == 0); assert((int) keylen == thekeylen); assert(memcmp(key, thekey, keylen) == 0);
assert((int) vallen == thevallen); assert(memcmp(val, theval, vallen) == 0); assert((int) vallen == thevallen); assert(memcmp(val, theval, vallen) == 0);
assert(i % 256 == type); assert(i % 256 == type);
assert((TXNID)i==xid); assert((TXNID)i==xids_get_innermost_xid(xids));
i += 1; i += 1;
}); });
assert(i == n); assert(i == n);
......
This diff is collapsed.
...@@ -19,7 +19,7 @@ static void test_leafentry_1 (void) { ...@@ -19,7 +19,7 @@ static void test_leafentry_1 (void) {
LEAFENTRY l; LEAFENTRY l;
int r; int r;
u_int32_t msize, dsize; u_int32_t msize, dsize;
r = le_committed(4, "abc", 3, "xy", &msize, &dsize, &l, 0, 0, 0); r = le10_committed(4, "abc", 3, "xy", &msize, &dsize, &l, 0, 0, 0);
assert(r==0); assert(r==0);
char expect[] = {LE_COMMITTED, char expect[] = {LE_COMMITTED,
UINT32TOCHAR(4), UINT32TOCHAR(4),
...@@ -36,7 +36,7 @@ static void test_leafentry_2 (void) { ...@@ -36,7 +36,7 @@ static void test_leafentry_2 (void) {
LEAFENTRY l; LEAFENTRY l;
int r; int r;
u_int32_t msize, dsize; u_int32_t msize, dsize;
r = le_both(0x0123456789abcdef0LL, 3, "ab", 4, "xyz", 5, "lmno", &msize, &dsize, &l, 0, 0, 0); r = le10_both(0x0123456789abcdef0LL, 3, "ab", 4, "xyz", 5, "lmno", &msize, &dsize, &l, 0, 0, 0);
assert(r==0); assert(r==0);
char expect[] = {LE_BOTH, char expect[] = {LE_BOTH,
UINT64TOCHAR(0x0123456789abcdef0LL), UINT64TOCHAR(0x0123456789abcdef0LL),
...@@ -53,7 +53,7 @@ static void test_leafentry_3 (void) { ...@@ -53,7 +53,7 @@ static void test_leafentry_3 (void) {
LEAFENTRY l; LEAFENTRY l;
int r; int r;
u_int32_t msize, dsize; u_int32_t msize, dsize;
r = le_provdel(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l, 0, 0, 0); r = le10_provdel(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l, 0, 0, 0);
assert(r==0); assert(r==0);
char expect[] = {LE_PROVDEL, char expect[] = {LE_PROVDEL,
UINT64TOCHAR(0x0123456789abcdef0LL), UINT64TOCHAR(0x0123456789abcdef0LL),
...@@ -69,7 +69,7 @@ static void test_leafentry_4 (void) { ...@@ -69,7 +69,7 @@ static void test_leafentry_4 (void) {
LEAFENTRY l; LEAFENTRY l;
int r; int r;
u_int32_t msize, dsize; u_int32_t msize, dsize;
r = le_provpair(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l, 0, 0, 0); r = le10_provpair(0x0123456789abcdef0LL, 3, "ab", 5, "lmno", &msize, &dsize, &l, 0, 0, 0);
assert(r==0); assert(r==0);
char expect[] = {LE_PROVPAIR, char expect[] = {LE_PROVPAIR,
UINT64TOCHAR(0x0123456789abcdef0LL), UINT64TOCHAR(0x0123456789abcdef0LL),
...@@ -101,7 +101,7 @@ static void test_leafentry_3long (void) { ...@@ -101,7 +101,7 @@ static void test_leafentry_3long (void) {
LEAFENTRY l; LEAFENTRY l;
int r; int r;
u_int32_t msize, dsize; u_int32_t msize, dsize;
r = le_provdel(0x0123456789abcdef0LL, 301, zeros, 1025, zeros, &msize, &dsize, &l, 0, 0, 0); r = le10_provdel(0x0123456789abcdef0LL, 301, zeros, 1025, zeros, &msize, &dsize, &l, 0, 0, 0);
assert(r==0); assert(r==0);
assert(sizeof(expect_3long)==msize); assert(sizeof(expect_3long)==msize);
assert(msize==dsize); assert(msize==dsize);
......
...@@ -20,11 +20,19 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log ...@@ -20,11 +20,19 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
} }
r = toku_omt_create(&result->open_brts); r = toku_omt_create(&result->open_brts);
if (r!=0) { if (r!=0) {
died1:
toku_logger_panic(logger, r); toku_logger_panic(logger, r);
toku_free(result); toku_free(result);
return r; return r;
} }
result->txnid64 = result->first_lsn.lsn; result->txnid64 = result->first_lsn.lsn;
XIDS parent_xids;
if (parent_tokutxn==NULL)
parent_xids = xids_get_root_xids();
else
parent_xids = parent_tokutxn->xids;
if ((r=xids_create_child(parent_xids, &result->xids, result->txnid64)))
goto died1;
result->logger = logger; result->logger = logger;
result->parent = parent_tokutxn; result->parent = parent_tokutxn;
result->oldest_logentry = result->newest_logentry = 0; result->oldest_logentry = result->newest_logentry = 0;
...@@ -70,3 +78,9 @@ void toku_txn_close_txn(TOKUTXN txn) { ...@@ -70,3 +78,9 @@ void toku_txn_close_txn(TOKUTXN txn) {
toku_rollback_txn_close(txn); toku_rollback_txn_close(txn);
return; return;
} }
XIDS toku_txn_get_xids (TOKUTXN txn) {
if (txn==0) return xids_get_root_xids();
else return txn->xids;
}
...@@ -9,5 +9,6 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log ...@@ -9,5 +9,6 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void*yieldv); int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void*yieldv);
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void*yieldv); int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void*yieldv);
void toku_txn_close_txn(TOKUTXN txn); void toku_txn_close_txn(TOKUTXN txn);
XIDS toku_txn_get_xids (TOKUTXN);
#endif //TOKUTXN_H #endif //TOKUTXN_H
This diff is collapsed.
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
/* Purpose of this file is to provide the world with everything necessary
* to use the nested transaction logic and nothing else. No internal
* requirements of the nested transaction logic belongs here.
*/
#ifndef ULE_H
#define ULE_H
//1 does much slower debugging
#define ULE_DEBUG 0
/////////////////////////////////////////////////////////////////////////////////
// Following data structures are the unpacked format of a leafentry.
// * ule is the unpacked leaf entry, that contains an array of unpacked
// transaction records
// * uxr is the unpacked transaction record
//
//Types of transaction records.
enum {XR_INSERT = 1,
XR_DELETE = 2,
XR_PLACEHOLDER = 3};
typedef struct { // unpacked transaction record
u_int8_t type; // delete/insert/placeholder
u_int32_t vallen; // number of bytes in value
void * valp; // pointer to value (Where is value really stored?)
TXNID xid; // transaction id
// Note: when packing ule into a new leafentry, will need
// to copy actual data from valp to new leafentry
} UXR_S, *UXR;
// Unpacked Leaf Entry is of fixed size because it's just on the
// stack and we care about ease of access more than the memory footprint.
typedef struct { // unpacked leaf entry
u_int8_t num_uxrs; // how many of uxrs[] are valid
u_int32_t keylen;
void * keyp;
UXR_S uxrs[MAX_TRANSACTION_RECORDS]; // uxrs[0] is outermost, uxrs[num_uxrs-1] is innermost
} ULE_S, *ULE;
int apply_msg_to_leafentry(BRT_MSG msg,
LEAFENTRY old_leafentry, // NULL if there was no stored data.
size_t *new_leafentry_memorysize,
size_t *new_leafentry_disksize,
LEAFENTRY *new_leafentry_p,
OMT omt,
struct mempool *mp,
void **maybe_free);
//////////////////////////////////////////////////////////////////////////////////////
//Functions exported for test purposes only (used internally for non-test purposes).
void le_unpack(ULE ule, LEAFENTRY le);
int le_pack(ULE ule, // data to be packed into new leafentry
size_t *new_leafentry_memorysize,
size_t *new_leafentry_disksize,
LEAFENTRY * const new_leafentry_p, // this is what this function creates
OMT omt,
struct mempool *mp,
void **maybe_free);
size_t le_memsize_from_ule (ULE ule);
#endif // ULE_H
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#ifndef XIDS_INTERNAL_H
#define XIDS_INTERNAL_H
// Variable size list of transaction ids (known in design doc as xids<>).
// ids[0] is the outermost transaction.
// ids[num_xids - 1] is the innermost transaction.
// Should only be accessed by accessor functions xids_xxx, not directly.
typedef struct xids_t {
u_int8_t num_stored_xids; // maximum value of MAX_TRANSACTION_RECORDS - 1 ...
// ... because transaction 0 is implicit
TXNID ids[];
} XIDS_S;
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
/* Purpose of this file is to implement xids list of nested transactions
* ids.
*
* See design documentation for nested transactions at
* TokuWiki/Imp/TransactionsOverview.
*
* NOTE: xids are always stored in disk byte order.
* Accessors are responsible for transposing bytes to
* host order.
*/
#include <errno.h>
#include <string.h>
#include <toku_portability.h>
#include "brttypes.h"
#include "xids.h"
#include "xids-internal.h"
#include "toku_assert.h"
#include "memory.h"
#include <toku_htod.h>
/////////////////////////////////////////////////////////////////////////////////
// This layer of abstraction (xids_xxx) understands xids<> and nothing else.
// It contains all the functions that understand xids<>
//
// xids<> do not store the implicit transaction id of 0 at index 0.
// The accessor functions make the id of 0 explicit at index 0.
// The number of xids physically stored in the xids array is in
// the variable num_stored_xids.
//
// The xids struct is immutable. The caller gets an initial version of XIDS
// by calling xids_get_root_xids(), which returns the constant struct
// representing the root transaction (id 0). When a transaction begins,
// a new XIDS is created with the id of the current transaction appended to
// the list.
//
//
// This is the xids list for a transactionless environment.
// It is also the initial state of any xids list created for
// nested transactions.
XIDS
xids_get_root_xids(void) {
static const struct xids_t root_xids = {
.num_stored_xids = 0
};
XIDS rval = (XIDS)&root_xids;
return rval;
}
// xids is immutable. This function creates a new xids by copying the
// parent's list and then appending the xid of the new transaction.
int
xids_create_child(XIDS parent_xids, // xids list for parent transaction
XIDS * xids_p, // xids list created
TXNID this_xid) { // xid of this transaction (new innermost)
int rval;
assert(parent_xids);
assert(this_xid > xids_get_innermost_xid(parent_xids));
u_int8_t num_stored_xids = parent_xids->num_stored_xids + 1;
u_int8_t num_xids = num_stored_xids + 1;
assert(num_xids > 0);
assert(num_xids <= MAX_TRANSACTION_RECORDS);
if (num_xids == MAX_TRANSACTION_RECORDS) rval = EINVAL;
else {
XIDS xids = toku_malloc(sizeof(*xids) + num_stored_xids*sizeof(xids->ids[0]));
if (!xids) rval = ENOMEM;
else {
xids->num_stored_xids = num_stored_xids;
memcpy(xids->ids,
parent_xids->ids,
parent_xids->num_stored_xids*sizeof(parent_xids->ids[0]));
TXNID this_xid_disk = toku_htod64(this_xid);
xids->ids[num_stored_xids-1] = this_xid_disk;
*xids_p = xids;
rval = 0;
}
}
return rval;
}
void
xids_create_from_buffer(struct rbuf *rb, // xids list for parent transaction
XIDS * xids_p) { // xids list created
u_int8_t num_stored_xids = rbuf_char(rb);
u_int8_t num_xids = num_stored_xids + 1;
assert(num_xids > 0);
assert(num_xids < MAX_TRANSACTION_RECORDS);
XIDS xids = toku_xmalloc(sizeof(*xids) + num_stored_xids*sizeof(xids->ids[0]));
xids->num_stored_xids = num_stored_xids;
u_int8_t index;
for (index = 0; index < xids->num_stored_xids; index++) {
rbuf_TXNID(rb, &xids->ids[index]);
if (index > 0)
assert(xids->ids[index] > xids->ids[index-1]);
}
*xids_p = xids;
}
void
xids_destroy(XIDS *xids_p) {
if (*xids_p != xids_get_root_xids()) toku_free(*xids_p);
*xids_p = NULL;
}
// Return xid at requested position.
// If requesting an xid out of range (which will be the case if xids array is empty)
// then return 0, the xid of the root transaction.
TXNID
xids_get_xid(XIDS xids, u_int8_t index) {
TXNID rval = 0;
if (index > 0) {
assert(index < xids_get_num_xids(xids));
rval = xids->ids[index-1];
rval = toku_dtoh64(rval);
}
return rval;
}
// This function assumes that target_xid IS in the list
// of xids.
u_int8_t
xids_find_index_of_xid(XIDS xids, TXNID target_xid) {
u_int8_t index = 0; // search outer to inner
TXNID current_xid = xids_get_xid(xids, index);
while (current_xid != target_xid) {
assert(current_xid < target_xid);
index++;
current_xid = xids_get_xid(xids, index); // Next inner txnid in xids.
}
return index;
}
u_int8_t
xids_get_num_xids(XIDS xids) {
u_int8_t rval = xids->num_stored_xids+1; //+1 for the id of 0 made explicit by xids<> accessors
return rval;
}
// Return innermost xid
TXNID
xids_get_innermost_xid(XIDS xids) {
TXNID rval = xids_get_xid(xids, xids_get_num_xids(xids)-1);
return rval;
}
void
xids_cpy(XIDS target, XIDS source) {
size_t size = xids_get_size(source);
memcpy(target, source, size);
}
// return size in bytes
u_int32_t
xids_get_size(XIDS xids){
u_int32_t rval;
u_int8_t num_stored_xids = xids->num_stored_xids;
rval = sizeof(*xids) + num_stored_xids * sizeof(xids->ids[0]);
return rval;
};
u_int32_t
xids_get_serialize_size(XIDS xids){
u_int32_t rval;
u_int8_t num_stored_xids = xids->num_stored_xids;
rval = 1 + //num stored xids
8 * num_stored_xids;
return rval;
};
void
toku_calc_more_murmur_xids (struct x1764 *mm, XIDS xids) {
x1764_add(mm, &xids->num_stored_xids, 1);
u_int8_t index;
u_int8_t num_xids = xids_get_num_xids(xids);
for (index = 0; index < num_xids; index++) {
TXNID current_xid = xids_get_xid(xids, index);
x1764_add(mm, &current_xid, 8);
}
}
unsigned char *
xids_get_end_of_array(XIDS xids) {
TXNID *r = xids->ids + xids->num_stored_xids;
return (unsigned char*)r;
}
void wbuf_xids(struct wbuf *wb, XIDS xids) {
wbuf_char(wb, (unsigned char)xids->num_stored_xids);
u_int8_t index;
for (index = 0; index < xids->num_stored_xids; index++) {
wbuf_TXNID(wb, xids->ids[index]);
}
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
/* Purpose of this file is to provide the world with everything necessary
* to use the xids and nothing else.
* Internal requirements of the xids logic do not belong here.
*
* xids is (abstractly) an immutable list of nested transaction ids, accessed only
* via the functions in this file.
*
* See design documentation for nested transactions at
* TokuWiki/Imp/TransactionsOverview.
*/
#ifndef XIDS_H
#define XIDS_H
#include "x1764.h"
#include "rbuf.h"
#include "wbuf.h"
/* The number of transaction ids stored in the xids structure is
* represented by an 8-bit value. The value 255 is reserved.
* The constant MAX_NESTED_TRANSACTIONS is one less because
* one slot in the packed leaf entry is used for the implicit
* root transaction (id 0).
*/
enum {MAX_NESTED_TRANSACTIONS = 253};
enum {MAX_TRANSACTION_RECORDS = MAX_NESTED_TRANSACTIONS + 1};
//Retrieve an XIDS representing the root transaction.
XIDS xids_get_root_xids(void);
void xids_cpy(XIDS target, XIDS source);
//Creates an XIDS representing this transaction.
//You must pass in an XIDS representing the parent of this transaction.
int xids_create_child(XIDS parent_xids, XIDS *xids_p, TXNID this_xid);
void xids_create_from_buffer(struct rbuf *rb, XIDS * xids_p);
void xids_destroy(XIDS *xids_p);
TXNID xids_get_xid(XIDS xids, u_int8_t index);
u_int8_t xids_find_index_of_xid(XIDS xids, TXNID target_xid);
u_int8_t xids_get_num_xids(XIDS xids);
TXNID xids_get_innermost_xid(XIDS xids);
// return size in bytes
u_int32_t xids_get_size(XIDS xids);
u_int32_t xids_get_serialize_size(XIDS xids);
void toku_calc_more_murmur_xids (struct x1764 *mm, XIDS xids);
unsigned char *xids_get_end_of_array(XIDS xids);
void wbuf_xids(struct wbuf *wb, XIDS xids);
#endif
...@@ -137,8 +137,6 @@ TDB_TESTS_THAT_SHOULD_FAIL= \ ...@@ -137,8 +137,6 @@ TDB_TESTS_THAT_SHOULD_FAIL= \
test_groupcommit_count \ test_groupcommit_count \
test944 \ test944 \
test_truncate_txn_abort \ test_truncate_txn_abort \
test_txn_nested_abort3 \
test_txn_nested_abort4 \
#\ ends prev line #\ ends prev line
ifneq ($(OS_CHOICE),windows) ifneq ($(OS_CHOICE),windows)
TDB_TESTS_THAT_SHOULD_FAIL+= \ TDB_TESTS_THAT_SHOULD_FAIL+= \
......
...@@ -111,7 +111,12 @@ test_main(int argc, char *argv[]) { ...@@ -111,7 +111,12 @@ test_main(int argc, char *argv[]) {
DB_ENV *env; DB_ENV *env;
r = db_env_create(&env, 0); assert(r == 0); r = db_env_create(&env, 0); assert(r == 0);
r = env->set_cachesize(env, 0, 8000000, 1); assert(r == 0); //r = env->set_cachesize(env, 0, 8000000, 1); assert(r == 0); //Prior to nested transactions
//This ran incredibly slow with nested transactions. I believe it makes sense to do the following:
//a node is 4MiB. Nodes can become overfull. If you can't have two nodes in memory, you thrash,
//So support 2 nodes plus a bit of wiggle room.
//r = env->set_cachesize(env, 0, (8<<20) + (1<<8), 1); assert(r == 0); //As of [13075] this is enough to hold the 2 nodes/run fast
r = env->set_cachesize(env, 0, (9<<20), 1); assert(r == 0);
r = env->open(env, ENVDIR, DB_CREATE + DB_THREAD + DB_PRIVATE + DB_INIT_MPOOL + DB_INIT_LOCK, S_IRWXU+S_IRWXG+S_IRWXO); assert(r == 0); r = env->open(env, ENVDIR, DB_CREATE + DB_THREAD + DB_PRIVATE + DB_INIT_MPOOL + DB_INIT_LOCK, S_IRWXU+S_IRWXG+S_IRWXO); assert(r == 0);
DB *db; DB *db;
......
...@@ -92,6 +92,21 @@ void toku_ydb_unlock(void); ...@@ -92,6 +92,21 @@ void toku_ydb_unlock(void);
/** Handle a panicked database: return EINVAL if the database env is panicked */ /** Handle a panicked database: return EINVAL if the database env is panicked */
#define HANDLE_PANICKED_DB(db) HANDLE_PANICKED_ENV(db->dbenv) #define HANDLE_PANICKED_DB(db) HANDLE_PANICKED_ENV(db->dbenv)
/** Handle a transaction that has a child: return EINVAL if the transaction tries to do any work.
Only commit/abort/prelock (which are used by handlerton) are allowed when a child exists. */
#define HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, txn) \
RAISE_COND_EXCEPTION(((txn) && db_txn_struct_i(txn)->child), \
toku_ydb_do_error((env), \
EINVAL, \
"%s: Transaction cannot do work when child exists", __FUNCTION__))
#define HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn) \
HANDLE_ILLEGAL_WORKING_PARENT_TXN((db)->dbenv, txn)
#define HANDLE_CURSOR_ILLEGAL_WORKING_PARENT_TXN(c) \
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN((c)->dbp, dbc_struct_i(c)->txn)
/* */ /* */
void toku_ydb_error_all_cases(const DB_ENV * env, void toku_ydb_error_all_cases(const DB_ENV * env,
int error, int error,
......
This diff is collapsed.
* One
* two
1. FOO
a. sock
a. pizza
2. elephant
[[Include(source:toku/tokudb.1125/test2.wiki,wiki)]]
1. these
1. lines
1. from
1. test2.wiki
...@@ -184,6 +184,9 @@ VGRIND=valgrind --quiet --error-exitcode=1 --leak-check=full --show-reachable=ye ...@@ -184,6 +184,9 @@ VGRIND=valgrind --quiet --error-exitcode=1 --leak-check=full --show-reachable=ye
ifeq ($(DB_ATTACH),1) ifeq ($(DB_ATTACH),1)
VGRIND+=--db-attach=yes VGRIND+=--db-attach=yes
endif endif
ifeq ($(TRACK_ORIGINS),1)
VGRIND+=--track-origins=yes
endif
HGRIND=valgrind --quiet --tool=helgrind --error-exitcode=1 HGRIND=valgrind --quiet --tool=helgrind --error-exitcode=1
......
...@@ -47,6 +47,16 @@ static const int64_t toku_byte_order_host = 0x0102030405060708LL; ...@@ -47,6 +47,16 @@ static const int64_t toku_byte_order_host = 0x0102030405060708LL;
#endif #endif
#if DISK_BYTE_ORDER == HOST_BYTE_ORDER #if DISK_BYTE_ORDER == HOST_BYTE_ORDER
static inline uint64_t
toku_dtoh64(uint64_t i) {
return i;
}
static inline uint64_t
toku_htod64(uint64_t i) {
return i;
}
static inline uint32_t static inline uint32_t
toku_dtoh32(uint32_t i) { toku_dtoh32(uint32_t i) {
return i; return i;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment