Commit 040b840d authored by Teemu Ollakka's avatar Teemu Ollakka

MDEV-15740 Backport wsrep recovery fixes from 10.4.

Clear wsrep XID in innobase_rollback_by_xid() for recovered wsrep
transaction in order to avoid resetting XID storage when rolling back
wsrep transaction during recovery.

Sort wsrep XIDs read from storage engine in ascending order and
erify that the range is continuous during crash recovery. If binlog is off,
commit all recovered transactions for continuous seqno range. This is safe
because all transactions with wsrep XID have been certified and must be
committed in the cluster. On the other hand if binlog is on, respect binlog
as a transaction coordinator in order to avoid missing transactions in binlog
that have been committed into storage engine .
parent ce28fa53
......@@ -1827,6 +1827,35 @@ static char* xid_to_str(char *buf, XID *xid)
}
#endif
#ifdef WITH_WSREP
static my_xid wsrep_order_and_check_continuity(XID *list, int len)
{
wsrep_sort_xid_array(list, len);
wsrep_uuid_t uuid;
wsrep_seqno_t seqno;
if (wsrep_get_SE_checkpoint(uuid, seqno))
{
WSREP_ERROR("Could not read wsrep SE checkpoint for recovery");
return 0;
}
long long cur_seqno= seqno;
for (int i= 0; i < len; ++i)
{
if (!wsrep_is_wsrep_xid(list + i) ||
wsrep_xid_seqno(*(list + i)) != cur_seqno + 1)
{
WSREP_WARN("Discovered discontinuity in recovered wsrep "
"transaction XIDs. Truncating the recovery list to "
"%d entries", i);
break;
}
++cur_seqno;
}
WSREP_INFO("Last wsrep seqno to be recovered %lld", cur_seqno);
return (cur_seqno < 0 ? 0 : cur_seqno);
}
#endif /* WITH_WSREP */
/**
recover() step of xa.
......@@ -1864,6 +1893,19 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin,
{
sql_print_information("Found %d prepared transaction(s) in %s",
got, hton_name(hton)->str);
#ifdef WITH_WSREP
/* If wsrep_on=ON, XIDs are first ordered and then the range of
recovered XIDs is checked for continuity. All the XIDs which
are in continuous range can be safely committed if binlog
is off since they have already ordered and certified in the
cluster. */
my_xid wsrep_limit= 0;
if (WSREP_ON)
{
wsrep_limit= wsrep_order_and_check_continuity(info->list, got);
}
#endif /* WITH_WSREP */
for (int i=0; i < got; i ++)
{
my_xid x= WSREP_ON && wsrep_is_wsrep_xid(&info->list[i]) ?
......@@ -1885,9 +1927,12 @@ static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin,
continue;
}
// recovery mode
if (info->commit_list ?
my_hash_search(info->commit_list, (uchar *)&x, sizeof(x)) != 0 :
tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT)
if (IF_WSREP((wsrep_emulate_bin_log &&
wsrep_is_wsrep_xid(info->list + i) &&
x <= wsrep_limit), false) ||
(info->commit_list ?
my_hash_search(info->commit_list, (uchar *)&x, sizeof(x)) != 0 :
tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT))
{
#ifndef DBUG_OFF
int rc=
......
......@@ -20,6 +20,8 @@
#include "sql_class.h"
#include "wsrep_mysqld.h" // for logging macros
#include <algorithm> /* std::sort() */
/*
* WSREPXid
*/
......@@ -154,3 +156,35 @@ bool wsrep_get_SE_checkpoint(wsrep_uuid_t& uuid, wsrep_seqno_t& seqno)
return false;
}
/*
Sort order for XIDs. Wsrep XIDs are sorted according to
seqno in ascending order. Non-wsrep XIDs are considered
equal among themselves and greater than with respect
to wsrep XIDs.
*/
struct Wsrep_xid_cmp
{
bool operator()(const XID& left, const XID& right) const
{
const bool left_is_wsrep= wsrep_is_wsrep_xid(&left);
const bool right_is_wsrep= wsrep_is_wsrep_xid(&right);
if (left_is_wsrep && right_is_wsrep)
{
return (wsrep_xid_seqno(left) < wsrep_xid_seqno(right));
}
else if (left_is_wsrep)
{
return true;
}
else
{
return false;
}
}
};
void wsrep_sort_xid_array(XID *array, int len)
{
std::sort(array, array + len, Wsrep_xid_cmp());
}
......@@ -32,5 +32,7 @@ bool wsrep_get_SE_checkpoint(wsrep_uuid_t&, wsrep_seqno_t&);
//void wsrep_set_SE_checkpoint(XID&); /* uncomment if needed */
bool wsrep_set_SE_checkpoint(const wsrep_uuid_t&, wsrep_seqno_t);
void wsrep_sort_xid_array(XID *array, int len);
#endif /* WITH_WSREP */
#endif /* WSREP_UTILS_H */
......@@ -17260,6 +17260,14 @@ innobase_rollback_by_xid(
}
if (trx_t* trx = trx_get_trx_by_xid(xid)) {
#ifdef WITH_WSREP
/* If a wsrep transaction is being rolled back during
the recovery, we must clear the xid in order to avoid
writing serialisation history for rolled back transaction. */
if (wsrep_is_wsrep_xid(trx->xid)) {
trx->xid->null();
}
#endif /* WITH_WSREP */
int ret = innobase_rollback_trx(trx);
trx_deregister_from_2pc(trx);
ut_ad(!trx->will_lock);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment