From 62a40e87e0d202dd461a4e3aedbc7551b217fbdc Mon Sep 17 00:00:00 2001
From: "Bradley C. Kuszmaul" <bradley@tokutek.com>
Date: Wed, 27 Feb 2008 07:14:03 +0000
Subject: [PATCH] halfway broken rollback for delete.  Addresses #27.

git-svn-id: file:///svn/tokudb@2431 c7de825b-a66e-492c-adef-691d508d4ae1
---
 newbrt/brt.c                 |  6 +++
 newbrt/logformat.c           |  8 +++-
 newbrt/pma.c                 |  2 +-
 newbrt/roll.c                | 24 ++++++++--
 src/tests/Makefile           |  2 +-
 src/tests/test_txn_abort5a.c | 90 ++++++++++++++++++++++++++++++++++++
 6 files changed, 123 insertions(+), 9 deletions(-)
 create mode 100644 src/tests/test_txn_abort5a.c

diff --git a/newbrt/brt.c b/newbrt/brt.c
index 87614a8a15..ab898a0973 100644
--- a/newbrt/brt.c
+++ b/newbrt/brt.c
@@ -1743,7 +1743,13 @@ int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
 int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
     int r;
     DBT val;
+    printf("removing\n");
     BRT_CMD_S brtcmd = { BRT_DELETE, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}};
+    {
+	const BYTESTRING deletedkey  =  { key->size, toku_memdup(key->data, key->size) };
+	r = toku_logger_save_rollback_delete(txn, toku_cachefile_filenum(brt->cf), deletedkey);
+	if (r!=0) return r;
+    }
     r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
     return r;
 }
diff --git a/newbrt/logformat.c b/newbrt/logformat.c
index 478281e5ee..e55554c14c 100644
--- a/newbrt/logformat.c
+++ b/newbrt/logformat.c
@@ -43,12 +43,16 @@ const struct logtype rollbacks[] = {
 			NULLFIELD}},
     {"delete",    'd', FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.  
 			  {"BYTESTRING", "key", 0},
-			  {"BYTESTRING", "data", 0},
 			  NULLFIELD}},
+    {"deleteboth", 'D', FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.  
+			   {"BYTESTRING", "key", 0},
+			   {"BYTESTRING", "data", 0},
+			   NULLFIELD}},
     {"insert",    'i', FA{{"FILENUM", "filenum", 0}, // Note a delete for rollback.  
 			  {"BYTESTRING", "key", 0},
 			  {"BYTESTRING", "data", 0},
 			  NULLFIELD}},
+    {0,0,FA{NULLFIELD}}
 };
 
 const struct logtype logtypes[] = {
@@ -212,7 +216,7 @@ void generate_enum_internal (char *enum_name, char *enum_prefix, const struct lo
 		    count++;
 		    fprintf(hf, "\n");
 		    fprintf(hf,"    %s_%-16s = '%c'", enum_prefix, lt->name, cmd);
-		    if (used_cmds[cmd]!=0) { fprintf(stderr, "%s:%d Command %d (%c) was used twice\n", __FILE__, __LINE__, cmd, cmd); abort(); }
+		    if (used_cmds[cmd]!=0) { fprintf(stderr, "%s:%d Command %d (%c) was used twice (second time for %s)\n", __FILE__, __LINE__, cmd, cmd, lt->name); abort(); }
 		    used_cmds[cmd]=1;
 		}));
     fprintf(hf, "\n};\n\n");
diff --git a/newbrt/pma.c b/newbrt/pma.c
index 0886a56d47..88e3555f66 100644
--- a/newbrt/pma.c
+++ b/newbrt/pma.c
@@ -870,7 +870,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
 		const BYTESTRING deleteddata = { kv->vallen, toku_memdup(kv_pair_val(kv), kv->vallen) };
 		TOKUTXN txn;
 		if (0!=toku_txnid2txn(logger, xid, &txn)) return -1;
-		r=toku_logger_save_rollback_delete(txn, pma->filenum, deletedkey, deleteddata);
+		r=toku_logger_save_rollback_deleteboth(txn, pma->filenum, deletedkey, deleteddata);
 		if (r!=0) {
 		    toku_free(deletedkey.data);
 		    toku_free(deleteddata.data);
diff --git a/newbrt/roll.c b/newbrt/roll.c
index e208d90d54..873ff4b840 100644
--- a/newbrt/roll.c
+++ b/newbrt/roll.c
@@ -52,7 +52,7 @@ void toku_recover_cleanup (void) {
     }
 }
 
-int toku_recover_note_cachefile (FILENUM fnum, CACHEFILE cf) {
+int toku_recover_note_cachefile (FILENUM fnum, CACHEFILE cf, BRT brt) {
     if (max_cf_pairs==0) {
 	n_cf_pairs=1;
 	max_cf_pairs=2;
@@ -67,7 +67,7 @@ int toku_recover_note_cachefile (FILENUM fnum, CACHEFILE cf) {
     }
     cf_pairs[n_cf_pairs-1].filenum = fnum;
     cf_pairs[n_cf_pairs-1].cf      = cf;
-    cf_pairs[n_cf_pairs-1].brt     = 0;
+    cf_pairs[n_cf_pairs-1].brt     = brt;
     return 0;
 }
 
@@ -358,7 +358,7 @@ void toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM
     assert(r==0);
     brt->skey = brt->sval = 0;
     brt->cf=cf;
-    toku_recover_note_cachefile(filenum, cf);
+    toku_recover_note_cachefile(filenum, cf, brt);
     toku_free_BYTESTRING(fname);
 }
 
@@ -411,8 +411,22 @@ void toku_recover_deleteinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKO
     toku_free_BYTESTRING(databs);
 }
 
+int toku_rollback_deleteboth (FILENUM filenum,
+			      BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
+    CACHEFILE cf;
+    BRT brt;
+    int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt);
+    assert(r==0);
+    DBT key_dbt,data_dbt;
+    r = toku_brt_insert(brt,
+			toku_fill_dbt(&key_dbt, key.data, key.len),
+			toku_fill_dbt(&data_dbt, data.data, data.len),
+			txn);
+    return r;
+}
+
 int toku_rollback_delete (FILENUM filenum,
-			  BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
+			      BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
     CACHEFILE cf;
     BRT brt;
     int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt);
@@ -435,7 +449,7 @@ int toku_rollback_insert (FILENUM filenum,
     r = toku_brt_delete_both(brt,
 			     toku_fill_dbt(&key_dbt, key.data, key.len),
 			     toku_fill_dbt(&data_dbt, data.data, data.len),
-			     txn);
+			     0);
     return r;
 }
 
diff --git a/src/tests/Makefile b/src/tests/Makefile
index 9205666b69..ba326034c8 100644
--- a/src/tests/Makefile
+++ b/src/tests/Makefile
@@ -191,7 +191,7 @@ make_libs:
 
 clean:
 	rm -f $(ALL_TESTS) *.o *.gcno *.gcda *.gcov 
-	rm -rf dir.*.tdb dir.*.bdb
+	rm -rf dir.*.tdb dir.*.bdb dir.*.tdb.recover
 
 cleanall: clean
 	rm -f *.tdbrun
diff --git a/src/tests/test_txn_abort5a.c b/src/tests/test_txn_abort5a.c
new file mode 100644
index 0000000000..ab168f6887
--- /dev/null
+++ b/src/tests/test_txn_abort5a.c
@@ -0,0 +1,90 @@
+#include <stdio.h>
+#include <assert.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <sys/stat.h>
+#include <db.h>
+#include <arpa/inet.h>
+#include "test.h"
+
+
+void test_txn_abort(int n) {
+    if (verbose) printf("test_txn_abort:%d\n", n);
+
+    system("rm -rf " DIR);
+    mkdir(DIR, 0777);
+
+    int r;
+    DB_ENV *env;
+    r = db_env_create(&env, 0); assert(r == 0);
+    r = env->set_data_dir(env, DIR);
+    r = env->set_lg_dir(env, DIR);
+    r = env->open(env, 0, DB_INIT_MPOOL + DB_INIT_LOG + DB_INIT_LOCK + DB_INIT_TXN + DB_PRIVATE + DB_CREATE, 0777); 
+    if (r != 0) printf("%s:%d:%d:%s\n", __FILE__, __LINE__, r, db_strerror(r));
+    assert(r == 0);
+
+    DB_TXN *txn = 0;
+    r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
+
+    DB *db;
+    r = db_create(&db, env, 0); assert(r == 0);
+    r = db->open(db, txn, "test.db", 0, DB_BTREE, DB_CREATE, 0777); assert(r == 0);
+    r = txn->commit(txn, 0); assert(r == 0);
+
+    r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
+    int i;
+    for (i=0; i<n; i++) {
+        DBT key, val;
+	int i2=htonl(i*2);
+        r = db->put(db, txn, dbt_init(&key, &i2, sizeof i2), dbt_init(&val, &i, sizeof i), 0); 
+        if (r != 0) printf("%s:%d:%d:%s\n", __FILE__, __LINE__, r, db_strerror(r));
+        assert(r == 0);
+    }
+    r = txn->commit(txn, 0);
+    
+    r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
+    for (i=0; i<n; i++) {
+        DBT key;
+	int i2=htonl(i*2);
+        r = db->del(db, txn, dbt_init(&key, &i2, sizeof i2), 0);
+        if (r != 0) printf("%s:%d:%d:%s\n", __FILE__, __LINE__, r, db_strerror(r));
+        assert(r == 0);
+    }
+    r = txn->abort(txn); 
+    if (r != 0) printf("%s:%d:abort:%d\n", __FILE__, __LINE__, r);
+    assert(r == 0);
+    /* walk the db, even numbers should be there */
+    r = env->txn_begin(env, 0, &txn, 0); assert(r == 0);
+    DBC *cursor;
+    r = db->cursor(db, txn, &cursor, 0); assert(r == 0);
+    DBT key; memset(&key, 0, sizeof key);
+    DBT val; memset(&val, 0, sizeof val);
+    for (i=0; 1; i++) {
+	r = cursor->c_get(cursor, &key, &val, DB_NEXT);
+	if (r!=0) break;
+	printf("%d present\n", ntohl(*(int*)key.data));
+	assert(key.size==4);
+	assert(ntohl(*(int*)key.data)==2*i);
+    }
+    assert(i==n);
+    r = cursor->c_close(cursor); assert(r == 0);
+    r = txn->commit(txn, 0);
+
+    r = db->close(db, 0); assert(r == 0);
+    r = env->close(env, 0); assert(r == 0);
+}
+
+int main(int argc, char *argv[]) {
+    int i;
+    for (i = 1; i < argc; i++) {
+        char *arg = argv[i];
+        if (strcmp(arg, "-v") == 0 || strcmp(arg, "--verbose") == 0) {
+            verbose++;
+            continue;
+        }
+    }
+    for (i=1; i<100; i++) 
+        test_txn_abort(i);
+    return 0;
+}
-- 
2.30.9