Commit 62a40e87 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

halfway broken rollback for delete. Addresses #27.

git-svn-id: file:///svn/tokudb@2431 c7de825b-a66e-492c-adef-691d508d4ae1
parent 1e1cfe2f
...@@ -1743,7 +1743,13 @@ int toku_brt_lookup (BRT brt, DBT *k, DBT *v) { ...@@ -1743,7 +1743,13 @@ int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) { int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
int r; int r;
DBT val; DBT val;
printf("removing\n");
BRT_CMD_S brtcmd = { BRT_DELETE, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}}; BRT_CMD_S brtcmd = { BRT_DELETE, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}};
{
const BYTESTRING deletedkey = { key->size, toku_memdup(key->data, key->size) };
r = toku_logger_save_rollback_delete(txn, toku_cachefile_filenum(brt->cf), deletedkey);
if (r!=0) return r;
}
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn)); r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r; return r;
} }
......
...@@ -42,6 +42,9 @@ const struct logtype rollbacks[] = { ...@@ -42,6 +42,9 @@ const struct logtype rollbacks[] = {
{"fcreate", 'F', FA{{"BYTESTRING", "fname", 0}, {"fcreate", 'F', FA{{"BYTESTRING", "fname", 0},
NULLFIELD}}, NULLFIELD}},
{"delete", 'd', FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback. {"delete", 'd', FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.
{"BYTESTRING", "key", 0},
NULLFIELD}},
{"deleteboth", 'D', FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.
{"BYTESTRING", "key", 0}, {"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0}, {"BYTESTRING", "data", 0},
NULLFIELD}}, NULLFIELD}},
...@@ -49,6 +52,7 @@ const struct logtype rollbacks[] = { ...@@ -49,6 +52,7 @@ const struct logtype rollbacks[] = {
{"BYTESTRING", "key", 0}, {"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0}, {"BYTESTRING", "data", 0},
NULLFIELD}}, NULLFIELD}},
{0,0,FA{NULLFIELD}}
}; };
const struct logtype logtypes[] = { const struct logtype logtypes[] = {
...@@ -212,7 +216,7 @@ void generate_enum_internal (char *enum_name, char *enum_prefix, const struct lo ...@@ -212,7 +216,7 @@ void generate_enum_internal (char *enum_name, char *enum_prefix, const struct lo
count++; count++;
fprintf(hf, "\n"); fprintf(hf, "\n");
fprintf(hf," %s_%-16s = '%c'", enum_prefix, lt->name, cmd); fprintf(hf," %s_%-16s = '%c'", enum_prefix, lt->name, cmd);
if (used_cmds[cmd]!=0) { fprintf(stderr, "%s:%d Command %d (%c) was used twice\n", __FILE__, __LINE__, cmd, cmd); abort(); } if (used_cmds[cmd]!=0) { fprintf(stderr, "%s:%d Command %d (%c) was used twice (second time for %s)\n", __FILE__, __LINE__, cmd, cmd, lt->name); abort(); }
used_cmds[cmd]=1; used_cmds[cmd]=1;
})); }));
fprintf(hf, "\n};\n\n"); fprintf(hf, "\n};\n\n");
......
...@@ -870,7 +870,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v, ...@@ -870,7 +870,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
const BYTESTRING deleteddata = { kv->vallen, toku_memdup(kv_pair_val(kv), kv->vallen) }; const BYTESTRING deleteddata = { kv->vallen, toku_memdup(kv_pair_val(kv), kv->vallen) };
TOKUTXN txn; TOKUTXN txn;
if (0!=toku_txnid2txn(logger, xid, &txn)) return -1; if (0!=toku_txnid2txn(logger, xid, &txn)) return -1;
r=toku_logger_save_rollback_delete(txn, pma->filenum, deletedkey, deleteddata); r=toku_logger_save_rollback_deleteboth(txn, pma->filenum, deletedkey, deleteddata);
if (r!=0) { if (r!=0) {
toku_free(deletedkey.data); toku_free(deletedkey.data);
toku_free(deleteddata.data); toku_free(deleteddata.data);
......
...@@ -52,7 +52,7 @@ void toku_recover_cleanup (void) { ...@@ -52,7 +52,7 @@ void toku_recover_cleanup (void) {
} }
} }
int toku_recover_note_cachefile (FILENUM fnum, CACHEFILE cf) { int toku_recover_note_cachefile (FILENUM fnum, CACHEFILE cf, BRT brt) {
if (max_cf_pairs==0) { if (max_cf_pairs==0) {
n_cf_pairs=1; n_cf_pairs=1;
max_cf_pairs=2; max_cf_pairs=2;
...@@ -67,7 +67,7 @@ int toku_recover_note_cachefile (FILENUM fnum, CACHEFILE cf) { ...@@ -67,7 +67,7 @@ int toku_recover_note_cachefile (FILENUM fnum, CACHEFILE cf) {
} }
cf_pairs[n_cf_pairs-1].filenum = fnum; cf_pairs[n_cf_pairs-1].filenum = fnum;
cf_pairs[n_cf_pairs-1].cf = cf; cf_pairs[n_cf_pairs-1].cf = cf;
cf_pairs[n_cf_pairs-1].brt = 0; cf_pairs[n_cf_pairs-1].brt = brt;
return 0; return 0;
} }
...@@ -358,7 +358,7 @@ void toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM ...@@ -358,7 +358,7 @@ void toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM
assert(r==0); assert(r==0);
brt->skey = brt->sval = 0; brt->skey = brt->sval = 0;
brt->cf=cf; brt->cf=cf;
toku_recover_note_cachefile(filenum, cf); toku_recover_note_cachefile(filenum, cf, brt);
toku_free_BYTESTRING(fname); toku_free_BYTESTRING(fname);
} }
...@@ -411,6 +411,20 @@ void toku_recover_deleteinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKO ...@@ -411,6 +411,20 @@ void toku_recover_deleteinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKO
toku_free_BYTESTRING(databs); toku_free_BYTESTRING(databs);
} }
int toku_rollback_deleteboth (FILENUM filenum,
BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
CACHEFILE cf;
BRT brt;
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt);
assert(r==0);
DBT key_dbt,data_dbt;
r = toku_brt_insert(brt,
toku_fill_dbt(&key_dbt, key.data, key.len),
toku_fill_dbt(&data_dbt, data.data, data.len),
txn);
return r;
}
int toku_rollback_delete (FILENUM filenum, int toku_rollback_delete (FILENUM filenum,
BYTESTRING key,BYTESTRING data,TOKUTXN txn) { BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
CACHEFILE cf; CACHEFILE cf;
...@@ -435,7 +449,7 @@ int toku_rollback_insert (FILENUM filenum, ...@@ -435,7 +449,7 @@ int toku_rollback_insert (FILENUM filenum,
r = toku_brt_delete_both(brt, r = toku_brt_delete_both(brt,
toku_fill_dbt(&key_dbt, key.data, key.len), toku_fill_dbt(&key_dbt, key.data, key.len),
toku_fill_dbt(&data_dbt, data.data, data.len), toku_fill_dbt(&data_dbt, data.data, data.len),
txn); 0);
return r; return r;
} }
......
...@@ -191,7 +191,7 @@ make_libs: ...@@ -191,7 +191,7 @@ make_libs:
clean: clean:
rm -f $(ALL_TESTS) *.o *.gcno *.gcda *.gcov rm -f $(ALL_TESTS) *.o *.gcno *.gcda *.gcov
rm -rf dir.*.tdb dir.*.bdb rm -rf dir.*.tdb dir.*.bdb dir.*.tdb.recover
cleanall: clean cleanall: clean
rm -f *.tdbrun rm -f *.tdbrun
......
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/stat.h>
#include <db.h>
#include <arpa/inet.h>
#include "test.h"
void test_txn_abort(int n) {
if (verbose) printf("test_txn_abort:%d\n", n);
system("rm -rf " DIR);
mkdir(DIR, 0777);
int r;
DB_ENV *env;
r = db_env_create(&env, 0); assert(r == 0);
r = env->set_data_dir(env, DIR);
r = env->set_lg_dir(env, DIR);
r = env->open(env, 0, DB_INIT_MPOOL + DB_INIT_LOG + DB_INIT_LOCK + DB_INIT_TXN + DB_PRIVATE + DB_CREATE, 0777);
if (r != 0) printf("%s:%d:%d:%s\n", __FILE__, __LINE__, r, db_strerror(r));
assert(r == 0);
DB_TXN *txn = 0;
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
DB *db;
r = db_create(&db, env, 0); assert(r == 0);
r = db->open(db, txn, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0);
r = txn->commit(txn, 0); assert(r == 0);
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
int i;
for (i=0; i<n; i++) {
DBT key, val;
int i2=htonl(i*2);
r = db->put(db, txn, dbt_init(&key, &i2, sizeof i2), dbt_init(&val, &i, sizeof i), 0);
if (r != 0) printf("%s:%d:%d:%s\n", __FILE__, __LINE__, r, db_strerror(r));
assert(r == 0);
}
r = txn->commit(txn, 0);
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
for (i=0; i<n; i++) {
DBT key;
int i2=htonl(i*2);
r = db->del(db, txn, dbt_init(&key, &i2, sizeof i2), 0);
if (r != 0) printf("%s:%d:%d:%s\n", __FILE__, __LINE__, r, db_strerror(r));
assert(r == 0);
}
r = txn->abort(txn);
if (r != 0) printf("%s:%d:abort:%d\n", __FILE__, __LINE__, r);
assert(r == 0);
/* walk the db, even numbers should be there */
r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
DBC *cursor;
r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
DBT key; memset(&key, 0, sizeof key);
DBT val; memset(&val, 0, sizeof val);
for (i=0; 1; i++) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
if (r!=0) break;
printf("%d present\n", ntohl(*(int*)key.data));
assert(key.size==4);
assert(ntohl(*(int*)key.data)==2*i);
}
assert(i==n);
r = cursor->c_close(cursor); assert(r == 0);
r = txn->commit(txn, 0);
r = db->close(db, 0); assert(r == 0);
r = env->close(env, 0); assert(r == 0);
}
int main(int argc, char *argv[]) {
int i;
for (i = 1; i < argc; i++) {
char *arg = argv[i];
if (strcmp(arg, "-v") == 0 || strcmp(arg, "--verbose") == 0) {
verbose++;
continue;
}
}
for (i=1; i<100; i++)
test_txn_abort(i);
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment