Commit 88cbf6be authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Make rollback do the right thing in some cases fo internal nodes. Addresses #556.

git-svn-id: file:///svn/tokudb@2955 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9d22160c
......@@ -52,6 +52,7 @@ REGRESSION_TESTS = \
cachetable-test \
cachetable-test2 \
fifo-test \
fifo-test-exp \
test-brt-delete-both \
brt-test \
brt-test3 \
......@@ -168,7 +169,7 @@ brt.o: $(BRT_INTERNAL_H_INCLUDES) key.h log_header.h
fifo.o: fifo.h brttypes.h
memory.o: memory.h
primes.o: primes.h toku_assert.h
fifo-test: fifo.o memory.o toku_assert.o ybt.o
fifo-test-exp fifo-test: fifo.o memory.o toku_assert.o ybt.o
brt-serialize.o: $(BRT_INTERNAL_H_INCLUDES) key.h wbuf.h rbuf.h
brt-bigtest: memory.o ybt.o brt.o pma.o cachetable.o key.o fifo.o brt-serialize.o
brt-bigtest.o: brt.h ../include/db.h
......
......@@ -191,4 +191,6 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, DISKOFF diskoff, enum brt_cmd_typ
int toku_set_func_fsync (int (*fsync_function)(int));
#endif
This diff is collapsed.
......@@ -66,4 +66,7 @@ int toku_brt_get_fd(BRT, int *);
int toku_brt_height_of_root(BRT, int *height); // for an open brt, return the current height.
// Special hack for recovery
int toku_brt_nonleaf_expunge_xaction(BRT brt, DISKOFF diskoff, TXNID xid);
#endif
......@@ -27,6 +27,11 @@ typedef struct {
char *data;
} BYTESTRING;
typedef struct {
int len;
DISKOFF *array;
} DISKOFFARRAY;
/* Make the LSN be a struct instead of an integer so that we get better type checking. */
typedef struct __toku_lsn { u_int64_t lsn; } LSN;
#define ZERO_LSN ((LSN){0})
......
/* Test the expunge method. */
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include "fifo.h"
#include "memory.h"
int count;
int callback (bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen, int type, TXNID xid, void *v) {
TXNID which=(long)v;
assert(xid==which);
int actual_row = count;
assert(strlen(key)+1==keylen);
assert(strlen(data)+1==datalen);
//printf("count=%d which=%ld deleting %s %s\n", count, (long)which, (char*)key, (char*)data);
switch (which) {
case 23: break;
case 24: actual_row++; break;
case 26: actual_row+=3;
}
switch (actual_row) {
case 0: assert(strcmp(key, "hello")==0); assert(strcmp(data, "thera")==0); assert(xid==23); assert(type==0); break;
case 1: assert(strcmp(key, "hello")==0); assert(strcmp(data, "therb")==0); assert(xid==24); assert(type==0); break;
case 2: assert(strcmp(key, "hell1")==0); assert(strcmp(data, "therc")==0); assert(xid==24); assert(type==1); break;
case 3: assert(strcmp(key, "hell1")==0); assert(strcmp(data, "therd")==0); assert(xid==26); assert(type==1); break;
default: assert(0);
}
count++;
return 0;
}
void doit (int which) {
int r;
FIFO f;
r = toku_fifo_create(&f); assert(r==0);
r = toku_fifo_enq(f, "hello", 6, "thera", 6, 0, 23); assert(r==0);
r = toku_fifo_enq(f, "hello", 6, "therb", 6, 0, 24); assert(r==0);
r = toku_fifo_enq(f, "hell1", 6, "therc", 6, 1, 24); assert(r==0);
r = toku_fifo_enq(f, "hell1", 6, "therd", 6, 1, 26); assert(r==0);
int i=0;
FIFO_ITERATE(f, k, kl, d, dl, t, x,
({
assert(strlen(k)+1==kl);
assert(strlen(d)+1==dl);
switch(i) {
case 0: assert(strcmp(k, "hello")==0); assert(strcmp(d, "thera")==0); assert(x==23); assert(t==0); i++; break;
case 1: assert(strcmp(k, "hello")==0); assert(strcmp(d, "therb")==0); assert(x==24); assert(t==0); i++; break;
case 2: assert(strcmp(k, "hell1")==0); assert(strcmp(d, "therc")==0); assert(x==24); assert(t==1); i++; break;
case 3: assert(strcmp(k, "hell1")==0); assert(strcmp(d, "therd")==0); assert(x==26); assert(t==1); i++; break;
default: assert(0);
}
}));
count=0;
r = toku_fifo_expunge_xaction(f, which, callback, (void*)(long)which);
switch (which) {
case 23: assert(count==1); break;
case 24: assert(count==2); break;
case 26: assert(count==1); break;
}
toku_fifo_free(&f);
toku_malloc_cleanup();
}
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
doit(23);
doit(24);
doit(26);
doit(27);
return 0;
}
......@@ -140,7 +140,20 @@ void toku_fifo_iterate (FIFO fifo, void(*f)(bytevec key,ITEMLEN keylen,bytevec d
f(entry->key, entry->keylen, entry->key + entry->keylen, entry->vallen, entry->type, entry->xid, arg);
}
int toku_fifo_expunge_xaction(FIFO fifo, TXNID xid, int (*callback_on_delete)(bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen, int type, TXNID xid, void*), void*arg) {
struct fifo_entry **prev=&fifo->head;
struct fifo_entry *entry;
while ((entry=*prev)) {
if (entry->xid==xid) {
// Must remove it.
int r = callback_on_delete(entry->key, entry->keylen, entry->key+entry->keylen, entry->vallen, entry->type, entry->xid, arg);
fifo->n--;
*prev=entry->next;
toku_free_n(entry, fifo_entry_size(entry));
if (r!=0) return r;
} else {
prev = &entry->next;
}
}
return 0;
}
......@@ -44,4 +44,6 @@ void toku_fifo_iterate (FIFO, void(*f)(bytevec key,ITEMLEN keylen,bytevec data,I
} \
})
int toku_fifo_expunge_xaction(FIFO fifo, TXNID xid, int (*callback_on_delete)(bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen, int type, TXNID xid, void*), void*arg);
#endif
......@@ -112,6 +112,9 @@ static inline int toku_copy_BYTESTRING(BYTESTRING *target, BYTESTRING val) {
static inline void toku_free_BYTESTRING(BYTESTRING val) {
toku_free(val.data);
}
static inline void toku_free_DISKOFFARRAY(DISKOFFARRAY val) {
toku_free(val.array);
}
static inline int toku_copy_LOGGEDBRTHEADER(LOGGEDBRTHEADER *target, LOGGEDBRTHEADER val) {
*target = val;
......
......@@ -49,6 +49,10 @@ const struct logtype rollbacks[] = {
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
{"xactiontouchednonleaf", 'n', FA{{"FILENUM", "filenum", 0},
{"DISKOFFARRAY", "parents", 0},
{"DISKOFF", "diskoff", 0},
NULLFIELD}},
{0,0,FA{NULLFIELD}}
};
......
......@@ -62,3 +62,13 @@ int toku_rollback_deleteatleaf (FILENUM filenum, BYTESTRING key, BYTESTRING data
0); // Do the insertion unconditionally
return r;
}
int toku_rollback_xactiontouchednonleaf(FILENUM filenum, DISKOFFARRAY array __attribute__((__unused__)), DISKOFF diskoff, TOKUTXN txn) {
CACHEFILE cf;
BRT brt;
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt);
assert(r==0);
r = toku_brt_nonleaf_expunge_xaction(brt, diskoff, txn->txnid64);
assert(r==0);
return 0;
}
......@@ -77,7 +77,7 @@ void do_test_abort2 (void) {
insert(7, 1);
r=txn->abort(txn); CKERR(r);
// Don't do a query on "hello7", because that will force things out of the buffer.
// Don't do a lookup on "hello7", because that will force things out of the buffer.
r=env->txn_begin(env, 0, &txn, 0); assert(r==0);
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment