Commit 5a0df7b3 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Rollback is working a little better. Addresses #27.

git-svn-id: file:///svn/tokudb@2294 c7de825b-a66e-492c-adef-691d508d4ae1
parent 08f09275
...@@ -1376,7 +1376,8 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char ...@@ -1376,7 +1376,8 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
t->database_name=0; t->database_name=0;
goto died0a; goto died0a;
} }
toku_logger_log_fcreate(txn, fname_in_env, 0777); r = toku_logger_log_fcreate(txn, fname_in_env, 0777);
if (r!=0) goto died0a;
} }
r=toku_cachetable_openfd(&t->cf, cachetable, fd); r=toku_cachetable_openfd(&t->cf, cachetable, fd);
toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf)); toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf));
......
...@@ -69,7 +69,7 @@ static inline void list_move(struct list *newhead, struct list *oldhead) { ...@@ -69,7 +69,7 @@ static inline void list_move(struct list *newhead, struct list *oldhead) {
list_init(oldhead); list_init(oldhead);
} }
// Note: Need an extra level of parens in these macros so that // Note: Need the extra level of parens in these macros so that
// list_struct(h, foo, b)->zot // list_struct(h, foo, b)->zot
// will work right. Otherwise the type cast will try to include ->zot, and it will be all messed up. // will work right. Otherwise the type cast will try to include ->zot, and it will be all messed up.
#if defined(__GNUC__) && __GNUC__ >= 4 #if defined(__GNUC__) && __GNUC__ >= 4
......
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
#include "log.h" #include "log.h"
#include "toku_assert.h" #include "toku_assert.h"
#include "list.h"
#include "yerror.h" #include "yerror.h"
#include <stdio.h> #include <stdio.h>
#include <sys/types.h> #include <sys/types.h>
...@@ -20,6 +21,7 @@ struct tokulogger { ...@@ -20,6 +21,7 @@ struct tokulogger {
char buf[LOGGER_BUF_SIZE]; char buf[LOGGER_BUF_SIZE];
int n_in_buf; int n_in_buf;
CACHETABLE ct; CACHETABLE ct;
struct list live_txns; // just a linked list. Should be a hashtable.
}; };
int toku_logger_find_next_unused_log_file(const char *directory, long long *result); int toku_logger_find_next_unused_log_file(const char *directory, long long *result);
...@@ -39,11 +41,13 @@ enum lt_command { ...@@ -39,11 +41,13 @@ enum lt_command {
}; };
struct tokutxn { struct tokutxn {
enum typ_tag tag;
u_int64_t txnid64; u_int64_t txnid64;
TOKULOGGER logger; TOKULOGGER logger;
TOKUTXN parent; TOKUTXN parent;
LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */ LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */
struct log_entry *oldest_logentry,*newest_logentry; struct log_entry *oldest_logentry,*newest_logentry; /* Only logentries with rollbacks are here. There is a list going from newest to oldest. */
struct list live_txns_link;
}; };
int toku_logger_finish (TOKULOGGER logger, struct wbuf *wbuf); int toku_logger_finish (TOKULOGGER logger, struct wbuf *wbuf);
...@@ -80,3 +84,4 @@ static inline int toku_logsizeof_LOGGEDBRTHEADER (LOGGEDBRTHEADER bs) { ...@@ -80,3 +84,4 @@ static inline int toku_logsizeof_LOGGEDBRTHEADER (LOGGEDBRTHEADER bs) {
static inline int toku_logsizeof_INTPAIRARRAY (INTPAIRARRAY pa) { static inline int toku_logsizeof_INTPAIRARRAY (INTPAIRARRAY pa) {
return 4+(4+4)*pa.size; return 4+(4+4)*pa.size;
} }
...@@ -69,6 +69,7 @@ int toku_logger_create (TOKULOGGER *resultp) { ...@@ -69,6 +69,7 @@ int toku_logger_create (TOKULOGGER *resultp) {
if (result==0) return errno; if (result==0) return errno;
result->is_open=0; result->is_open=0;
result->is_panicked=0; result->is_panicked=0;
list_init(&result->live_txns);
*resultp=result; *resultp=result;
return 0; return 0;
} }
...@@ -223,18 +224,25 @@ int toku_logger_commit (TOKUTXN txn, int nosync) { ...@@ -223,18 +224,25 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
} }
goto free_and_return; goto free_and_return;
} else { } else {
if (txn->parent->oldest_logentry) txn->parent->newest_logentry->next = txn->oldest_logentry; // Append the list to the front.
else txn->parent->oldest_logentry = txn->oldest_logentry; if (txn->oldest_logentry) {
if (txn->newest_logentry) txn->parent->newest_logentry = txn->newest_logentry; // There are some entries, so link them in.
txn->oldest_logentry->prev = txn->parent->newest_logentry;
txn->parent->newest_logentry = txn->newest_logentry;
}
if (txn->parent->oldest_logentry==0) {
txn->parent->oldest_logentry = txn->oldest_logentry;
}
txn->newest_logentry = txn->oldest_logentry = 0; txn->newest_logentry = txn->oldest_logentry = 0;
} }
free_and_return: /*nothing*/; free_and_return: /*nothing*/;
struct log_entry *item; struct log_entry *item;
while ((item=txn->oldest_logentry)) { while ((item=txn->newest_logentry)) {
txn->oldest_logentry = item->next; txn->newest_logentry = item->prev;
logtype_dispatch(item, toku_free_logtype_); logtype_dispatch(item, toku_free_logtype_);
toku_free(item); toku_free(item);
} }
list_remove(&txn->live_txns_link);
toku_free(txn); toku_free(txn);
return r; return r;
} }
...@@ -261,6 +269,7 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid ...@@ -261,6 +269,7 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid
result->logger = logger; result->logger = logger;
result->parent = parent_tokutxn; result->parent = parent_tokutxn;
result->oldest_logentry = result->newest_logentry = 0; result->oldest_logentry = result->newest_logentry = 0;
list_push(&logger->live_txns, &result->live_txns_link);
*tokutxn = result; *tokutxn = result;
return 0; return 0;
} }
...@@ -268,10 +277,11 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid ...@@ -268,10 +277,11 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid
int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) { int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) {
if (txn==0) return 0; if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL; if (txn->logger->is_panicked) return EINVAL;
BYTESTRING bs; BYTESTRING bs = { .len=strlen(fname), .data = strdup(fname) };
bs.len = strlen(fname); int r = toku_log_fcreate (txn->logger, toku_txn_get_txnid(txn), bs, mode);
bs.data = (char*)fname; if (r!=0) return r;
return toku_log_fcreate (txn->logger, toku_txn_get_txnid(txn), bs, mode); r = toku_logger_save_rollback_fcreate(txn, toku_txn_get_txnid(txn), bs, mode);
return r;
} }
/* fopen isn't really an action. It's just for bookkeeping. We need to know the filename that goes with a filenum. */ /* fopen isn't really an action. It's just for bookkeeping. We need to know the filename that goes with a filenum. */
...@@ -598,23 +608,30 @@ int toku_abort_logentry_commit (struct logtype_commit *le __attribute__((__unuse ...@@ -598,23 +608,30 @@ int toku_abort_logentry_commit (struct logtype_commit *le __attribute__((__unuse
int toku_logger_abort(TOKUTXN txn) { int toku_logger_abort(TOKUTXN txn) {
// Must undo everything. Must undo it all in reverse order. // Must undo everything. Must undo it all in reverse order.
// Build the reverse list // Build the reverse list
struct log_entry *prev=0;
struct log_entry *item=txn->oldest_logentry; struct log_entry *item=txn->oldest_logentry;
while (item) { while ((item=txn->newest_logentry)) {
item->tmp=prev; txn->newest_logentry = item->prev;
prev=item;
item=item->next;
}
for (item=txn->newest_logentry; item; item=item->tmp) {
int r; int r;
logtype_dispatch_assign(item, toku_rollback_, r, txn); logtype_dispatch_assign_rollback(item, toku_rollback_, r, txn);
if (r!=0) return r; if (r!=0) return r;
}
while ((item=txn->newest_logentry)) {
txn->newest_logentry=item->tmp;
logtype_dispatch(item, toku_free_logtype_); logtype_dispatch(item, toku_free_logtype_);
toku_free(item); toku_free(item);
} }
toku_free(txn); toku_free(txn);
return 0; return 0;
} }
int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) {
if (logger==0) return -1;
struct list *h = list_head(&logger->live_txns);
while (h) {
TOKUTXN txn = list_struct(h, struct tokutxn, live_txns_link);
assert(txn->tag==TYP_TOKUTXN);
if (txn->txnid64==txnid) {
*result = txn;
return 0;
}
h=list_tail(h);
}
return -1;
}
...@@ -135,4 +135,9 @@ int toku_recover_init(void); ...@@ -135,4 +135,9 @@ int toku_recover_init(void);
void toku_recover_cleanup(void); void toku_recover_cleanup(void);
int toku_logger_abort(TOKUTXN); int toku_logger_abort(TOKUTXN);
// Find the txn that belongs to a txnid.
// Return nonzero if no such txn is live (either didn't exist ever, or it is committed or aborted.)
// Return 0 if there is a live txn with that txnid.
int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result);
#endif #endif
This diff is collapsed.
...@@ -218,6 +218,7 @@ void toku_memory_check_all_free (void) { ...@@ -218,6 +218,7 @@ void toku_memory_check_all_free (void) {
printf("n_items_malloced=%lld\n", n_items_malloced); printf("n_items_malloced=%lld\n", n_items_malloced);
if (toku_memory_check) if (toku_memory_check)
printf(" one item is %p size=%ld\n", items[0], sizes[0]); printf(" one item is %p size=%ld\n", items[0], sizes[0]);
exit(1);
} }
assert(n_items_malloced==0); assert(n_items_malloced==0);
} }
......
...@@ -4,7 +4,6 @@ ...@@ -4,7 +4,6 @@
#include "brt-internal.h" #include "brt-internal.h"
#include "key.h" #include "key.h"
#include "kv-pair.h" #include "kv-pair.h"
#include "list.h"
#include "pma-internal.h" #include "pma-internal.h"
#include "test.h" #include "test.h"
#include "toku_assert.h" #include "toku_assert.h"
......
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
/* Only needed for testing. */ /* Only needed for testing. */
#include <string.h> #include <string.h>
#include <inttypes.h> #include <inttypes.h>
#include "list.h"
#include "kv-pair.h" #include "kv-pair.h"
#include "pma-internal.h" #include "pma-internal.h"
#include "log.h" #include "log.h"
...@@ -716,14 +715,23 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKULOGGER logger, TXNID xid, FILE ...@@ -716,14 +715,23 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKULOGGER logger, TXNID xid, FILE
assert(pma->pairs[idx]); assert(pma->pairs[idx]);
pma->n_pairs_present++; pma->n_pairs_present++;
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size); *fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
{
const struct kv_pair *pair = pma->pairs[idx]; const struct kv_pair *pair = pma->pairs[idx];
const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) }; const BYTESTRING key = { pair->keylen, toku_memdup(kv_pair_key_const(pair), pair->keylen) };
const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) }; const BYTESTRING data = { pair->vallen, toku_memdup(kv_pair_val_const(pair), pair->vallen) };
int r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data); TOKUTXN txn;
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger); int r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data);
return r; if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
} if (r!=0) goto freeit;
if (0!=toku_txnid2txn(logger, xid, &txn)) goto freeit;
/* if no txn then, the txn is completed, so we don't bother with rollback.
* In particular, if the txn committed, we don't rollback.
* If the txn aborted, then we already inserted a delete command when we rolled it back.
*/
r = toku_logger_save_rollback_insertinleaf(txn, xid, pma->filenum, diskoff, idx, key, data);
if (0) { freeit: toku_free(key.data); toku_free(data.data); }
return r;
} }
static int pma_delete_dup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) { static int pma_delete_dup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) {
...@@ -888,10 +896,19 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v, ...@@ -888,10 +896,19 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
logit_and_update_fingerprint: logit_and_update_fingerprint:
{ {
const struct kv_pair *pair = pma->pairs[idx]; const struct kv_pair *pair = pma->pairs[idx];
const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) }; const BYTESTRING key = { pair->keylen, toku_memdup(kv_pair_key_const(pair), pair->keylen) };
const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) }; const BYTESTRING data = { pair->vallen, toku_memdup(kv_pair_val_const(pair), pair->vallen) };
r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data); r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger); if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
if (r!=0) goto freeit;
TOKUTXN txn;
if (0!=toku_txnid2txn(logger, xid, &txn)) goto freeit;
/* the txn is completed, so we don't bother with rollback.
* In particular, if the txn committed, we don't rollback.
* If the txn aborted, then we already inserted a delete command when we rolled it back.
*/
r = toku_logger_save_rollback_insertinleaf(txn, xid, pma->filenum, diskoff, idx, key, data);
if (0) { freeit: toku_free(key.data); toku_free(data.data); }
} }
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size); *fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
return r; return r;
......
...@@ -111,8 +111,11 @@ void toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid),BYTESTRING fname,u_int32 ...@@ -111,8 +111,11 @@ void toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid),BYTESTRING fname,u_int32
toku_free_BYTESTRING(fname); toku_free_BYTESTRING(fname);
} }
int toku_rollback_fcreate (struct logtype_fcreate *le, TOKUTXN txn __attribute__((__unused__))) { int toku_rollback_fcreate (TXNID txnid __attribute__((__unused__)),
char *fname = fixup_fname(&le->fname); BYTESTRING bs_fname,
u_int32_t mode __attribute__((__unused__)),
TOKUTXN txn __attribute__((__unused__))) {
char *fname = fixup_fname(&bs_fname);
char *directory = txn->logger->directory; char *directory = txn->logger->directory;
int full_len=strlen(fname)+strlen(directory)+2; int full_len=strlen(fname)+strlen(directory)+2;
char full_fname[full_len]; char full_fname[full_len];
...@@ -120,7 +123,7 @@ int toku_rollback_fcreate (struct logtype_fcreate *le, TOKUTXN txn __attribute__ ...@@ -120,7 +123,7 @@ int toku_rollback_fcreate (struct logtype_fcreate *le, TOKUTXN txn __attribute__
assert(l<=full_len); assert(l<=full_len);
int r = unlink(full_fname); int r = unlink(full_fname);
assert(r==0); assert(r==0);
toku_free(fname); free(fname);
return 0; return 0;
} }
...@@ -395,22 +398,23 @@ void toku_recover_insertinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKO ...@@ -395,22 +398,23 @@ void toku_recover_insertinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKO
toku_free_BYTESTRING(databs); toku_free_BYTESTRING(databs);
} }
int toku_rollback_insertinleaf (struct logtype_insertinleaf *c, TOKUTXN txn) { int toku_rollback_insertinleaf (TXNID txnid __attribute__((__unused__)), FILENUM filenum, DISKOFF diskoff, u_int32_t pmaidx, BYTESTRING key, BYTESTRING data, TOKUTXN txn) {
CACHEFILE cf; CACHEFILE cf;
BRT brt; BRT brt;
void *node_v; void *node_v;
int r = toku_cachefile_of_filenum(txn->logger->ct, c->filenum, &cf, &brt); printf("Rollback insertinleaf\n");
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt);
assert(r==0); assert(r==0);
r = toku_cachetable_get_and_pin(cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt); r = toku_cachetable_get_and_pin(cf, diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r; if (r!=0) return r;
BRTNODE node = node_v; BRTNODE node = node_v;
r = toku_pma_clear_at_index(node->u.l.buffer, c->pmaidx); r = toku_pma_clear_at_index(node->u.l.buffer, pmaidx);
if (r!=0) return r; if (r!=0) return r;
node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len); node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_kvpair(key.data, key.len, data.data, data.len);
node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + c->key.len + c->data.len; node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + key.len + data.len;
VERIFY_COUNTS(node); VERIFY_COUNTS(node);
node->log_lsn = c->lsn; //node->log_lsn = c->lsn;
r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node)); r = toku_cachetable_unpin(cf, diskoff, 1, toku_serialize_brtnode_size(node));
return r; return r;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment