Commit 8b1f9f8e authored by Sergei Petrunia's avatar Sergei Petrunia

MDEV-11934: MariaRocks: Group Commit with binlog

Implement MariaDB's Group Commit API. This is a first attempt which lacks
the expected performance.
parent c52ffbeb
...@@ -1682,6 +1682,12 @@ class Rdb_transaction { ...@@ -1682,6 +1682,12 @@ class Rdb_transaction {
*/ */
int64_t m_n_mysql_tables_in_use = 0; int64_t m_n_mysql_tables_in_use = 0;
/*
MariaDB's group commit:
*/
bool commit_ordered_done;
bool commit_ordered_res;
/* /*
for distinction between rdb_transaction_impl and rdb_writebatch_impl for distinction between rdb_transaction_impl and rdb_writebatch_impl
when using walk tx list when using walk tx list
...@@ -2299,6 +2305,8 @@ class Rdb_transaction_impl : public Rdb_transaction { ...@@ -2299,6 +2305,8 @@ class Rdb_transaction_impl : public Rdb_transaction {
THDVAR(m_thd, write_ignore_missing_column_families); THDVAR(m_thd, write_ignore_missing_column_families);
m_is_two_phase = rocksdb_enable_2pc; m_is_two_phase = rocksdb_enable_2pc;
commit_ordered_done= false;
/* /*
If m_rocksdb_reuse_tx is null this will create a new transaction object. If m_rocksdb_reuse_tx is null this will create a new transaction object.
Otherwise it will reuse the existing one. Otherwise it will reuse the existing one.
...@@ -2511,6 +2519,7 @@ class Rdb_writebatch_impl : public Rdb_transaction { ...@@ -2511,6 +2519,7 @@ class Rdb_writebatch_impl : public Rdb_transaction {
bool is_tx_started() const override { return (m_batch != nullptr); } bool is_tx_started() const override { return (m_batch != nullptr); }
void start_tx() override { void start_tx() override {
commit_ordered_done= false; // Do we need this here?
reset(); reset();
write_opts.sync = (rocksdb_flush_log_at_trx_commit == 1); write_opts.sync = (rocksdb_flush_log_at_trx_commit == 1);
write_opts.disableWAL = THDVAR(m_thd, write_disable_wal); write_opts.disableWAL = THDVAR(m_thd, write_disable_wal);
...@@ -2682,8 +2691,7 @@ static bool rocksdb_flush_wal(handlerton* hton __attribute__((__unused__))) ...@@ -2682,8 +2691,7 @@ static bool rocksdb_flush_wal(handlerton* hton __attribute__((__unused__)))
*/ */
static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx) static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
{ {
// This is "ASYNC_COMMIT" feature which is only in webscalesql bool async=false; // This is "ASYNC_COMMIT" feature which is only present in webscalesql
bool async=false;
Rdb_transaction *&tx = get_tx_from_thd(thd); Rdb_transaction *&tx = get_tx_from_thd(thd);
if (!tx->can_prepare()) { if (!tx->can_prepare()) {
...@@ -2693,7 +2701,8 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx) ...@@ -2693,7 +2701,8 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
(!my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) { (!my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))) {
/* We were instructed to prepare the whole transaction, or /* We were instructed to prepare the whole transaction, or
this is an SQL statement end and autocommit is on */ this is an SQL statement end and autocommit is on */
#ifdef MARIAROCKS_NOT_YET // disable prepare/commit
#ifdef MARIAROCKS_NOT_YET // Crash-safe slave does not work yet
std::vector<st_slave_gtid_info> slave_gtid_info; std::vector<st_slave_gtid_info> slave_gtid_info;
my_core::thd_slave_gtid_info(thd, &slave_gtid_info); my_core::thd_slave_gtid_info(thd, &slave_gtid_info);
for (const auto &it : slave_gtid_info) { for (const auto &it : slave_gtid_info) {
...@@ -2703,21 +2712,38 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx) ...@@ -2703,21 +2712,38 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
#endif #endif
if (tx->is_two_phase()) { if (tx->is_two_phase()) {
/*
MariaDB: the following branch is never taken.
We always flush at Prepare and rely on RocksDB's internal Group Commit
to do some grouping.
*/
if (thd->durability_property == HA_IGNORE_DURABILITY || async) { if (thd->durability_property == HA_IGNORE_DURABILITY || async) {
tx->set_sync(false); tx->set_sync(false);
} }
/*
MariaDB: do not flush logs if we are running in a non-crash-safe mode.
*/
if (!rocksdb_flush_log_at_trx_commit)
tx->set_sync(false);
XID xid; XID xid;
thd_get_xid(thd, reinterpret_cast<MYSQL_XID *>(&xid)); thd_get_xid(thd, reinterpret_cast<MYSQL_XID *>(&xid));
if (!tx->prepare(rdb_xid_to_string(xid))) { if (!tx->prepare(rdb_xid_to_string(xid))) {
return HA_EXIT_FAILURE; return HA_EXIT_FAILURE;
} }
if (thd->durability_property == HA_IGNORE_DURABILITY )
/*
MariaDB: our Group Commit implementation does not use the
hton->flush_logs call (at least currently) so the following is not
needed (TODO: will we need this for binlog rotation?)
*/
#ifdef MARIAROCKS_NOT_YET #ifdef MARIAROCKS_NOT_YET
if (thd->durability_property == HA_IGNORE_DURABILITY
&& &&
THDVAR(thd, flush_log_at_trx_commit)) rocksdb_flush_log_at_trx_commit)
#endif
{ {
#ifdef MARIAROCKS_NOT_YET
// MariaRocks: disable the // MariaRocks: disable the
// "write/sync redo log before flushing binlog cache to file" // "write/sync redo log before flushing binlog cache to file"
// feature. See a869c56d361bb44f46c0efeb11a8f03561676247 // feature. See a869c56d361bb44f46c0efeb11a8f03561676247
...@@ -2725,8 +2751,8 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx) ...@@ -2725,8 +2751,8 @@ static int rocksdb_prepare(handlerton* hton, THD* thd, bool prepare_tx)
we set the log sequence as '1' just to trigger hton->flush_logs we set the log sequence as '1' just to trigger hton->flush_logs
*/ */
thd_store_lsn(thd, 1, DB_TYPE_ROCKSDB); thd_store_lsn(thd, 1, DB_TYPE_ROCKSDB);
#endif
} }
#endif
} }
DEBUG_SYNC(thd, "rocksdb.prepared"); DEBUG_SYNC(thd, "rocksdb.prepared");
...@@ -2876,6 +2902,50 @@ static int rocksdb_recover(handlerton* hton, XID* xid_list, uint len) ...@@ -2876,6 +2902,50 @@ static int rocksdb_recover(handlerton* hton, XID* xid_list, uint len)
return count; return count;
} }
/*
Handle a commit checkpoint request from server layer.
InnoDB does this:
We put the request in a queue, so that we can notify upper layer about
checkpoint complete when we have flushed the redo log.
If we have already flushed all relevant redo log, we notify immediately.
MariaRocks just flushes everything right away ATM
*/
static void rocksdb_checkpoint_request(handlerton *hton,
void *cookie)
{
const rocksdb::Status s= rdb->SyncWAL();
//TODO: what to do on error?
if (s.ok())
{
rocksdb_wal_group_syncs++;
commit_checkpoint_notify_ha(hton, cookie);
}
}
/*
@param all: TRUE - commit the transaction
FALSE - SQL statement ended
*/
static void rocksdb_commit_ordered(handlerton *hton, THD* thd, bool all)
{
// Same assert as InnoDB has
DBUG_ASSERT(all || (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT |
OPTION_BEGIN)));
Rdb_transaction *&tx = get_tx_from_thd(thd);
tx->set_sync(false);
/* This will note the master position also */
tx->commit_ordered_res= tx->commit();
tx->commit_ordered_done= true;
}
static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx) static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
{ {
DBUG_ENTER_FUNC(); DBUG_ENTER_FUNC();
...@@ -2895,6 +2965,16 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx) ...@@ -2895,6 +2965,16 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
if (tx != nullptr) { if (tx != nullptr) {
if (commit_tx || (!my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | if (commit_tx || (!my_core::thd_test_options(thd, OPTION_NOT_AUTOCOMMIT |
OPTION_BEGIN))) { OPTION_BEGIN))) {
/*
This will not add anything to commit_latency_stats, and this is correct
right?
*/
if (tx->commit_ordered_done)
{
thd_wakeup_subsequent_commits(thd, 0);
DBUG_RETURN((tx->commit_ordered_res? HA_ERR_INTERNAL_ERROR: 0));
}
/* /*
We get here We get here
- For a COMMIT statement that finishes a multi-statement transaction - For a COMMIT statement that finishes a multi-statement transaction
...@@ -2903,6 +2983,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx) ...@@ -2903,6 +2983,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
if (tx->commit()) { if (tx->commit()) {
DBUG_RETURN(HA_ERR_ROCKSDB_COMMIT_FAILED); DBUG_RETURN(HA_ERR_ROCKSDB_COMMIT_FAILED);
} }
thd_wakeup_subsequent_commits(thd, 0);
} else { } else {
/* /*
We get here when committing a statement within a transaction. We get here when committing a statement within a transaction.
...@@ -2926,6 +3007,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx) ...@@ -2926,6 +3007,7 @@ static int rocksdb_commit(handlerton* hton, THD* thd, bool commit_tx)
DBUG_RETURN(HA_EXIT_SUCCESS); DBUG_RETURN(HA_EXIT_SUCCESS);
} }
static int rocksdb_rollback(handlerton *const hton, THD *const thd, static int rocksdb_rollback(handlerton *const hton, THD *const thd,
bool rollback_tx) { bool rollback_tx) {
Rdb_perf_context_guard guard(thd); Rdb_perf_context_guard guard(thd);
...@@ -3642,11 +3724,19 @@ static int rocksdb_init_func(void *const p) { ...@@ -3642,11 +3724,19 @@ static int rocksdb_init_func(void *const p) {
rocksdb_hton->state = SHOW_OPTION_YES; rocksdb_hton->state = SHOW_OPTION_YES;
rocksdb_hton->create = rocksdb_create_handler; rocksdb_hton->create = rocksdb_create_handler;
rocksdb_hton->close_connection = rocksdb_close_connection; rocksdb_hton->close_connection = rocksdb_close_connection;
rocksdb_hton->prepare = rocksdb_prepare; rocksdb_hton->prepare = rocksdb_prepare;
rocksdb_hton->prepare_ordered = NULL; // Do not need it
rocksdb_hton->commit_by_xid = rocksdb_commit_by_xid; rocksdb_hton->commit_by_xid = rocksdb_commit_by_xid;
rocksdb_hton->rollback_by_xid = rocksdb_rollback_by_xid; rocksdb_hton->rollback_by_xid = rocksdb_rollback_by_xid;
rocksdb_hton->recover = rocksdb_recover; rocksdb_hton->recover = rocksdb_recover;
rocksdb_hton->commit_ordered= rocksdb_commit_ordered;
rocksdb_hton->commit = rocksdb_commit; rocksdb_hton->commit = rocksdb_commit;
rocksdb_hton->commit_checkpoint_request= rocksdb_checkpoint_request;
rocksdb_hton->rollback = rocksdb_rollback; rocksdb_hton->rollback = rocksdb_rollback;
rocksdb_hton->show_status = rocksdb_show_status; rocksdb_hton->show_status = rocksdb_show_status;
rocksdb_hton->start_consistent_snapshot = rocksdb_hton->start_consistent_snapshot =
...@@ -3974,7 +4064,6 @@ static int rocksdb_init_func(void *const p) { ...@@ -3974,7 +4064,6 @@ static int rocksdb_init_func(void *const p) {
if (myrocks::rocksdb_wal_dir && *myrocks::rocksdb_wal_dir) { if (myrocks::rocksdb_wal_dir && *myrocks::rocksdb_wal_dir) {
directories.push_back(myrocks::rocksdb_wal_dir); directories.push_back(myrocks::rocksdb_wal_dir);
} }
#ifndef _WIN32 #ifndef _WIN32
io_watchdog = new Rdb_io_watchdog(directories); io_watchdog = new Rdb_io_watchdog(directories);
io_watchdog->reset_timeout(rocksdb_io_write_timeout_secs); io_watchdog->reset_timeout(rocksdb_io_write_timeout_secs);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment