Commit 1e1cfe2f authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Rollback during abort. Fixes #448.

git-svn-id: file:///svn/tokudb@2423 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7f67f720
......@@ -1391,7 +1391,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
} else
goto died0a;
}
r=toku_cachetable_openfd(&t->cf, cachetable, fd);
r=toku_cachetable_openfd(&t->cf, cachetable, fd, t);
if (r != 0) goto died0a;
toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf));
}
......@@ -1715,6 +1715,12 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
int r;
BRT_CMD_S brtcmd = { BRT_INSERT, toku_txn_get_txnid(txn), .u.id={key,val}};
{
const BYTESTRING insertedkey = { key->size, toku_memdup(key->data, key->size) };
const BYTESTRING inserteddata = { val->size, toku_memdup(val->data, val->size) };
r = toku_logger_save_rollback_insert(txn, toku_cachefile_filenum(brt->cf), insertedkey, inserteddata);
if (r!=0) return r;
}
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r;
}
......
......@@ -84,7 +84,7 @@ static void test0 (void) {
r=toku_create_cachetable(&t, 5, ZERO_LSN, NULL_LOGGER);
assert(r==0);
unlink(fname);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777, (BRT)0);
assert(r==0);
expect_f = f;
......@@ -208,7 +208,7 @@ static void test_nested_pin (void) {
r = toku_create_cachetable(&t, 1, ZERO_LSN, NULL_LOGGER);
assert(r==0);
unlink(fname);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777, (BRT)0);
assert(r==0);
expect_f = f;
......@@ -269,10 +269,10 @@ static void test_multi_filehandles (void) {
unlink(fname2);
r = toku_create_cachetable(&t, 4, ZERO_LSN, NULL_LOGGER); assert(r==0);
r = toku_cachetable_openf(&f1, t, fname1, O_RDWR|O_CREAT, 0777); assert(r==0);
r = toku_cachetable_openf(&f1, t, fname1, O_RDWR|O_CREAT, 0777, (BRT)0); assert(r==0);
r = link(fname1, fname2); assert(r==0);
r = toku_cachetable_openf(&f2, t, fname2, O_RDWR|O_CREAT, 0777); assert(r==0);
r = toku_cachetable_openf(&f3, t, fname3, O_RDWR|O_CREAT, 0777); assert(r==0);
r = toku_cachetable_openf(&f2, t, fname2, O_RDWR|O_CREAT, 0777, (BRT)0); assert(r==0);
r = toku_cachetable_openf(&f3, t, fname3, O_RDWR|O_CREAT, 0777, (BRT)0); assert(r==0);
assert(f1==f2);
assert(f1!=f3);
......@@ -318,7 +318,7 @@ static void test_dirty() {
char *fname = "test.dat";
unlink(fname);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777, (BRT)0);
assert(r == 0);
key = 1; value = (void*)1;
......@@ -426,7 +426,7 @@ static void test_size_resize() {
char *fname = "test.dat";
unlink(fname);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777, (BRT)0);
assert(r == 0);
CACHEKEY key = 42;
......@@ -477,7 +477,7 @@ static void test_size_flush() {
char *fname = "test.dat";
unlink(fname);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777, (BRT)0);
assert(r == 0);
/* put 2*n keys into the table, ensure flushes occur in key order */
......@@ -568,7 +568,7 @@ static void test_rename (void) {
const char fname[] = "ct-test-rename.dat";
r=toku_create_cachetable(&t, KEYLIMIT, ZERO_LSN, NULL_LOGGER); assert(r==0);
unlink(fname);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777, (BRT)0);
assert(r==0);
for (i=0; i<TRIALLIMIT; i++) {
......
......@@ -112,7 +112,7 @@ static void test_chaining (void) {
r = snprintf(fname[i], FILENAME_LEN, "cachetabletest2.%ld.dat", i);
assert(r>0 && r<FILENAME_LEN);
unlink(fname[i]);
r = toku_cachetable_openf(&f[i], ct, fname[i], O_RDWR|O_CREAT, 0777); assert(r==0);
r = toku_cachetable_openf(&f[i], ct, fname[i], O_RDWR|O_CREAT, 0777, (BRT)0); assert(r==0);
}
for (i=0; i<N_PRESENT_LIMIT; i++) {
int fnum = i%N_FILES;
......@@ -161,7 +161,7 @@ static void test_chaining (void) {
CACHEFILE oldcf=f[i];
r = toku_cachefile_close(&f[i]); assert(r==0);
file_is_not_present(oldcf);
r = toku_cachetable_openf(&f[i], ct, fname[i], O_RDWR, 0777); assert(r==0);
r = toku_cachetable_openf(&f[i], ct, fname[i], O_RDWR, 0777, (BRT)0); assert(r==0);
}
}
for (i=0; i<N_FILES; i++) {
......
......@@ -7,6 +7,7 @@
#include "primes.h"
#include "toku_assert.h"
#include "yerror.h"
#include "brt-internal.h"
#include <errno.h>
#include <stdio.h>
......@@ -66,6 +67,7 @@ struct cachefile {
CACHETABLE cachetable;
struct fileid fileid;
FILENUM filenum;
BRT brt;
};
int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn, TOKULOGGER logger) {
......@@ -93,13 +95,18 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn,
int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf, BRT *brt) {
CACHEFILE extant;
for (extant = t->cachefiles; extant; extant=extant->next) {
if (extant->filenum.fileid==filenum.fileid) { *cf = extant; return 0; }
if (extant->filenum.fileid==filenum.fileid) {
*cf = extant;
assert(extant->brt);
assert(extant->brt->cf==extant);
*brt = extant->brt;
return 0;
}
}
*brt=0; // This is wrong. But the tests will notice right away.
return ENOENT;
}
int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd) {
int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, BRT brt) {
int r;
CACHEFILE extant;
FILENUM max_filenum_in_use={0};
......@@ -128,16 +135,17 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd) {
newcf->fd = fd;
newcf->cachetable = t;
newcf->fileid = fileid;
newcf->brt = brt;
t->cachefiles = newcf;
*cf = newcf;
return 0;
}
}
int toku_cachetable_openf (CACHEFILE *cf, CACHETABLE t, const char *fname, int flags, mode_t mode) {
int toku_cachetable_openf (CACHEFILE *cf, CACHETABLE t, const char *fname, int flags, mode_t mode, BRT brt) {
int fd = open(fname, flags, mode);
if (fd<0) return errno;
return toku_cachetable_openfd (cf, t, fd);
return toku_cachetable_openfd (cf, t, fd, brt);
}
static CACHEFILE remove_cf_from_list (CACHEFILE cf, CACHEFILE list) {
......
......@@ -25,8 +25,8 @@ typedef long long CACHEKEY;
*/
int toku_create_cachetable(CACHETABLE */*result*/, long size_limit, LSN initial_lsn, TOKULOGGER);
int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, int flags, mode_t mode);
int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/);
int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, int flags, mode_t mode, BRT);
int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/, BRT brt);
typedef void (cachetable_flush_func_t)(CACHEFILE, CACHEKEY key, void*value, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p);
typedef cachetable_flush_func_t *CACHETABLE_FLUSH_FUNC_T;
......
......@@ -41,7 +41,11 @@ int logformat_version_number = 0;
const struct logtype rollbacks[] = {
{"fcreate", 'F', FA{{"BYTESTRING", "fname", 0},
NULLFIELD}},
{"delete", 'K', FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.
{"delete", 'd', FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
{"insert", 'i', FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
......
......@@ -346,10 +346,19 @@ void toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM
CACHEFILE cf;
int fd = open(fixedfname, O_RDWR, 0);
assert(fd>=0);
int r = toku_cachetable_openfd(&cf, ct, fd);
assert(r==0);
BRT MALLOC(brt);
assert(errno==0 && brt!=0);
brt->database_name = fixedfname;
brt->h=0;
list_init(&brt->cursors);
brt->compare_fun = 0;
brt->dup_compare = 0;
brt->db = 0;
int r = toku_cachetable_openfd(&cf, ct, fd, brt);
assert(r==0);
brt->skey = brt->sval = 0;
brt->cf=cf;
toku_recover_note_cachefile(filenum, cf);
toku_free(fixedfname);
toku_free_BYTESTRING(fname);
}
......@@ -416,6 +425,20 @@ int toku_rollback_delete (FILENUM filenum,
return r;
}
int toku_rollback_insert (FILENUM filenum,
BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
CACHEFILE cf;
BRT brt;
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt);
assert(r==0);
DBT key_dbt,data_dbt;
r = toku_brt_delete_both(brt,
toku_fill_dbt(&key_dbt, key.data, key.len),
toku_fill_dbt(&data_dbt, data.data, data.len),
txn);
return r;
}
// a newbrtnode should have been done before this
void toku_recover_resizepma (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t oldsize, u_int32_t newsize) {
struct cf_pair *pair;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment