Commit f06a0b53 authored by Teemu Ollakka's avatar Teemu Ollakka

Implement wsrep_load_data_splitting with streaming replication

If wsrep_load_data_splitting is configured, change streaming replication
parameters internally to match the original behavior, i.e. replicate
on every 10000 rows. After load data is over, restore original
streaming replication settings.

Removed redundant wsrep_tc_log_commit().
parent ce6505f8
......@@ -100,41 +100,39 @@ class Term_string
#define PUSH(A) *(stack_pos++)=(A)
#ifdef WITH_WSREP
/** If requested by wsrep_load_data_splitting, commit and restart
the transaction after every 10,000 inserted rows. */
static bool wsrep_load_data_split(THD *thd, const TABLE *table,
const COPY_INFO &info)
/** If requested by wsrep_load_data_splitting and streaming replication is
not enabled, replicate a streaming fragment every 10,000 rows.*/
class Wsrep_load_data_split
{
DBUG_ENTER("wsrep_load_data_split");
if (!wsrep_load_data_splitting || !WSREP(thd)
|| !info.records || (info.records % 10000)
|| !thd->transaction.stmt.ha_list
|| thd->transaction.stmt.ha_list->ht() != binlog_hton
|| !thd->transaction.stmt.ha_list->next()
|| thd->transaction.stmt.ha_list->next()->next())
DBUG_RETURN(false);
if (handlerton* hton= thd->transaction.stmt.ha_list->next()->ht())
public:
Wsrep_load_data_split(THD *thd)
: m_thd(thd)
, m_load_data_splitting(wsrep_load_data_splitting)
, m_fragment_unit(thd->wsrep_trx().streaming_context().fragment_unit())
, m_fragment_size(thd->wsrep_trx().streaming_context().fragment_size())
{
if (!(hton->flags & HTON_WSREP_REPLICATION))
DBUG_RETURN(false);
WSREP_DEBUG("intermediate transaction commit in LOAD DATA");
wsrep_tc_log_commit(thd);
table->file->extra(HA_EXTRA_FAKE_START_STMT);
if (WSREP(m_thd) && m_load_data_splitting)
{
/* Override streaming settings with backward compatible values for
load data splitting */
m_thd->wsrep_cs().streaming_params(wsrep::streaming_context::row, 10000);
}
}
DBUG_RETURN(false);
}
# define WSREP_LOAD_DATA_SPLIT(thd,table,info) \
if (wsrep_load_data_split(thd,table,info)) \
{ \
table->auto_increment_field_not_null= FALSE; \
DBUG_RETURN(1); \
}
#else /* WITH_WSREP */
#define WSREP_LOAD_DATA_SPLIT(thd,table,info) /* empty */
~Wsrep_load_data_split()
{
if (WSREP(m_thd) && m_load_data_splitting)
{
/* Restore original settings */
m_thd->wsrep_cs().streaming_params(m_fragment_unit, m_fragment_size);
}
}
private:
THD *m_thd;
my_bool m_load_data_splitting;
enum wsrep::streaming_context::fragment_unit m_fragment_unit;
size_t m_fragment_size;
};
#endif /* WITH_WSREP */
class READ_INFO: public Load_data_param
......@@ -354,6 +352,9 @@ int mysql_load(THD *thd, const sql_exchange *ex, TABLE_LIST *table_list,
bool transactional_table __attribute__((unused));
DBUG_ENTER("mysql_load");
#ifdef WITH_WSREP
Wsrep_load_data_split wsrep_load_data_split(thd);
#endif /* WITH_WSREP */
/*
Bug #34283
mysqlbinlog leaves tmpfile after termination if binlog contains
......@@ -1005,7 +1006,6 @@ read_fixed_length(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= FALSE;
if (err)
......@@ -1148,7 +1148,6 @@ read_sep_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= FALSE;
if (err)
......@@ -1271,7 +1270,6 @@ read_xml_field(THD *thd, COPY_INFO &info, TABLE_LIST *table_list,
DBUG_RETURN(-1);
}
WSREP_LOAD_DATA_SPLIT(thd, table, info);
err= write_record(thd, table, &info);
table->auto_increment_field_not_null= false;
if (err)
......
......@@ -2499,57 +2499,6 @@ int wsrep_ordered_commit_if_no_binlog(THD* thd, bool all)
return 0;
}
wsrep_status_t wsrep_tc_log_commit(THD* thd)
{
int cookie;
my_xid xid= thd->transaction.xid_state.xid.get_my_xid();
DBUG_ASSERT(thd->lex->sql_command == SQLCOM_LOAD);
if (wsrep_before_commit(thd, true))
{
WSREP_DEBUG("wsrep_tc_log_commit: wsrep_before_commit failed %llu",
thd->thread_id);
return WSREP_TRX_FAIL;
}
cookie= tc_log->log_and_order(thd, xid, 1, false, true);
if (wsrep_after_commit(thd, true))
{
WSREP_DEBUG("wsrep_tc_log_commit: wsrep_after_commit failed %llu",
thd->thread_id);
return WSREP_TRX_FAIL;
}
if (!cookie)
{
WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie);
return WSREP_TRX_FAIL;
}
if (tc_log->unlog(cookie, xid))
{
WSREP_DEBUG("log_and_order has failed %llu %d", thd->thread_id, cookie);
return WSREP_TRX_FAIL;
}
if (wsrep_after_statement(thd))
{
return WSREP_TRX_FAIL;
}
/* Set wsrep transaction id if not set. */
if (thd->wsrep_trx_id() == WSREP_UNDEFINED_TRX_ID)
{
if (thd->wsrep_next_trx_id() == WSREP_UNDEFINED_TRX_ID)
{
thd->set_wsrep_next_trx_id(thd->query_id);
}
DBUG_ASSERT(thd->wsrep_next_trx_id() != WSREP_UNDEFINED_TRX_ID);
}
if (wsrep_start_transaction(thd, thd->wsrep_next_trx_id()))
{
return WSREP_TRX_FAIL;
}
DBUG_ASSERT(thd->wsrep_trx_id() != WSREP_UNDEFINED_TRX_ID);
return WSREP_OK;
}
int wsrep_thd_retry_counter(const THD *thd)
{
return thd->wsrep_retry_counter;
......
......@@ -444,15 +444,6 @@ bool wsrep_provider_is_SR_capable();
*/
int wsrep_ordered_commit_if_no_binlog(THD*, bool);
/**
* Commit the current transaction with the
* MySQL "Transaction Coordinator Log" (see `class TC_LOG` in sql/log.h).
* Calling this function will generate and assign a new wsrep transaction id
* for `thd`.
* @return WSREP_OK on success or other WSREP_* error code on failure
*/
wsrep_status_t wsrep_tc_log_commit(THD* thd);
/**
* Initialize WSREP server instance.
*
......
Subproject commit e7d72ae7f6a6995a21d743389426a963429a1fff
Subproject commit 20b52ff1ddc3b2f547b7081471f46dcfa5efabc7
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment