Commit 660a2928 authored by Kristian Nielsen's avatar Kristian Nielsen

Fix optimistic parallel replication for TokuDB.

Make TokuDB report row lock waits with thd_rpl_deadlock_check(). This allows
parallel replication to properly detect conflicts, and kill and retry the
offending transaction.
parent d145d1b6
...@@ -405,6 +405,7 @@ static void print_db_env_struct (void) { ...@@ -405,6 +405,7 @@ static void print_db_env_struct (void) {
"int (*set_lock_timeout) (DB_ENV *env, uint64_t default_lock_wait_time_msec, uint64_t (*get_lock_wait_time_cb)(uint64_t default_lock_wait_time))", "int (*set_lock_timeout) (DB_ENV *env, uint64_t default_lock_wait_time_msec, uint64_t (*get_lock_wait_time_cb)(uint64_t default_lock_wait_time))",
"int (*get_lock_timeout) (DB_ENV *env, uint64_t *lock_wait_time_msec)", "int (*get_lock_timeout) (DB_ENV *env, uint64_t *lock_wait_time_msec)",
"int (*set_lock_timeout_callback) (DB_ENV *env, lock_timeout_callback callback)", "int (*set_lock_timeout_callback) (DB_ENV *env, lock_timeout_callback callback)",
"int (*set_lock_wait_callback) (DB_ENV *env, lock_wait_callback callback)",
"int (*txn_xa_recover) (DB_ENV*, TOKU_XA_XID list[/*count*/], long count, /*out*/ long *retp, uint32_t flags)", "int (*txn_xa_recover) (DB_ENV*, TOKU_XA_XID list[/*count*/], long count, /*out*/ long *retp, uint32_t flags)",
"int (*get_txn_from_xid) (DB_ENV*, /*in*/ TOKU_XA_XID *, /*out*/ DB_TXN **)", "int (*get_txn_from_xid) (DB_ENV*, /*in*/ TOKU_XA_XID *, /*out*/ DB_TXN **)",
"DB* (*get_db_for_directory) (DB_ENV*)", "DB* (*get_db_for_directory) (DB_ENV*)",
...@@ -751,6 +752,7 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) { ...@@ -751,6 +752,7 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) {
printf("void toku_dbt_array_resize(DBT_ARRAY *dbts, uint32_t size) %s;\n", VISIBLE); printf("void toku_dbt_array_resize(DBT_ARRAY *dbts, uint32_t size) %s;\n", VISIBLE);
printf("typedef void (*lock_timeout_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid);\n"); printf("typedef void (*lock_timeout_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid);\n");
printf("typedef void (*lock_wait_callback)(void *arg, uint64_t requesting_txnid, uint64_t blocking_txnid);\n");
printf("typedef int (*iterate_row_locks_callback)(DB **db, DBT *left_key, DBT *right_key, void *extra);\n"); printf("typedef int (*iterate_row_locks_callback)(DB **db, DBT *left_key, DBT *right_key, void *extra);\n");
printf("typedef int (*iterate_transactions_callback)(DB_TXN *dbtxn, iterate_row_locks_callback cb, void *locks_extra, void *extra);\n"); printf("typedef int (*iterate_transactions_callback)(DB_TXN *dbtxn, iterate_row_locks_callback cb, void *locks_extra, void *extra);\n");
printf("typedef int (*iterate_requests_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid, uint64_t start_time, void *extra);\n"); printf("typedef int (*iterate_requests_callback)(DB *db, uint64_t requesting_txnid, const DBT *left_key, const DBT *right_key, uint64_t blocking_txnid, uint64_t start_time, void *extra);\n");
......
...@@ -202,6 +202,7 @@ namespace ftcxx { ...@@ -202,6 +202,7 @@ namespace ftcxx {
typedef uint64_t (*get_lock_wait_time_cb_func)(uint64_t); typedef uint64_t (*get_lock_wait_time_cb_func)(uint64_t);
get_lock_wait_time_cb_func _get_lock_wait_time_cb; get_lock_wait_time_cb_func _get_lock_wait_time_cb;
lock_timeout_callback _lock_timeout_callback; lock_timeout_callback _lock_timeout_callback;
lock_wait_callback _lock_wait_needed_callback;
uint64_t (*_loader_memory_size_callback)(void); uint64_t (*_loader_memory_size_callback)(void);
uint32_t _cachesize_gbytes; uint32_t _cachesize_gbytes;
...@@ -231,6 +232,7 @@ namespace ftcxx { ...@@ -231,6 +232,7 @@ namespace ftcxx {
_lock_wait_time_msec(0), _lock_wait_time_msec(0),
_get_lock_wait_time_cb(nullptr), _get_lock_wait_time_cb(nullptr),
_lock_timeout_callback(nullptr), _lock_timeout_callback(nullptr),
_lock_wait_needed_callback(nullptr),
_loader_memory_size_callback(nullptr), _loader_memory_size_callback(nullptr),
_cachesize_gbytes(0), _cachesize_gbytes(0),
_cachesize_bytes(0), _cachesize_bytes(0),
...@@ -296,6 +298,11 @@ namespace ftcxx { ...@@ -296,6 +298,11 @@ namespace ftcxx {
handle_ft_retval(r); handle_ft_retval(r);
} }
if (_lock_wait_needed_callback) {
r = env->set_lock_wait_callback(env, _lock_wait_needed_callback);
handle_ft_retval(r);
}
if (_loader_memory_size_callback) { if (_loader_memory_size_callback) {
env->set_loader_memory_size(env, _loader_memory_size_callback); env->set_loader_memory_size(env, _loader_memory_size_callback);
} }
...@@ -419,6 +426,11 @@ namespace ftcxx { ...@@ -419,6 +426,11 @@ namespace ftcxx {
return *this; return *this;
} }
DBEnvBuilder& set_lock_wait_callback(lock_wait_callback callback) {
_lock_wait_needed_callback = callback;
return *this;
}
DBEnvBuilder& set_loader_memory_size(uint64_t (*callback)(void)) { DBEnvBuilder& set_loader_memory_size(uint64_t (*callback)(void)) {
_loader_memory_size_callback = callback; _loader_memory_size_callback = callback;
return *this; return *this;
......
...@@ -199,7 +199,8 @@ int lock_request::wait(uint64_t wait_time_ms) { ...@@ -199,7 +199,8 @@ int lock_request::wait(uint64_t wait_time_ms) {
return wait(wait_time_ms, 0, nullptr); return wait(wait_time_ms, 0, nullptr);
} }
int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void)) { int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void),
void (*lock_wait_callback)(void *, TXNID, TXNID)) {
uint64_t t_now = toku_current_time_microsec(); uint64_t t_now = toku_current_time_microsec();
uint64_t t_start = t_now; uint64_t t_start = t_now;
uint64_t t_end = t_start + wait_time_ms * 1000; uint64_t t_end = t_start + wait_time_ms * 1000;
...@@ -208,7 +209,13 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil ...@@ -208,7 +209,13 @@ int lock_request::wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*kil
// check again, this time locking out other retry calls // check again, this time locking out other retry calls
if (m_state == state::PENDING) { if (m_state == state::PENDING) {
retry(); GrowableArray<TXNID> conflicts_collector;
conflicts_collector.init();
retry(&conflicts_collector);
if (m_state == state::PENDING) {
report_waits(&conflicts_collector, lock_wait_callback);
}
conflicts_collector.deinit();
} }
while (m_state == state::PENDING) { while (m_state == state::PENDING) {
...@@ -287,7 +294,7 @@ TXNID lock_request::get_conflicting_txnid(void) const { ...@@ -287,7 +294,7 @@ TXNID lock_request::get_conflicting_txnid(void) const {
return m_conflicting_txnid; return m_conflicting_txnid;
} }
int lock_request::retry(void) { int lock_request::retry(GrowableArray<TXNID> *conflicts_collector) {
invariant(m_state == state::PENDING); invariant(m_state == state::PENDING);
int r; int r;
...@@ -308,13 +315,14 @@ int lock_request::retry(void) { ...@@ -308,13 +315,14 @@ int lock_request::retry(void) {
toku_cond_broadcast(&m_wait_cond); toku_cond_broadcast(&m_wait_cond);
} else { } else {
m_conflicting_txnid = conflicts.get(0); m_conflicting_txnid = conflicts.get(0);
add_conflicts_to_waits(&conflicts, conflicts_collector);
} }
conflicts.destroy(); conflicts.destroy();
return r; return r;
} }
void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_test_callback)(void)) { void lock_request::retry_all_lock_requests(locktree *lt, void (*lock_wait_callback)(void *, TXNID, TXNID), void (*after_retry_all_test_callback)(void)) {
lt_lock_request_info *info = lt->get_lock_request_info(); lt_lock_request_info *info = lt->get_lock_request_info();
info->retry_want++; info->retry_want++;
...@@ -327,6 +335,9 @@ void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_ ...@@ -327,6 +335,9 @@ void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_
toku_mutex_lock(&info->mutex); toku_mutex_lock(&info->mutex);
GrowableArray<TXNID> conflicts_collector;
conflicts_collector.init();
// here is the group retry algorithm. // here is the group retry algorithm.
// get the latest retry_want count and use it as the generation number of this retry operation. // get the latest retry_want count and use it as the generation number of this retry operation.
// if this retry generation is > the last retry generation, then do the lock retries. otherwise, // if this retry generation is > the last retry generation, then do the lock retries. otherwise,
...@@ -344,7 +355,7 @@ void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_ ...@@ -344,7 +355,7 @@ void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_
// move on to the next lock request. otherwise // move on to the next lock request. otherwise
// the request is gone from the list so we may // the request is gone from the list so we may
// read the i'th entry for the next one. // read the i'th entry for the next one.
r = request->retry(); r = request->retry(&conflicts_collector);
if (r != 0) { if (r != 0) {
i++; i++;
} }
...@@ -354,6 +365,30 @@ void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_ ...@@ -354,6 +365,30 @@ void lock_request::retry_all_lock_requests(locktree *lt, void (*after_retry_all_
} }
toku_mutex_unlock(&info->mutex); toku_mutex_unlock(&info->mutex);
report_waits(&conflicts_collector, lock_wait_callback);
conflicts_collector.deinit();
}
void lock_request::add_conflicts_to_waits(txnid_set *conflicts,
GrowableArray<TXNID> *wait_conflicts) {
size_t num_conflicts = conflicts->size();
for (size_t i = 0; i < num_conflicts; i++) {
wait_conflicts->push(m_txnid);
wait_conflicts->push(conflicts->get(i));
}
}
void lock_request::report_waits(GrowableArray<TXNID> *wait_conflicts,
void (*lock_wait_callback)(void *, TXNID, TXNID)) {
if (!lock_wait_callback)
return;
size_t num_conflicts = wait_conflicts->get_size();
for (size_t i = 0; i < num_conflicts; i += 2) {
TXNID blocked_txnid = wait_conflicts->fetch_unchecked(i);
TXNID blocking_txnid = wait_conflicts->fetch_unchecked(i+1);
(*lock_wait_callback)(nullptr, blocked_txnid, blocking_txnid);
}
} }
void *lock_request::get_extra(void) const { void *lock_request::get_extra(void) const {
......
...@@ -89,7 +89,8 @@ class lock_request { ...@@ -89,7 +89,8 @@ class lock_request {
// returns: The return code of locktree::acquire_[write,read]_lock() // returns: The return code of locktree::acquire_[write,read]_lock()
// or simply DB_LOCK_NOTGRANTED if the wait time expired. // or simply DB_LOCK_NOTGRANTED if the wait time expired.
int wait(uint64_t wait_time_ms); int wait(uint64_t wait_time_ms);
int wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void)); int wait(uint64_t wait_time_ms, uint64_t killed_time_ms, int (*killed_callback)(void),
void (*lock_wait_callback)(void *, TXNID, TXNID) = nullptr);
// return: left end-point of the lock range // return: left end-point of the lock range
const DBT *get_left_key(void) const; const DBT *get_left_key(void) const;
...@@ -109,7 +110,7 @@ class lock_request { ...@@ -109,7 +110,7 @@ class lock_request {
// effect: Retries all of the lock requests for the given locktree. // effect: Retries all of the lock requests for the given locktree.
// Any lock requests successfully restarted is completed and woken up. // Any lock requests successfully restarted is completed and woken up.
// The rest remain pending. // The rest remain pending.
static void retry_all_lock_requests(locktree *lt, void (*after_retry_test_callback)(void) = nullptr); static void retry_all_lock_requests(locktree *lt, void (*lock_wait_callback)(void *, TXNID, TXNID) = nullptr, void (*after_retry_test_callback)(void) = nullptr);
void set_start_test_callback(void (*f)(void)); void set_start_test_callback(void (*f)(void));
void set_start_before_pending_test_callback(void (*f)(void)); void set_start_before_pending_test_callback(void (*f)(void));
...@@ -162,7 +163,7 @@ class lock_request { ...@@ -162,7 +163,7 @@ class lock_request {
// effect: tries again to acquire the lock described by this lock request // effect: tries again to acquire the lock described by this lock request
// returns: 0 if retrying the request succeeded and is now complete // returns: 0 if retrying the request succeeded and is now complete
int retry(void); int retry(GrowableArray<TXNID> *conflict_collector);
void complete(int complete_r); void complete(int complete_r);
...@@ -194,6 +195,11 @@ class lock_request { ...@@ -194,6 +195,11 @@ class lock_request {
static int find_by_txnid(lock_request * const &request, const TXNID &txnid); static int find_by_txnid(lock_request * const &request, const TXNID &txnid);
// Report list of conflicts to lock wait callback.
static void report_waits(GrowableArray<TXNID> *wait_conflicts,
void (*lock_wait_callback)(void *, TXNID, TXNID));
void add_conflicts_to_waits(txnid_set *conflicts, GrowableArray<TXNID> *wait_conflicts);
void (*m_start_test_callback)(void); void (*m_start_test_callback)(void);
void (*m_start_before_pending_test_callback)(void); void (*m_start_before_pending_test_callback)(void);
void (*m_retry_test_callback)(void); void (*m_retry_test_callback)(void);
......
...@@ -87,7 +87,7 @@ static void run_locker(locktree *lt, TXNID txnid, const DBT *key, pthread_barrie ...@@ -87,7 +87,7 @@ static void run_locker(locktree *lt, TXNID txnid, const DBT *key, pthread_barrie
buffer.destroy(); buffer.destroy();
// retry pending lock requests // retry pending lock requests
lock_request::retry_all_lock_requests(lt, after_retry_all); lock_request::retry_all_lock_requests(lt, nullptr, after_retry_all);
} }
request.destroy(); request.destroy();
......
...@@ -105,6 +105,7 @@ struct __toku_db_env_internal { ...@@ -105,6 +105,7 @@ struct __toku_db_env_internal {
TOKULOGGER logger; TOKULOGGER logger;
toku::locktree_manager ltm; toku::locktree_manager ltm;
lock_timeout_callback lock_wait_timeout_callback; // Called when a lock request times out waiting for a lock. lock_timeout_callback lock_wait_timeout_callback; // Called when a lock request times out waiting for a lock.
lock_wait_callback lock_wait_needed_callback; // Called when a lock request requires a wait.
DB *directory; // Maps dnames to inames DB *directory; // Maps dnames to inames
DB *persistent_environment; // Stores environment settings, can be used for upgrade DB *persistent_environment; // Stores environment settings, can be used for upgrade
......
...@@ -1804,6 +1804,12 @@ env_set_lock_timeout_callback(DB_ENV *env, lock_timeout_callback callback) { ...@@ -1804,6 +1804,12 @@ env_set_lock_timeout_callback(DB_ENV *env, lock_timeout_callback callback) {
return 0; return 0;
} }
static int
env_set_lock_wait_callback(DB_ENV *env, lock_wait_callback callback) {
env->i->lock_wait_needed_callback = callback;
return 0;
}
static void static void
format_time(const time_t *timer, char *buf) { format_time(const time_t *timer, char *buf) {
ctime_r(timer, buf); ctime_r(timer, buf);
...@@ -2704,6 +2710,7 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) { ...@@ -2704,6 +2710,7 @@ toku_env_create(DB_ENV ** envp, uint32_t flags) {
USENV(get_lock_timeout); USENV(get_lock_timeout);
USENV(set_lock_timeout); USENV(set_lock_timeout);
USENV(set_lock_timeout_callback); USENV(set_lock_timeout_callback);
USENV(set_lock_wait_callback);
USENV(set_redzone); USENV(set_redzone);
USENV(log_flush); USENV(log_flush);
USENV(log_archive); USENV(log_archive);
......
...@@ -193,7 +193,10 @@ int toku_db_start_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT ...@@ -193,7 +193,10 @@ int toku_db_start_range_lock(DB *db, DB_TXN *txn, const DBT *left_key, const DBT
toku::lock_request::type lock_type, toku::lock_request *request) { toku::lock_request::type lock_type, toku::lock_request *request) {
DB_TXN *txn_anc = txn_oldest_ancester(txn); DB_TXN *txn_anc = txn_oldest_ancester(txn);
TXNID txn_anc_id = txn_anc->id64(txn_anc); TXNID txn_anc_id = txn_anc->id64(txn_anc);
request->set(db->i->lt, txn_anc_id, left_key, right_key, lock_type, toku_is_big_txn(txn_anc)); uint64_t client_id;
void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
request->set(db->i->lt, txn_anc_id, left_key, right_key, lock_type, toku_is_big_txn(txn_anc), client_extra);
const int r = request->start(); const int r = request->start();
if (r == 0) { if (r == 0) {
...@@ -221,7 +224,8 @@ int toku_db_wait_range_lock(DB *db, DB_TXN *txn, toku::lock_request *request) { ...@@ -221,7 +224,8 @@ int toku_db_wait_range_lock(DB *db, DB_TXN *txn, toku::lock_request *request) {
uint64_t killed_time_msec = env->i->default_killed_time_msec; uint64_t killed_time_msec = env->i->default_killed_time_msec;
if (env->i->get_killed_time_callback) if (env->i->get_killed_time_callback)
killed_time_msec = env->i->get_killed_time_callback(killed_time_msec); killed_time_msec = env->i->get_killed_time_callback(killed_time_msec);
const int r = request->wait(wait_time_msec, killed_time_msec, env->i->killed_callback); const int r = request->wait(wait_time_msec, killed_time_msec, env->i->killed_callback,
env->i->lock_wait_needed_callback);
if (r == 0) { if (r == 0) {
db_txn_note_row_lock(db, txn_anc, left_key, right_key); db_txn_note_row_lock(db, txn_anc, left_key, right_key);
} else if (r == DB_LOCK_NOTGRANTED) { } else if (r == DB_LOCK_NOTGRANTED) {
...@@ -248,7 +252,10 @@ void toku_db_grab_write_lock (DB *db, DBT *key, TOKUTXN tokutxn) { ...@@ -248,7 +252,10 @@ void toku_db_grab_write_lock (DB *db, DBT *key, TOKUTXN tokutxn) {
// This lock request must succeed, so we do not want to wait // This lock request must succeed, so we do not want to wait
toku::lock_request request; toku::lock_request request;
request.create(); request.create();
request.set(db->i->lt, txn_anc_id, key, key, toku::lock_request::type::WRITE, toku_is_big_txn(txn_anc)); uint64_t client_id;
void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
request.set(db->i->lt, txn_anc_id, key, key, toku::lock_request::type::WRITE, toku_is_big_txn(txn_anc), client_extra);
int r = request.start(); int r = request.start();
invariant_zero(r); invariant_zero(r);
db_txn_note_row_lock(db, txn_anc, key, key); db_txn_note_row_lock(db, txn_anc, key, key);
...@@ -268,7 +275,7 @@ void toku_db_release_lt_key_ranges(DB_TXN *txn, txn_lt_key_ranges *ranges) { ...@@ -268,7 +275,7 @@ void toku_db_release_lt_key_ranges(DB_TXN *txn, txn_lt_key_ranges *ranges) {
// all of our locks have been released, so first try to wake up // all of our locks have been released, so first try to wake up
// pending lock requests, then release our reference on the lt // pending lock requests, then release our reference on the lt
toku::lock_request::retry_all_lock_requests(lt); toku::lock_request::retry_all_lock_requests(lt, txn->mgrp->i->lock_wait_needed_callback);
// Release our reference on this locktree // Release our reference on this locktree
toku::locktree_manager *ltm = &txn->mgrp->i->ltm; toku::locktree_manager *ltm = &txn->mgrp->i->ltm;
......
...@@ -55,6 +55,7 @@ static bool tokudb_show_status( ...@@ -55,6 +55,7 @@ static bool tokudb_show_status(
static void tokudb_handle_fatal_signal(handlerton* hton, THD* thd, int sig); static void tokudb_handle_fatal_signal(handlerton* hton, THD* thd, int sig);
#endif #endif
static int tokudb_close_connection(handlerton* hton, THD* thd); static int tokudb_close_connection(handlerton* hton, THD* thd);
static void tokudb_kill_query(handlerton *hton, THD *thd, enum thd_kill_levels level);
static int tokudb_commit(handlerton* hton, THD* thd, bool all); static int tokudb_commit(handlerton* hton, THD* thd, bool all);
static int tokudb_rollback(handlerton* hton, THD* thd, bool all); static int tokudb_rollback(handlerton* hton, THD* thd, bool all);
#if TOKU_INCLUDE_XA #if TOKU_INCLUDE_XA
...@@ -147,6 +148,11 @@ static void tokudb_lock_timeout_callback( ...@@ -147,6 +148,11 @@ static void tokudb_lock_timeout_callback(
const DBT* right_key, const DBT* right_key,
uint64_t blocking_txnid); uint64_t blocking_txnid);
static void tokudb_lock_wait_needed_callback(
void* arg,
uint64_t requesting_txnid,
uint64_t blocking_txnid);
#define ASSERT_MSGLEN 1024 #define ASSERT_MSGLEN 1024
void toku_hton_assert_fail( void toku_hton_assert_fail(
...@@ -331,6 +337,7 @@ static int tokudb_init_func(void *p) { ...@@ -331,6 +337,7 @@ static int tokudb_init_func(void *p) {
tokudb_hton->create = tokudb_create_handler; tokudb_hton->create = tokudb_create_handler;
tokudb_hton->close_connection = tokudb_close_connection; tokudb_hton->close_connection = tokudb_close_connection;
tokudb_hton->kill_query = tokudb_kill_query;
tokudb_hton->savepoint_offset = sizeof(SP_INFO_T); tokudb_hton->savepoint_offset = sizeof(SP_INFO_T);
tokudb_hton->savepoint_set = tokudb_savepoint; tokudb_hton->savepoint_set = tokudb_savepoint;
...@@ -532,6 +539,7 @@ static int tokudb_init_func(void *p) { ...@@ -532,6 +539,7 @@ static int tokudb_init_func(void *p) {
db_env->set_lock_timeout_callback(db_env, tokudb_lock_timeout_callback); db_env->set_lock_timeout_callback(db_env, tokudb_lock_timeout_callback);
db_env->set_dir_per_db(db_env, tokudb::sysvars::dir_per_db); db_env->set_dir_per_db(db_env, tokudb::sysvars::dir_per_db);
db_env->set_lock_wait_callback(db_env, tokudb_lock_wait_needed_callback);
db_env->set_loader_memory_size( db_env->set_loader_memory_size(
db_env, db_env,
...@@ -754,6 +762,12 @@ static int tokudb_close_connection(handlerton* hton, THD* thd) { ...@@ -754,6 +762,12 @@ static int tokudb_close_connection(handlerton* hton, THD* thd) {
return error; return error;
} }
void tokudb_kill_query(handlerton *hton, THD *thd, enum thd_kill_levels level) {
TOKUDB_DBUG_ENTER("");
db_env->kill_waiter(db_env, thd);
DBUG_VOID_RETURN;
}
bool tokudb_flush_logs(handlerton * hton) { bool tokudb_flush_logs(handlerton * hton) {
TOKUDB_DBUG_ENTER(""); TOKUDB_DBUG_ENTER("");
int error; int error;
...@@ -873,9 +887,9 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) { ...@@ -873,9 +887,9 @@ static int tokudb_commit(handlerton * hton, THD * thd, bool all) {
tokudb_sync_on_commit(thd, trx, this_txn) ? 0 : DB_TXN_NOSYNC; tokudb_sync_on_commit(thd, trx, this_txn) ? 0 : DB_TXN_NOSYNC;
TOKUDB_TRACE_FOR_FLAGS( TOKUDB_TRACE_FOR_FLAGS(
TOKUDB_DEBUG_TXN, TOKUDB_DEBUG_TXN,
"commit trx %u txn %p syncflag %u", "commit trx %u txn %p %" PRIu64 " syncflag %u",
all, all,
this_txn, this_txn, this_txn->id64(this_txn),
syncflag); syncflag);
// test hook to induce a crash on a debug build // test hook to induce a crash on a debug build
DBUG_EXECUTE_IF("tokudb_crash_commit_before", DBUG_SUICIDE();); DBUG_EXECUTE_IF("tokudb_crash_commit_before", DBUG_SUICIDE(););
...@@ -904,9 +918,9 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) { ...@@ -904,9 +918,9 @@ static int tokudb_rollback(handlerton * hton, THD * thd, bool all) {
if (this_txn) { if (this_txn) {
TOKUDB_TRACE_FOR_FLAGS( TOKUDB_TRACE_FOR_FLAGS(
TOKUDB_DEBUG_TXN, TOKUDB_DEBUG_TXN,
"rollback %u txn %p", "rollback %u txn %p %" PRIu64,
all, all,
this_txn); this_txn, this_txn->id64(this_txn));
tokudb_cleanup_handlers(trx, this_txn); tokudb_cleanup_handlers(trx, this_txn);
abort_txn_with_progress(this_txn, thd); abort_txn_with_progress(this_txn, thd);
*txn = NULL; *txn = NULL;
...@@ -952,9 +966,9 @@ static int tokudb_xa_prepare(handlerton* hton, THD* thd, bool all) { ...@@ -952,9 +966,9 @@ static int tokudb_xa_prepare(handlerton* hton, THD* thd, bool all) {
uint32_t syncflag = tokudb_sync_on_prepare() ? 0 : DB_TXN_NOSYNC; uint32_t syncflag = tokudb_sync_on_prepare() ? 0 : DB_TXN_NOSYNC;
TOKUDB_TRACE_FOR_FLAGS( TOKUDB_TRACE_FOR_FLAGS(
TOKUDB_DEBUG_XA, TOKUDB_DEBUG_XA,
"doing txn prepare:%d:%p", "doing txn prepare:%d:%p %" PRIu64,
all, all,
txn); txn, txn->id64(txn));
// a TOKU_XA_XID is identical to a MYSQL_XID // a TOKU_XA_XID is identical to a MYSQL_XID
TOKU_XA_XID thd_xid; TOKU_XA_XID thd_xid;
thd_get_xid(thd, (MYSQL_XID*) &thd_xid); thd_get_xid(thd, (MYSQL_XID*) &thd_xid);
...@@ -1570,7 +1584,9 @@ static int tokudb_search_txn_callback( ...@@ -1570,7 +1584,9 @@ static int tokudb_search_txn_callback(
void* extra) { void* extra) {
uint64_t txn_id = txn->id64(txn); uint64_t txn_id = txn->id64(txn);
uint64_t client_id = txn->get_client_id(txn); uint64_t client_id;
void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
struct tokudb_search_txn_extra* e = struct tokudb_search_txn_extra* e =
reinterpret_cast<struct tokudb_search_txn_extra*>(extra); reinterpret_cast<struct tokudb_search_txn_extra*>(extra);
if (e->match_txn_id == txn_id) { if (e->match_txn_id == txn_id) {
...@@ -1748,6 +1764,63 @@ static void tokudb_lock_timeout_callback( ...@@ -1748,6 +1764,63 @@ static void tokudb_lock_timeout_callback(
} }
} }
extern "C" int thd_rpl_deadlock_check(MYSQL_THD thd, MYSQL_THD other_thd);
struct tokudb_search_txn_thd {
bool match_found;
uint64_t match_txn_id;
THD *match_client_thd;
};
static int tokudb_search_txn_thd_callback(
DB_TXN* txn,
iterate_row_locks_callback iterate_locks,
void* locks_extra,
void* extra) {
uint64_t txn_id = txn->id64(txn);
uint64_t client_id;
void *client_extra;
txn->get_client_id(txn, &client_id, &client_extra);
struct tokudb_search_txn_thd* e =
reinterpret_cast<struct tokudb_search_txn_thd*>(extra);
if (e->match_txn_id == txn_id) {
e->match_found = true;
e->match_client_thd = reinterpret_cast<THD *>(client_extra);
return 1;
}
return 0;
}
static bool tokudb_txn_id_to_thd(
uint64_t txnid,
THD **out_thd) {
struct tokudb_search_txn_thd e = {
false,
txnid,
0
};
db_env->iterate_live_transactions(db_env, tokudb_search_txn_thd_callback, &e);
if (e.match_found) {
*out_thd = e.match_client_thd;
}
return e.match_found;
}
static void tokudb_lock_wait_needed_callback(
void *arg,
uint64_t requesting_txnid,
uint64_t blocking_txnid) {
THD *requesting_thd;
THD *blocking_thd;
if (tokudb_txn_id_to_thd(requesting_txnid, &requesting_thd) &&
tokudb_txn_id_to_thd(blocking_txnid, &blocking_thd)) {
thd_rpl_deadlock_check (requesting_thd, blocking_thd);
}
}
// Retrieves variables for information_schema.global_status. // Retrieves variables for information_schema.global_status.
// Names (columnname) are automatically converted to upper case, // Names (columnname) are automatically converted to upper case,
// and prefixed with "TOKUDB_" // and prefixed with "TOKUDB_"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment