Commit 39b46845 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

filter some operations during recovery close[t:1993]

git-svn-id: file:///svn/toku/tokudb@14421 c7de825b-a66e-492c-adef-691d508d4ae1
parent cd5a0cd6
......@@ -2593,10 +2593,13 @@ toku_brt_broadcast_commit_all (BRT brt)
return r;
}
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn)
// Effect: Insert the key-val pair into brt.
{
int r;
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
return toku_brt_maybe_insert(brt, key, val, txn, ZERO_LSN);
}
int toku_brt_maybe_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn, LSN oplsn) {
int r = 0;
XIDS message_xids;
TXNID xid = toku_txn_get_txnid(txn);
if (txn && (brt->h->txnid_that_created_or_locked_when_empty != xid)) {
......@@ -2626,13 +2629,21 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn)
if (r!=0) return r;
}
LSN treelsn = toku_brt_checkpoint_lsn(brt);
if (oplsn.lsn != 0 && oplsn.lsn <= treelsn.lsn) {
r = 0;
} else {
BRT_MSG_S brtcmd = { BRT_INSERT, message_xids, .u.id={key,val}};
r = toku_brt_root_put_cmd(brt, &brtcmd);
if (r!=0) return r;
}
return r;
}
int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
return toku_brt_maybe_delete(brt, key, txn, ZERO_LSN);
}
int toku_brt_maybe_delete(BRT brt, DBT *key, TOKUTXN txn, LSN oplsn) {
int r;
XIDS message_xids;
TXNID xid = toku_txn_get_txnid(txn);
......@@ -2655,9 +2666,15 @@ int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
r = toku_log_enq_delete_any(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), xid, keybs);
if (r!=0) return r;
}
LSN treelsn = toku_brt_checkpoint_lsn(brt);
if (oplsn.lsn != 0 && oplsn.lsn <= treelsn.lsn) {
r = 0;
} else {
DBT val;
BRT_MSG_S brtcmd = { BRT_DELETE_ANY, message_xids, .u.id={key, toku_init_dbt(&val)}};
r = toku_brt_root_put_cmd(brt, &brtcmd);
}
return r;
}
......@@ -4504,6 +4521,10 @@ toku_brt_lookup (BRT brt, DBT *k, DBT *v, BRT_GET_CALLBACK_FUNCTION getf, void *
/* ********************************* delete **************************************/
int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
return toku_brt_maybe_delete_both(brt, key, val, txn, ZERO_LSN);
}
int toku_brt_maybe_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn, LSN oplsn) {
//{ unsigned i; printf("del %p keylen=%d key={", brt->db, key->size); for(i=0; i<key->size; i++) printf("%d,", ((char*)key->data)[i]); printf("} datalen=%d data={", val->size); for(i=0; i<val->size; i++) printf("%d,", ((char*)val->data)[i]); printf("}\n"); }
int r;
XIDS message_xids;
......@@ -4530,8 +4551,13 @@ int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
if (r!=0) return r;
}
LSN treelsn = toku_brt_checkpoint_lsn(brt);
if (oplsn.lsn != 0 && oplsn.lsn <= treelsn.lsn) {
r = 0;
} else {
BRT_MSG_S brtcmd = { BRT_DELETE_BOTH, message_xids, .u.id={key,val}};
r = toku_brt_root_put_cmd(brt, &brtcmd);
}
return r;
}
......@@ -4880,6 +4906,10 @@ toku_brt_note_table_lock (BRT brt, TOKUTXN txn)
return 0;
}
LSN toku_brt_checkpoint_lsn(BRT brt) {
return brt->h->checkpoint_lsn;
}
//Wrapper functions for upgrading from version 10.
#include "backwards_10.h"
void
......
......@@ -51,10 +51,33 @@ int toku_brt_open(BRT, const char *fname, const char *fname_in_env, int is_creat
int toku_brt_remove_subdb(BRT brt, const char *dbname, u_int32_t flags);
int toku_brt_broadcast_commit_all (BRT brt);
int toku_brt_insert (BRT, DBT *, DBT *, TOKUTXN);
int toku_brt_lookup (BRT brt, DBT *k, DBT *v, BRT_GET_CALLBACK_FUNCTION getf, void *getf_v);
int toku_brt_delete (BRT brt, DBT *k, TOKUTXN);
int toku_brt_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN); // Delete a pair only if both k and v are equal according to the comparison function.
// Effect: Insert a key and data pair into a brt
// Returns 0 if successfull
int toku_brt_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn);
// Effect: Insert a key and data pair into a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
// Returns 0 if successfull
int toku_brt_maybe_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn, LSN oplsn);
// Effect: Delete a key from a brt
// Returns 0 if successfull
int toku_brt_delete (BRT brt, DBT *k, TOKUTXN txn);
// Effect: Delete a key from a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
// Returns 0 if successfull
int toku_brt_maybe_delete (BRT brt, DBT *k, TOKUTXN txn, LSN oplsn);
// Effect: Delete a pair only if both k and v are equal according to the comparison function.
// Returns 0 if successfull
int toku_brt_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN txn);
// Effect: Delete a pair only if both k and v are equal according to the comparison function and the
// oplsn is newer than the brt lsn. This function is called by recovery.
// Returns 0 if successfull
int toku_brt_maybe_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN txn, LSN oplsn);
int toku_brt_db_delay_closed (BRT brt, DB* db, int (*close_db)(DB*, u_int32_t), u_int32_t close_flags);
int toku_close_brt (BRT, TOKULOGGER, char **error_string);
......@@ -76,6 +99,8 @@ int toku_brt_truncate (BRT brt);
// effect: remove everything from the tree
// returns: 0 if success
LSN toku_brt_checkpoint_lsn(BRT brt);
// create and initialize a cache table
// cachesize is the upper limit on the size of the size of the values in the table
// pass 0 if you want the default
......
......@@ -217,10 +217,10 @@ generate_log_struct (void) {
fprintf(hf, "};\n");
fprintf(hf, "int toku_rollback_%s (", lt->name);
DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name));
fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v);\n");
fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v, LSN oplsn);\n");
fprintf(hf, "int toku_commit_%s (", lt->name);
DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name));
fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v);\n");
fprintf(hf, "TOKUTXN txn, YIELDF yield, void*yield_v, LSN oplsn);\n");
});
fprintf(hf, "struct log_entry {\n");
fprintf(hf, " enum lt_cmd cmd;\n");
......
......@@ -143,7 +143,7 @@ static void recover_yield(voidfp UU(f), void *UU(extra)) {
// nothing
}
static void toku_recover_commit (LSN UU(lsn), TXNID xid, RECOVER_ENV env) {
static void toku_recover_commit (LSN lsn, TXNID xid, RECOVER_ENV env) {
int r;
// find the transaction by transaction id
......@@ -152,7 +152,7 @@ static void toku_recover_commit (LSN UU(lsn), TXNID xid, RECOVER_ENV env) {
assert(r == 0);
// commit the transaction
r = toku_txn_commit_txn(txn, TRUE, recover_yield, NULL);
r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, lsn);
assert(r == 0);
// close the transaction
......@@ -164,7 +164,7 @@ static int toku_recover_backward_commit (struct logtype_commit *UU(l), RECOVER_E
return 0;
}
static void toku_recover_xabort (LSN UU(lsn), TXNID xid, RECOVER_ENV env) {
static void toku_recover_xabort (LSN lsn, TXNID xid, RECOVER_ENV env) {
int r;
// find the transaction by transaction id
......@@ -173,7 +173,7 @@ static void toku_recover_xabort (LSN UU(lsn), TXNID xid, RECOVER_ENV env) {
assert(r == 0);
// abort the transaction
r = toku_txn_abort_txn(txn, recover_yield, NULL);
r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, lsn);
assert(r == 0);
// close the transaction
......@@ -285,21 +285,21 @@ static int toku_recover_backward_fcreate (struct logtype_fcreate *UU(l), RECOVER
return 0;
}
static void toku_recover_enq_insert (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV env) {
static void toku_recover_enq_insert (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV env) {
struct cf_pair *pair = NULL;
int r = find_cachefile(&env->fmap, filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
// TODO compare file LSN with this XID
TOKUTXN txn;
r = toku_txnid2txn(env->logger, xid, &txn);
assert(r == 0);
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, key.data, key.len);
toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_insert(pair->brt, &keydbt, &valdbt, txn);
r = toku_brt_maybe_insert(pair->brt, &keydbt, &valdbt, txn, lsn);
assert(r == 0);
}
......@@ -308,21 +308,21 @@ static int toku_recover_backward_enq_insert (struct logtype_enq_insert *UU(l), R
return 0;
}
static void toku_recover_enq_delete_both (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV env) {
static void toku_recover_enq_delete_both (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val, RECOVER_ENV env) {
struct cf_pair *pair = NULL;
int r = find_cachefile(&env->fmap, filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
// TODO compare file LSN with this XID
TOKUTXN txn;
r = toku_txnid2txn(env->logger, xid, &txn);
assert(r == 0);
DBT keydbt, valdbt;
toku_fill_dbt(&keydbt, key.data, key.len);
toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_delete_both(pair->brt, &keydbt, &valdbt, txn);
r = toku_brt_maybe_delete_both(pair->brt, &keydbt, &valdbt, txn, lsn);
assert(r == 0);
}
......@@ -331,20 +331,20 @@ static int toku_recover_backward_enq_delete_both (struct logtype_enq_delete_both
return 0;
}
static void toku_recover_enq_delete_any (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, RECOVER_ENV env) {
static void toku_recover_enq_delete_any (LSN lsn, FILENUM filenum, TXNID xid, BYTESTRING key, RECOVER_ENV env) {
struct cf_pair *pair = NULL;
int r = find_cachefile(&env->fmap, filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
// TODO compare file LSN with this XID
TOKUTXN txn;
r = toku_txnid2txn(env->logger, xid, &txn);
assert(r == 0);
DBT keydbt;
toku_fill_dbt(&keydbt, key.data, key.len);
r = toku_brt_delete(pair->brt, &keydbt, txn);
r = toku_brt_maybe_delete(pair->brt, &keydbt, txn, lsn);
assert(r == 0);
}
......
......@@ -16,7 +16,8 @@ toku_commit_fcreate (TXNID UU(xid),
BYTESTRING UU(bs_fname),
TOKUTXN UU(txn),
YIELDF UU(yield),
void *UU(yield_v))
void *UU(yield_v),
LSN UU(oplsn))
{
return 0;
}
......@@ -27,7 +28,8 @@ toku_rollback_fcreate (TXNID UU(xid),
BYTESTRING bs_fname,
TOKUTXN txn,
YIELDF yield,
void* yield_v)
void* yield_v,
LSN UU(oplsn))
{
yield(toku_checkpoint_safe_client_lock, yield_v);
char *fname = fixup_fname(&bs_fname);
......@@ -59,12 +61,21 @@ static int find_brt_from_filenum (OMTVALUE v, void *filenumvp) {
return 0;
}
static int do_insertion (enum brt_msg_type type, FILENUM filenum, BYTESTRING key, BYTESTRING *data,TOKUTXN txn) {
static int do_insertion (enum brt_msg_type type, FILENUM filenum, BYTESTRING key, BYTESTRING *data, TOKUTXN txn, LSN oplsn) {
CACHEFILE cf;
//printf("%s:%d committing insert %s %s\n", __FILE__, __LINE__, key.data, data.data);
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
assert(r==0);
OMTVALUE brtv=NULL;
r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL, NULL);
assert(r==0);
BRT brt = brtv;
LSN treelsn = toku_brt_checkpoint_lsn(brt);
if (oplsn.lsn != 0 && oplsn.lsn <= treelsn.lsn)
return 0;
DBT key_dbt,data_dbt;
XIDS xids = toku_txn_get_xids(txn);
BRT_MSG_S brtcmd = { type, xids,
......@@ -72,11 +83,7 @@ static int do_insertion (enum brt_msg_type type, FILENUM filenum, BYTESTRING key
data
? toku_fill_dbt(&data_dbt, data->data, data->len)
: toku_init_dbt(&data_dbt) }};
OMTVALUE brtv=NULL;
r = toku_omt_find_zero(txn->open_brts, find_brt_from_filenum, &filenum, &brtv, NULL, NULL);
assert(r==0);
BRT brt = brtv;
r = toku_brt_root_put_cmd(brt, &brtcmd);
return r;
}
......@@ -90,11 +97,11 @@ static int do_nothing_with_filenum(TOKUTXN txn, FILENUM filenum) {
}
int toku_commit_cmdinsert (FILENUM filenum, BYTESTRING key, TOKUTXN txn, YIELDF UU(yield), void *UU(yieldv)) {
int toku_commit_cmdinsert (FILENUM filenum, BYTESTRING key, TOKUTXN txn, YIELDF UU(yield), void *UU(yieldv), LSN oplsn) {
#if TOKU_DO_COMMIT_CMD_INSERT
return do_insertion (BRT_COMMIT_ANY, filenum, key, 0, txn);
return do_insertion (BRT_COMMIT_ANY, filenum, key, 0, txn, oplsn);
#else
key = key;
key = key; oplsn = oplsn;
return do_nothing_with_filenum(txn, filenum);
#endif
}
......@@ -105,12 +112,13 @@ toku_commit_cmdinsertboth (FILENUM filenum,
BYTESTRING data,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
void * UU(yieldv),
LSN oplsn)
{
#if TOKU_DO_COMMIT_CMD_INSERT
return do_insertion (BRT_COMMIT_BOTH, filenum, key, &data, txn);
return do_insertion (BRT_COMMIT_BOTH, filenum, key, &data, txn, oplsn);
#else
key = key; data = data;
key = key; data = data; oplsn = oplsn;
return do_nothing_with_filenum(txn, filenum);
#endif
}
......@@ -120,9 +128,10 @@ toku_rollback_cmdinsert (FILENUM filenum,
BYTESTRING key,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
void * UU(yieldv),
LSN oplsn)
{
return do_insertion (BRT_ABORT_ANY, filenum, key, 0, txn);
return do_insertion (BRT_ABORT_ANY, filenum, key, 0, txn, oplsn);
}
int
......@@ -131,9 +140,10 @@ toku_rollback_cmdinsertboth (FILENUM filenum,
BYTESTRING data,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
void * UU(yieldv),
LSN oplsn)
{
return do_insertion (BRT_ABORT_BOTH, filenum, key, &data, txn);
return do_insertion (BRT_ABORT_BOTH, filenum, key, &data, txn, oplsn);
}
int
......@@ -142,10 +152,11 @@ toku_commit_cmddeleteboth (FILENUM filenum,
BYTESTRING data,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
void * UU(yieldv),
LSN oplsn)
{
#if TOKU_DO_COMMIT_CMD_DELETE_BOTH
return do_insertion (BRT_COMMIT_BOTH, filenum, key, &data, txn);
return do_insertion (BRT_COMMIT_BOTH, filenum, key, &data, txn, oplsn);
#else
xid = xid; key = key; data = data;
return do_nothing_with_filenum(txn, filenum);
......@@ -158,9 +169,10 @@ toku_rollback_cmddeleteboth (FILENUM filenum,
BYTESTRING data,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
void * UU(yieldv),
LSN oplsn)
{
return do_insertion (BRT_ABORT_BOTH, filenum, key, &data, txn);
return do_insertion (BRT_ABORT_BOTH, filenum, key, &data, txn, oplsn);
}
int
......@@ -168,10 +180,11 @@ toku_commit_cmddelete (FILENUM filenum,
BYTESTRING key,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
void * UU(yieldv),
LSN oplsn)
{
#if TOKU_DO_COMMIT_CMD_DELETE
return do_insertion (BRT_COMMIT_ANY, filenum, key, 0, txn);
return do_insertion (BRT_COMMIT_ANY, filenum, key, 0, txn, oplsn);
#else
xid = xid; key = key;
return do_nothing_with_filenum(txn, filenum);
......@@ -183,16 +196,18 @@ toku_rollback_cmddelete (FILENUM filenum,
BYTESTRING key,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
void * UU(yieldv),
LSN oplsn)
{
return do_insertion (BRT_ABORT_ANY, filenum, key, 0, txn);
return do_insertion (BRT_ABORT_ANY, filenum, key, 0, txn, oplsn);
}
int
toku_commit_fileentries (int fd,
TOKUTXN txn,
YIELDF yield,
void * yieldv)
void * yieldv,
LSN oplsn)
{
BREAD f = create_bread_from_fd_initialize_at(fd);
int r=0;
......@@ -202,7 +217,7 @@ toku_commit_fileentries (int fd,
struct roll_entry *item;
r = toku_read_rollback_backwards(f, &item, ma);
if (r!=0) goto finish;
r = toku_commit_rollback_item(txn, item, yield, yieldv);
r = toku_commit_rollback_item(txn, item, yield, yieldv, oplsn);
if (r!=0) goto finish;
memarena_clear(ma);
count++;
......@@ -218,7 +233,8 @@ int
toku_rollback_fileentries (int fd,
TOKUTXN txn,
YIELDF yield,
void * yieldv)
void * yieldv,
LSN oplsn)
{
BREAD f = create_bread_from_fd_initialize_at(fd);
assert(f);
......@@ -229,7 +245,7 @@ toku_rollback_fileentries (int fd,
struct roll_entry *item;
r = toku_read_rollback_backwards(f, &item, ma);
if (r!=0) goto finish;
r = toku_abort_rollback_item(txn, item, yield, yieldv);
r = toku_abort_rollback_item(txn, item, yield, yieldv, oplsn);
if (r!=0) goto finish;
memarena_clear(ma);
count++;
......@@ -245,12 +261,13 @@ int
toku_commit_rollinclude (BYTESTRING bs,
TOKUTXN txn,
YIELDF yield,
void * yieldv) {
void * yieldv,
LSN oplsn) {
int r;
char *fname = fixup_fname(&bs);
int fd = open(fname, O_RDONLY+O_BINARY);
assert(fd>=0);
r = toku_commit_fileentries(fd, txn, yield, yieldv);
r = toku_commit_fileentries(fd, txn, yield, yieldv, oplsn);
assert(r==0);
r = close(fd);
assert(r==0);
......@@ -263,13 +280,14 @@ int
toku_rollback_rollinclude (BYTESTRING bs,
TOKUTXN txn,
YIELDF yield,
void * yieldv)
void * yieldv,
LSN oplsn)
{
int r;
char *fname = fixup_fname(&bs);
int fd = open(fname, O_RDONLY+O_BINARY);
assert(fd>=0);
r = toku_rollback_fileentries(fd, txn, yield, yieldv);
r = toku_rollback_fileentries(fd, txn, yield, yieldv, oplsn);
assert(r==0);
r = close(fd);
assert(r==0);
......@@ -282,7 +300,8 @@ int
toku_rollback_tablelock_on_empty_table (FILENUM filenum,
TOKUTXN txn,
YIELDF yield,
void* yield_v)
void* yield_v,
LSN UU(oplsn))
{
yield(toku_checkpoint_safe_client_lock, yield_v);
// on rollback we have to make the file be empty, since we locked an empty table, and then may have done things to it.
......@@ -307,7 +326,7 @@ toku_rollback_tablelock_on_empty_table (FILENUM filenum,
}
int
toku_commit_tablelock_on_empty_table (FILENUM filenum, TOKUTXN txn, YIELDF UU(yield), void* UU(yield_v))
toku_commit_tablelock_on_empty_table (FILENUM filenum, TOKUTXN txn, YIELDF UU(yield), void* UU(yield_v), LSN UU(oplsn))
{
return do_nothing_with_filenum(txn, filenum);
}
......@@ -7,15 +7,15 @@
static void note_txn_closing (TOKUTXN txn);
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv) {
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn) {
int r=0;
rolltype_dispatch_assign(item, toku_commit_, r, txn, yield, yieldv);
rolltype_dispatch_assign(item, toku_commit_, r, txn, yield, yieldv, lsn);
return r;
}
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv) {
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn) {
int r=0;
rolltype_dispatch_assign(item, toku_rollback_, r, txn, yield, yieldv);
rolltype_dispatch_assign(item, toku_rollback_, r, txn, yield, yieldv, lsn);
if (r!=0) return r;
return 0;
}
......@@ -104,7 +104,7 @@ static int note_brt_used_in_txns_parent(OMTVALUE brtv, u_int32_t UU(index), void
return r;
}
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv) {
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
int r=0;
if (txn->parent!=0) {
// First we must put a rollinclude entry into the parent if we have a rollentry file.
......@@ -169,7 +169,7 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv) {
int count=0;
while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
r = toku_commit_rollback_item(txn, item, yield, yieldv);
r = toku_commit_rollback_item(txn, item, yield, yieldv, lsn);
if (r!=0) return r;
count++;
if (count%2 == 0) yield(NULL, yieldv);
......@@ -178,19 +178,19 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv) {
// Read stuff out of the file and execute it.
if (txn->rollentry_filename) {
r = toku_commit_fileentries(txn->rollentry_fd, txn, yield, yieldv);
r = toku_commit_fileentries(txn->rollentry_fd, txn, yield, yieldv, lsn);
}
}
return r;
}
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv) {
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
struct roll_entry *item;
int count=0;
int r=0;
while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
r = toku_abort_rollback_item(txn, item, yield, yieldv);
r = toku_abort_rollback_item(txn, item, yield, yieldv, lsn);
if (r!=0)
return r;
count++;
......@@ -198,7 +198,7 @@ int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv) {
}
// Read stuff out of the file and roll it back.
if (txn->rollentry_filename) {
r = toku_rollback_fileentries(txn->rollentry_fd, txn, yield, yieldv);
r = toku_rollback_fileentries(txn->rollentry_fd, txn, yield, yieldv, lsn);
assert(r==0);
}
return 0;
......
......@@ -9,12 +9,12 @@
// these routines in rollback.c
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv);
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv);
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn);
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn);
void toku_rollback_txn_close (TOKUTXN txn);
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv);
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn);
void *toku_malloc_in_rollback(TOKUTXN txn, size_t size);
void *toku_memdup_in_rollback(TOKUTXN txn, const void *v, size_t len);
......@@ -29,8 +29,8 @@ int toku_logger_txn_rolltmp_raw_count(TOKUTXN txn, u_int64_t *raw_count);
int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr);
// these routines in roll.c
int toku_rollback_fileentries (int fd, TOKUTXN txn, YIELDF yield, void *yieldv);
int toku_commit_fileentries (int fd, TOKUTXN txn, YIELDF yield,void *yieldv);
int toku_rollback_fileentries (int fd, TOKUTXN txn, YIELDF yield, void *yieldv, LSN lsn);
int toku_commit_fileentries (int fd, TOKUTXN txn, YIELDF yield,void *yieldv, LSN lsn);
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
int find_xid (OMTVALUE v, void *txnv);
......
......@@ -7,58 +7,7 @@
#include "txn.h"
int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger) {
if (logger->is_panicked) return EINVAL;
TAGMALLOC(TOKUTXN, result);
if (result==0)
return errno;
int r;
r = toku_log_xbegin(logger, &result->first_lsn, 0, parent_tokutxn ? parent_tokutxn->txnid64 : 0);
if (r!=0) goto died;
r = toku_omt_create(&result->open_brts);
if (r!=0) goto died;
result->txnid64 = result->first_lsn.lsn;
XIDS parent_xids;
if (parent_tokutxn==NULL)
parent_xids = xids_get_root_xids();
else
parent_xids = parent_tokutxn->xids;
if ((r=xids_create_child(parent_xids, &result->xids, result->txnid64)))
goto died;
result->logger = logger;
result->parent = parent_tokutxn;
result->oldest_logentry = result->newest_logentry = 0;
result->rollentry_arena = memarena_create();
if (toku_omt_size(logger->live_txns) == 0) {
assert(logger->oldest_living_xid == TXNID_NONE_LIVING);
logger->oldest_living_xid = result->txnid64;
}
assert(logger->oldest_living_xid <= result->txnid64);
{
//Add txn to list (omt) of live transactions
u_int32_t idx;
r = toku_omt_insert(logger->live_txns, result, find_xid, result, &idx);
if (r!=0) goto died;
if (logger->oldest_living_xid == result->txnid64)
assert(idx == 0);
else
assert(idx > 0);
}
result->rollentry_resident_bytecount=0;
result->rollentry_raw_count = 0;
result->rollentry_filename = 0;
result->rollentry_fd = -1;
result->rollentry_filesize = 0;
*tokutxn = result;
return 0;
died:
// TODO memory leak
toku_logger_panic(logger, r);
return r;
return toku_txn_begin_with_xid(parent_tokutxn, tokutxn, logger, 0);
}
int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger, TXNID xid) {
......@@ -67,6 +16,10 @@ int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGE
if (result==0)
return errno;
int r;
if (xid == 0) {
r = toku_log_xbegin(logger, &result->first_lsn, 0, parent_tokutxn ? parent_tokutxn->txnid64 : 0);
if (r!=0) goto died;
} else
result->first_lsn.lsn = xid;
r = toku_omt_create(&result->open_brts);
if (r!=0) goto died;
......@@ -109,6 +62,7 @@ int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGE
result->rollentry_filesize = 0;
*tokutxn = result;
return 0;
died:
// TODO memory leak
toku_logger_panic(logger, r);
......@@ -117,18 +71,26 @@ died:
// Doesn't close the txn, just performs the commit operations.
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void*yieldv) {
int toku_txn_commit_txn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv) {
return toku_txn_commit_with_lsn(txn, nosync, yield, yieldv, ZERO_LSN);
}
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn) {
int r;
// panic handled in log_commit
r = toku_log_commit(txn->logger, (LSN*)0, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks.
if (r!=0)
return r;
r = toku_rollback_commit(txn, yield, yieldv);
r = toku_rollback_commit(txn, yield, yieldv, oplsn);
return r;
}
// Doesn't close the txn, just performs the abort operations.
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void*yieldv) {
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv) {
return toku_txn_abort_with_lsn(txn, yield, yieldv, ZERO_LSN);
}
int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn) {
//printf("%s:%d aborting\n", __FILE__, __LINE__);
// Must undo everything. Must undo it all in reverse order.
// Build the reverse list
......@@ -137,7 +99,7 @@ int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void*yieldv) {
r = toku_log_xabort(txn->logger, (LSN*)0, 0, txn->txnid64);
if (r!=0)
return r;
r = toku_rollback_abort(txn, yield, yieldv);
r = toku_rollback_abort(txn, yield, yieldv, oplsn);
return r;
}
......@@ -155,6 +117,10 @@ BOOL toku_txnid_older(TXNID a, TXNID b) {
return (BOOL)(a < b); // TODO need modulo 64 arithmetic
}
BOOL toku_txnid_newer(TXNID a, TXNID b) {
return (BOOL)(a > b); // TODO need modulo 64 arithmetic
}
BOOL toku_txnid_eq(TXNID a, TXNID b) {
return (BOOL)(a == b);
}
......@@ -7,8 +7,13 @@
int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger);
int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger, TXNID xid);
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void*yieldv);
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void*yieldv);
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv);
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn);
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv);
int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn);
void toku_txn_close_txn(TOKUTXN txn);
XIDS toku_txn_get_xids (TOKUTXN);
......
......@@ -8,18 +8,12 @@ const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG
char *namea="a.db";
static void checkpoint_callback_2_do_abort(void *UU(extra)) {
abort();
}
static void run_test (void) {
int r;
system("rm -rf " ENVDIR);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
db_env_set_checkpoint_callback2(checkpoint_callback_2_do_abort, NULL);
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
......@@ -84,6 +78,8 @@ static void run_recover (void) {
r = cursor->c_close(cursor); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
r = db->close(db, 0); CKERR(r);
r = env->close(env, 0); CKERR(r);
exit(0);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment