Commit f4ba7759 authored by Daniele Sciascia's avatar Daniele Sciascia Committed by Jan Lindström

MDEV-17099 Preliminary changes for Galera XA support (#1404)

Redo changes reverted in commit
8f46e383, this time without build
issues in wsrep-lib.
parent 576c96a9
......@@ -10768,13 +10768,13 @@ bool wsrep_stmt_rollback_is_safe(THD* thd)
binlog_cache_data * trx_cache = &cache_mngr->trx_cache;
if (thd->wsrep_sr().fragments_certified() > 0 &&
(trx_cache->get_prev_position() == MY_OFF_T_UNDEF ||
trx_cache->get_prev_position() < thd->wsrep_sr().bytes_certified()))
trx_cache->get_prev_position() < thd->wsrep_sr().log_position()))
{
WSREP_DEBUG("statement rollback is not safe for streaming replication"
" pre-stmt_pos: %llu, frag repl pos: %zu\n"
"Thread: %llu, SQL: %s",
trx_cache->get_prev_position(),
thd->wsrep_sr().bytes_certified(),
thd->wsrep_sr().log_position(),
thd->thread_id, thd->query());
ret = false;
}
......
......@@ -123,7 +123,7 @@ static int wsrep_write_cache_inc(THD* const thd,
DBUG_ENTER("wsrep_write_cache_inc");
my_off_t const saved_pos(my_b_tell(cache));
if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().bytes_certified(), 0, 0))
if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().log_position(), 0, 0))
{
WSREP_ERROR("failed to initialize io-cache");
DBUG_RETURN(1);;
......@@ -158,7 +158,7 @@ static int wsrep_write_cache_inc(THD* const thd,
}
if (ret == 0)
{
assert(total_length + thd->wsrep_sr().bytes_certified() == saved_pos);
assert(total_length + thd->wsrep_sr().log_position() == saved_pos);
}
cleanup:
......
......@@ -138,7 +138,8 @@ void Wsrep_client_service::cleanup_transaction()
}
int Wsrep_client_service::prepare_fragment_for_replication(wsrep::mutable_buffer& buffer)
int Wsrep_client_service::prepare_fragment_for_replication(
wsrep::mutable_buffer& buffer, size_t& log_position)
{
DBUG_ASSERT(m_thd == current_thd);
THD* thd= m_thd;
......@@ -152,7 +153,7 @@ int Wsrep_client_service::prepare_fragment_for_replication(wsrep::mutable_buffer
}
const my_off_t saved_pos(my_b_tell(cache));
if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().bytes_certified(), 0, 0))
if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().log_position(), 0, 0))
{
DBUG_RETURN(1);
}
......@@ -186,6 +187,7 @@ int Wsrep_client_service::prepare_fragment_for_replication(wsrep::mutable_buffer
while (cache->file >= 0 && (length= my_b_fill(cache)));
}
DBUG_ASSERT(total_length == buffer.size());
log_position= saved_pos;
cleanup:
if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
{
......
......@@ -43,7 +43,7 @@ class Wsrep_client_service : public wsrep::client_service
void cleanup_transaction();
bool statement_allowed_for_streaming() const;
size_t bytes_generated() const;
int prepare_fragment_for_replication(wsrep::mutable_buffer&);
int prepare_fragment_for_replication(wsrep::mutable_buffer&, size_t&);
int remove_fragments();
void emergency_shutdown()
{ throw wsrep::not_implemented_error(); }
......
......@@ -205,6 +205,12 @@ const wsrep::transaction& Wsrep_high_priority_service::transaction() const
DBUG_RETURN(m_thd->wsrep_trx());
}
int Wsrep_high_priority_service::next_fragment(const wsrep::ws_meta& ws_meta)
{
DBUG_ENTER(" Wsrep_high_priority_service::next_fragment");
DBUG_RETURN(m_thd->wsrep_cs().next_fragment(ws_meta));
}
int Wsrep_high_priority_service::adopt_transaction(
const wsrep::transaction& transaction)
{
......@@ -225,7 +231,8 @@ int Wsrep_high_priority_service::adopt_transaction(
int Wsrep_high_priority_service::append_fragment_and_commit(
const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data)
const wsrep::const_buffer& data,
const wsrep::xid& xid WSREP_UNUSED)
{
DBUG_ENTER("Wsrep_high_priority_service::append_fragment_and_commit");
int ret= start_transaction(ws_handle, ws_meta);
......
......@@ -35,13 +35,15 @@ class Wsrep_high_priority_service :
~Wsrep_high_priority_service();
int start_transaction(const wsrep::ws_handle&,
const wsrep::ws_meta&);
int next_fragment(const wsrep::ws_meta&);
const wsrep::transaction& transaction() const;
int adopt_transaction(const wsrep::transaction&);
int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&,
wsrep::mutable_buffer&) = 0;
int append_fragment_and_commit(const wsrep::ws_handle&,
const wsrep::ws_meta&,
const wsrep::const_buffer&);
const wsrep::const_buffer&,
const wsrep::xid&);
int remove_fragments(const wsrep::ws_meta&);
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&);
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&);
......
......@@ -100,7 +100,8 @@ void Wsrep_storage_service::adopt_transaction(const wsrep::transaction& transact
int Wsrep_storage_service::append_fragment(const wsrep::id& server_id,
wsrep::transaction_id transaction_id,
int flags,
const wsrep::const_buffer& data)
const wsrep::const_buffer& data,
const wsrep::xid& xid WSREP_UNUSED)
{
DBUG_ENTER("Wsrep_storage_service::append_fragment");
DBUG_ASSERT(m_thd == current_thd);
......
......@@ -33,7 +33,8 @@ class Wsrep_storage_service :
int append_fragment(const wsrep::id&,
wsrep::transaction_id,
int flags,
const wsrep::const_buffer&);
const wsrep::const_buffer&,
const wsrep::xid&);
int update_fragment_meta(const wsrep::ws_meta&);
int remove_fragments();
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&);
......
Subproject commit 58aa3e821f575532870c5f76f6f1cf833458eed4
Subproject commit c9513bd2e49abbc0941689af1f2f2636a25e253e
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment