Commit 2b5f4b3e authored by Daniele Sciascia's avatar Daniele Sciascia Committed by Jan Lindström

MDEV-17099 Preliminary changes for Galera XA support (#1401)

Update wsrep-lib, and adapt to wsrep-lib interface changes.
parent 82f22d2f
...@@ -10768,13 +10768,13 @@ bool wsrep_stmt_rollback_is_safe(THD* thd) ...@@ -10768,13 +10768,13 @@ bool wsrep_stmt_rollback_is_safe(THD* thd)
binlog_cache_data * trx_cache = &cache_mngr->trx_cache; binlog_cache_data * trx_cache = &cache_mngr->trx_cache;
if (thd->wsrep_sr().fragments_certified() > 0 && if (thd->wsrep_sr().fragments_certified() > 0 &&
(trx_cache->get_prev_position() == MY_OFF_T_UNDEF || (trx_cache->get_prev_position() == MY_OFF_T_UNDEF ||
trx_cache->get_prev_position() < thd->wsrep_sr().bytes_certified())) trx_cache->get_prev_position() < thd->wsrep_sr().log_position()))
{ {
WSREP_DEBUG("statement rollback is not safe for streaming replication" WSREP_DEBUG("statement rollback is not safe for streaming replication"
" pre-stmt_pos: %llu, frag repl pos: %zu\n" " pre-stmt_pos: %llu, frag repl pos: %zu\n"
"Thread: %llu, SQL: %s", "Thread: %llu, SQL: %s",
trx_cache->get_prev_position(), trx_cache->get_prev_position(),
thd->wsrep_sr().bytes_certified(), thd->wsrep_sr().log_position(),
thd->thread_id, thd->query()); thd->thread_id, thd->query());
ret = false; ret = false;
} }
......
...@@ -123,7 +123,7 @@ static int wsrep_write_cache_inc(THD* const thd, ...@@ -123,7 +123,7 @@ static int wsrep_write_cache_inc(THD* const thd,
DBUG_ENTER("wsrep_write_cache_inc"); DBUG_ENTER("wsrep_write_cache_inc");
my_off_t const saved_pos(my_b_tell(cache)); my_off_t const saved_pos(my_b_tell(cache));
if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().bytes_certified(), 0, 0)) if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().log_position(), 0, 0))
{ {
WSREP_ERROR("failed to initialize io-cache"); WSREP_ERROR("failed to initialize io-cache");
DBUG_RETURN(1);; DBUG_RETURN(1);;
...@@ -158,7 +158,7 @@ static int wsrep_write_cache_inc(THD* const thd, ...@@ -158,7 +158,7 @@ static int wsrep_write_cache_inc(THD* const thd,
} }
if (ret == 0) if (ret == 0)
{ {
assert(total_length + thd->wsrep_sr().bytes_certified() == saved_pos); assert(total_length + thd->wsrep_sr().log_position() == saved_pos);
} }
cleanup: cleanup:
......
...@@ -138,7 +138,8 @@ void Wsrep_client_service::cleanup_transaction() ...@@ -138,7 +138,8 @@ void Wsrep_client_service::cleanup_transaction()
} }
int Wsrep_client_service::prepare_fragment_for_replication(wsrep::mutable_buffer& buffer) int Wsrep_client_service::prepare_fragment_for_replication(
wsrep::mutable_buffer& buffer, size_t& log_position)
{ {
DBUG_ASSERT(m_thd == current_thd); DBUG_ASSERT(m_thd == current_thd);
THD* thd= m_thd; THD* thd= m_thd;
...@@ -152,7 +153,7 @@ int Wsrep_client_service::prepare_fragment_for_replication(wsrep::mutable_buffer ...@@ -152,7 +153,7 @@ int Wsrep_client_service::prepare_fragment_for_replication(wsrep::mutable_buffer
} }
const my_off_t saved_pos(my_b_tell(cache)); const my_off_t saved_pos(my_b_tell(cache));
if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().bytes_certified(), 0, 0)) if (reinit_io_cache(cache, READ_CACHE, thd->wsrep_sr().log_position(), 0, 0))
{ {
DBUG_RETURN(1); DBUG_RETURN(1);
} }
...@@ -186,6 +187,7 @@ int Wsrep_client_service::prepare_fragment_for_replication(wsrep::mutable_buffer ...@@ -186,6 +187,7 @@ int Wsrep_client_service::prepare_fragment_for_replication(wsrep::mutable_buffer
while (cache->file >= 0 && (length= my_b_fill(cache))); while (cache->file >= 0 && (length= my_b_fill(cache)));
} }
DBUG_ASSERT(total_length == buffer.size()); DBUG_ASSERT(total_length == buffer.size());
log_position= saved_pos;
cleanup: cleanup:
if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0)) if (reinit_io_cache(cache, WRITE_CACHE, saved_pos, 0, 0))
{ {
......
...@@ -43,7 +43,7 @@ class Wsrep_client_service : public wsrep::client_service ...@@ -43,7 +43,7 @@ class Wsrep_client_service : public wsrep::client_service
void cleanup_transaction(); void cleanup_transaction();
bool statement_allowed_for_streaming() const; bool statement_allowed_for_streaming() const;
size_t bytes_generated() const; size_t bytes_generated() const;
int prepare_fragment_for_replication(wsrep::mutable_buffer&); int prepare_fragment_for_replication(wsrep::mutable_buffer&, size_t&);
int remove_fragments(); int remove_fragments();
void emergency_shutdown() void emergency_shutdown()
{ throw wsrep::not_implemented_error(); } { throw wsrep::not_implemented_error(); }
......
...@@ -205,6 +205,12 @@ const wsrep::transaction& Wsrep_high_priority_service::transaction() const ...@@ -205,6 +205,12 @@ const wsrep::transaction& Wsrep_high_priority_service::transaction() const
DBUG_RETURN(m_thd->wsrep_trx()); DBUG_RETURN(m_thd->wsrep_trx());
} }
int Wsrep_high_priority_service::next_fragment(const wsrep::ws_meta& ws_meta)
{
DBUG_ENTER(" Wsrep_high_priority_service::next_fragment");
DBUG_RETURN(m_thd->wsrep_cs().next_fragment(ws_meta));
}
int Wsrep_high_priority_service::adopt_transaction( int Wsrep_high_priority_service::adopt_transaction(
const wsrep::transaction& transaction) const wsrep::transaction& transaction)
{ {
...@@ -225,7 +231,8 @@ int Wsrep_high_priority_service::adopt_transaction( ...@@ -225,7 +231,8 @@ int Wsrep_high_priority_service::adopt_transaction(
int Wsrep_high_priority_service::append_fragment_and_commit( int Wsrep_high_priority_service::append_fragment_and_commit(
const wsrep::ws_handle& ws_handle, const wsrep::ws_handle& ws_handle,
const wsrep::ws_meta& ws_meta, const wsrep::ws_meta& ws_meta,
const wsrep::const_buffer& data) const wsrep::const_buffer& data,
const wsrep::xid& xid WSREP_UNUSED)
{ {
DBUG_ENTER("Wsrep_high_priority_service::append_fragment_and_commit"); DBUG_ENTER("Wsrep_high_priority_service::append_fragment_and_commit");
int ret= start_transaction(ws_handle, ws_meta); int ret= start_transaction(ws_handle, ws_meta);
......
...@@ -35,13 +35,15 @@ class Wsrep_high_priority_service : ...@@ -35,13 +35,15 @@ class Wsrep_high_priority_service :
~Wsrep_high_priority_service(); ~Wsrep_high_priority_service();
int start_transaction(const wsrep::ws_handle&, int start_transaction(const wsrep::ws_handle&,
const wsrep::ws_meta&); const wsrep::ws_meta&);
int next_fragment(const wsrep::ws_meta&);
const wsrep::transaction& transaction() const; const wsrep::transaction& transaction() const;
int adopt_transaction(const wsrep::transaction&); int adopt_transaction(const wsrep::transaction&);
int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&, int apply_write_set(const wsrep::ws_meta&, const wsrep::const_buffer&,
wsrep::mutable_buffer&) = 0; wsrep::mutable_buffer&) = 0;
int append_fragment_and_commit(const wsrep::ws_handle&, int append_fragment_and_commit(const wsrep::ws_handle&,
const wsrep::ws_meta&, const wsrep::ws_meta&,
const wsrep::const_buffer&); const wsrep::const_buffer&,
const wsrep::xid&);
int remove_fragments(const wsrep::ws_meta&); int remove_fragments(const wsrep::ws_meta&);
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&); int commit(const wsrep::ws_handle&, const wsrep::ws_meta&);
int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&); int rollback(const wsrep::ws_handle&, const wsrep::ws_meta&);
......
...@@ -100,7 +100,8 @@ void Wsrep_storage_service::adopt_transaction(const wsrep::transaction& transact ...@@ -100,7 +100,8 @@ void Wsrep_storage_service::adopt_transaction(const wsrep::transaction& transact
int Wsrep_storage_service::append_fragment(const wsrep::id& server_id, int Wsrep_storage_service::append_fragment(const wsrep::id& server_id,
wsrep::transaction_id transaction_id, wsrep::transaction_id transaction_id,
int flags, int flags,
const wsrep::const_buffer& data) const wsrep::const_buffer& data,
const wsrep::xid& xid WSREP_UNUSED)
{ {
DBUG_ENTER("Wsrep_storage_service::append_fragment"); DBUG_ENTER("Wsrep_storage_service::append_fragment");
DBUG_ASSERT(m_thd == current_thd); DBUG_ASSERT(m_thd == current_thd);
......
...@@ -33,7 +33,8 @@ class Wsrep_storage_service : ...@@ -33,7 +33,8 @@ class Wsrep_storage_service :
int append_fragment(const wsrep::id&, int append_fragment(const wsrep::id&,
wsrep::transaction_id, wsrep::transaction_id,
int flags, int flags,
const wsrep::const_buffer&); const wsrep::const_buffer&,
const wsrep::xid&);
int update_fragment_meta(const wsrep::ws_meta&); int update_fragment_meta(const wsrep::ws_meta&);
int remove_fragments(); int remove_fragments();
int commit(const wsrep::ws_handle&, const wsrep::ws_meta&); int commit(const wsrep::ws_handle&, const wsrep::ws_meta&);
......
Subproject commit 58aa3e821f575532870c5f76f6f1cf833458eed4 Subproject commit 66ee7bed1b7fa7b6d3f584517be681aa9ea15df9
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment