Commit 0d32c57b authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

close[t:4574] merging 4574 to main. fixes the hcad deadlock found by Tim's...

close[t:4574] merging 4574 to main. fixes the hcad deadlock found by Tim's stress test, which adds and drops indexes concurrent with queries and insertions. transactions no longer keep trollback nodes pinned after an operation, but instead always unpin them.

this merge also introduces a lot of improvements to our rollback code, in terms of clarity and consistency. to that end, variable names and function names were improved, as well as more documentation of the rollback logic in rollback.h and log-internal.h

roll.h is removed because it is a dead file.


git-svn-id: file:///svn/toku/tokudb@41576 c7de825b-a66e-492c-adef-691d508d4ae1
parent 118561e5
......@@ -2046,7 +2046,7 @@ toku_serialize_descriptor_contents_to_fd(int fd, const DESCRIPTOR desc, DISKOFF
static void
deserialize_descriptor_from_rbuf(struct rbuf *rb, DESCRIPTOR desc, int layout_version) {
if (layout_version == BRT_LAYOUT_VERSION_13) {
// in older versions of TokuDB the Descriptor had a 4 byte version, which we must skip over
// in previous versions of TokuDB the Descriptor had a 4 byte version, which we must skip over
u_int32_t dummy_version __attribute__((__unused__)) = rbuf_int(rb);
}
u_int32_t size;
......@@ -2501,8 +2501,8 @@ serialize_rollback_log_size(ROLLBACK_LOG_NODE log) {
size_t size = node_header_overhead //8 "tokuroll", 4 version, 4 version_original, 4 build_id
+8 //TXNID
+8 //sequence
+8 //thislogname
+8 //older (blocknum)
+8 //blocknum
+8 //previous (blocknum)
+8 //resident_bytecount
+8 //memarena_size_needed_to_load
+log->rollentry_resident_bytecount;
......@@ -2521,8 +2521,8 @@ serialize_rollback_log_node_to_buf(ROLLBACK_LOG_NODE log, char *buf, size_t calc
wbuf_nocrc_uint(&wb, BUILD_ID);
wbuf_nocrc_TXNID(&wb, log->txnid);
wbuf_nocrc_ulonglong(&wb, log->sequence);
wbuf_nocrc_BLOCKNUM(&wb, log->thislogname);
wbuf_nocrc_BLOCKNUM(&wb, log->older);
wbuf_nocrc_BLOCKNUM(&wb, log->blocknum);
wbuf_nocrc_BLOCKNUM(&wb, log->previous);
wbuf_nocrc_ulonglong(&wb, log->rollentry_resident_bytecount);
//Write down memarena size needed to restore
wbuf_nocrc_ulonglong(&wb, memarena_total_size_in_use(log->rollentry_arena));
......@@ -2677,18 +2677,18 @@ deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, ROLLB
//TODO: This is hard.. everything is shared in a single dictionary.
rbuf_TXNID(rb, &result->txnid);
result->sequence = rbuf_ulonglong(rb);
result->thislogname = rbuf_blocknum(rb);
if (result->thislogname.b != blocknum.b) {
result->blocknum = rbuf_blocknum(rb);
if (result->blocknum.b != blocknum.b) {
r = toku_db_badformat();
goto died0;
}
result->thishash = toku_cachetable_hash(h->cf, result->thislogname);
if (result->thishash != fullhash) {
result->hash = toku_cachetable_hash(h->cf, result->blocknum);
if (result->hash != fullhash) {
r = toku_db_badformat();
goto died0;
}
result->older = rbuf_blocknum(rb);
result->older_hash = toku_cachetable_hash(h->cf, result->older);
result->previous = rbuf_blocknum(rb);
result->previous_hash = toku_cachetable_hash(h->cf, result->previous);
result->rollentry_resident_bytecount = rbuf_ulonglong(rb);
size_t arena_initial_size = rbuf_ulonglong(rb);
......
......@@ -120,7 +120,6 @@ basement nodes, bulk fetch, and partial fetch:
// Access to nested transaction logic
#include "ule.h"
#include "xids.h"
#include "roll.h"
#include "sub_block.h"
#include "sort.h"
#include <brt-cachetable-wrappers.h>
......
......@@ -3571,17 +3571,6 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
assert(0);
}
static int
unpin_rollback_log_for_checkpoint (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
int r = 0;
TOKUTXN txn = txnv;
if (txn->pinned_inprogress_rollback_log) {
r = toku_rollback_log_unpin(txn, txn->pinned_inprogress_rollback_log);
assert(r==0);
}
return r;
}
// TODO: #1510 locking of cachetable is suspect
// verify correct algorithm overall
......@@ -3596,12 +3585,6 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
{
brt_begin_checkpoint();
unsigned i;
if (logger) { // Unpin all 'inprogress rollback log nodes' pinned by transactions
int r = toku_omt_iterate(logger->live_txns,
unpin_rollback_log_for_checkpoint,
NULL);
assert(r==0);
}
cachetable_lock(ct);
//Initialize accountability counters
ct->checkpoint_num_files = 0;
......
......@@ -145,17 +145,37 @@ struct tokutxn {
BOOL force_fsync_on_commit; //This transaction NEEDS an fsync once (if) it commits. (commit means root txn)
TXN_PROGRESS_POLL_FUNCTION progress_poll_fun;
void * progress_poll_fun_extra;
// these are number of rollback nodes and rollback entries for this txn.
//
// the current rollback node below has sequence number num_rollback_nodes - 1
// (because they are numbered 0...num-1). often, the current rollback is
// already set to this block num, which means it exists and is available to
// log some entries. if the current rollback is NONE and the number of
// rollback nodes for this transaction is non-zero, then we will use
// the number of rollback nodes to know which sequence number to assign
// to a new one we create
uint64_t num_rollback_nodes;
uint64_t num_rollentries;
uint64_t num_rollentries_processed;
// spilled rollback nodes are rollback nodes that were gorged by this
// transaction, retired, and saved in a list.
// the spilled rollback head is the block number of the first rollback node
// that makes up the rollback log chain
BLOCKNUM spilled_rollback_head;
uint32_t spilled_rollback_head_hash;
// the spilled rollback is the block number of the last rollback node that
// makes up the rollback log chain.
BLOCKNUM spilled_rollback_tail;
uint32_t spilled_rollback_tail_hash;
// the current rollback node block number we may use. if this is ROLLBACK_NONE,
// then we need to create one and set it here before using it.
BLOCKNUM current_rollback;
uint32_t current_rollback_hash;
BOOL recovered_from_checkpoint;
ROLLBACK_LOG_NODE pinned_inprogress_rollback_log;
struct toku_list checkpoint_before_commit;
TXN_IGNORE_S ignore_errors; // 2954
TOKUTXN_STATE state;
......
......@@ -580,10 +580,8 @@ generate_rollbacks (void) {
fprintf(hf, ");\n");
fprintf(cf, ") {\n");
fprintf(cf, " int r;\n");
fprintf(cf, " ROLLBACK_LOG_NODE log;\n");
fprintf(cf, " r = toku_get_and_pin_rollback_log_for_new_entry(txn, &log);\n");
fprintf(cf, " assert(r==0);\n");
fprintf(cf, " toku_get_and_pin_rollback_log_for_new_entry(txn, &log);\n");
// 'memdup' all BYTESTRINGS here
DO_FIELDS(ft, lt, {
if ( strcmp(ft->type, "BYTESTRING") == 0 ) {
......@@ -620,7 +618,10 @@ generate_rollbacks (void) {
fprintf(cf, " txn->rollentry_raw_count += rollback_fsize;\n");
fprintf(cf, " txn->num_rollentries++;\n");
fprintf(cf, " log->dirty = TRUE;\n");
fprintf(cf, " return toku_maybe_spill_rollbacks(txn, log);\n}\n");
fprintf(cf, " // spill and unpin assert success internally\n");
fprintf(cf, " toku_maybe_spill_rollbacks(txn, log);\n");
fprintf(cf, " toku_rollback_log_unpin(txn, log);\n");
fprintf(cf, " return 0;\n}\n");
});
DO_ROLLBACKS(lt, {
......
......@@ -6,9 +6,23 @@
/* rollback and rollforward routines. */
#include "includes.h"
#include "checkpoint.h"
#include "xids.h"
#include "roll.h"
// functionality provided by roll.c is exposed by an autogenerated
// header file, logheader.h
//
// this (poorly) explains the absense of "roll.h"
// these flags control whether or not we send commit messages for
// various operations
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_INSERT message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_INSERT 0
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_DELETE_ANY message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_DELETE 1
int
toku_commit_fdelete (u_int8_t file_was_open,
......@@ -305,7 +319,7 @@ toku_apply_rollinclude (TXNID xid,
void * yieldv,
LSN oplsn,
apply_rollback_item func) {
int r;
int r = 0;
struct roll_entry *item;
int count=0;
......@@ -316,14 +330,13 @@ toku_apply_rollinclude (TXNID xid,
BOOL found_head = FALSE;
assert(next_log.b != ROLLBACK_NONE.b);
while (next_log.b != ROLLBACK_NONE.b) {
ROLLBACK_LOG_NODE log;
//pin log
r = toku_get_and_pin_rollback_log(txn, xid, last_sequence - 1, next_log, next_log_hash, &log);
assert(r==0);
ROLLBACK_LOG_NODE log;
toku_get_and_pin_rollback_log(txn, next_log, next_log_hash, &log);
toku_rollback_verify_contents(log, xid, last_sequence - 1);
last_sequence = log->sequence;
r = toku_maybe_prefetch_older_rollback_log(txn, log);
assert(r==0);
toku_maybe_prefetch_previous_rollback_log(txn, log);
while ((item=log->newest_logentry)) {
log->newest_logentry = item->prev;
......@@ -337,8 +350,8 @@ toku_apply_rollinclude (TXNID xid,
found_head = TRUE;
assert(log->sequence == 0);
}
next_log = log->older;
next_log_hash = log->older_hash;
next_log = log->previous;
next_log_hash = log->previous_hash;
{
//Clean up transaction structure to prevent
//toku_txn_close from double-freeing
......@@ -350,9 +363,7 @@ toku_apply_rollinclude (TXNID xid,
spilled_head_hash = next_log_hash;
}
}
//Unpins log
r = toku_delete_rollback_log(txn, log);
assert(r==0);
toku_rollback_log_unpin_and_remove(txn, log);
}
return r;
}
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#ifndef TOKUDB_ROLL_H
#define TOKUDB_ROLL_H
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// these flags control whether or not we send commit messages for
// various operations
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_INSERT message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_INSERT 0
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_DELETE_ANY message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_DELETE 1
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_DELETE_BOTH message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_DELETE_BOTH 1
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
......@@ -5,8 +5,6 @@
#include "includes.h"
static void note_txn_closing (TOKUTXN txn);
void
toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint) {
if (txn->progress_poll_fun) {
......@@ -38,7 +36,7 @@ int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield
}
static inline int
txn_has_inprogress_rollback_log(TOKUTXN txn) {
txn_has_current_rollback_log(TOKUTXN txn) {
return txn->current_rollback.b != ROLLBACK_NONE.b;
}
......@@ -58,21 +56,16 @@ static void rollback_unpin_remove_callback(CACHEKEY* cachekey, BOOL for_checkpoi
}
int
toku_delete_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
int r;
CACHEFILE cf = txn->logger->rollback_cachefile;
struct brt_header *h = toku_cachefile_get_userdata(cf);
if (txn->pinned_inprogress_rollback_log == log) {
txn->pinned_inprogress_rollback_log = NULL;
}
r = toku_cachetable_unpin_and_remove (cf, log->thislogname, rollback_unpin_remove_callback, h);
assert(r==0);
return r;
r = toku_cachetable_unpin_and_remove (cf, log->blocknum, rollback_unpin_remove_callback, h);
assert(r == 0);
}
static int
toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
apply_rollback_item func) {
int r = 0;
// do the commit/abort calls and free everything
......@@ -85,7 +78,7 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
uint32_t next_log_hash = 0;
BOOL is_current = FALSE;
if (txn_has_inprogress_rollback_log(txn)) {
if (txn_has_current_rollback_log(txn)) {
next_log = txn->current_rollback;
next_log_hash = txn->current_rollback_hash;
is_current = TRUE;
......@@ -100,11 +93,10 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
while (next_log.b != ROLLBACK_NONE.b) {
ROLLBACK_LOG_NODE log;
//pin log
r = toku_get_and_pin_rollback_log(txn, txn->txnid64, last_sequence-1, next_log, next_log_hash, &log);
assert(r==0);
toku_get_and_pin_rollback_log(txn, next_log, next_log_hash, &log);
toku_rollback_verify_contents(log, txn->txnid64, last_sequence - 1);
r = toku_maybe_prefetch_older_rollback_log(txn, log);
assert(r==0);
toku_maybe_prefetch_previous_rollback_log(txn, log);
last_sequence = log->sequence;
if (func) {
......@@ -129,8 +121,8 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
found_head = TRUE;
assert(log->sequence == 0);
}
next_log = log->older;
next_log_hash = log->older_hash;
next_log = log->previous;
next_log_hash = log->previous_hash;
{
//Clean up transaction structure to prevent
//toku_txn_close from double-freeing
......@@ -149,9 +141,7 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
txn->spilled_rollback_head_hash = next_log_hash;
}
}
//Unpins log
r = toku_delete_rollback_log(txn, log);
assert(r==0);
toku_rollback_log_unpin_and_remove(txn, log);
}
return r;
}
......@@ -211,7 +201,7 @@ live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), voi
olderxid = (TXNID) olderv;
invariant(olderxid < xid);
if (olderxid >= *live_xid) {
//Older txn is new enough, we need to update.
//older txn is new enough, we need to update.
pair->xid2 = olderxid;
should_delete = FALSE;
}
......@@ -243,6 +233,48 @@ live_list_reverse_note_txn_end(TOKUTXN txn) {
return r;
}
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
static int find_xid (OMTVALUE v, void *txnv) {
TOKUTXN txn = v;
TOKUTXN txnfind = txnv;
if (txn->txnid64<txnfind->txnid64) return -1;
if (txn->txnid64>txnfind->txnid64) return +1;
return 0;
}
static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv)
// Effect: This function is called on every open BRT that a transaction used.
// This function removes the transaction from that BRT.
{
BRT brt = brtv;
TOKUTXN txn = txnv;
OMTVALUE txnv_again=NULL;
u_int32_t index;
int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv_again, &index);
assert(r==0);
assert(txnv_again == txnv);
r = toku_omt_delete_at(brt->txns, index);
assert(r==0);
if (txn->txnid64==brt->h->txnid_that_created_or_locked_when_empty) {
brt->h->txnid_that_created_or_locked_when_empty = TXNID_NONE;
brt->h->root_that_created_or_locked_when_empty = TXNID_NONE;
}
if (txn->txnid64==brt->h->txnid_that_suppressed_recovery_logs) {
brt->h->txnid_that_suppressed_recovery_logs = TXNID_NONE;
}
if (!toku_brt_zombie_needed(brt) && brt->was_closed) {
//Close immediately.
assert(brt->close_db);
r = brt->close_db(brt->db, brt->close_flags, false, ZERO_LSN);
}
return r;
}
// for every BRT in txn, remove it.
static void note_txn_closing (TOKUTXN txn) {
toku_omt_iterate(txn->open_brts, remove_txn, txn);
}
void toku_rollback_txn_close (TOKUTXN txn) {
assert(txn->spilled_rollback_head.b == ROLLBACK_NONE.b);
......@@ -329,7 +361,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
note_txn_closing(txn);
}
void* toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size) {
void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size) {
return malloc_in_memarena(log->rollentry_arena, size);
}
......@@ -366,7 +398,7 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
if (txn_has_spilled_rollback_logs(txn)) {
uint64_t num_nodes = txn->num_rollback_nodes;
if (txn_has_inprogress_rollback_log(txn)) {
if (txn_has_current_rollback_log(txn)) {
num_nodes--; //Don't count the in-progress rollback log.
}
r = toku_logger_save_rollback_rollinclude(txn->parent, txn->txnid64, num_nodes,
......@@ -379,18 +411,19 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
txn->spilled_rollback_tail = ROLLBACK_NONE;
txn->spilled_rollback_tail_hash = 0;
}
if (txn_has_inprogress_rollback_log(txn)) {
ROLLBACK_LOG_NODE parent_log;
// if we're commiting a child rollback, put its entries into the parent
// by pinning both child and parent and then linking the child log entry
// list to the end of the parent log entry list.
if (txn_has_current_rollback_log(txn)) {
//Pin parent log
r = toku_get_and_pin_rollback_log_for_new_entry(txn->parent, &parent_log);
assert(r==0);
ROLLBACK_LOG_NODE parent_log;
toku_get_and_pin_rollback_log_for_new_entry(txn->parent, &parent_log);
ROLLBACK_LOG_NODE child_log;
//Pin child log
r = toku_get_and_pin_rollback_log(txn, txn->txnid64, txn->num_rollback_nodes - 1,
txn->current_rollback, txn->current_rollback_hash,
&child_log);
assert(r==0);
ROLLBACK_LOG_NODE child_log;
toku_get_and_pin_rollback_log(txn, txn->current_rollback,
txn->current_rollback_hash, &child_log);
toku_rollback_verify_contents(child_log, txn->txnid64, txn->num_rollback_nodes - 1);
// Append the list to the front of the parent.
if (child_log->oldest_logentry) {
......@@ -413,14 +446,13 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
// If there are no bytes to move, then just leave things alone, and let the memory be reclaimed on txn is closed.
memarena_move_buffers(parent_log->rollentry_arena, child_log->rollentry_arena);
}
//Delete child log (unpins child_log)
r = toku_delete_rollback_log(txn, child_log);
assert(r==0);
toku_rollback_log_unpin_and_remove(txn, child_log);
txn->current_rollback = ROLLBACK_NONE;
txn->current_rollback_hash = 0;
r = toku_maybe_spill_rollbacks(txn->parent, parent_log); //unpins parent_log
assert(r==0);
toku_maybe_spill_rollbacks(txn->parent, parent_log);
toku_rollback_log_unpin(txn->parent, parent_log);
assert(r == 0);
}
// Note the open brts, the omts must be merged
......@@ -438,7 +470,7 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit;
txn->parent->num_rollentries += txn->num_rollentries;
} else {
r = toku_apply_txn(txn, yield, yieldv, lsn, toku_commit_rollback_item);
r = apply_txn(txn, yield, yieldv, lsn, toku_commit_rollback_item);
assert(r==0);
}
......@@ -452,7 +484,7 @@ int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
toku_list_pop(&txn->checkpoint_before_commit);
}
r = toku_apply_txn(txn, yield, yieldv, lsn, toku_abort_rollback_item);
r = apply_txn(txn, yield, yieldv, lsn, toku_abort_rollback_item);
assert(r==0);
return r;
}
......@@ -481,17 +513,14 @@ rollback_memory_size(ROLLBACK_LOG_NODE log) {
return make_rollback_pair_attr(size);
}
// Cleanup the rollback memory
static void
toku_rollback_log_free(ROLLBACK_LOG_NODE *log_p) {
ROLLBACK_LOG_NODE log = *log_p;
*log_p = NULL; //Sanitize
// Cleanup the rollback memory
rollback_log_destroy(ROLLBACK_LOG_NODE log) {
memarena_close(&log->rollentry_arena);
toku_free(log);
}
static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname,
static void rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname,
void *rollback_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size,
BOOL write_me, BOOL keep_me, BOOL for_checkpoint, BOOL UU(is_clone)) {
int r;
......@@ -499,13 +528,13 @@ static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM
struct brt_header *h = extraargs;
assert(h->cf == cachefile);
assert(log->thislogname.b==logname.b);
assert(log->blocknum.b==logname.b);
if (write_me && !h->panic) {
int n_workitems, n_threads;
toku_cachefile_get_workqueue_load(cachefile, &n_workitems, &n_threads);
r = toku_serialize_rollback_log_to(fd, log->thislogname, log, h, n_workitems, n_threads, for_checkpoint);
r = toku_serialize_rollback_log_to(fd, log->blocknum, log, h, n_workitems, n_threads, for_checkpoint);
if (r) {
if (h->panic==0) {
char *e = strerror(r);
......@@ -519,11 +548,11 @@ static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM
}
*new_size = size;
if (!keep_me) {
toku_rollback_log_free(&log);
rollback_log_destroy(log);
}
}
static int toku_rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname, u_int32_t fullhash,
static int rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname, u_int32_t fullhash,
void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) {
int r;
struct brt_header *h = extraargs;
......@@ -537,7 +566,7 @@ static int toku_rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM l
return r;
}
static void toku_rollback_pe_est_callback(
static void rollback_pe_est_callback(
void* rollback_v,
void* UU(disk_data),
long* bytes_freed_estimate,
......@@ -551,7 +580,7 @@ static void toku_rollback_pe_est_callback(
}
// callback for partially evicting a cachetable entry
static int toku_rollback_pe_callback (
static int rollback_pe_callback (
void *rollback_v,
PAIR_ATTR UU(old_attr),
PAIR_ATTR* new_attr,
......@@ -562,17 +591,22 @@ static int toku_rollback_pe_callback (
*new_attr = old_attr;
return 0;
}
static BOOL toku_rollback_pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
// partial fetch is never required for a rollback log node
static BOOL rollback_pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE;
}
static int toku_rollback_pf_callback(void* UU(brtnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep)) {
// should never be called, given that toku_rollback_pf_req_callback always returns false
// a rollback node should never be partial fetched,
// because we always say it is not required.
// (pf req callback always returns false)
static int rollback_pf_callback(void* UU(brtnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep)) {
assert(FALSE);
return 0;
}
static int toku_rollback_cleaner_callback (
// the cleaner thread should never choose a rollback node for cleaning
static int rollback_cleaner_callback (
void* UU(brtnode_pv),
BLOCKNUM UU(blocknum),
u_int32_t UU(fullhash),
......@@ -585,16 +619,19 @@ static int toku_rollback_cleaner_callback (
static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_rollback_log(struct brt_header* h) {
CACHETABLE_WRITE_CALLBACK wc;
wc.flush_callback = toku_rollback_flush_callback;
wc.pe_est_callback = toku_rollback_pe_est_callback;
wc.pe_callback = toku_rollback_pe_callback;
wc.cleaner_callback = toku_rollback_cleaner_callback;
wc.flush_callback = rollback_flush_callback;
wc.pe_est_callback = rollback_pe_est_callback;
wc.pe_callback = rollback_pe_callback;
wc.cleaner_callback = rollback_cleaner_callback;
wc.clone_callback = NULL;
wc.write_extraargs = h;
return wc;
}
static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t older_hash, ROLLBACK_LOG_NODE *result) {
// create and pin a new rollback log node. chain it to the other rollback nodes
// by providing a previous blocknum/ hash and assigning the new rollback log
// node the next sequence number
static void rollback_log_create (TOKUTXN txn, BLOCKNUM previous, uint32_t previous_hash, ROLLBACK_LOG_NODE *result) {
ROLLBACK_LOG_NODE MALLOC(log);
assert(log);
......@@ -608,58 +645,39 @@ static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t o
log->dirty = TRUE;
log->txnid = txn->txnid64;
log->sequence = txn->num_rollback_nodes++;
toku_allocate_blocknum(h->blocktable, &log->thislogname, h);
log->thishash = toku_cachetable_hash(cf, log->thislogname);
log->older = older;
log->older_hash = older_hash;
toku_allocate_blocknum(h->blocktable, &log->blocknum, h);
log->hash = toku_cachetable_hash(cf, log->blocknum);
log->previous = previous;
log->previous_hash = previous_hash;
log->oldest_logentry = NULL;
log->newest_logentry = NULL;
log->rollentry_arena = memarena_create();
log->rollentry_resident_bytecount = 0;
*result = log;
r=toku_cachetable_put(cf, log->thislogname, log->thishash,
r = toku_cachetable_put(cf, log->blocknum, log->hash,
log, rollback_memory_size(log),
get_write_callbacks_for_rollback_log(h));
assert(r==0);
txn->current_rollback = log->thislogname;
txn->current_rollback_hash = log->thishash;
txn->pinned_inprogress_rollback_log = log;
return 0;
}
int
toku_unpin_inprogress_rollback_log(TOKUTXN txn) {
if (txn->pinned_inprogress_rollback_log) {
return toku_rollback_log_unpin(txn, txn->pinned_inprogress_rollback_log);
}
else {
return 0;
}
assert(r == 0);
txn->current_rollback = log->blocknum;
txn->current_rollback_hash = log->hash;
}
int
toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
int r;
CACHEFILE cf = txn->logger->rollback_cachefile;
if (txn->pinned_inprogress_rollback_log == log) {
txn->pinned_inprogress_rollback_log = NULL;
}
r = toku_cachetable_unpin(cf, log->thislogname, log->thishash,
r = toku_cachetable_unpin(cf, log->blocknum, log->hash,
(enum cachetable_dirty)log->dirty, rollback_memory_size(log));
assert(r==0);
return r;
assert(r == 0);
}
//Requires: log is pinned
// log is current
//After:
// log is unpinned if a spill happened
// Maybe there is no current after (if it spilled)
int toku_maybe_spill_rollbacks (TOKUTXN txn, ROLLBACK_LOG_NODE log) {
int r = 0;
void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
if (log->rollentry_resident_bytecount > txn->logger->write_block_size) {
assert(log->thislogname.b == txn->current_rollback.b);
assert(log->blocknum.b == txn->current_rollback.b);
//spill
if (!txn_has_spilled_rollback_logs(txn)) {
//First spilled. Copy to head.
......@@ -672,20 +690,7 @@ int toku_maybe_spill_rollbacks (TOKUTXN txn, ROLLBACK_LOG_NODE log) {
txn->current_rollback = ROLLBACK_NONE;
txn->current_rollback_hash = 0;
//Unpin
r = toku_rollback_log_unpin(txn, log);
assert(r==0);
}
return r;
}
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
int find_xid (OMTVALUE v, void *txnv) {
TOKUTXN txn = v;
TOKUTXN txnfind = txnv;
if (txn->txnid64<txnfind->txnid64) return -1;
if (txn->txnid64>txnfind->txnid64) return +1;
return 0;
}
static int find_filenum (OMTVALUE v, void *brtv) {
......@@ -785,39 +790,6 @@ int toku_txn_note_close_brt (BRT brt) {
return 0;
}
static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv)
// Effect: This function is called on every open BRT that a transaction used.
// This function removes the transaction from that BRT.
{
BRT brt = brtv;
TOKUTXN txn = txnv;
OMTVALUE txnv_again=NULL;
u_int32_t index;
int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv_again, &index);
assert(r==0);
assert((void*)txnv_again==txnv);
r = toku_omt_delete_at(brt->txns, index);
assert(r==0);
if (txn->txnid64==brt->h->txnid_that_created_or_locked_when_empty) {
brt->h->txnid_that_created_or_locked_when_empty = TXNID_NONE;
brt->h->root_that_created_or_locked_when_empty = TXNID_NONE;
}
if (txn->txnid64==brt->h->txnid_that_suppressed_recovery_logs) {
brt->h->txnid_that_suppressed_recovery_logs = TXNID_NONE;
}
if (!toku_brt_zombie_needed(brt) && brt->was_closed) {
//Close immediately.
assert(brt->close_db);
r = brt->close_db(brt->db, brt->close_flags, false, ZERO_LSN);
}
return r;
}
// for every BRT in txn, remove it.
static void note_txn_closing (TOKUTXN txn) {
toku_omt_iterate(txn->open_brts, remove_txn, txn);
}
// Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression)
int toku_logger_txn_rollback_raw_count(TOKUTXN txn, u_int64_t *raw_count)
{
......@@ -825,98 +797,66 @@ int toku_logger_txn_rollback_raw_count(TOKUTXN txn, u_int64_t *raw_count)
return 0;
}
int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr) {
struct tokutxn fake_txn; fake_txn.txnid64 = xid;
u_int32_t index;
OMTVALUE txnv;
int r = toku_omt_find_zero(brt->txns, find_xid, &fake_txn, &txnv, &index);
if (r == 0) *txnptr = txnv;
return r;
}
int
toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
//Currently processing 'log'. Prefetch the next (older) log node.
void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
//Currently processing 'log'. Prefetch the next (previous) log node.
BLOCKNUM name = log->older;
BLOCKNUM name = log->previous;
int r = 0;
if (name.b != ROLLBACK_NONE.b) {
uint32_t hash = log->older_hash;
uint32_t hash = log->previous_hash;
CACHEFILE cf = txn->logger->rollback_cachefile;
struct brt_header *h = toku_cachefile_get_userdata(cf);
BOOL doing_prefetch = FALSE;
r = toku_cachefile_prefetch(cf, name, hash,
get_write_callbacks_for_rollback_log(h),
toku_rollback_fetch_callback,
toku_rollback_pf_req_callback,
toku_rollback_pf_callback,
rollback_fetch_callback,
rollback_pf_req_callback,
rollback_pf_callback,
h,
&doing_prefetch);
assert(r==0);
assert(r == 0);
}
return r;
}
int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLOCKNUM name, uint32_t hash, ROLLBACK_LOG_NODE *result) {
BOOL save_inprogress_node = FALSE;
assert(name.b != ROLLBACK_NONE.b);
int r = 0;
ROLLBACK_LOG_NODE log = NULL;
void toku_rollback_verify_contents(ROLLBACK_LOG_NODE log,
TXNID txnid, uint64_t sequence)
{
assert(log->txnid == txnid);
assert(log->sequence == sequence);
}
if (name.b == txn->current_rollback.b) {
assert(hash == txn->current_rollback_hash);
log = txn->pinned_inprogress_rollback_log;
save_inprogress_node = TRUE;
}
if (!log) {
void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash, ROLLBACK_LOG_NODE *log) {
void * value;
CACHEFILE cf = txn->logger->rollback_cachefile;
void * log_v;
struct brt_header *h = toku_cachefile_get_userdata(cf);
r = toku_cachetable_get_and_pin(cf, name, hash,
&log_v, NULL,
int r = toku_cachetable_get_and_pin(cf, blocknum, hash,
&value, NULL,
get_write_callbacks_for_rollback_log(h),
toku_rollback_fetch_callback,
toku_rollback_pf_req_callback,
toku_rollback_pf_callback,
rollback_fetch_callback,
rollback_pf_req_callback,
rollback_pf_callback,
TRUE, // may_modify_value
h
);
assert(r==0);
log = (ROLLBACK_LOG_NODE)log_v;
}
if (r==0) {
assert(log->thislogname.b == name.b);
assert(log->txnid == xid);
assert(log->sequence == sequence);
if (save_inprogress_node) {
txn->pinned_inprogress_rollback_log = log;
}
*result = log;
}
return r;
assert(r == 0);
ROLLBACK_LOG_NODE pinned_log = value;
assert(pinned_log->blocknum.b == blocknum.b);
*log = pinned_log;
}
int toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *result) {
void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *log) {
ROLLBACK_LOG_NODE pinned_log;
invariant(txn->state == TOKUTXN_LIVE); // #3258
int r;
ROLLBACK_LOG_NODE log;
if (txn_has_inprogress_rollback_log(txn)) {
r = toku_get_and_pin_rollback_log(txn, txn->txnid64, txn->num_rollback_nodes-1,
txn->current_rollback, txn->current_rollback_hash, &log);
assert(r==0);
}
else {
//Generate new one.
//tail will be ROLLBACK_NONE if this is the very first
r = toku_create_new_rollback_log(txn, txn->spilled_rollback_tail, txn->spilled_rollback_tail_hash, &log);
assert(r==0);
}
if (r==0) {
assert(log->txnid == txn->txnid64);
assert(log->thislogname.b != ROLLBACK_NONE.b);
*result = log;
if (txn_has_current_rollback_log(txn)) {
toku_get_and_pin_rollback_log(txn, txn->current_rollback, txn->current_rollback_hash, &pinned_log);
toku_rollback_verify_contents(pinned_log, txn->txnid64, txn->num_rollback_nodes - 1);
} else {
// create a new log for this transaction to use.
// this call asserts success internally
rollback_log_create(txn, txn->spilled_rollback_tail, txn->spilled_rollback_tail_hash, &pinned_log);
}
return r;
assert(pinned_log->txnid == txn->txnid64);
assert(pinned_log->blocknum.b != ROLLBACK_NONE.b);
*log = pinned_log;
}
......@@ -11,56 +11,79 @@
extern "C" {
#endif
// these routines in rollback.c
void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint);
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn);
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn);
void toku_rollback_txn_close (TOKUTXN txn);
int toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *result);
int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLOCKNUM name, uint32_t hash, ROLLBACK_LOG_NODE *result);
int toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log);
int toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log);
int toku_unpin_inprogress_rollback_log(TOKUTXN txn);
int toku_delete_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log);
typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn);
// these functions assert internally that they succeed
// get a rollback node this txn may use for a new entry. if there
// is a current rollback node to use, pin it, otherwise create one.
void toku_get_and_pin_rollback_log_for_new_entry(TOKUTXN txn, ROLLBACK_LOG_NODE *log);
// get a specific rollback by blocknum and hash
void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash, ROLLBACK_LOG_NODE *log);
// unpin a rollback node from the cachetable
void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log);
// assert that the given log's txnid and sequence match the ones given
void toku_rollback_verify_contents(ROLLBACK_LOG_NODE log, TXNID txnid, uint64_t sequence);
// if there is a previous rollback log for the given log node, prefetch it
void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log);
// unpin and rmove a rollback log from the cachetable
void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log);
typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn);
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn);
void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size);
void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len);
int toku_maybe_spill_rollbacks (TOKUTXN txn, ROLLBACK_LOG_NODE log);
// given a transaction and a log node, and if the log is too full,
// set the current rollback log to ROLLBACK_NONE and move the current
// node onto the tail of the rollback node chain. further insertions
// into the rollback log for this transaction will force the creation
// of a new rollback log.
//
// this never unpins the rollback log if a spill occurs. the caller
// is responsible for ensuring the given rollback node is unpinned
// if necessary.
void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log);
int toku_txn_note_brt (TOKUTXN txn, BRT brt);
int toku_txn_note_swap_brt (BRT live, BRT zombie);
int toku_txn_note_close_brt (BRT brt);
int toku_logger_txn_rollback_raw_count(TOKUTXN txn, u_int64_t *raw_count);
int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr);
int toku_find_pair_by_xid (OMTVALUE v, void *txnv);
int toku_find_xid_by_xid (OMTVALUE v, void *xidv);
// these routines in roll.c
int toku_rollback_fileentries (int fd, TOKUTXN txn, YIELDF yield, void *yieldv, LSN lsn);
int toku_commit_fileentries (int fd, TOKUTXN txn, YIELDF yield,void *yieldv, LSN lsn);
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
int find_xid (OMTVALUE v, void *txnv);
// A high-level rollback log is made up of a chain of rollback log nodes.
// Each rollback log node is represented (separately) in the cachetable by
// this structure. Each portion of the rollback log chain has a block num
// and a hash to identify it.
struct rollback_log_node {
int layout_version;
int layout_version_original;
int layout_version_read_from_disk;
uint32_t build_id; // build_id (svn rev number) of software that wrote this node to disk
int dirty;
TXNID txnid; // Which transaction made this?
uint64_t sequence; // Which rollback log in the sequence is this?
BLOCKNUM thislogname; // Which block number is this chunk of the log?
uint32_t thishash;
BLOCKNUM older; // Which block number is the next oldest chunk of the log?
uint32_t older_hash;
// to which transaction does this node belong?
TXNID txnid;
// sequentially, where in the rollback log chain is this node?
// the sequence is between 0 and totalnodes-1
uint64_t sequence;
BLOCKNUM blocknum; // on which block does this node live?
uint32_t hash;
// which block number is the previous in the chain of rollback nodes
// that make up this rollback log?
BLOCKNUM previous;
uint32_t previous_hash;
struct roll_entry *oldest_logentry;
struct roll_entry *newest_logentry;
MEMARENA rollentry_arena;
......
......@@ -223,7 +223,6 @@ toku_txn_create_txn (
result->current_rollback = ROLLBACK_NONE;
result->current_rollback_hash = 0;
result->num_rollback_nodes = 0;
result->pinned_inprogress_rollback_log = NULL;
result->snapshot_type = snapshot_type;
result->snapshot_txnid64 = TXNID_NONE;
result->container_db_txn = container_db_txn;
......
......@@ -285,26 +285,12 @@ static int
locked_txn_commit_with_progress(DB_TXN *txn, u_int32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
//
// We must unpin rollback log, otherwise, another thread that tries to checkpoint during commit
// will grab the multi operation lock, and then not be able to complete the checkpoint because
// this thread has its rollback log pinned and is trying to grab the multi operation lock.
//
// We grab the ydb lock because the checkpoint thread also unpins inprogress rollback logs,
// so the ydb lock protects a race of both this thread and the checkpoint thread unpinning the
// inprogress rollback log. If we want, we can probably have the checkpoint thread to not
// unpin inprogress rollback logs, making this ydb lock grab unnecessary.
//
toku_ydb_lock();
int r = toku_unpin_inprogress_rollback_log(ttxn);
toku_ydb_unlock();
assert_zero(r);
if (toku_txn_requires_checkpoint(ttxn)) {
toku_checkpoint(txn->mgrp->i->cachetable, txn->mgrp->i->logger, NULL, NULL, NULL, NULL, TXN_COMMIT_CHECKPOINT);
}
toku_multi_operation_client_lock(); //Cannot checkpoint during a commit.
toku_ydb_lock();
r = toku_txn_commit_only(txn, flags, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lock
int r = toku_txn_commit_only(txn, flags, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lock
toku_ydb_unlock();
toku_txn_destroy(txn);
return r;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment