Commit 681c5dd3 authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1866 List of live transactions (in logger) is now an omt instead of a linked list.

refs [t:1866]

git-svn-id: file:///svn/toku/tokudb@13604 c7de825b-a66e-492c-adef-691d508d4ae1
parent 13aa2ed3
......@@ -14,7 +14,6 @@
#include "list.h"
#include "mempool.h"
#include "kv-pair.h"
typedef void *OMTVALUE;
#include "omt.h"
#include "leafentry.h"
#include "block_table.h"
......
......@@ -18,6 +18,7 @@
#include "log_header.h"
#include "checkpoint.h"
#include "minicron.h"
#include "log-internal.h"
#if !defined(TOKU_CACHETABLE_DO_EVICT_FROM_WRITER)
#error
......@@ -1520,8 +1521,9 @@ int toku_cachetable_unpin_and_remove (CACHEFILE cachefile, CACHEKEY key) {
}
static int
log_open_txn (TOKULOGGER logger, TOKUTXN txn, void *UU(v))
{
log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *loggerv) {
TOKUTXN txn = txnv;
TOKULOGGER logger = loggerv;
if (toku_logger_txn_parent(txn)==NULL) { // only have to log the open root transactions
int r = toku_log_xstillopen(logger, NULL, 0,
toku_txn_get_txnid(txn),
......@@ -1575,7 +1577,7 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
}
// Log all the open transactions
{
int r = toku_logger_iterate_over_live_txns (logger, log_open_txn, NULL);
int r = toku_omt_iterate(logger->live_txns, log_open_txn, logger);
assert(r==0);
}
// Log all the open files
......
......@@ -65,7 +65,7 @@ struct tokulogger {
// To access these, you must have the input lock
struct logbytes *head,*tail;
LSN lsn; // the next available lsn
struct list live_txns; // just a linked list. Should be a hashtable.
OMT live_txns; // a sorted tree. Old comment said should be a hashtable. Do we still want that?
int n_in_buf;
// To access these, you must have the output lock
......@@ -95,7 +95,6 @@ struct tokutxn {
LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */
LSN first_lsn; /* The first lsn in the transaction. */
struct roll_entry *oldest_logentry,*newest_logentry; /* Only logentries with rollbacks are here. There is a list going from newest to oldest. */
struct list live_txns_link;
MEMARENA rollentry_arena;
......
......@@ -24,19 +24,21 @@ int toku_logger_create (TOKULOGGER *resultp) {
result->lg_max = 100<<20; // 100MB default
result->head = result->tail = 0;
result->lsn = result->written_lsn = result->fsynced_lsn = (LSN){0};
list_init(&result->live_txns);
r = toku_omt_create(&result->live_txns); if (r!=0) goto died0;
result->n_in_buf=0;
result->n_in_file=0;
result->directory=0;
result->checkpoint_lsn=(LSN){0};
result->write_block_size = BRT_DEFAULT_NODE_SIZE; // default logging size is the same as the default brt block size
*resultp=result;
r = ml_init(&result->input_lock); if (r!=0) goto died0;
r = ml_init(&result->output_lock); if (r!=0) goto died1;
r = ml_init(&result->input_lock); if (r!=0) goto died1;
r = ml_init(&result->output_lock); if (r!=0) goto died2;
return 0;
died1:
died2:
ml_destroy(&result->input_lock);
died1:
toku_omt_destroy(&result->live_txns);
died0:
toku_free(result);
return r;
......@@ -131,6 +133,7 @@ int toku_logger_close(TOKULOGGER *loggerp) {
r = ml_destroy(&logger->input_lock); if (r!=0) goto panic;
logger->is_panicked=1; // Just in case this might help.
if (logger->directory) toku_free(logger->directory);
toku_omt_destroy(&logger->live_txns);
toku_free(logger);
*loggerp=0;
if (locked_logger) {
......@@ -728,20 +731,36 @@ TOKULOGGER toku_txn_logger (TOKUTXN txn) {
return txn ? txn->logger : 0;
}
//Heaviside function to search through an OMT by a TXNID
static int
find_by_xid (OMTVALUE v, void *txnidv) {
TOKUTXN txn = v;
TXNID txnidfind = *(TXNID*)txnidv;
if (txn->txnid64<txnidfind) return -1;
if (txn->txnid64>txnidfind) return +1;
return 0;
}
int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result) {
if (logger==0) return -1;
struct list *l;
for (l = list_head(&logger->live_txns); l != &logger->live_txns; l = l->next) {
TOKUTXN txn = list_struct(l, struct tokutxn, live_txns_link);
assert(txn->tag==TYP_TOKUTXN);
if (txn->txnid64==txnid) {
*result = txn;
return 0;
}
if (logger==NULL) return -1;
OMTVALUE txnfound;
int rval;
int r = toku_omt_find_zero(logger->live_txns, find_by_xid, &txnid, &txnfound, NULL, NULL);
if (r==0) {
TOKUTXN txn = txnfound;
assert(txn->tag==TYP_TOKUTXN);
assert(txn->txnid64==txnid);
*result = txn;
rval = 0;
}
// If there is no txn, then we treat it as the null txn.
*result = 0;
return 0;
else {
assert(r==DB_NOTFOUND);
// If there is no txn, then we treat it as the null txn.
*result = NULL;
rval = 0;
}
return rval;
}
int toku_set_func_fsync (int (*fsync_function)(int)) {
......@@ -793,16 +812,7 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
// get them into increasing order
qsort(all_logs, all_n_logs, sizeof(all_logs[0]), logfilenamecompare);
LSN oldest_live_txn_lsn={1LL<<63};
{
struct list *l;
for (l=list_head(&logger->live_txns); l!=&logger->live_txns; l=l->next) {
TOKUTXN txn = list_struct(l, struct tokutxn, live_txns_link);
if (oldest_live_txn_lsn.lsn>txn->txnid64) {
oldest_live_txn_lsn.lsn=txn->txnid64;
}
}
}
LSN oldest_live_txn_lsn={.lsn = oldest_living_xid};
//printf("%s:%d Oldest txn is %lld\n", __FILE__, __LINE__, (long long)oldest_live_txn_lsn.lsn);
// Now starting at the last one, look for archivable ones.
......@@ -854,16 +864,6 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
}
int toku_logger_iterate_over_live_txns (TOKULOGGER logger, int (*f)(TOKULOGGER, TOKUTXN, void*), void *v) {
struct list *l;
for (l = list_head(&logger->live_txns); l != &logger->live_txns; l = l->next) {
TOKUTXN txn = list_struct(l, struct tokutxn, live_txns_link);
int r = f(logger, txn, v);
if (r!=0) return r;
}
return 0;
}
TOKUTXN toku_logger_txn_parent (TOKUTXN txn) {
return txn->parent;
}
......
......@@ -65,7 +65,6 @@ int toku_txnid2txn (TOKULOGGER logger, TXNID txnid, TOKUTXN *result);
//int toku_set_func_fsync (int (*fsync_function)(int));
int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags);
int toku_logger_iterate_over_live_txns (TOKULOGGER logger, int (*f)(TOKULOGGER, TOKUTXN, void*), void *v);
TOKUTXN toku_logger_txn_parent (TOKUTXN txn);
void toku_logger_note_checkpoint(TOKULOGGER logger, LSN lsn);
......
......@@ -11,7 +11,6 @@
#include <stdlib.h>
#include <string.h>
typedef void *OMTVALUE;
#include "toku_assert.h"
#include "memory.h"
#include "omt.h"
......
......@@ -147,6 +147,7 @@
// The programming API:
//typedef struct value *OMTVALUE; // A slight improvement over using void*.
typedef void *OMTVALUE;
typedef struct omt *OMT;
typedef struct omt_cursor *OMTCURSOR;
......
......@@ -30,22 +30,36 @@ void toku_rollback_txn_close (TOKUTXN txn) {
toku_free(txn->rollentry_filename);
}
list_remove(&txn->live_txns_link);
{
//Remove txn from list (omt) of live transactions
OMTVALUE txnagain;
u_int32_t idx;
int r;
r = toku_omt_find_zero(txn->logger->live_txns, find_xid, txn, &txnagain, &idx, NULL);
assert(r==0);
assert(txn==txnagain);
r = toku_omt_delete_at(txn->logger->live_txns, idx);
assert(r==0);
}
assert(oldest_living_xid <= txn->txnid64);
assert(oldest_living_xid < MAX_TXNID);
if (txn->txnid64 == oldest_living_xid) {
TOKULOGGER logger = txn->logger;
LSN oldest_live_txn_lsn = {MAX_TXNID};
{
struct list *l;
for (l=list_head(&logger->live_txns); l!=&logger->live_txns; l=l->next) {
TOKUTXN live_txn = list_struct(l, struct tokutxn, live_txns_link);
if (oldest_live_txn_lsn.lsn>live_txn->txnid64) {
oldest_live_txn_lsn.lsn=live_txn->txnid64;
}
}
OMTVALUE oldest_txnv;
int r = toku_omt_fetch(logger->live_txns, 0, &oldest_txnv, NULL);
if (r==0) {
TOKUTXN oldest_txn = oldest_txnv;
assert(oldest_txn != txn); // We just removed it
assert(oldest_txn->txnid64 > oldest_living_xid); //Must be newer than the previous oldest
oldest_living_xid = oldest_txn->txnid64;
}
else {
//No living transactions
assert(r==EINVAL);
oldest_living_xid = MAX_TXNID;
}
oldest_living_xid = oldest_live_txn_lsn.lsn;
}
note_txn_closing(txn);
......@@ -183,7 +197,6 @@ int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv) {
count++;
if (count%2 == 0) yield(NULL, yieldv);
}
list_remove(&txn->live_txns_link);
// Read stuff out of the file and roll it back.
if (txn->rollentry_filename) {
r = toku_rollback_fileentries(txn->rollentry_fd, txn, yield, yieldv);
......@@ -288,8 +301,8 @@ int toku_read_rollback_backwards(BREAD br, struct roll_entry **item, MEMARENA ma
return 0;
}
static int find_xid (OMTVALUE v, void *txnv) {
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
int find_xid (OMTVALUE v, void *txnv) {
TOKUTXN txn = v;
TOKUTXN txnfind = txnv;
if (txn->txnid64<txnfind->txnid64) return -1;
......
......@@ -5,6 +5,8 @@
#ident "Copyright (c) 2007, 2008, 2009 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "omt.h"
// these routines in rollback.c
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv);
......@@ -30,4 +32,7 @@ int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr);
int toku_rollback_fileentries (int fd, TOKUTXN txn, YIELDF yield, void *yieldv);
int toku_commit_fileentries (int fd, TOKUTXN txn, YIELDF yield,void *yieldv);
//Heaviside function to find a TOKUTXN by TOKUTXN (used to find the index)
int find_xid (OMTVALUE v, void *txnv);
#endif
......@@ -4,7 +4,6 @@
#include "test.h"
#include <stdio.h>
typedef void *OMTVALUE;
#include "omt.h"
#include "memory.h"
#include "toku_assert.h"
......
......@@ -6,7 +6,6 @@
#include <errno.h>
#include <stdio.h>
typedef void *OMTVALUE;
#include "omt.h"
#include "memory.h"
#include "toku_assert.h"
......
......@@ -14,18 +14,20 @@ int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER log
if (result==0)
return errno;
int r;
r = toku_log_xbegin(logger, &result->first_lsn, 0, parent_tokutxn ? parent_tokutxn->txnid64 : 0);
if (r!=0) {
if (0) {
died0:
toku_logger_panic(logger, r);
toku_free(result);
return r;
}
r = toku_log_xbegin(logger, &result->first_lsn, 0, parent_tokutxn ? parent_tokutxn->txnid64 : 0);
if (r!=0) goto died0;
r = toku_omt_create(&result->open_brts);
if (r!=0) {
if (r!=0) goto died0;
if (0) {
died1:
toku_logger_panic(logger, r);
toku_free(result);
return r;
toku_omt_destroy(&result->open_brts);
goto died0;
}
result->txnid64 = result->first_lsn.lsn;
XIDS parent_xids;
......@@ -35,19 +37,36 @@ died1:
parent_xids = parent_tokutxn->xids;
if ((r=xids_create_child(parent_xids, &result->xids, result->txnid64)))
goto died1;
if (0) {
died2:
xids_destroy(&result->xids);
goto died1;
}
result->logger = logger;
result->parent = parent_tokutxn;
result->oldest_logentry = result->newest_logentry = 0;
result->rollentry_arena = memarena_create();
if (list_empty(&logger->live_txns)) {
if (toku_omt_size(logger->live_txns) == 0) {
assert(oldest_living_xid == MAX_TXNID);
oldest_living_xid = result->txnid64;
}
assert(oldest_living_xid < MAX_TXNID);
assert(oldest_living_xid <= result->txnid64);
list_push(&logger->live_txns, &result->live_txns_link);
{
//Add txn to list (omt) of live transactions
u_int32_t idx;
r = toku_omt_insert(logger->live_txns, result, find_xid, result, &idx);
if (r!=0) goto died2;
if (oldest_living_xid == result->txnid64)
assert(idx == 0);
else
assert(idx > 0);
}
result->rollentry_resident_bytecount=0;
result->rollentry_raw_count = 0;
result->rollentry_filename = 0;
......
......@@ -18,7 +18,6 @@
#include <stdlib.h>
#include <toku_stdint.h>
#include <string.h>
typedef toku_range *OMTVALUE;
#include "../../newbrt/omt.h"
struct __toku_range_tree_local {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment