Commit ca9e8dc6 authored by John Esmet's avatar John Esmet Committed by Yoni Fogel

close[t:4574] merging 4574 to main. fixes the hcad deadlock found by Tim's...

close[t:4574] merging 4574 to main. fixes the hcad deadlock found by Tim's stress test, which adds and drops indexes concurrent with queries and insertions. transactions no longer keep trollback nodes pinned after an operation, but instead always unpin them.

this merge also introduces a lot of improvements to our rollback code, in terms of clarity and consistency. to that end, variable names and function names were improved, as well as more documentation of the rollback logic in rollback.h and log-internal.h

roll.h is removed because it is a dead file.


git-svn-id: file:///svn/toku/tokudb@41576 c7de825b-a66e-492c-adef-691d508d4ae1
parent ac7abc7f
...@@ -2046,7 +2046,7 @@ toku_serialize_descriptor_contents_to_fd(int fd, const DESCRIPTOR desc, DISKOFF ...@@ -2046,7 +2046,7 @@ toku_serialize_descriptor_contents_to_fd(int fd, const DESCRIPTOR desc, DISKOFF
static void static void
deserialize_descriptor_from_rbuf(struct rbuf *rb, DESCRIPTOR desc, int layout_version) { deserialize_descriptor_from_rbuf(struct rbuf *rb, DESCRIPTOR desc, int layout_version) {
if (layout_version == BRT_LAYOUT_VERSION_13) { if (layout_version == BRT_LAYOUT_VERSION_13) {
// in older versions of TokuDB the Descriptor had a 4 byte version, which we must skip over // in previous versions of TokuDB the Descriptor had a 4 byte version, which we must skip over
u_int32_t dummy_version __attribute__((__unused__)) = rbuf_int(rb); u_int32_t dummy_version __attribute__((__unused__)) = rbuf_int(rb);
} }
u_int32_t size; u_int32_t size;
...@@ -2501,8 +2501,8 @@ serialize_rollback_log_size(ROLLBACK_LOG_NODE log) { ...@@ -2501,8 +2501,8 @@ serialize_rollback_log_size(ROLLBACK_LOG_NODE log) {
size_t size = node_header_overhead //8 "tokuroll", 4 version, 4 version_original, 4 build_id size_t size = node_header_overhead //8 "tokuroll", 4 version, 4 version_original, 4 build_id
+8 //TXNID +8 //TXNID
+8 //sequence +8 //sequence
+8 //thislogname +8 //blocknum
+8 //older (blocknum) +8 //previous (blocknum)
+8 //resident_bytecount +8 //resident_bytecount
+8 //memarena_size_needed_to_load +8 //memarena_size_needed_to_load
+log->rollentry_resident_bytecount; +log->rollentry_resident_bytecount;
...@@ -2521,8 +2521,8 @@ serialize_rollback_log_node_to_buf(ROLLBACK_LOG_NODE log, char *buf, size_t calc ...@@ -2521,8 +2521,8 @@ serialize_rollback_log_node_to_buf(ROLLBACK_LOG_NODE log, char *buf, size_t calc
wbuf_nocrc_uint(&wb, BUILD_ID); wbuf_nocrc_uint(&wb, BUILD_ID);
wbuf_nocrc_TXNID(&wb, log->txnid); wbuf_nocrc_TXNID(&wb, log->txnid);
wbuf_nocrc_ulonglong(&wb, log->sequence); wbuf_nocrc_ulonglong(&wb, log->sequence);
wbuf_nocrc_BLOCKNUM(&wb, log->thislogname); wbuf_nocrc_BLOCKNUM(&wb, log->blocknum);
wbuf_nocrc_BLOCKNUM(&wb, log->older); wbuf_nocrc_BLOCKNUM(&wb, log->previous);
wbuf_nocrc_ulonglong(&wb, log->rollentry_resident_bytecount); wbuf_nocrc_ulonglong(&wb, log->rollentry_resident_bytecount);
//Write down memarena size needed to restore //Write down memarena size needed to restore
wbuf_nocrc_ulonglong(&wb, memarena_total_size_in_use(log->rollentry_arena)); wbuf_nocrc_ulonglong(&wb, memarena_total_size_in_use(log->rollentry_arena));
...@@ -2677,18 +2677,18 @@ deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, ROLLB ...@@ -2677,18 +2677,18 @@ deserialize_rollback_log_from_rbuf (BLOCKNUM blocknum, u_int32_t fullhash, ROLLB
//TODO: This is hard.. everything is shared in a single dictionary. //TODO: This is hard.. everything is shared in a single dictionary.
rbuf_TXNID(rb, &result->txnid); rbuf_TXNID(rb, &result->txnid);
result->sequence = rbuf_ulonglong(rb); result->sequence = rbuf_ulonglong(rb);
result->thislogname = rbuf_blocknum(rb); result->blocknum = rbuf_blocknum(rb);
if (result->thislogname.b != blocknum.b) { if (result->blocknum.b != blocknum.b) {
r = toku_db_badformat(); r = toku_db_badformat();
goto died0; goto died0;
} }
result->thishash = toku_cachetable_hash(h->cf, result->thislogname); result->hash = toku_cachetable_hash(h->cf, result->blocknum);
if (result->thishash != fullhash) { if (result->hash != fullhash) {
r = toku_db_badformat(); r = toku_db_badformat();
goto died0; goto died0;
} }
result->older = rbuf_blocknum(rb); result->previous = rbuf_blocknum(rb);
result->older_hash = toku_cachetable_hash(h->cf, result->older); result->previous_hash = toku_cachetable_hash(h->cf, result->previous);
result->rollentry_resident_bytecount = rbuf_ulonglong(rb); result->rollentry_resident_bytecount = rbuf_ulonglong(rb);
size_t arena_initial_size = rbuf_ulonglong(rb); size_t arena_initial_size = rbuf_ulonglong(rb);
......
...@@ -120,7 +120,6 @@ basement nodes, bulk fetch, and partial fetch: ...@@ -120,7 +120,6 @@ basement nodes, bulk fetch, and partial fetch:
// Access to nested transaction logic // Access to nested transaction logic
#include "ule.h" #include "ule.h"
#include "xids.h" #include "xids.h"
#include "roll.h"
#include "sub_block.h" #include "sub_block.h"
#include "sort.h" #include "sort.h"
#include <brt-cachetable-wrappers.h> #include <brt-cachetable-wrappers.h>
......
...@@ -3571,17 +3571,6 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) { ...@@ -3571,17 +3571,6 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
assert(0); assert(0);
} }
static int
unpin_rollback_log_for_checkpoint (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
int r = 0;
TOKUTXN txn = txnv;
if (txn->pinned_inprogress_rollback_log) {
r = toku_rollback_log_unpin(txn, txn->pinned_inprogress_rollback_log);
assert(r==0);
}
return r;
}
// TODO: #1510 locking of cachetable is suspect // TODO: #1510 locking of cachetable is suspect
// verify correct algorithm overall // verify correct algorithm overall
...@@ -3596,12 +3585,6 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) { ...@@ -3596,12 +3585,6 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
{ {
brt_begin_checkpoint(); brt_begin_checkpoint();
unsigned i; unsigned i;
if (logger) { // Unpin all 'inprogress rollback log nodes' pinned by transactions
int r = toku_omt_iterate(logger->live_txns,
unpin_rollback_log_for_checkpoint,
NULL);
assert(r==0);
}
cachetable_lock(ct); cachetable_lock(ct);
//Initialize accountability counters //Initialize accountability counters
ct->checkpoint_num_files = 0; ct->checkpoint_num_files = 0;
......
...@@ -145,17 +145,37 @@ struct tokutxn { ...@@ -145,17 +145,37 @@ struct tokutxn {
BOOL force_fsync_on_commit; //This transaction NEEDS an fsync once (if) it commits. (commit means root txn) BOOL force_fsync_on_commit; //This transaction NEEDS an fsync once (if) it commits. (commit means root txn)
TXN_PROGRESS_POLL_FUNCTION progress_poll_fun; TXN_PROGRESS_POLL_FUNCTION progress_poll_fun;
void * progress_poll_fun_extra; void * progress_poll_fun_extra;
// these are number of rollback nodes and rollback entries for this txn.
//
// the current rollback node below has sequence number num_rollback_nodes - 1
// (because they are numbered 0...num-1). often, the current rollback is
// already set to this block num, which means it exists and is available to
// log some entries. if the current rollback is NONE and the number of
// rollback nodes for this transaction is non-zero, then we will use
// the number of rollback nodes to know which sequence number to assign
// to a new one we create
uint64_t num_rollback_nodes; uint64_t num_rollback_nodes;
uint64_t num_rollentries; uint64_t num_rollentries;
uint64_t num_rollentries_processed; uint64_t num_rollentries_processed;
// spilled rollback nodes are rollback nodes that were gorged by this
// transaction, retired, and saved in a list.
// the spilled rollback head is the block number of the first rollback node
// that makes up the rollback log chain
BLOCKNUM spilled_rollback_head; BLOCKNUM spilled_rollback_head;
uint32_t spilled_rollback_head_hash; uint32_t spilled_rollback_head_hash;
// the spilled rollback is the block number of the last rollback node that
// makes up the rollback log chain.
BLOCKNUM spilled_rollback_tail; BLOCKNUM spilled_rollback_tail;
uint32_t spilled_rollback_tail_hash; uint32_t spilled_rollback_tail_hash;
// the current rollback node block number we may use. if this is ROLLBACK_NONE,
// then we need to create one and set it here before using it.
BLOCKNUM current_rollback; BLOCKNUM current_rollback;
uint32_t current_rollback_hash; uint32_t current_rollback_hash;
BOOL recovered_from_checkpoint; BOOL recovered_from_checkpoint;
ROLLBACK_LOG_NODE pinned_inprogress_rollback_log;
struct toku_list checkpoint_before_commit; struct toku_list checkpoint_before_commit;
TXN_IGNORE_S ignore_errors; // 2954 TXN_IGNORE_S ignore_errors; // 2954
TOKUTXN_STATE state; TOKUTXN_STATE state;
......
...@@ -580,10 +580,8 @@ generate_rollbacks (void) { ...@@ -580,10 +580,8 @@ generate_rollbacks (void) {
fprintf(hf, ");\n"); fprintf(hf, ");\n");
fprintf(cf, ") {\n"); fprintf(cf, ") {\n");
fprintf(cf, " int r;\n");
fprintf(cf, " ROLLBACK_LOG_NODE log;\n"); fprintf(cf, " ROLLBACK_LOG_NODE log;\n");
fprintf(cf, " r = toku_get_and_pin_rollback_log_for_new_entry(txn, &log);\n"); fprintf(cf, " toku_get_and_pin_rollback_log_for_new_entry(txn, &log);\n");
fprintf(cf, " assert(r==0);\n");
// 'memdup' all BYTESTRINGS here // 'memdup' all BYTESTRINGS here
DO_FIELDS(ft, lt, { DO_FIELDS(ft, lt, {
if ( strcmp(ft->type, "BYTESTRING") == 0 ) { if ( strcmp(ft->type, "BYTESTRING") == 0 ) {
...@@ -620,7 +618,10 @@ generate_rollbacks (void) { ...@@ -620,7 +618,10 @@ generate_rollbacks (void) {
fprintf(cf, " txn->rollentry_raw_count += rollback_fsize;\n"); fprintf(cf, " txn->rollentry_raw_count += rollback_fsize;\n");
fprintf(cf, " txn->num_rollentries++;\n"); fprintf(cf, " txn->num_rollentries++;\n");
fprintf(cf, " log->dirty = TRUE;\n"); fprintf(cf, " log->dirty = TRUE;\n");
fprintf(cf, " return toku_maybe_spill_rollbacks(txn, log);\n}\n"); fprintf(cf, " // spill and unpin assert success internally\n");
fprintf(cf, " toku_maybe_spill_rollbacks(txn, log);\n");
fprintf(cf, " toku_rollback_log_unpin(txn, log);\n");
fprintf(cf, " return 0;\n}\n");
}); });
DO_ROLLBACKS(lt, { DO_ROLLBACKS(lt, {
......
...@@ -6,9 +6,23 @@ ...@@ -6,9 +6,23 @@
/* rollback and rollforward routines. */ /* rollback and rollforward routines. */
#include "includes.h" #include "includes.h"
#include "checkpoint.h"
#include "xids.h" #include "xids.h"
#include "roll.h"
// functionality provided by roll.c is exposed by an autogenerated
// header file, logheader.h
//
// this (poorly) explains the absense of "roll.h"
// these flags control whether or not we send commit messages for
// various operations
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_INSERT message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_INSERT 0
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_DELETE_ANY message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_DELETE 1
int int
toku_commit_fdelete (u_int8_t file_was_open, toku_commit_fdelete (u_int8_t file_was_open,
...@@ -305,7 +319,7 @@ toku_apply_rollinclude (TXNID xid, ...@@ -305,7 +319,7 @@ toku_apply_rollinclude (TXNID xid,
void * yieldv, void * yieldv,
LSN oplsn, LSN oplsn,
apply_rollback_item func) { apply_rollback_item func) {
int r; int r = 0;
struct roll_entry *item; struct roll_entry *item;
int count=0; int count=0;
...@@ -316,14 +330,13 @@ toku_apply_rollinclude (TXNID xid, ...@@ -316,14 +330,13 @@ toku_apply_rollinclude (TXNID xid,
BOOL found_head = FALSE; BOOL found_head = FALSE;
assert(next_log.b != ROLLBACK_NONE.b); assert(next_log.b != ROLLBACK_NONE.b);
while (next_log.b != ROLLBACK_NONE.b) { while (next_log.b != ROLLBACK_NONE.b) {
ROLLBACK_LOG_NODE log;
//pin log //pin log
r = toku_get_and_pin_rollback_log(txn, xid, last_sequence - 1, next_log, next_log_hash, &log); ROLLBACK_LOG_NODE log;
assert(r==0); toku_get_and_pin_rollback_log(txn, next_log, next_log_hash, &log);
toku_rollback_verify_contents(log, xid, last_sequence - 1);
last_sequence = log->sequence; last_sequence = log->sequence;
r = toku_maybe_prefetch_older_rollback_log(txn, log); toku_maybe_prefetch_previous_rollback_log(txn, log);
assert(r==0);
while ((item=log->newest_logentry)) { while ((item=log->newest_logentry)) {
log->newest_logentry = item->prev; log->newest_logentry = item->prev;
...@@ -337,8 +350,8 @@ toku_apply_rollinclude (TXNID xid, ...@@ -337,8 +350,8 @@ toku_apply_rollinclude (TXNID xid,
found_head = TRUE; found_head = TRUE;
assert(log->sequence == 0); assert(log->sequence == 0);
} }
next_log = log->older; next_log = log->previous;
next_log_hash = log->older_hash; next_log_hash = log->previous_hash;
{ {
//Clean up transaction structure to prevent //Clean up transaction structure to prevent
//toku_txn_close from double-freeing //toku_txn_close from double-freeing
...@@ -350,9 +363,7 @@ toku_apply_rollinclude (TXNID xid, ...@@ -350,9 +363,7 @@ toku_apply_rollinclude (TXNID xid,
spilled_head_hash = next_log_hash; spilled_head_hash = next_log_hash;
} }
} }
//Unpins log toku_rollback_log_unpin_and_remove(txn, log);
r = toku_delete_rollback_log(txn, log);
assert(r==0);
} }
return r; return r;
} }
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#ifndef TOKUDB_ROLL_H
#define TOKUDB_ROLL_H
#if defined(__cplusplus) || defined(__cilkplusplus)
extern "C" {
#endif
// these flags control whether or not we send commit messages for
// various operations
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_INSERT message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_INSERT 0
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_DELETE_ANY message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_DELETE 1
// When a transaction is committed, should we send a BRT_COMMIT message
// for each BRT_DELETE_BOTH message sent earlier by the transaction?
#define TOKU_DO_COMMIT_CMD_DELETE_BOTH 1
#if defined(__cplusplus) || defined(__cilkplusplus)
};
#endif
#endif
...@@ -5,8 +5,6 @@ ...@@ -5,8 +5,6 @@
#include "includes.h" #include "includes.h"
static void note_txn_closing (TOKUTXN txn);
void void
toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint) { toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint) {
if (txn->progress_poll_fun) { if (txn->progress_poll_fun) {
...@@ -38,7 +36,7 @@ int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield ...@@ -38,7 +36,7 @@ int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield
} }
static inline int static inline int
txn_has_inprogress_rollback_log(TOKUTXN txn) { txn_has_current_rollback_log(TOKUTXN txn) {
return txn->current_rollback.b != ROLLBACK_NONE.b; return txn->current_rollback.b != ROLLBACK_NONE.b;
} }
...@@ -58,21 +56,16 @@ static void rollback_unpin_remove_callback(CACHEKEY* cachekey, BOOL for_checkpoi ...@@ -58,21 +56,16 @@ static void rollback_unpin_remove_callback(CACHEKEY* cachekey, BOOL for_checkpoi
} }
int void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
toku_delete_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
int r; int r;
CACHEFILE cf = txn->logger->rollback_cachefile; CACHEFILE cf = txn->logger->rollback_cachefile;
struct brt_header *h = toku_cachefile_get_userdata(cf); struct brt_header *h = toku_cachefile_get_userdata(cf);
if (txn->pinned_inprogress_rollback_log == log) { r = toku_cachetable_unpin_and_remove (cf, log->blocknum, rollback_unpin_remove_callback, h);
txn->pinned_inprogress_rollback_log = NULL; assert(r == 0);
}
r = toku_cachetable_unpin_and_remove (cf, log->thislogname, rollback_unpin_remove_callback, h);
assert(r==0);
return r;
} }
static int static int
toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn, apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
apply_rollback_item func) { apply_rollback_item func) {
int r = 0; int r = 0;
// do the commit/abort calls and free everything // do the commit/abort calls and free everything
...@@ -85,7 +78,7 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn, ...@@ -85,7 +78,7 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
uint32_t next_log_hash = 0; uint32_t next_log_hash = 0;
BOOL is_current = FALSE; BOOL is_current = FALSE;
if (txn_has_inprogress_rollback_log(txn)) { if (txn_has_current_rollback_log(txn)) {
next_log = txn->current_rollback; next_log = txn->current_rollback;
next_log_hash = txn->current_rollback_hash; next_log_hash = txn->current_rollback_hash;
is_current = TRUE; is_current = TRUE;
...@@ -100,11 +93,10 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn, ...@@ -100,11 +93,10 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
while (next_log.b != ROLLBACK_NONE.b) { while (next_log.b != ROLLBACK_NONE.b) {
ROLLBACK_LOG_NODE log; ROLLBACK_LOG_NODE log;
//pin log //pin log
r = toku_get_and_pin_rollback_log(txn, txn->txnid64, last_sequence-1, next_log, next_log_hash, &log); toku_get_and_pin_rollback_log(txn, next_log, next_log_hash, &log);
assert(r==0); toku_rollback_verify_contents(log, txn->txnid64, last_sequence - 1);
r = toku_maybe_prefetch_older_rollback_log(txn, log); toku_maybe_prefetch_previous_rollback_log(txn, log);
assert(r==0);
last_sequence = log->sequence; last_sequence = log->sequence;
if (func) { if (func) {
...@@ -129,8 +121,8 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn, ...@@ -129,8 +121,8 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
found_head = TRUE; found_head = TRUE;
assert(log->sequence == 0); assert(log->sequence == 0);
} }
next_log = log->older; next_log = log->previous;
next_log_hash = log->older_hash; next_log_hash = log->previous_hash;
{ {
//Clean up transaction structure to prevent //Clean up transaction structure to prevent
//toku_txn_close from double-freeing //toku_txn_close from double-freeing
...@@ -149,9 +141,7 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn, ...@@ -149,9 +141,7 @@ toku_apply_txn (TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn,
txn->spilled_rollback_head_hash = next_log_hash; txn->spilled_rollback_head_hash = next_log_hash;
} }
} }
//Unpins log toku_rollback_log_unpin_and_remove(txn, log);
r = toku_delete_rollback_log(txn, log);
assert(r==0);
} }
return r; return r;
} }
...@@ -211,7 +201,7 @@ live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), voi ...@@ -211,7 +201,7 @@ live_list_reverse_note_txn_end_iter(OMTVALUE live_xidv, u_int32_t UU(index), voi
olderxid = (TXNID) olderv; olderxid = (TXNID) olderv;
invariant(olderxid < xid); invariant(olderxid < xid);
if (olderxid >= *live_xid) { if (olderxid >= *live_xid) {
//Older txn is new enough, we need to update. //older txn is new enough, we need to update.
pair->xid2 = olderxid; pair->xid2 = olderxid;
should_delete = FALSE; should_delete = FALSE;
} }
...@@ -243,6 +233,48 @@ live_list_reverse_note_txn_end(TOKUTXN txn) { ...@@ -243,6 +233,48 @@ live_list_reverse_note_txn_end(TOKUTXN txn) {
return r; return r;
} }
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
static int find_xid (OMTVALUE v, void *txnv) {
TOKUTXN txn = v;
TOKUTXN txnfind = txnv;
if (txn->txnid64<txnfind->txnid64) return -1;
if (txn->txnid64>txnfind->txnid64) return +1;
return 0;
}
static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv)
// Effect: This function is called on every open BRT that a transaction used.
// This function removes the transaction from that BRT.
{
BRT brt = brtv;
TOKUTXN txn = txnv;
OMTVALUE txnv_again=NULL;
u_int32_t index;
int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv_again, &index);
assert(r==0);
assert(txnv_again == txnv);
r = toku_omt_delete_at(brt->txns, index);
assert(r==0);
if (txn->txnid64==brt->h->txnid_that_created_or_locked_when_empty) {
brt->h->txnid_that_created_or_locked_when_empty = TXNID_NONE;
brt->h->root_that_created_or_locked_when_empty = TXNID_NONE;
}
if (txn->txnid64==brt->h->txnid_that_suppressed_recovery_logs) {
brt->h->txnid_that_suppressed_recovery_logs = TXNID_NONE;
}
if (!toku_brt_zombie_needed(brt) && brt->was_closed) {
//Close immediately.
assert(brt->close_db);
r = brt->close_db(brt->db, brt->close_flags, false, ZERO_LSN);
}
return r;
}
// for every BRT in txn, remove it.
static void note_txn_closing (TOKUTXN txn) {
toku_omt_iterate(txn->open_brts, remove_txn, txn);
}
void toku_rollback_txn_close (TOKUTXN txn) { void toku_rollback_txn_close (TOKUTXN txn) {
assert(txn->spilled_rollback_head.b == ROLLBACK_NONE.b); assert(txn->spilled_rollback_head.b == ROLLBACK_NONE.b);
...@@ -329,7 +361,7 @@ void toku_rollback_txn_close (TOKUTXN txn) { ...@@ -329,7 +361,7 @@ void toku_rollback_txn_close (TOKUTXN txn) {
note_txn_closing(txn); note_txn_closing(txn);
} }
void* toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size) { void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size) {
return malloc_in_memarena(log->rollentry_arena, size); return malloc_in_memarena(log->rollentry_arena, size);
} }
...@@ -366,7 +398,7 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) { ...@@ -366,7 +398,7 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
if (txn_has_spilled_rollback_logs(txn)) { if (txn_has_spilled_rollback_logs(txn)) {
uint64_t num_nodes = txn->num_rollback_nodes; uint64_t num_nodes = txn->num_rollback_nodes;
if (txn_has_inprogress_rollback_log(txn)) { if (txn_has_current_rollback_log(txn)) {
num_nodes--; //Don't count the in-progress rollback log. num_nodes--; //Don't count the in-progress rollback log.
} }
r = toku_logger_save_rollback_rollinclude(txn->parent, txn->txnid64, num_nodes, r = toku_logger_save_rollback_rollinclude(txn->parent, txn->txnid64, num_nodes,
...@@ -379,18 +411,19 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) { ...@@ -379,18 +411,19 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
txn->spilled_rollback_tail = ROLLBACK_NONE; txn->spilled_rollback_tail = ROLLBACK_NONE;
txn->spilled_rollback_tail_hash = 0; txn->spilled_rollback_tail_hash = 0;
} }
if (txn_has_inprogress_rollback_log(txn)) { // if we're commiting a child rollback, put its entries into the parent
ROLLBACK_LOG_NODE parent_log; // by pinning both child and parent and then linking the child log entry
// list to the end of the parent log entry list.
if (txn_has_current_rollback_log(txn)) {
//Pin parent log //Pin parent log
r = toku_get_and_pin_rollback_log_for_new_entry(txn->parent, &parent_log); ROLLBACK_LOG_NODE parent_log;
assert(r==0); toku_get_and_pin_rollback_log_for_new_entry(txn->parent, &parent_log);
ROLLBACK_LOG_NODE child_log;
//Pin child log //Pin child log
r = toku_get_and_pin_rollback_log(txn, txn->txnid64, txn->num_rollback_nodes - 1, ROLLBACK_LOG_NODE child_log;
txn->current_rollback, txn->current_rollback_hash, toku_get_and_pin_rollback_log(txn, txn->current_rollback,
&child_log); txn->current_rollback_hash, &child_log);
assert(r==0); toku_rollback_verify_contents(child_log, txn->txnid64, txn->num_rollback_nodes - 1);
// Append the list to the front of the parent. // Append the list to the front of the parent.
if (child_log->oldest_logentry) { if (child_log->oldest_logentry) {
...@@ -413,14 +446,13 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) { ...@@ -413,14 +446,13 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
// If there are no bytes to move, then just leave things alone, and let the memory be reclaimed on txn is closed. // If there are no bytes to move, then just leave things alone, and let the memory be reclaimed on txn is closed.
memarena_move_buffers(parent_log->rollentry_arena, child_log->rollentry_arena); memarena_move_buffers(parent_log->rollentry_arena, child_log->rollentry_arena);
} }
//Delete child log (unpins child_log) toku_rollback_log_unpin_and_remove(txn, child_log);
r = toku_delete_rollback_log(txn, child_log);
assert(r==0);
txn->current_rollback = ROLLBACK_NONE; txn->current_rollback = ROLLBACK_NONE;
txn->current_rollback_hash = 0; txn->current_rollback_hash = 0;
r = toku_maybe_spill_rollbacks(txn->parent, parent_log); //unpins parent_log toku_maybe_spill_rollbacks(txn->parent, parent_log);
assert(r==0); toku_rollback_log_unpin(txn->parent, parent_log);
assert(r == 0);
} }
// Note the open brts, the omts must be merged // Note the open brts, the omts must be merged
...@@ -438,7 +470,7 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) { ...@@ -438,7 +470,7 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit; txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit;
txn->parent->num_rollentries += txn->num_rollentries; txn->parent->num_rollentries += txn->num_rollentries;
} else { } else {
r = toku_apply_txn(txn, yield, yieldv, lsn, toku_commit_rollback_item); r = apply_txn(txn, yield, yieldv, lsn, toku_commit_rollback_item);
assert(r==0); assert(r==0);
} }
...@@ -452,7 +484,7 @@ int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) { ...@@ -452,7 +484,7 @@ int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
toku_list_pop(&txn->checkpoint_before_commit); toku_list_pop(&txn->checkpoint_before_commit);
} }
r = toku_apply_txn(txn, yield, yieldv, lsn, toku_abort_rollback_item); r = apply_txn(txn, yield, yieldv, lsn, toku_abort_rollback_item);
assert(r==0); assert(r==0);
return r; return r;
} }
...@@ -481,17 +513,14 @@ rollback_memory_size(ROLLBACK_LOG_NODE log) { ...@@ -481,17 +513,14 @@ rollback_memory_size(ROLLBACK_LOG_NODE log) {
return make_rollback_pair_attr(size); return make_rollback_pair_attr(size);
} }
// Cleanup the rollback memory
static void static void
toku_rollback_log_free(ROLLBACK_LOG_NODE *log_p) { rollback_log_destroy(ROLLBACK_LOG_NODE log) {
ROLLBACK_LOG_NODE log = *log_p;
*log_p = NULL; //Sanitize
// Cleanup the rollback memory
memarena_close(&log->rollentry_arena); memarena_close(&log->rollentry_arena);
toku_free(log); toku_free(log);
} }
static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname, static void rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname,
void *rollback_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size, void *rollback_v, void** UU(disk_data), void *extraargs, PAIR_ATTR size, PAIR_ATTR* new_size,
BOOL write_me, BOOL keep_me, BOOL for_checkpoint, BOOL UU(is_clone)) { BOOL write_me, BOOL keep_me, BOOL for_checkpoint, BOOL UU(is_clone)) {
int r; int r;
...@@ -499,13 +528,13 @@ static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM ...@@ -499,13 +528,13 @@ static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM
struct brt_header *h = extraargs; struct brt_header *h = extraargs;
assert(h->cf == cachefile); assert(h->cf == cachefile);
assert(log->thislogname.b==logname.b); assert(log->blocknum.b==logname.b);
if (write_me && !h->panic) { if (write_me && !h->panic) {
int n_workitems, n_threads; int n_workitems, n_threads;
toku_cachefile_get_workqueue_load(cachefile, &n_workitems, &n_threads); toku_cachefile_get_workqueue_load(cachefile, &n_workitems, &n_threads);
r = toku_serialize_rollback_log_to(fd, log->thislogname, log, h, n_workitems, n_threads, for_checkpoint); r = toku_serialize_rollback_log_to(fd, log->blocknum, log, h, n_workitems, n_threads, for_checkpoint);
if (r) { if (r) {
if (h->panic==0) { if (h->panic==0) {
char *e = strerror(r); char *e = strerror(r);
...@@ -519,11 +548,11 @@ static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM ...@@ -519,11 +548,11 @@ static void toku_rollback_flush_callback (CACHEFILE cachefile, int fd, BLOCKNUM
} }
*new_size = size; *new_size = size;
if (!keep_me) { if (!keep_me) {
toku_rollback_log_free(&log); rollback_log_destroy(log);
} }
} }
static int toku_rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname, u_int32_t fullhash, static int rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM logname, u_int32_t fullhash,
void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) { void **rollback_pv, void** UU(disk_data), PAIR_ATTR *sizep, int * UU(dirtyp), void *extraargs) {
int r; int r;
struct brt_header *h = extraargs; struct brt_header *h = extraargs;
...@@ -537,7 +566,7 @@ static int toku_rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM l ...@@ -537,7 +566,7 @@ static int toku_rollback_fetch_callback (CACHEFILE cachefile, int fd, BLOCKNUM l
return r; return r;
} }
static void toku_rollback_pe_est_callback( static void rollback_pe_est_callback(
void* rollback_v, void* rollback_v,
void* UU(disk_data), void* UU(disk_data),
long* bytes_freed_estimate, long* bytes_freed_estimate,
...@@ -551,7 +580,7 @@ static void toku_rollback_pe_est_callback( ...@@ -551,7 +580,7 @@ static void toku_rollback_pe_est_callback(
} }
// callback for partially evicting a cachetable entry // callback for partially evicting a cachetable entry
static int toku_rollback_pe_callback ( static int rollback_pe_callback (
void *rollback_v, void *rollback_v,
PAIR_ATTR UU(old_attr), PAIR_ATTR UU(old_attr),
PAIR_ATTR* new_attr, PAIR_ATTR* new_attr,
...@@ -562,17 +591,22 @@ static int toku_rollback_pe_callback ( ...@@ -562,17 +591,22 @@ static int toku_rollback_pe_callback (
*new_attr = old_attr; *new_attr = old_attr;
return 0; return 0;
} }
static BOOL toku_rollback_pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
// partial fetch is never required for a rollback log node
static BOOL rollback_pf_req_callback(void* UU(brtnode_pv), void* UU(read_extraargs)) {
return FALSE; return FALSE;
} }
static int toku_rollback_pf_callback(void* UU(brtnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep)) { // a rollback node should never be partial fetched,
// should never be called, given that toku_rollback_pf_req_callback always returns false // because we always say it is not required.
// (pf req callback always returns false)
static int rollback_pf_callback(void* UU(brtnode_pv), void* UU(disk_data), void* UU(read_extraargs), int UU(fd), PAIR_ATTR* UU(sizep)) {
assert(FALSE); assert(FALSE);
return 0; return 0;
} }
static int toku_rollback_cleaner_callback ( // the cleaner thread should never choose a rollback node for cleaning
static int rollback_cleaner_callback (
void* UU(brtnode_pv), void* UU(brtnode_pv),
BLOCKNUM UU(blocknum), BLOCKNUM UU(blocknum),
u_int32_t UU(fullhash), u_int32_t UU(fullhash),
...@@ -585,16 +619,19 @@ static int toku_rollback_cleaner_callback ( ...@@ -585,16 +619,19 @@ static int toku_rollback_cleaner_callback (
static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_rollback_log(struct brt_header* h) { static inline CACHETABLE_WRITE_CALLBACK get_write_callbacks_for_rollback_log(struct brt_header* h) {
CACHETABLE_WRITE_CALLBACK wc; CACHETABLE_WRITE_CALLBACK wc;
wc.flush_callback = toku_rollback_flush_callback; wc.flush_callback = rollback_flush_callback;
wc.pe_est_callback = toku_rollback_pe_est_callback; wc.pe_est_callback = rollback_pe_est_callback;
wc.pe_callback = toku_rollback_pe_callback; wc.pe_callback = rollback_pe_callback;
wc.cleaner_callback = toku_rollback_cleaner_callback; wc.cleaner_callback = rollback_cleaner_callback;
wc.clone_callback = NULL; wc.clone_callback = NULL;
wc.write_extraargs = h; wc.write_extraargs = h;
return wc; return wc;
} }
static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t older_hash, ROLLBACK_LOG_NODE *result) { // create and pin a new rollback log node. chain it to the other rollback nodes
// by providing a previous blocknum/ hash and assigning the new rollback log
// node the next sequence number
static void rollback_log_create (TOKUTXN txn, BLOCKNUM previous, uint32_t previous_hash, ROLLBACK_LOG_NODE *result) {
ROLLBACK_LOG_NODE MALLOC(log); ROLLBACK_LOG_NODE MALLOC(log);
assert(log); assert(log);
...@@ -608,58 +645,39 @@ static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t o ...@@ -608,58 +645,39 @@ static int toku_create_new_rollback_log (TOKUTXN txn, BLOCKNUM older, uint32_t o
log->dirty = TRUE; log->dirty = TRUE;
log->txnid = txn->txnid64; log->txnid = txn->txnid64;
log->sequence = txn->num_rollback_nodes++; log->sequence = txn->num_rollback_nodes++;
toku_allocate_blocknum(h->blocktable, &log->thislogname, h); toku_allocate_blocknum(h->blocktable, &log->blocknum, h);
log->thishash = toku_cachetable_hash(cf, log->thislogname); log->hash = toku_cachetable_hash(cf, log->blocknum);
log->older = older; log->previous = previous;
log->older_hash = older_hash; log->previous_hash = previous_hash;
log->oldest_logentry = NULL; log->oldest_logentry = NULL;
log->newest_logentry = NULL; log->newest_logentry = NULL;
log->rollentry_arena = memarena_create(); log->rollentry_arena = memarena_create();
log->rollentry_resident_bytecount = 0; log->rollentry_resident_bytecount = 0;
*result = log; *result = log;
r=toku_cachetable_put(cf, log->thislogname, log->thishash, r = toku_cachetable_put(cf, log->blocknum, log->hash,
log, rollback_memory_size(log), log, rollback_memory_size(log),
get_write_callbacks_for_rollback_log(h)); get_write_callbacks_for_rollback_log(h));
assert(r==0); assert(r == 0);
txn->current_rollback = log->thislogname; txn->current_rollback = log->blocknum;
txn->current_rollback_hash = log->thishash; txn->current_rollback_hash = log->hash;
txn->pinned_inprogress_rollback_log = log;
return 0;
}
int
toku_unpin_inprogress_rollback_log(TOKUTXN txn) {
if (txn->pinned_inprogress_rollback_log) {
return toku_rollback_log_unpin(txn, txn->pinned_inprogress_rollback_log);
}
else {
return 0;
}
} }
int void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
int r; int r;
CACHEFILE cf = txn->logger->rollback_cachefile; CACHEFILE cf = txn->logger->rollback_cachefile;
if (txn->pinned_inprogress_rollback_log == log) { r = toku_cachetable_unpin(cf, log->blocknum, log->hash,
txn->pinned_inprogress_rollback_log = NULL;
}
r = toku_cachetable_unpin(cf, log->thislogname, log->thishash,
(enum cachetable_dirty)log->dirty, rollback_memory_size(log)); (enum cachetable_dirty)log->dirty, rollback_memory_size(log));
assert(r==0); assert(r == 0);
return r;
} }
//Requires: log is pinned //Requires: log is pinned
// log is current // log is current
//After: //After:
// log is unpinned if a spill happened
// Maybe there is no current after (if it spilled) // Maybe there is no current after (if it spilled)
int toku_maybe_spill_rollbacks (TOKUTXN txn, ROLLBACK_LOG_NODE log) { void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
int r = 0;
if (log->rollentry_resident_bytecount > txn->logger->write_block_size) { if (log->rollentry_resident_bytecount > txn->logger->write_block_size) {
assert(log->thislogname.b == txn->current_rollback.b); assert(log->blocknum.b == txn->current_rollback.b);
//spill //spill
if (!txn_has_spilled_rollback_logs(txn)) { if (!txn_has_spilled_rollback_logs(txn)) {
//First spilled. Copy to head. //First spilled. Copy to head.
...@@ -672,20 +690,7 @@ int toku_maybe_spill_rollbacks (TOKUTXN txn, ROLLBACK_LOG_NODE log) { ...@@ -672,20 +690,7 @@ int toku_maybe_spill_rollbacks (TOKUTXN txn, ROLLBACK_LOG_NODE log) {
txn->current_rollback = ROLLBACK_NONE; txn->current_rollback = ROLLBACK_NONE;
txn->current_rollback_hash = 0; txn->current_rollback_hash = 0;
//Unpin
r = toku_rollback_log_unpin(txn, log);
assert(r==0);
} }
return r;
}
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
int find_xid (OMTVALUE v, void *txnv) {
TOKUTXN txn = v;
TOKUTXN txnfind = txnv;
if (txn->txnid64<txnfind->txnid64) return -1;
if (txn->txnid64>txnfind->txnid64) return +1;
return 0;
} }
static int find_filenum (OMTVALUE v, void *brtv) { static int find_filenum (OMTVALUE v, void *brtv) {
...@@ -785,39 +790,6 @@ int toku_txn_note_close_brt (BRT brt) { ...@@ -785,39 +790,6 @@ int toku_txn_note_close_brt (BRT brt) {
return 0; return 0;
} }
static int remove_txn (OMTVALUE brtv, u_int32_t UU(idx), void *txnv)
// Effect: This function is called on every open BRT that a transaction used.
// This function removes the transaction from that BRT.
{
BRT brt = brtv;
TOKUTXN txn = txnv;
OMTVALUE txnv_again=NULL;
u_int32_t index;
int r = toku_omt_find_zero(brt->txns, find_xid, txn, &txnv_again, &index);
assert(r==0);
assert((void*)txnv_again==txnv);
r = toku_omt_delete_at(brt->txns, index);
assert(r==0);
if (txn->txnid64==brt->h->txnid_that_created_or_locked_when_empty) {
brt->h->txnid_that_created_or_locked_when_empty = TXNID_NONE;
brt->h->root_that_created_or_locked_when_empty = TXNID_NONE;
}
if (txn->txnid64==brt->h->txnid_that_suppressed_recovery_logs) {
brt->h->txnid_that_suppressed_recovery_logs = TXNID_NONE;
}
if (!toku_brt_zombie_needed(brt) && brt->was_closed) {
//Close immediately.
assert(brt->close_db);
r = brt->close_db(brt->db, brt->close_flags, false, ZERO_LSN);
}
return r;
}
// for every BRT in txn, remove it.
static void note_txn_closing (TOKUTXN txn) {
toku_omt_iterate(txn->open_brts, remove_txn, txn);
}
// Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression) // Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression)
int toku_logger_txn_rollback_raw_count(TOKUTXN txn, u_int64_t *raw_count) int toku_logger_txn_rollback_raw_count(TOKUTXN txn, u_int64_t *raw_count)
{ {
...@@ -825,98 +797,66 @@ int toku_logger_txn_rollback_raw_count(TOKUTXN txn, u_int64_t *raw_count) ...@@ -825,98 +797,66 @@ int toku_logger_txn_rollback_raw_count(TOKUTXN txn, u_int64_t *raw_count)
return 0; return 0;
} }
int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr) { void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
struct tokutxn fake_txn; fake_txn.txnid64 = xid; //Currently processing 'log'. Prefetch the next (previous) log node.
u_int32_t index;
OMTVALUE txnv;
int r = toku_omt_find_zero(brt->txns, find_xid, &fake_txn, &txnv, &index);
if (r == 0) *txnptr = txnv;
return r;
}
int
toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log) {
//Currently processing 'log'. Prefetch the next (older) log node.
BLOCKNUM name = log->older; BLOCKNUM name = log->previous;
int r = 0; int r = 0;
if (name.b != ROLLBACK_NONE.b) { if (name.b != ROLLBACK_NONE.b) {
uint32_t hash = log->older_hash; uint32_t hash = log->previous_hash;
CACHEFILE cf = txn->logger->rollback_cachefile; CACHEFILE cf = txn->logger->rollback_cachefile;
struct brt_header *h = toku_cachefile_get_userdata(cf); struct brt_header *h = toku_cachefile_get_userdata(cf);
BOOL doing_prefetch = FALSE; BOOL doing_prefetch = FALSE;
r = toku_cachefile_prefetch(cf, name, hash, r = toku_cachefile_prefetch(cf, name, hash,
get_write_callbacks_for_rollback_log(h), get_write_callbacks_for_rollback_log(h),
toku_rollback_fetch_callback, rollback_fetch_callback,
toku_rollback_pf_req_callback, rollback_pf_req_callback,
toku_rollback_pf_callback, rollback_pf_callback,
h, h,
&doing_prefetch); &doing_prefetch);
assert(r==0); assert(r == 0);
} }
return r;
} }
int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLOCKNUM name, uint32_t hash, ROLLBACK_LOG_NODE *result) { void toku_rollback_verify_contents(ROLLBACK_LOG_NODE log,
BOOL save_inprogress_node = FALSE; TXNID txnid, uint64_t sequence)
assert(name.b != ROLLBACK_NONE.b); {
int r = 0; assert(log->txnid == txnid);
ROLLBACK_LOG_NODE log = NULL; assert(log->sequence == sequence);
}
if (name.b == txn->current_rollback.b) { void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash, ROLLBACK_LOG_NODE *log) {
assert(hash == txn->current_rollback_hash); void * value;
log = txn->pinned_inprogress_rollback_log;
save_inprogress_node = TRUE;
}
if (!log) {
CACHEFILE cf = txn->logger->rollback_cachefile; CACHEFILE cf = txn->logger->rollback_cachefile;
void * log_v;
struct brt_header *h = toku_cachefile_get_userdata(cf); struct brt_header *h = toku_cachefile_get_userdata(cf);
r = toku_cachetable_get_and_pin(cf, name, hash, int r = toku_cachetable_get_and_pin(cf, blocknum, hash,
&log_v, NULL, &value, NULL,
get_write_callbacks_for_rollback_log(h), get_write_callbacks_for_rollback_log(h),
toku_rollback_fetch_callback, rollback_fetch_callback,
toku_rollback_pf_req_callback, rollback_pf_req_callback,
toku_rollback_pf_callback, rollback_pf_callback,
TRUE, // may_modify_value TRUE, // may_modify_value
h h
); );
assert(r==0); assert(r == 0);
log = (ROLLBACK_LOG_NODE)log_v; ROLLBACK_LOG_NODE pinned_log = value;
} assert(pinned_log->blocknum.b == blocknum.b);
if (r==0) { *log = pinned_log;
assert(log->thislogname.b == name.b);
assert(log->txnid == xid);
assert(log->sequence == sequence);
if (save_inprogress_node) {
txn->pinned_inprogress_rollback_log = log;
}
*result = log;
}
return r;
} }
int toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *result) { void toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *log) {
ROLLBACK_LOG_NODE pinned_log;
invariant(txn->state == TOKUTXN_LIVE); // #3258 invariant(txn->state == TOKUTXN_LIVE); // #3258
int r; if (txn_has_current_rollback_log(txn)) {
ROLLBACK_LOG_NODE log; toku_get_and_pin_rollback_log(txn, txn->current_rollback, txn->current_rollback_hash, &pinned_log);
if (txn_has_inprogress_rollback_log(txn)) { toku_rollback_verify_contents(pinned_log, txn->txnid64, txn->num_rollback_nodes - 1);
r = toku_get_and_pin_rollback_log(txn, txn->txnid64, txn->num_rollback_nodes-1, } else {
txn->current_rollback, txn->current_rollback_hash, &log); // create a new log for this transaction to use.
assert(r==0); // this call asserts success internally
} rollback_log_create(txn, txn->spilled_rollback_tail, txn->spilled_rollback_tail_hash, &pinned_log);
else {
//Generate new one.
//tail will be ROLLBACK_NONE if this is the very first
r = toku_create_new_rollback_log(txn, txn->spilled_rollback_tail, txn->spilled_rollback_tail_hash, &log);
assert(r==0);
}
if (r==0) {
assert(log->txnid == txn->txnid64);
assert(log->thislogname.b != ROLLBACK_NONE.b);
*result = log;
} }
return r; assert(pinned_log->txnid == txn->txnid64);
assert(pinned_log->blocknum.b != ROLLBACK_NONE.b);
*log = pinned_log;
} }
...@@ -11,56 +11,79 @@ ...@@ -11,56 +11,79 @@
extern "C" { extern "C" {
#endif #endif
// these routines in rollback.c
void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint); void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint);
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn); int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn);
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn); int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn);
void toku_rollback_txn_close (TOKUTXN txn); void toku_rollback_txn_close (TOKUTXN txn);
int toku_get_and_pin_rollback_log_for_new_entry (TOKUTXN txn, ROLLBACK_LOG_NODE *result);
int toku_get_and_pin_rollback_log(TOKUTXN txn, TXNID xid, uint64_t sequence, BLOCKNUM name, uint32_t hash, ROLLBACK_LOG_NODE *result);
int toku_maybe_prefetch_older_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log);
int toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log);
int toku_unpin_inprogress_rollback_log(TOKUTXN txn);
int toku_delete_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log);
typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn); // these functions assert internally that they succeed
// get a rollback node this txn may use for a new entry. if there
// is a current rollback node to use, pin it, otherwise create one.
void toku_get_and_pin_rollback_log_for_new_entry(TOKUTXN txn, ROLLBACK_LOG_NODE *log);
// get a specific rollback by blocknum and hash
void toku_get_and_pin_rollback_log(TOKUTXN txn, BLOCKNUM blocknum, uint32_t hash, ROLLBACK_LOG_NODE *log);
// unpin a rollback node from the cachetable
void toku_rollback_log_unpin(TOKUTXN txn, ROLLBACK_LOG_NODE log);
// assert that the given log's txnid and sequence match the ones given
void toku_rollback_verify_contents(ROLLBACK_LOG_NODE log, TXNID txnid, uint64_t sequence);
// if there is a previous rollback log for the given log node, prefetch it
void toku_maybe_prefetch_previous_rollback_log(TOKUTXN txn, ROLLBACK_LOG_NODE log);
// unpin and rmove a rollback log from the cachetable
void toku_rollback_log_unpin_and_remove(TOKUTXN txn, ROLLBACK_LOG_NODE log);
typedef int(*apply_rollback_item)(TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn);
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn); int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn);
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn); int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn);
void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size); void *toku_malloc_in_rollback(ROLLBACK_LOG_NODE log, size_t size);
void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len); void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len);
int toku_maybe_spill_rollbacks (TOKUTXN txn, ROLLBACK_LOG_NODE log);
// given a transaction and a log node, and if the log is too full,
// set the current rollback log to ROLLBACK_NONE and move the current
// node onto the tail of the rollback node chain. further insertions
// into the rollback log for this transaction will force the creation
// of a new rollback log.
//
// this never unpins the rollback log if a spill occurs. the caller
// is responsible for ensuring the given rollback node is unpinned
// if necessary.
void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log);
int toku_txn_note_brt (TOKUTXN txn, BRT brt); int toku_txn_note_brt (TOKUTXN txn, BRT brt);
int toku_txn_note_swap_brt (BRT live, BRT zombie); int toku_txn_note_swap_brt (BRT live, BRT zombie);
int toku_txn_note_close_brt (BRT brt); int toku_txn_note_close_brt (BRT brt);
int toku_logger_txn_rollback_raw_count(TOKUTXN txn, u_int64_t *raw_count); int toku_logger_txn_rollback_raw_count(TOKUTXN txn, u_int64_t *raw_count);
int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr);
int toku_find_pair_by_xid (OMTVALUE v, void *txnv); int toku_find_pair_by_xid (OMTVALUE v, void *txnv);
int toku_find_xid_by_xid (OMTVALUE v, void *xidv); int toku_find_xid_by_xid (OMTVALUE v, void *xidv);
// these routines in roll.c // A high-level rollback log is made up of a chain of rollback log nodes.
int toku_rollback_fileentries (int fd, TOKUTXN txn, YIELDF yield, void *yieldv, LSN lsn); // Each rollback log node is represented (separately) in the cachetable by
int toku_commit_fileentries (int fd, TOKUTXN txn, YIELDF yield,void *yieldv, LSN lsn); // this structure. Each portion of the rollback log chain has a block num
// and a hash to identify it.
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
int find_xid (OMTVALUE v, void *txnv);
struct rollback_log_node { struct rollback_log_node {
int layout_version; int layout_version;
int layout_version_original; int layout_version_original;
int layout_version_read_from_disk; int layout_version_read_from_disk;
uint32_t build_id; // build_id (svn rev number) of software that wrote this node to disk uint32_t build_id; // build_id (svn rev number) of software that wrote this node to disk
int dirty; int dirty;
TXNID txnid; // Which transaction made this? // to which transaction does this node belong?
uint64_t sequence; // Which rollback log in the sequence is this? TXNID txnid;
BLOCKNUM thislogname; // Which block number is this chunk of the log? // sequentially, where in the rollback log chain is this node?
uint32_t thishash; // the sequence is between 0 and totalnodes-1
BLOCKNUM older; // Which block number is the next oldest chunk of the log? uint64_t sequence;
uint32_t older_hash; BLOCKNUM blocknum; // on which block does this node live?
uint32_t hash;
// which block number is the previous in the chain of rollback nodes
// that make up this rollback log?
BLOCKNUM previous;
uint32_t previous_hash;
struct roll_entry *oldest_logentry; struct roll_entry *oldest_logentry;
struct roll_entry *newest_logentry; struct roll_entry *newest_logentry;
MEMARENA rollentry_arena; MEMARENA rollentry_arena;
......
...@@ -223,7 +223,6 @@ toku_txn_create_txn ( ...@@ -223,7 +223,6 @@ toku_txn_create_txn (
result->current_rollback = ROLLBACK_NONE; result->current_rollback = ROLLBACK_NONE;
result->current_rollback_hash = 0; result->current_rollback_hash = 0;
result->num_rollback_nodes = 0; result->num_rollback_nodes = 0;
result->pinned_inprogress_rollback_log = NULL;
result->snapshot_type = snapshot_type; result->snapshot_type = snapshot_type;
result->snapshot_txnid64 = TXNID_NONE; result->snapshot_txnid64 = TXNID_NONE;
result->container_db_txn = container_db_txn; result->container_db_txn = container_db_txn;
......
...@@ -285,26 +285,12 @@ static int ...@@ -285,26 +285,12 @@ static int
locked_txn_commit_with_progress(DB_TXN *txn, u_int32_t flags, locked_txn_commit_with_progress(DB_TXN *txn, u_int32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) { TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn; TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
//
// We must unpin rollback log, otherwise, another thread that tries to checkpoint during commit
// will grab the multi operation lock, and then not be able to complete the checkpoint because
// this thread has its rollback log pinned and is trying to grab the multi operation lock.
//
// We grab the ydb lock because the checkpoint thread also unpins inprogress rollback logs,
// so the ydb lock protects a race of both this thread and the checkpoint thread unpinning the
// inprogress rollback log. If we want, we can probably have the checkpoint thread to not
// unpin inprogress rollback logs, making this ydb lock grab unnecessary.
//
toku_ydb_lock();
int r = toku_unpin_inprogress_rollback_log(ttxn);
toku_ydb_unlock();
assert_zero(r);
if (toku_txn_requires_checkpoint(ttxn)) { if (toku_txn_requires_checkpoint(ttxn)) {
toku_checkpoint(txn->mgrp->i->cachetable, txn->mgrp->i->logger, NULL, NULL, NULL, NULL, TXN_COMMIT_CHECKPOINT); toku_checkpoint(txn->mgrp->i->cachetable, txn->mgrp->i->logger, NULL, NULL, NULL, NULL, TXN_COMMIT_CHECKPOINT);
} }
toku_multi_operation_client_lock(); //Cannot checkpoint during a commit. toku_multi_operation_client_lock(); //Cannot checkpoint during a commit.
toku_ydb_lock(); toku_ydb_lock();
r = toku_txn_commit_only(txn, flags, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lock int r = toku_txn_commit_only(txn, flags, poll, poll_extra, true); // the final 'true' says to release the multi_operation_client_lock
toku_ydb_unlock(); toku_ydb_unlock();
toku_txn_destroy(txn); toku_txn_destroy(txn);
return r; return r;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment