Commit ca51b430 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

[t:5092] fix #5092:

 - make the bool in logformat an enum: "doesn't care about logging begin",
   "should log begin if it hasn't been logged yet", and "should assert
   that the begin was already logged"
 - make logging client code not try to log commits, prepares, or aborts
   for read-only txns
 - added test, fixed another test


git-svn-id: file:///svn/toku/tokudb@44651 c7de825b-a66e-492c-adef-691d508d4ae1
parent 006f1156
......@@ -35,11 +35,17 @@ typedef struct field {
#define NULLFIELD {0,0,0}
#define FA (F[])
enum log_begin_action {
IGNORE_LOG_BEGIN,
SHOULD_LOG_BEGIN,
ASSERT_BEGIN_WAS_LOGGED
};
struct logtype {
char *name;
unsigned int command_and_flags;
struct field *fields;
bool needs_to_maybe_log_begin_txn;
enum log_begin_action log_begin_action;
};
// In the fields, don't mention the command, the LSN, the CRC or the trailing LEN.
......@@ -94,18 +100,18 @@ const struct logtype logtypes[] = {
#if 0 // no longer used, but reserve the type
{"local_txn_checkpoint", 'c', FA{{"TXNID", "xid", 0}, NULLFIELD}},
#endif
{"begin_checkpoint", 'x', FA{{"u_int64_t", "timestamp", 0}, {"TXNID", "last_xid", 0}, NULLFIELD}, false},
{"begin_checkpoint", 'x', FA{{"u_int64_t", "timestamp", 0}, {"TXNID", "last_xid", 0}, NULLFIELD}, IGNORE_LOG_BEGIN},
{"end_checkpoint", 'X', FA{{"LSN", "lsn_begin_checkpoint", 0},
{"u_int64_t", "timestamp", 0},
{"u_int32_t", "num_fassociate_entries", 0}, // how many files were checkpointed
{"u_int32_t", "num_xstillopen_entries", 0}, // how many txns were checkpointed
NULLFIELD}, false},
NULLFIELD}, IGNORE_LOG_BEGIN},
//TODO: #2037 Add dname
{"fassociate", 'f', FA{{"FILENUM", "filenum", 0},
{"u_int32_t", "treeflags", 0},
{"BYTESTRING", "iname", 0}, // pathname of file
{"u_int8_t", "unlink_on_close", 0},
NULLFIELD}, false},
NULLFIELD}, IGNORE_LOG_BEGIN},
//We do not use a TXNINFO struct since recovery log has
//FILENUMS and TOKUTXN has FTs (for open_fts)
{"xstillopen", 's', FA{{"TXNID", "xid", 0},
......@@ -118,7 +124,7 @@ const struct logtype logtypes[] = {
{"BLOCKNUM", "spilled_rollback_head", 0},
{"BLOCKNUM", "spilled_rollback_tail", 0},
{"BLOCKNUM", "current_rollback", 0},
NULLFIELD}, false}, // record all transactions
NULLFIELD}, IGNORE_LOG_BEGIN}, // record all transactions
// prepared txns need a gid
{"xstillopenprepared", 'p', FA{{"TXNID", "xid", 0},
{"XIDP", "xa_xid", 0}, // prepared transactions need a gid, and have no parentxid.
......@@ -130,15 +136,15 @@ const struct logtype logtypes[] = {
{"BLOCKNUM", "spilled_rollback_head", 0},
{"BLOCKNUM", "spilled_rollback_tail", 0},
{"BLOCKNUM", "current_rollback", 0},
NULLFIELD}, false}, // record all transactions
NULLFIELD}, IGNORE_LOG_BEGIN}, // record all transactions
{"suppress_rollback", 'S', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
NULLFIELD}, true},
NULLFIELD}, SHOULD_LOG_BEGIN},
// Records produced by transactions
{"xbegin", 'b', FA{{"TXNID", "xid", 0},{"TXNID", "parentxid", 0},NULLFIELD}, false},
{"xcommit",'C', FA{{"TXNID", "xid", 0},NULLFIELD}, false},
{"xprepare",'P', FA{{"TXNID", "xid", 0}, {"XIDP", "xa_xid", 0}, NULLFIELD}, false},
{"xabort", 'q', FA{{"TXNID", "xid", 0},NULLFIELD}, false},
{"xbegin", 'b', FA{{"TXNID", "xid", 0},{"TXNID", "parentxid", 0},NULLFIELD}, IGNORE_LOG_BEGIN},
{"xcommit",'C', FA{{"TXNID", "xid", 0},NULLFIELD}, ASSERT_BEGIN_WAS_LOGGED},
{"xprepare",'P', FA{{"TXNID", "xid", 0}, {"XIDP", "xa_xid", 0}, NULLFIELD}, ASSERT_BEGIN_WAS_LOGGED},
{"xabort", 'q', FA{{"TXNID", "xid", 0},NULLFIELD}, ASSERT_BEGIN_WAS_LOGGED},
//TODO: #2037 Add dname
{"fcreate", 'F', FA{{"TXNID", "xid", 0},
{"FILENUM", "filenum", 0},
......@@ -148,49 +154,49 @@ const struct logtype logtypes[] = {
{"u_int32_t", "nodesize", 0},
{"u_int32_t", "basementnodesize", 0},
{"u_int32_t", "compression_method", 0},
NULLFIELD}, true},
NULLFIELD}, SHOULD_LOG_BEGIN},
//TODO: #2037 Add dname
{"fopen", 'O', FA{{"BYTESTRING", "iname", 0},
{"FILENUM", "filenum", 0},
{"u_int32_t", "treeflags", 0},
NULLFIELD}, false},
NULLFIELD}, IGNORE_LOG_BEGIN},
//TODO: #2037 Add dname
{"fclose", 'e', FA{{"BYTESTRING", "iname", 0},
{"FILENUM", "filenum", 0},
NULLFIELD}, false},
NULLFIELD}, IGNORE_LOG_BEGIN},
//TODO: #2037 Add dname
{"fdelete", 'U', FA{{"TXNID", "xid", 0},
{"FILENUM", "filenum", 0},
NULLFIELD}, true},
NULLFIELD}, SHOULD_LOG_BEGIN},
{"enq_insert", 'I', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "value", 0},
NULLFIELD}, true},
NULLFIELD}, SHOULD_LOG_BEGIN},
{"enq_insert_no_overwrite", 'i', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "value", 0},
NULLFIELD}, true},
NULLFIELD}, SHOULD_LOG_BEGIN},
{"enq_delete_any", 'E', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},
NULLFIELD}, true},
NULLFIELD}, SHOULD_LOG_BEGIN},
{"enq_insert_multiple", 'm', FA{{"FILENUM", "src_filenum", 0},
{"FILENUMS", "dest_filenums", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "src_key", 0},
{"BYTESTRING", "src_val", 0},
NULLFIELD}, true},
NULLFIELD}, SHOULD_LOG_BEGIN},
{"enq_delete_multiple", 'M', FA{{"FILENUM", "src_filenum", 0},
{"FILENUMS", "dest_filenums", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "src_key", 0},
{"BYTESTRING", "src_val", 0},
NULLFIELD}, true},
NULLFIELD}, SHOULD_LOG_BEGIN},
{"comment", 'T', FA{{"u_int64_t", "timestamp", 0},
{"BYTESTRING", "comment", 0},
NULLFIELD}, false},
NULLFIELD}, IGNORE_LOG_BEGIN},
// Note: Shutdown log entry is NOT ALLOWED TO BE CHANGED.
// Do not change the letter ('Q'), do not add fields,
// do not remove fields.
......@@ -198,32 +204,32 @@ const struct logtype logtypes[] = {
// This log entry must always be readable for future versions.
// If you DO change it, you need to write a separate log upgrade mechanism.
{"shutdown", 'Q', FA{{"u_int64_t", "timestamp", 0},
NULLFIELD}, false},
NULLFIELD}, IGNORE_LOG_BEGIN},
{"load", 'l', FA{{"TXNID", "xid", 0},
{"FILENUM", "old_filenum", 0},
{"BYTESTRING", "new_iname", 0},
NULLFIELD}, true},
NULLFIELD}, SHOULD_LOG_BEGIN},
// #2954
{"hot_index", 'h', FA{{"TXNID", "xid", 0},
{"FILENUMS", "hot_index_filenums", 0},
NULLFIELD}, true},
NULLFIELD}, SHOULD_LOG_BEGIN},
{"enq_update", 'u', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "extra", 0},
NULLFIELD}, true},
NULLFIELD}, SHOULD_LOG_BEGIN},
{"enq_updatebroadcast", 'B', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "extra", 0},
{"BOOL", "is_resetting_op", 0},
NULLFIELD}, true},
NULLFIELD}, SHOULD_LOG_BEGIN},
{"change_fdescriptor", 'D', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "old_descriptor", 0},
{"BYTESTRING", "new_descriptor", 0},
{"BOOL", "update_cmp_descriptor", 0},
NULLFIELD}, true},
{0,0,FA{NULLFIELD}, false}
NULLFIELD}, SHOULD_LOG_BEGIN},
{0,0,FA{NULLFIELD}, (enum log_begin_action) 0}
};
......@@ -381,18 +387,31 @@ generate_log_writer (void) {
DO_FIELDS(field_type, lt, fprintf(hf, "+sizeof(%s)", field_type->type));
fprintf(hf, "+8);\n");
fprintf2(cf, hf, "int toku_log_%s (TOKULOGGER logger, LSN *lsnp, int do_fsync", lt->name);
if (lt->needs_to_maybe_log_begin_txn) {
switch (lt->log_begin_action) {
case SHOULD_LOG_BEGIN:
case ASSERT_BEGIN_WAS_LOGGED: {
fprintf2(cf, hf, ", TOKUTXN txn");
break;
}
case IGNORE_LOG_BEGIN: break;
}
DO_FIELDS(field_type, lt, fprintf2(cf, hf, ", %s %s", field_type->type, field_type->name));
fprintf(hf, ");\n");
fprintf(cf, ") {\n");
fprintf(cf, " int r = 0;\n");
fprintf(cf, " if (logger==0) return 0;\n");
if (lt->needs_to_maybe_log_begin_txn) {
switch (lt->log_begin_action) {
case SHOULD_LOG_BEGIN: {
fprintf(cf, " if (txn && !txn->begin_was_logged) {\n");
fprintf(cf, " toku_maybe_log_begin_txn_for_write_operation(txn);\n");
fprintf(cf, " }\n");
break;
}
case ASSERT_BEGIN_WAS_LOGGED: {
fprintf(cf, " assert(txn->begin_was_logged);\n");
break;
}
case IGNORE_LOG_BEGIN: break;
}
fprintf(cf, " if (!logger->write_log_files) {\n");
fprintf(cf, " ml_lock(&logger->input_lock);\n");
......
......@@ -188,13 +188,13 @@ int create_logfiles() {
//fcreate 'F': lsn=2 txnid=1 filenum=0 fname={len=4 data="a.db"} mode=0777 treeflags=0 crc=18a3d525 len=49
r = toku_log_fcreate(logger, &lsn, NO_FSYNC, NULL, txnid, fn_aname, bs_aname, 0x0777, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, 0); assert(r==0);
//commit 'C': lsn=3 txnid=1 crc=00001f1e len=29
r = toku_log_xcommit(logger, &lsn, FSYNC, txnid); assert(r==0);
r = toku_log_xcommit(logger, &lsn, FSYNC, NULL, txnid); assert(r==0);
//xbegin 'b': lsn=4 parenttxnid=0 crc=00000a1f len=29
r = toku_log_xbegin(logger, &lsn, 2, NO_FSYNC, 0); assert(r==0); txnid = lsn.lsn;
//fcreate 'F': lsn=5 txnid=4 filenum=1 fname={len=4 data="b.db"} mode=0777 treeflags=0 crc=14a47925 len=49
r = toku_log_fcreate(logger, &lsn, NO_FSYNC, NULL, txnid, fn_bname, bs_bname, 0x0777, 0, 0, TOKU_DEFAULT_COMPRESSION_METHOD, 0); assert(r==0);
//commit 'C': lsn=6 txnid=4 crc=0000c11e len=29
r = toku_log_xcommit(logger, &lsn, FSYNC, txnid); assert(r==0);
r = toku_log_xcommit(logger, &lsn, FSYNC, NULL, txnid); assert(r==0);
//xbegin 'b': lsn=7 parenttxnid=0 crc=0000f91f len=29
r = toku_log_xbegin(logger, &lsn, 3, NO_FSYNC, 0); assert(r==0); txnid = lsn.lsn;
//enq_insert 'I': lsn=8 filenum=0 xid=7 key={len=2 data="a\000"} value={len=2 data="b\000"} crc=40b863e4 len=45
......@@ -221,7 +221,7 @@ int create_logfiles() {
//enq_insert 'I': lsn=14 filenum=1 xid=7 key={len=2 data="b\000"} value={len=2 data="a\000"} crc=40388be4 len=45
r = toku_log_enq_insert(logger, &lsn, NO_FSYNC, NULL, fn_bname, txnid, bs_b, bs_a); assert(r==0);
//commit 'C': lsn=15 txnid=7 crc=00016d1e len=29
r = toku_log_xcommit(logger, &lsn, FSYNC, txnid); assert(r==0);
r = toku_log_xcommit(logger, &lsn, FSYNC, NULL, txnid); assert(r==0);
// close logger
r = toku_logger_close(&logger); assert(r==0);
......
......@@ -283,7 +283,7 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
txn->progress_poll_fun_extra = poll_extra;
if (txn->begin_was_logged) {
r = toku_log_xcommit(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64);
r = toku_log_xcommit(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid64);
if (r != 0) {
goto cleanup;
}
......@@ -324,7 +324,7 @@ int toku_txn_abort_with_lsn(TOKUTXN txn, LSN oplsn,
txn->do_fsync = FALSE;
if (txn->begin_was_logged) {
r = toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64);
r = toku_log_xabort(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid64);
if (r != 0) {
goto cleanup;
}
......@@ -354,13 +354,19 @@ static void copy_xid (TOKU_XA_XID *dest, TOKU_XA_XID *source) {
}
int toku_txn_prepare_txn (TOKUTXN txn, TOKU_XA_XID *xa_xid) {
if (txn->parent) return 0; // nothing to do if there's a parent.
int r = 0;
if (txn->parent || !txn->begin_was_logged) {
// nothing to do if there's a parent, or if it's read-only
goto cleanup;
}
toku_txn_manager_add_prepared_txn(txn->logger->txn_manager, txn);
// Do we need to do an fsync?
txn->do_fsync = (txn->force_fsync_on_commit || txn->roll_info.num_rollentries>0);
copy_xid(&txn->xa_xid, xa_xid);
// This list will go away with #4683, so we wn't need the ydb lock for this anymore.
return toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn->txnid64, xa_xid);
r = toku_log_xprepare(txn->logger, &txn->do_fsync_lsn, 0, txn, txn->txnid64, xa_xid);
cleanup:
return r;
}
void toku_txn_get_prepared_xa_xid (TOKUTXN txn, TOKU_XA_XID *xid) {
......
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#include "test.h"
#include <sys/wait.h>
#define ENVDIR2 ENVDIR "2"
static void clean_env (const char *envdir) {
const int len = strlen(envdir)+100;
char cmd[len];
snprintf(cmd, len, "rm -rf %s", envdir);
int r = system(cmd);
CKERR(r);
CKERR(toku_os_mkdir(envdir, S_IRWXU+S_IRWXG+S_IRWXO));
}
static void setup_env (DB_ENV **envp, const char *envdir) {
{ int chk_r = db_env_create(envp, 0); CKERR(chk_r); }
(*envp)->set_errfile(*envp, stderr);
#ifdef TOKUDB
{ int chk_r = (*envp)->set_redzone(*envp, 0); CKERR(chk_r); }
#endif
{ int chk_r = (*envp)->open(*envp, envdir, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE|DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(chk_r); }
}
static void setup_env_and_prepare (DB_ENV **envp, const char *envdir, bool commit) {
DB *db;
DB_TXN *txn;
clean_env(envdir);
setup_env(envp, envdir);
CKERR(db_create(&db, *envp, 0));
CKERR(db->open(db, NULL, "foo.db", 0, DB_BTREE, DB_CREATE | DB_AUTO_COMMIT, S_IRWXU+S_IRWXG+S_IRWXO));
CKERR((*envp)->txn_begin(*envp, 0, &txn, 0));
u_int8_t gid[DB_GID_SIZE];
memset(gid, 0, DB_GID_SIZE);
gid[0]=42;
CKERR(txn->prepare(txn, gid));
{ int chk_r = db->close(db, 0); CKERR(chk_r); }
if (commit)
CKERR(txn->commit(txn, 0));
}
int test_main (int argc, char *const argv[]) {
default_parse_args(argc, argv);
DB_ENV *env;
setup_env_and_prepare(&env, ENVDIR, true);
{ int chk_r = env ->close(env, 0); CKERR(chk_r); }
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment