Commit a1dfeebb authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Rollback is working a little better. Addresses #27.

git-svn-id: file:///svn/tokudb@2294 c7de825b-a66e-492c-adef-691d508d4ae1
parent 2f26a8b6
......@@ -1376,7 +1376,8 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
t->database_name=0;
goto died0a;
}
toku_logger_log_fcreate(txn, fname_in_env, 0777);
r = toku_logger_log_fcreate(txn, fname_in_env, 0777);
if (r!=0) goto died0a;
}
r=toku_cachetable_openfd(&t->cf, cachetable, fd);
toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf));
......
......@@ -69,7 +69,7 @@ static inline void list_move(struct list *newhead, struct list *oldhead) {
list_init(oldhead);
}
// Note: Need an extra level of parens in these macros so that
// Note: Need the extra level of parens in these macros so that
// list_struct(h, foo, b)->zot
// will work right. Otherwise the type cast will try to include ->zot, and it will be all messed up.
#if defined(__GNUC__) && __GNUC__ >= 4
......
......@@ -2,6 +2,7 @@
#include "log.h"
#include "toku_assert.h"
#include "list.h"
#include "yerror.h"
#include <stdio.h>
#include <sys/types.h>
......@@ -20,6 +21,7 @@ struct tokulogger {
char buf[LOGGER_BUF_SIZE];
int n_in_buf;
CACHETABLE ct;
struct list live_txns; // just a linked list. Should be a hashtable.
};
int toku_logger_find_next_unused_log_file(const char *directory, long long *result);
......@@ -39,11 +41,13 @@ enum lt_command {
};
struct tokutxn {
enum typ_tag tag;
u_int64_t txnid64;
TOKULOGGER logger;
TOKUTXN parent;
LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */
struct log_entry *oldest_logentry,*newest_logentry;
struct log_entry *oldest_logentry,*newest_logentry; /* Only logentries with rollbacks are here. There is a list going from newest to oldest. */
struct list live_txns_link;
};
int toku_logger_finish (TOKULOGGER logger, struct wbuf *wbuf);
......@@ -80,3 +84,4 @@ static inline int toku_logsizeof_LOGGEDBRTHEADER (LOGGEDBRTHEADER bs) {
static inline int toku_logsizeof_INTPAIRARRAY (INTPAIRARRAY pa) {
return 4+(4+4)*pa.size;
}
......@@ -69,6 +69,7 @@ int toku_logger_create (TOKULOGGER *resultp) {
if (result==0) return errno;
result->is_open=0;
result->is_panicked=0;
list_init(&result->live_txns);
*resultp=result;
return 0;
}
......@@ -223,18 +224,25 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
}
goto free_and_return;
} else {
if (txn->parent->oldest_logentry) txn->parent->newest_logentry->next = txn->oldest_logentry;
else txn->parent->oldest_logentry = txn->oldest_logentry;
if (txn->newest_logentry) txn->parent->newest_logentry = txn->newest_logentry;
// Append the list to the front.
if (txn->oldest_logentry) {
// There are some entries, so link them in.
txn->oldest_logentry->prev = txn->parent->newest_logentry;
txn->parent->newest_logentry = txn->newest_logentry;
}
if (txn->parent->oldest_logentry==0) {
txn->parent->oldest_logentry = txn->oldest_logentry;
}
txn->newest_logentry = txn->oldest_logentry = 0;
}
free_and_return: /*nothing*/;
struct log_entry *item;
while ((item=txn->oldest_logentry)) {
txn->oldest_logentry = item->next;
while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
logtype_dispatch(item, toku_free_logtype_);
toku_free(item);
}
list_remove(&txn->live_txns_link);
toku_free(txn);
return r;
}
......@@ -261,6 +269,7 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid
result->logger = logger;
result->parent = parent_tokutxn;
result->oldest_logentry = result->newest_logentry = 0;
list_push(&logger->live_txns, &result->live_txns_link);
*tokutxn = result;
return 0;
}
......@@ -268,10 +277,11 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid
int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
BYTESTRING bs;
bs.len = strlen(fname);
bs.data = (char*)fname;
return toku_log_fcreate (txn->logger, toku_txn_get_txnid(txn), bs, mode);
BYTESTRING bs = { .len=strlen(fname), .data = strdup(fname) };
int r = toku_log_fcreate (txn->logger, toku_txn_get_txnid(txn), bs, mode);
if (r!=0) return r;
r = toku_logger_save_rollback_fcreate(txn, toku_txn_get_txnid(txn), bs, mode);
return r;
}
/* fopen isn't really an action. It's just for bookkeeping. We need to know the filename that goes with a filenum. */
......@@ -598,23 +608,30 @@ int toku_abort_logentry_commit (struct logtype_commit *le __attribute__((__unuse
int toku_logger_abort(TOKUTXN txn) {
// Must undo everything. Must undo it all in reverse order.
// Build the reverse list
struct log_entry *prev=0;
struct log_entry *item=txn->oldest_logentry;
while (item) {
item->tmp=prev;
prev=item;
item=item->next;
}
for (item=txn->newest_logentry; item; item=item->tmp) {
while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
int r;
logtype_dispatch_assign(item, toku_rollback_, r, txn);
logtype_dispatch_assign_rollback(item, toku_rollback_, r, txn);
if (r!=0) return r;
}
while ((item=txn->newest_logentry)) {
txn->newest_logentry=item->tmp;
logtype_dispatch(item, toku_free_logtype_);
toku_free(item);
}
toku_free(txn);
return 0;
}
int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) {
if (logger==0) return -1;
struct list *h = list_head(&logger->live_txns);
while (h) {
TOKUTXN txn = list_struct(h, struct tokutxn, live_txns_link);
assert(txn->tag==TYP_TOKUTXN);
if (txn->txnid64==txnid) {
*result = txn;
return 0;
}
h=list_tail(h);
}
return -1;
}
......@@ -135,4 +135,9 @@ int toku_recover_init(void);
void toku_recover_cleanup(void);
int toku_logger_abort(TOKUTXN);
// Find the txn that belongs to a txnid.
// Return nonzero if no such txn is live (either didn't exist ever, or it is committed or aborted.)
// Return 0 if there is a live txn with that txnid.
int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result);
#endif
This diff is collapsed.
......@@ -218,6 +218,7 @@ void toku_memory_check_all_free (void) {
printf("n_items_malloced=%lld\n", n_items_malloced);
if (toku_memory_check)
printf(" one item is %p size=%ld\n", items[0], sizes[0]);
exit(1);
}
assert(n_items_malloced==0);
}
......
......@@ -4,7 +4,6 @@
#include "brt-internal.h"
#include "key.h"
#include "kv-pair.h"
#include "list.h"
#include "pma-internal.h"
#include "test.h"
#include "toku_assert.h"
......
......@@ -15,7 +15,6 @@
/* Only needed for testing. */
#include <string.h>
#include <inttypes.h>
#include "list.h"
#include "kv-pair.h"
#include "pma-internal.h"
#include "log.h"
......@@ -716,14 +715,23 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKULOGGER logger, TXNID xid, FILE
assert(pma->pairs[idx]);
pma->n_pairs_present++;
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
{
const struct kv_pair *pair = pma->pairs[idx];
const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) };
const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) };
int r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
return r;
}
const struct kv_pair *pair = pma->pairs[idx];
const BYTESTRING key = { pair->keylen, toku_memdup(kv_pair_key_const(pair), pair->keylen) };
const BYTESTRING data = { pair->vallen, toku_memdup(kv_pair_val_const(pair), pair->vallen) };
TOKUTXN txn;
int r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
if (r!=0) goto freeit;
if (0!=toku_txnid2txn(logger, xid, &txn)) goto freeit;
/* if no txn then, the txn is completed, so we don't bother with rollback.
* In particular, if the txn committed, we don't rollback.
* If the txn aborted, then we already inserted a delete command when we rolled it back.
*/
r = toku_logger_save_rollback_insertinleaf(txn, xid, pma->filenum, diskoff, idx, key, data);
if (0) { freeit: toku_free(key.data); toku_free(data.data); }
return r;
}
static int pma_delete_dup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) {
......@@ -888,10 +896,19 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
logit_and_update_fingerprint:
{
const struct kv_pair *pair = pma->pairs[idx];
const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) };
const BYTESTRING data = { pair->vallen, (char*)kv_pair_val_const(pair) };
const BYTESTRING key = { pair->keylen, toku_memdup(kv_pair_key_const(pair), pair->keylen) };
const BYTESTRING data = { pair->vallen, toku_memdup(kv_pair_val_const(pair), pair->vallen) };
r = toku_log_insertinleaf (logger, xid, pma->filenum, diskoff, idx, key, data);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
if (r!=0) goto freeit;
TOKUTXN txn;
if (0!=toku_txnid2txn(logger, xid, &txn)) goto freeit;
/* the txn is completed, so we don't bother with rollback.
* In particular, if the txn committed, we don't rollback.
* If the txn aborted, then we already inserted a delete command when we rolled it back.
*/
r = toku_logger_save_rollback_insertinleaf(txn, xid, pma->filenum, diskoff, idx, key, data);
if (0) { freeit: toku_free(key.data); toku_free(data.data); }
}
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
return r;
......
......@@ -111,8 +111,11 @@ void toku_recover_fcreate (LSN UU(lsn), TXNID UU(txnid),BYTESTRING fname,u_int32
toku_free_BYTESTRING(fname);
}
int toku_rollback_fcreate (struct logtype_fcreate *le, TOKUTXN txn __attribute__((__unused__))) {
char *fname = fixup_fname(&le->fname);
int toku_rollback_fcreate (TXNID txnid __attribute__((__unused__)),
BYTESTRING bs_fname,
u_int32_t mode __attribute__((__unused__)),
TOKUTXN txn __attribute__((__unused__))) {
char *fname = fixup_fname(&bs_fname);
char *directory = txn->logger->directory;
int full_len=strlen(fname)+strlen(directory)+2;
char full_fname[full_len];
......@@ -120,7 +123,7 @@ int toku_rollback_fcreate (struct logtype_fcreate *le, TOKUTXN txn __attribute__
assert(l<=full_len);
int r = unlink(full_fname);
assert(r==0);
toku_free(fname);
free(fname);
return 0;
}
......@@ -395,22 +398,23 @@ void toku_recover_insertinleaf (LSN lsn, TXNID UU(txnid), FILENUM filenum, DISKO
toku_free_BYTESTRING(databs);
}
int toku_rollback_insertinleaf (struct logtype_insertinleaf *c, TOKUTXN txn) {
int toku_rollback_insertinleaf (TXNID txnid __attribute__((__unused__)), FILENUM filenum, DISKOFF diskoff, u_int32_t pmaidx, BYTESTRING key, BYTESTRING data, TOKUTXN txn) {
CACHEFILE cf;
BRT brt;
void *node_v;
int r = toku_cachefile_of_filenum(txn->logger->ct, c->filenum, &cf, &brt);
printf("Rollback insertinleaf\n");
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt);
assert(r==0);
r = toku_cachetable_get_and_pin(cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
r = toku_cachetable_get_and_pin(cf, diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
BRTNODE node = node_v;
r = toku_pma_clear_at_index(node->u.l.buffer, c->pmaidx);
r = toku_pma_clear_at_index(node->u.l.buffer, pmaidx);
if (r!=0) return r;
node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len);
node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + c->key.len + c->data.len;
node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_kvpair(key.data, key.len, data.data, data.len);
node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + key.len + data.len;
VERIFY_COUNTS(node);
node->log_lsn = c->lsn;
r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
//node->log_lsn = c->lsn;
r = toku_cachetable_unpin(cf, diskoff, 1, toku_serialize_brtnode_size(node));
return r;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment