Commit 216d99bb authored by Andrei's avatar Andrei

MDEV-26071: rpl.rpl_perfschema_applier_status_by_worker failed in bb …

…with: Test assertion failed

Problem:
=======
Assertion text: 'Value returned by SSS and PS table for Last_Error_Number
should be same.'
Assertion condition: '"1146" = "0"'
Assertion condition, interpolated: '"1146" = "0"'
Assertion result: '0'

Analysis:
========
In parallel replication when slave is started the worker pool gets
activated and it gets cleared when slave stops. Each time the worker pool
gets activated a backup worker pool also gets created to store worker
specific perforance schema information in case of errors. On error, all
relevant information is copied from rpl_parallel_thread to rli and it gets
cleared from thread.  Then server waits for all workers to complete their
work, during this stage performance schema table specific worker info is
stored into the backup pool and finally the actual pool gets cleared. If
users query the performance schema table to know the status of workers the
information from backup pool will be used. The test simulates
ER_NO_SUCH_TABLE error and verifies the worker information in pfs table.
Test works fine if execution occurs in following order.

Step 1. Error occurred 'worker information is copied to backup pool'.
Step 2. handle_slave_sql invokes 'rpl_parallel_resize_pool_if_no_slaves' to
deactivate worker pool, it marks the pool->count=0
Step 3. PFS table is queried, since actual pool is deactivated backup pool
information is read.

If the Step 3 happens prior to Step2 the pool is yet to be deactivated and
the actual pool is read, which doesn't have any error details as they were
cleared. Hence test ocasionally fails.

Fix:
===
Upon error mark the back pool as being active so that if PFS table is
quried since the backup pool is flagged as valid its information will be
read, in case it is not flagged regular pool will be read.

This work is one of the last pieces created by the late Sujatha Sivakumar.
parent e06c6046
...@@ -1781,6 +1781,7 @@ rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool) ...@@ -1781,6 +1781,7 @@ rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool)
} }
else else
bkp->init(pool->count); bkp->init(pool->count);
bkp->is_valid= false; // Mark backup as stale during pool init
} }
} }
...@@ -2050,7 +2051,7 @@ rpl_parallel_thread::rpl_parallel_thread() ...@@ -2050,7 +2051,7 @@ rpl_parallel_thread::rpl_parallel_thread()
rpl_parallel_thread_pool::rpl_parallel_thread_pool() rpl_parallel_thread_pool::rpl_parallel_thread_pool()
: threads(0), free_list(0), count(0), inited(false), busy(false), : threads(0), free_list(0), count(0), inited(false), busy(false),
pfs_bkp{0, false, NULL} pfs_bkp{0, false, false, NULL}
{ {
} }
...@@ -2179,6 +2180,7 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli) ...@@ -2179,6 +2180,7 @@ rpl_parallel_thread_pool::copy_pool_for_pfs(Relay_log_info *rli)
pfs_rpt->worker_idle_time= rpt->get_worker_idle_time(); pfs_rpt->worker_idle_time= rpt->get_worker_idle_time();
pfs_rpt->last_trans_retry_count= rpt->last_trans_retry_count; pfs_rpt->last_trans_retry_count= rpt->last_trans_retry_count;
} }
pfs_bkp.is_valid= true;
} }
} }
......
...@@ -260,7 +260,7 @@ struct rpl_parallel_thread { ...@@ -260,7 +260,7 @@ struct rpl_parallel_thread {
struct pool_bkp_for_pfs{ struct pool_bkp_for_pfs{
uint32 count; uint32 count;
bool inited; bool inited, is_valid;
struct rpl_parallel_thread **rpl_thread_arr; struct rpl_parallel_thread **rpl_thread_arr;
void init(uint32 thd_count) void init(uint32 thd_count)
{ {
...@@ -287,6 +287,7 @@ struct pool_bkp_for_pfs{ ...@@ -287,6 +287,7 @@ struct pool_bkp_for_pfs{
my_free(rpl_thread_arr); my_free(rpl_thread_arr);
rpl_thread_arr= NULL; rpl_thread_arr= NULL;
} }
inited= false;
} }
}; };
......
...@@ -100,72 +100,67 @@ ha_rows table_replication_applier_status_by_worker::get_row_count() ...@@ -100,72 +100,67 @@ ha_rows table_replication_applier_status_by_worker::get_row_count()
int table_replication_applier_status_by_worker::rnd_next(void) int table_replication_applier_status_by_worker::rnd_next(void)
{ {
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
if (pool->inited && pool->count) struct pool_bkp_for_pfs *bkp_pool= &pool->pfs_bkp;
{
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
uint worker_count= pool->count; if (bkp_pool->inited && bkp_pool->count && bkp_pool->is_valid)
{
for (m_pos.set_at(&m_next_pos); for (m_pos.set_at(&m_next_pos);
m_pos.has_more_workers(worker_count); m_pos.has_more_workers(bkp_pool->count);
m_pos.next_worker()) m_pos.next_worker())
{ {
rpl_parallel_thread *rpt= pool->threads[m_pos.m_index]; rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index];
make_row(rpt); make_row(rpt);
m_next_pos.set_after(&m_pos); m_next_pos.set_after(&m_pos);
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
return 0; return 0;
} }
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
} }
else else
{ {
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); if (pool->inited && pool->count)
struct pool_bkp_for_pfs *bkp_pool= &pool->pfs_bkp;
if (bkp_pool->inited && bkp_pool->count)
{ {
uint worker_count= pool->count;
for (m_pos.set_at(&m_next_pos); for (m_pos.set_at(&m_next_pos);
m_pos.has_more_workers(bkp_pool->count); m_pos.has_more_workers(worker_count);
m_pos.next_worker()) m_pos.next_worker())
{ {
rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index]; rpl_parallel_thread *rpt= pool->threads[m_pos.m_index];
make_row(rpt); make_row(rpt);
m_next_pos.set_after(&m_pos); m_next_pos.set_after(&m_pos);
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
return 0; return 0;
} }
} }
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
} }
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
return HA_ERR_END_OF_FILE; return HA_ERR_END_OF_FILE;
} }
int table_replication_applier_status_by_worker::rnd_pos(const void *pos) int table_replication_applier_status_by_worker::rnd_pos(const void *pos)
{ {
int res= HA_ERR_RECORD_DELETED; int res= HA_ERR_RECORD_DELETED;
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
struct pool_bkp_for_pfs *bkp_pool= &pool->pfs_bkp;
set_position(pos); set_position(pos);
if (global_rpl_thread_pool.inited && global_rpl_thread_pool.count)
{
rpl_parallel_thread_pool *pool= &global_rpl_thread_pool;
mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); mysql_mutex_lock(&pool->LOCK_rpl_thread_pool);
if(m_pos.m_index < pool->count) if (bkp_pool->inited && bkp_pool->count && bkp_pool->is_valid
&& m_pos.m_index < bkp_pool->count)
{ {
rpl_parallel_thread *rpt= pool->threads[m_pos.m_index]; rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index];
make_row(rpt); make_row(rpt);
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
res= 0; res= 0;
} }
}
else else
{ {
struct pool_bkp_for_pfs *bkp_pool= &global_rpl_thread_pool.pfs_bkp; if (pool->inited && pool->count && m_pos.m_index < pool->count)
if (bkp_pool->inited && bkp_pool->count && m_pos.m_index < bkp_pool->count)
{ {
rpl_parallel_thread *rpt= bkp_pool->rpl_thread_arr[m_pos.m_index]; rpl_parallel_thread *rpt= pool->threads[m_pos.m_index];
make_row(rpt); make_row(rpt);
res= 0; res= 0;
} }
} }
mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool);
return res; return res;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment