Commit e2c87687 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge #1311 (responsiveness during commit and abort) back to the main line.

{{{
svn merge -r 8703:8781 https://svn.tokutek.com/tokudb/toku/tokudb.1311
}}}
and delete the tokudb.1311 branch.

Addresses #1311.


git-svn-id: file:///svn/toku/tokudb@8782 c7de825b-a66e-492c-adef-691d508d4ae1
parent 1f794476
......@@ -383,7 +383,7 @@ int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbu
static void note_txn_closing (TOKUTXN txn);
static void cleanup_txn (TOKUTXN txn) {
void toku_logger_txn_close (TOKUTXN txn) {
memarena_close(&txn->rollentry_arena);
if (txn->rollentry_filename!=0) {
int r = close(txn->rollentry_fd);
......@@ -399,15 +399,15 @@ static void cleanup_txn (TOKUTXN txn) {
return;
}
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item) {
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, void (*yield)(void*), void*yieldv) {
int r=0;
rolltype_dispatch_assign(item, toku_commit_, r, txn);
rolltype_dispatch_assign(item, toku_commit_, r, txn, yield, yieldv);
return r;
}
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item) {
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, void (*yield)(void*), void*yieldv) {
int r=0;
rolltype_dispatch_assign(item, toku_rollback_, r, txn);
rolltype_dispatch_assign(item, toku_rollback_, r, txn, yield, yieldv);
if (r!=0) return r;
return 0;
}
......@@ -416,7 +416,8 @@ static int note_brt_used_in_parent_txn(OMTVALUE brtv, u_int32_t UU(index), void*
return toku_txn_note_brt(parentv, brtv);
}
int toku_logger_commit (TOKUTXN txn, int nosync) {
// Doesn't close the txn, just performs the commit operations.
int toku_logger_commit (TOKUTXN txn, int nosync, void(*yield)(void*yieldv), void*yieldv) {
// printf("%s:%d committing\n", __FILE__, __LINE__);
// panic handled in log_commit
int r = toku_log_commit(txn->logger, (LSN*)0, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks.
......@@ -429,7 +430,7 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
// we take ownership of it.
BYTESTRING fname = {len, toku_strdup_in_rollback(txn, txn->rollentry_filename)};
r = toku_logger_save_rollback_rollinclude(txn->parent, fname);
if (r!=0) { cleanup_txn(txn); return r; }
if (r!=0) return r;
r = close(txn->rollentry_fd);
if (r!=0) {
// We have to do the unlink ourselves, and then
......@@ -438,7 +439,6 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
unlink(txn->rollentry_filename);
toku_free(txn->rollentry_filename);
txn->rollentry_filename = 0;
cleanup_txn(txn);
return r;
}
// Stop the cleanup from closing and unlinking the file.
......@@ -478,20 +478,22 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
{
struct roll_entry *item;
//printf("%s:%d abort\n", __FILE__, __LINE__);
int count=0;
while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
r = toku_commit_rollback_item(txn, item);
if (r!=0) { cleanup_txn(txn); return r; }
r = toku_commit_rollback_item(txn, item, yield, yieldv);
if (r!=0) return r;
count++;
if (count%2 == 0) yield(yieldv);
}
}
// Read stuff out of the file and execute it.
if (txn->rollentry_filename) {
r = toku_commit_fileentries(txn->rollentry_fd, txn->rollentry_filesize, txn);
r = toku_commit_fileentries(txn->rollentry_fd, txn->rollentry_filesize, txn, yield, yieldv);
}
}
}
cleanup_txn(txn);
return r;
}
......@@ -834,7 +836,8 @@ toku_abort_logentry_commit (struct logtype_commit *le __attribute__((__unused__)
}
#endif
int toku_logger_abort(TOKUTXN txn) {
// Doesn't close the txn, just performs the abort operations.
int toku_logger_abort(TOKUTXN txn, void (*yield)(void*), void*yieldv) {
//printf("%s:%d aborting\n", __FILE__, __LINE__);
// Must undo everything. Must undo it all in reverse order.
// Build the reverse list
......@@ -845,19 +848,21 @@ int toku_logger_abort(TOKUTXN txn) {
}
{
struct roll_entry *item;
int count=0;
while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
int r = toku_abort_rollback_item(txn, item);
if (r!=0) { cleanup_txn(txn); return r; }
int r = toku_abort_rollback_item(txn, item, yield, yieldv);
if (r!=0) return r;
count++;
if (count%2 == 0) yield(yieldv);
}
}
list_remove(&txn->live_txns_link);
// Read stuff out of the file and roll it back.
if (txn->rollentry_filename) {
int r = toku_rollback_fileentries(txn->rollentry_fd, txn->rollentry_filesize, txn);
int r = toku_rollback_fileentries(txn->rollentry_fd, txn->rollentry_filesize, txn, yield, yieldv);
assert(r==0);
}
cleanup_txn(txn);
return 0;
}
......
......@@ -39,7 +39,14 @@ LSN toku_logger_last_lsn(TOKULOGGER);
int toku_logger_set_lg_max (TOKULOGGER logger, u_int32_t);
int toku_logger_get_lg_max (TOKULOGGER logger, u_int32_t *);
int toku_logger_commit (TOKUTXN txn, int no_sync);
// Doesn't close the txn, just performs the commit operations.
int toku_logger_commit (TOKUTXN txn, int no_sync, void(*yield)(void*yield_v), void*yield_v);
// Doesn't close the txn, just performs the abort operations.
int toku_logger_abort(TOKUTXN, void(*/*yield*/)(void*), void*/*yield_v*/);
// Closes a txn. Call after commiting or aborting.
void toku_logger_txn_close (TOKUTXN);
int toku_logger_txn_begin (TOKUTXN /*parent*/,TOKUTXN *, TOKULOGGER /*logger*/);
......@@ -155,7 +162,6 @@ static inline void toku_free_LOGGEDBRTHEADER(LOGGEDBRTHEADER val) {
int toku_recover_init(void);
void toku_recover_cleanup(void);
int toku_logger_abort(TOKUTXN);
// Find the txn that belongs to a txnid.
// Return nonzero if no such txn is live (either didn't exist ever, or it is committed or aborted.)
......@@ -168,10 +174,20 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags);
int toku_maybe_spill_rollbacks (TOKUTXN txn);
struct roll_entry;
int toku_rollback_fileentries (int fd, toku_off_t filesize, TOKUTXN txn);
int toku_commit_fileentries (int fd, toku_off_t filesize, TOKUTXN txn);
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item);
int toku_rollback_fileentries (int fd,
toku_off_t filesize,
TOKUTXN txn,
void (*yield)(void*yieldv),
void * yieldv);
int toku_commit_fileentries (int fd,
toku_off_t filesize,
TOKUTXN txn,
void (*yield)(void*yieldv),
void * yieldv);
// do the commit items. Call yield(yield_v) once in a while.
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, void(*yield)(void*yield_v), void*yield_v);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, void(*yield)(void*yield_v), void*yield_v);
int toku_txn_note_brt (TOKUTXN txn, BRT brt);
int toku_txn_note_close_brt (BRT brt);
......
......@@ -291,10 +291,10 @@ generate_log_struct (void) {
fprintf(hf, "};\n");
fprintf(hf, "int toku_rollback_%s (", lt->name);
DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name));
fprintf(hf, "TOKUTXN txn);\n");
fprintf(hf, "TOKUTXN txn, void(*yield)(void*yield_v), void*yield_v);\n");
fprintf(hf, "int toku_commit_%s (", lt->name);
DO_FIELDS(ft, lt, fprintf(hf, "%s %s,", ft->type, ft->name));
fprintf(hf, "TOKUTXN txn);\n");
fprintf(hf, "TOKUTXN txn, void(*yield)(void*yield_v), void*yield_v);\n");
});
fprintf(hf, "struct log_entry {\n");
fprintf(hf, " enum lt_cmd cmd;\n");
......
......@@ -11,15 +11,25 @@
#define TOKU_DO_COMMIT_CMD_DELETE 1
#define TOKU_DO_COMMIT_CMD_DELETE_BOTH 1
int toku_commit_fcreate (TXNID UU(xid),
typedef void (*YIELDF)(void*);
int
toku_commit_fcreate (TXNID UU(xid),
BYTESTRING UU(bs_fname),
TOKUTXN UU(txn)) {
TOKUTXN UU(txn),
YIELDF UU(yield),
void *UU(yield_v))
{
return 0;
}
int toku_rollback_fcreate (TXNID xid __attribute__((__unused__)),
int
toku_rollback_fcreate (TXNID UU(xid),
BYTESTRING bs_fname,
TOKUTXN txn __attribute__((__unused__))) {
TOKUTXN UU(txn),
YIELDF UU(yield),
void* UU(yield_v))
{
char *fname = fixup_fname(&bs_fname);
char *directory = txn->logger->directory;
int full_len=strlen(fname)+strlen(directory)+2;
......@@ -86,7 +96,7 @@ static int do_nothing_with_filenum(TOKUTXN txn, FILENUM filenum) {
}
int toku_commit_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key, TOKUTXN txn) {
int toku_commit_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key, TOKUTXN txn, YIELDF UU(yield), void *UU(yieldv)) {
#if TOKU_DO_COMMIT_CMD_INSERT
return do_insertion (BRT_COMMIT_ANY, xid, filenum, key, 0, txn);
#else
......@@ -95,7 +105,15 @@ int toku_commit_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key, TOKUTXN t
#endif
}
int toku_commit_cmdinsertboth (TXNID xid, FILENUM filenum, BYTESTRING key, BYTESTRING data, TOKUTXN txn) {
int
toku_commit_cmdinsertboth (TXNID xid,
FILENUM filenum,
BYTESTRING key,
BYTESTRING data,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
{
#if TOKU_DO_COMMIT_CMD_INSERT
return do_insertion (BRT_COMMIT_BOTH, xid, filenum, key, &data, txn);
#else
......@@ -104,15 +122,38 @@ int toku_commit_cmdinsertboth (TXNID xid, FILENUM filenum, BYTESTRING key, BYTES
#endif
}
int toku_rollback_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key, TOKUTXN txn) {
int
toku_rollback_cmdinsert (TXNID xid,
FILENUM filenum,
BYTESTRING key,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
{
return do_insertion (BRT_ABORT_ANY, xid, filenum, key, 0, txn);
}
int toku_rollback_cmdinsertboth (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
int
toku_rollback_cmdinsertboth (TXNID xid,
FILENUM filenum,
BYTESTRING key,
BYTESTRING data,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
{
return do_insertion (BRT_ABORT_BOTH, xid, filenum, key, &data, txn);
}
int toku_commit_cmddeleteboth (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
int
toku_commit_cmddeleteboth (TXNID xid,
FILENUM filenum,
BYTESTRING key,
BYTESTRING data,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
{
#if TOKU_DO_COMMIT_CMD_DELETE_BOTH
return do_insertion (BRT_COMMIT_BOTH, xid, filenum, key, &data, txn);
#else
......@@ -121,11 +162,26 @@ int toku_commit_cmddeleteboth (TXNID xid, FILENUM filenum, BYTESTRING key,BYTEST
#endif
}
int toku_rollback_cmddeleteboth (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
int
toku_rollback_cmddeleteboth (TXNID xid,
FILENUM filenum,
BYTESTRING key,
BYTESTRING data,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
{
return do_insertion (BRT_ABORT_BOTH, xid, filenum, key, &data, txn);
}
int toku_commit_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN txn) {
int
toku_commit_cmddelete (TXNID xid,
FILENUM filenum,
BYTESTRING key,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
{
#if TOKU_DO_COMMIT_CMD_DELETE
return do_insertion (BRT_COMMIT_ANY, xid, filenum, key, 0, txn);
#else
......@@ -134,21 +190,37 @@ int toku_commit_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN tx
#endif
}
int toku_rollback_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN txn) {
int
toku_rollback_cmddelete (TXNID xid,
FILENUM filenum,
BYTESTRING key,
TOKUTXN txn,
YIELDF UU(yield),
void * UU(yieldv))
{
return do_insertion (BRT_ABORT_ANY, xid, filenum, key, 0, txn);
}
int toku_commit_fileentries (int fd, toku_off_t filesize, TOKUTXN txn) {
int
toku_commit_fileentries (int fd,
toku_off_t filesize,
TOKUTXN txn,
YIELDF yield,
void * yieldv)
{
BREAD f = create_bread_from_fd_initialize_at(fd, filesize, 1<<20);
int r=0;
MEMARENA ma = memarena_create();
int count=0;
while (bread_has_more(f)) {
struct roll_entry *item;
r = toku_read_rollback_backwards(f, &item, ma);
if (r!=0) goto finish;
r = toku_commit_rollback_item(txn, item);
r = toku_commit_rollback_item(txn, item, yield, yieldv);
if (r!=0) goto finish;
memarena_clear(ma);
count++;
if (count%2==0) yield(yieldv);
}
finish:
{ int r2 = close_bread_without_closing_fd(f); assert(r2==0); }
......@@ -156,18 +228,27 @@ int toku_commit_fileentries (int fd, toku_off_t filesize, TOKUTXN txn) {
return r;
}
int toku_rollback_fileentries (int fd, toku_off_t filesize, TOKUTXN txn) {
int
toku_rollback_fileentries (int fd,
toku_off_t filesize,
TOKUTXN txn,
YIELDF yield,
void * yieldv)
{
BREAD f = create_bread_from_fd_initialize_at(fd, filesize, 1<<20);
assert(f);
int r=0;
MEMARENA ma = memarena_create();
int count=0;
while (bread_has_more(f)) {
struct roll_entry *item;
r = toku_read_rollback_backwards(f, &item, ma);
if (r!=0) goto finish;
r = toku_abort_rollback_item(txn, item);
r = toku_abort_rollback_item(txn, item, yield, yieldv);
if (r!=0) goto finish;
memarena_clear(ma);
count++;
if (count%2==0) yield(yieldv);
}
finish:
{ int r2 = close_bread_without_closing_fd(f); assert(r2==0); }
......@@ -175,7 +256,11 @@ int toku_rollback_fileentries (int fd, toku_off_t filesize, TOKUTXN txn) {
return r;
}
int toku_commit_rollinclude (BYTESTRING bs,TOKUTXN txn) {
int
toku_commit_rollinclude (BYTESTRING bs,
TOKUTXN txn,
YIELDF yield,
void * yieldv) {
int r;
char *fname = fixup_fname(&bs);
int fd = open(fname, O_RDONLY+O_BINARY);
......@@ -184,7 +269,7 @@ int toku_commit_rollinclude (BYTESTRING bs,TOKUTXN txn) {
int64_t fsize = 0;
r = toku_os_get_file_size(fd, &fsize);
assert(r==0);
r = toku_commit_fileentries(fd, fsize, txn);
r = toku_commit_fileentries(fd, fsize, txn, yield, yieldv);
assert(r==0);
r = close(fd);
assert(r==0);
......@@ -193,7 +278,12 @@ int toku_commit_rollinclude (BYTESTRING bs,TOKUTXN txn) {
return 0;
}
int toku_rollback_rollinclude (BYTESTRING bs,TOKUTXN txn) {
int
toku_rollback_rollinclude (BYTESTRING bs,
TOKUTXN txn,
YIELDF yield,
void * yieldv)
{
int r;
char *fname = fixup_fname(&bs);
int fd = open(fname, O_RDONLY+O_BINARY);
......@@ -201,7 +291,7 @@ int toku_rollback_rollinclude (BYTESTRING bs,TOKUTXN txn) {
int64_t fsize = 0;
r = toku_os_get_file_size(fd, &fsize);
assert(r==0);
r = toku_rollback_fileentries(fd, fsize, txn);
r = toku_rollback_fileentries(fd, fsize, txn, yield, yieldv);
assert(r==0);
r = close(fd);
assert(r==0);
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#define DONT_DEPRECATE_MALLOC
#include "test.h"
/* Test to see if setting malloc works, and if dlmalloc works. */
#include <memory.h>
......
......@@ -994,6 +994,13 @@ static int toku_txn_release_locks(DB_TXN* txn) {
return r;
}
// Yield the lock so someone else can work, and then reacquire the lock.
// Useful while processing commit or rollback logs, to allow others to access the system.
static void ydb_yield (void *UU(v)) {
toku_ydb_unlock();
toku_ydb_lock();
}
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
if (!txn) return EINVAL;
HANDLE_PANICKED_ENV(txn->mgrp);
......@@ -1020,15 +1027,22 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
int nosync = (flags & DB_TXN_NOSYNC)!=0 || (txn->i->flags&DB_TXN_NOSYNC);
flags &= ~DB_TXN_NOSYNC;
int r2 = toku_txn_release_locks(txn);
int r;
if (r_child_first || flags!=0)
r = toku_logger_abort(txn->i->tokutxn); // frees the tokutxn
// frees the tokutxn
// Calls ydb_yield(NULL) occasionally
r = toku_logger_abort(txn->i->tokutxn, ydb_yield, NULL);
else
r = toku_logger_commit(txn->i->tokutxn, nosync); // frees the tokutxn
// frees the tokutxn
// Calls ydb_yield(NULL) occasionally
r = toku_logger_commit(txn->i->tokutxn, nosync, ydb_yield, NULL);
// Close the logger after releasing the locks
int r2 = toku_txn_release_locks(txn);
toku_logger_txn_close(txn->i->tokutxn);
// the toxutxn is freed, and we must free the rest. */
// The txn is no good after the commit even if the commit fails.
// The txn is no good after the commit even if the commit fails, so free it up.
if (txn->i)
toku_free(txn->i);
toku_free(txn);
......@@ -1064,8 +1078,9 @@ static int toku_txn_abort(DB_TXN * txn) {
txn->i->prev->i->next = txn->i->next;
}
}
int r = toku_logger_abort(txn->i->tokutxn, ydb_yield, NULL);
int r2 = toku_txn_release_locks(txn);
int r = toku_logger_abort(txn->i->tokutxn);
toku_logger_txn_close(txn->i->tokutxn);
toku_free(txn->i);
toku_free(txn);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment