Commit 16ea692e authored by Monty's avatar Monty

MDEV-23586 Mariabackup: GTID saved for replication in 10.4.14 is wrong

MDEV-21953 deadlock between BACKUP STAGE BLOCK_COMMIT and parallel
replication

Fixed by partly reverting MDEV-21953 to put back MDL_BACKUP_COMMIT locking
before log_and_order.

The original problem for MDEV-21953 was that while a thread was waiting in
for another threads to commit in 'log_and_order', it had the
MDL_BACKUP_COMMIT lock. The backup thread was waiting to get the
MDL_BACKUP_WAIT_COMMIT lock, which blocks all new MDL_BACKUP_COMMIT locks.
This causes a deadlock as the waited-for thread can never get past the
MDL_BACKUP_COMMIT lock in ha_commit_trans.

The main part of the bug fix is to release the MDL_BACKUP_COMMIT lock while
a thread is waiting for other 'previous' threads to commit. This ensures
that no transactional thread keeps MDL_BACKUP_COMMIT while waiting, which
ensures that there are no deadlocks anymore.
parent 3cdbaa04
...@@ -114,7 +114,7 @@ static TYPELIB known_extensions= {0,"known_exts", NULL, NULL}; ...@@ -114,7 +114,7 @@ static TYPELIB known_extensions= {0,"known_exts", NULL, NULL};
uint known_extensions_id= 0; uint known_extensions_id= 0;
static int commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, static int commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans,
bool is_real_trans, bool rw_trans); bool is_real_trans);
static plugin_ref ha_default_plugin(THD *thd) static plugin_ref ha_default_plugin(THD *thd)
...@@ -1490,9 +1490,40 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1490,9 +1490,40 @@ int ha_commit_trans(THD *thd, bool all)
/* rw_trans is TRUE when we in a transaction changing data */ /* rw_trans is TRUE when we in a transaction changing data */
bool rw_trans= is_real_trans && bool rw_trans= is_real_trans &&
(rw_ha_count > (thd->is_current_stmt_binlog_disabled()?0U:1U)); (rw_ha_count > (thd->is_current_stmt_binlog_disabled()?0U:1U));
MDL_request mdl_backup;
DBUG_PRINT("info", ("is_real_trans: %d rw_trans: %d rw_ha_count: %d", DBUG_PRINT("info", ("is_real_trans: %d rw_trans: %d rw_ha_count: %d",
is_real_trans, rw_trans, rw_ha_count)); is_real_trans, rw_trans, rw_ha_count));
if (rw_trans)
{
/*
Acquire a metadata lock which will ensure that COMMIT is blocked
by an active FLUSH TABLES WITH READ LOCK (and vice versa:
COMMIT in progress blocks FTWRL).
We allow the owner of FTWRL to COMMIT; we assume that it knows
what it does.
*/
mdl_backup.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, MDL_EXPLICIT);
if (!WSREP(thd))
{
if (thd->mdl_context.acquire_lock(&mdl_backup,
thd->variables.lock_wait_timeout))
{
ha_rollback_trans(thd, all);
DBUG_RETURN(1);
}
thd->backup_commit_lock= &mdl_backup;
}
DEBUG_SYNC(thd, "ha_commit_trans_after_acquire_commit_lock");
/* Use shortcut as we already have the MDL_BACKUP_COMMIT lock */
ha_maria::implicit_commit(thd, TRUE);
}
else
ha_maria_implicit_commit(thd, TRUE);
if (rw_trans && if (rw_trans &&
opt_readonly && opt_readonly &&
!(thd->security_ctx->master_access & SUPER_ACL) && !(thd->security_ctx->master_access & SUPER_ACL) &&
...@@ -1532,7 +1563,7 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1532,7 +1563,7 @@ int ha_commit_trans(THD *thd, bool all)
// Here, the call will not commit inside InnoDB. It is only working // Here, the call will not commit inside InnoDB. It is only working
// around closing thd->transaction.stmt open by TR_table::open(). // around closing thd->transaction.stmt open by TR_table::open().
if (all) if (all)
commit_one_phase_2(thd, false, &thd->transaction.stmt, false, false); commit_one_phase_2(thd, false, &thd->transaction.stmt, false);
} }
} }
#endif #endif
...@@ -1552,7 +1583,7 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1552,7 +1583,7 @@ int ha_commit_trans(THD *thd, bool all)
goto wsrep_err; goto wsrep_err;
} }
#endif /* WITH_WSREP */ #endif /* WITH_WSREP */
error= ha_commit_one_phase(thd, all, rw_trans); error= ha_commit_one_phase(thd, all);
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (run_wsrep_hooks) if (run_wsrep_hooks)
error= error || wsrep_after_commit(thd, all); error= error || wsrep_after_commit(thd, all);
...@@ -1604,7 +1635,7 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1604,7 +1635,7 @@ int ha_commit_trans(THD *thd, bool all)
if (!is_real_trans) if (!is_real_trans)
{ {
error= commit_one_phase_2(thd, all, trans, is_real_trans, rw_trans); error= commit_one_phase_2(thd, all, trans, is_real_trans);
goto done; goto done;
} }
#ifdef WITH_WSREP #ifdef WITH_WSREP
...@@ -1622,7 +1653,7 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1622,7 +1653,7 @@ int ha_commit_trans(THD *thd, bool all)
DEBUG_SYNC(thd, "ha_commit_trans_after_log_and_order"); DEBUG_SYNC(thd, "ha_commit_trans_after_log_and_order");
DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE();); DBUG_EXECUTE_IF("crash_commit_after_log", DBUG_SUICIDE(););
error= commit_one_phase_2(thd, all, trans, is_real_trans, rw_trans) ? 2 : 0; error= commit_one_phase_2(thd, all, trans, is_real_trans) ? 2 : 0;
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (run_wsrep_hooks && (error || (error = wsrep_after_commit(thd, all)))) if (run_wsrep_hooks && (error || (error = wsrep_after_commit(thd, all))))
{ {
...@@ -1685,6 +1716,17 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1685,6 +1716,17 @@ int ha_commit_trans(THD *thd, bool all)
thd->rgi_slave->is_parallel_exec); thd->rgi_slave->is_parallel_exec);
} }
end: end:
if (mdl_backup.ticket)
{
/*
We do not always immediately release transactional locks
after ha_commit_trans() (see uses of ha_enable_transaction()),
thus we release the commit blocker lock as soon as it's
not needed.
*/
thd->mdl_context.release_lock(mdl_backup.ticket);
}
thd->backup_commit_lock= 0;
#ifdef WITH_WSREP #ifdef WITH_WSREP
if (wsrep_is_active(thd) && is_real_trans && !error && if (wsrep_is_active(thd) && is_real_trans && !error &&
(rw_ha_count == 0 || all) && (rw_ha_count == 0 || all) &&
...@@ -1699,8 +1741,8 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1699,8 +1741,8 @@ int ha_commit_trans(THD *thd, bool all)
/** /**
@note @note
This function does not care about global read lock. A caller should. This function does not care about global read lock or backup locks,
However backup locks are handled in commit_one_phase_2. the caller should.
@param[in] all Is set in case of explicit commit @param[in] all Is set in case of explicit commit
(COMMIT statement), or implicit commit (COMMIT statement), or implicit commit
...@@ -1709,7 +1751,7 @@ int ha_commit_trans(THD *thd, bool all) ...@@ -1709,7 +1751,7 @@ int ha_commit_trans(THD *thd, bool all)
autocommit=1. autocommit=1.
*/ */
int ha_commit_one_phase(THD *thd, bool all, bool rw_trans) int ha_commit_one_phase(THD *thd, bool all)
{ {
THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt; THD_TRANS *trans=all ? &thd->transaction.all : &thd->transaction.stmt;
/* /*
...@@ -1735,50 +1777,21 @@ int ha_commit_one_phase(THD *thd, bool all, bool rw_trans) ...@@ -1735,50 +1777,21 @@ int ha_commit_one_phase(THD *thd, bool all, bool rw_trans)
if ((res= thd->wait_for_prior_commit())) if ((res= thd->wait_for_prior_commit()))
DBUG_RETURN(res); DBUG_RETURN(res);
} }
res= commit_one_phase_2(thd, all, trans, is_real_trans, rw_trans); res= commit_one_phase_2(thd, all, trans, is_real_trans);
DBUG_RETURN(res); DBUG_RETURN(res);
} }
static int static int
commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans, commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans)
bool rw_trans)
{ {
int error= 0; int error= 0;
uint count= 0; uint count= 0;
Ha_trx_info *ha_info= trans->ha_list, *ha_info_next; Ha_trx_info *ha_info= trans->ha_list, *ha_info_next;
MDL_request mdl_request;
DBUG_ENTER("commit_one_phase_2"); DBUG_ENTER("commit_one_phase_2");
if (is_real_trans) if (is_real_trans)
DEBUG_SYNC(thd, "commit_one_phase_2"); DEBUG_SYNC(thd, "commit_one_phase_2");
if (rw_trans)
{
/*
Acquire a metadata lock which will ensure that COMMIT is blocked
by an active FLUSH TABLES WITH READ LOCK (and vice versa:
COMMIT in progress blocks FTWRL).
We allow the owner of FTWRL to COMMIT; we assume that it knows
what it does.
*/
mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, MDL_EXPLICIT);
if (!WSREP(thd) &&
thd->mdl_context.acquire_lock(&mdl_request,
thd->variables.lock_wait_timeout))
{
my_error(ER_ERROR_DURING_COMMIT, MYF(0), 1);
ha_rollback_trans(thd, all);
DBUG_RETURN(1);
}
DEBUG_SYNC(thd, "ha_commit_trans_after_acquire_commit_lock");
}
#if defined(WITH_ARIA_STORAGE_ENGINE) && MYSQL_VERSION_ID < 100500
ha_maria::implicit_commit(thd, TRUE);
#endif
if (ha_info) if (ha_info)
{ {
for (; ha_info; ha_info= ha_info_next) for (; ha_info; ha_info= ha_info_next)
...@@ -1807,16 +1820,6 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans, ...@@ -1807,16 +1820,6 @@ commit_one_phase_2(THD *thd, bool all, THD_TRANS *trans, bool is_real_trans,
#endif #endif
} }
} }
if (mdl_request.ticket)
{
/*
We do not always immediately release transactional locks
after ha_commit_trans() (see uses of ha_enable_transaction()),
thus we release the commit blocker lock as soon as it's
not needed.
*/
thd->mdl_context.release_lock(mdl_request.ticket);
}
/* Free resources and perform other cleanup even for 'empty' transactions. */ /* Free resources and perform other cleanup even for 'empty' transactions. */
if (is_real_trans) if (is_real_trans)
......
...@@ -5024,7 +5024,7 @@ int ha_change_key_cache(KEY_CACHE *old_key_cache, KEY_CACHE *new_key_cache); ...@@ -5024,7 +5024,7 @@ int ha_change_key_cache(KEY_CACHE *old_key_cache, KEY_CACHE *new_key_cache);
/* transactions: interface to handlerton functions */ /* transactions: interface to handlerton functions */
int ha_start_consistent_snapshot(THD *thd); int ha_start_consistent_snapshot(THD *thd);
int ha_commit_or_rollback_by_xid(XID *xid, bool commit); int ha_commit_or_rollback_by_xid(XID *xid, bool commit);
int ha_commit_one_phase(THD *thd, bool all, bool rw_trans); int ha_commit_one_phase(THD *thd, bool all);
int ha_commit_trans(THD *thd, bool all); int ha_commit_trans(THD *thd, bool all);
int ha_rollback_trans(THD *thd, bool all); int ha_rollback_trans(THD *thd, bool all);
int ha_prepare(THD *thd); int ha_prepare(THD *thd);
......
...@@ -6380,11 +6380,25 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) ...@@ -6380,11 +6380,25 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
if (direct) if (direct)
{ {
/* We come here only for incident events */
int res; int res;
uint64 commit_id= 0; uint64 commit_id= 0;
MDL_request mdl_request;
DBUG_PRINT("info", ("direct is set")); DBUG_PRINT("info", ("direct is set"));
DBUG_ASSERT(!thd->backup_commit_lock);
mdl_request.init(MDL_key::BACKUP, "", "", MDL_BACKUP_COMMIT, MDL_EXPLICIT);
thd->mdl_context.acquire_lock(&mdl_request,
thd->variables.lock_wait_timeout);
thd->backup_commit_lock= &mdl_request;
if ((res= thd->wait_for_prior_commit())) if ((res= thd->wait_for_prior_commit()))
{
if (mdl_request.ticket)
thd->mdl_context.release_lock(mdl_request.ticket);
thd->backup_commit_lock= 0;
DBUG_RETURN(res); DBUG_RETURN(res);
}
file= &log_file; file= &log_file;
my_org_b_tell= my_b_tell(file); my_org_b_tell= my_b_tell(file);
mysql_mutex_lock(&LOCK_log); mysql_mutex_lock(&LOCK_log);
...@@ -6399,7 +6413,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate) ...@@ -6399,7 +6413,11 @@ bool MYSQL_BIN_LOG::write(Log_event *event_info, my_bool *with_annotate)
commit_name.length); commit_name.length);
commit_id= entry->val_int(&null_value); commit_id= entry->val_int(&null_value);
}); });
if (write_gtid_event(thd, true, using_trans, commit_id)) res= write_gtid_event(thd, true, using_trans, commit_id);
if (mdl_request.ticket)
thd->mdl_context.release_lock(mdl_request.ticket);
thd->backup_commit_lock= 0;
if (res)
goto err; goto err;
} }
else else
...@@ -7461,7 +7479,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -7461,7 +7479,11 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
group_commit_entry *entry, *orig_queue, *last; group_commit_entry *entry, *orig_queue, *last;
wait_for_commit *cur; wait_for_commit *cur;
wait_for_commit *wfc; wait_for_commit *wfc;
bool backup_lock_released= 0;
int result= 0;
THD *thd= orig_entry->thd;
DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit"); DBUG_ENTER("MYSQL_BIN_LOG::queue_for_group_commit");
DBUG_ASSERT(thd == current_thd);
/* /*
Check if we need to wait for another transaction to commit before us. Check if we need to wait for another transaction to commit before us.
...@@ -7493,6 +7515,21 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -7493,6 +7515,21 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
{ {
PSI_stage_info old_stage; PSI_stage_info old_stage;
/*
Release MDL_BACKUP_COMMIT LOCK while waiting for other threads to
commit.
This is needed to avoid deadlock between the other threads (which not
yet have the MDL_BACKUP_COMMIT_LOCK) and any threads using
BACKUP LOCK BLOCK_COMMIT.
*/
if (thd->backup_commit_lock && thd->backup_commit_lock->ticket &&
!backup_lock_released)
{
backup_lock_released= 1;
thd->mdl_context.release_lock(thd->backup_commit_lock->ticket);
thd->backup_commit_lock->ticket= 0;
}
/* /*
By setting wfc->opaque_pointer to our own entry, we mark that we are By setting wfc->opaque_pointer to our own entry, we mark that we are
ready to commit, but waiting for another transaction to commit before ready to commit, but waiting for another transaction to commit before
...@@ -7553,7 +7590,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -7553,7 +7590,8 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
wfc->wakeup_error= ER_QUERY_INTERRUPTED; wfc->wakeup_error= ER_QUERY_INTERRUPTED;
my_message(wfc->wakeup_error, my_message(wfc->wakeup_error,
ER_THD(orig_entry->thd, wfc->wakeup_error), MYF(0)); ER_THD(orig_entry->thd, wfc->wakeup_error), MYF(0));
DBUG_RETURN(-1); result= -1;
goto end;
} }
} }
orig_entry->thd->EXIT_COND(&old_stage); orig_entry->thd->EXIT_COND(&old_stage);
...@@ -7567,12 +7605,13 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -7567,12 +7605,13 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
then there is nothing else to do. then there is nothing else to do.
*/ */
if (orig_entry->queued_by_other) if (orig_entry->queued_by_other)
DBUG_RETURN(0); goto end;
if (wfc && wfc->wakeup_error) if (wfc && wfc->wakeup_error)
{ {
my_error(ER_PRIOR_COMMIT_FAILED, MYF(0)); my_error(ER_PRIOR_COMMIT_FAILED, MYF(0));
DBUG_RETURN(-1); result= -1;
goto end;
} }
/* Now enqueue ourselves in the group commit queue. */ /* Now enqueue ourselves in the group commit queue. */
...@@ -7733,7 +7772,13 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry) ...@@ -7733,7 +7772,13 @@ MYSQL_BIN_LOG::queue_for_group_commit(group_commit_entry *orig_entry)
DBUG_PRINT("info", ("Queued for group commit as %s", DBUG_PRINT("info", ("Queued for group commit as %s",
(orig_queue == NULL) ? "leader" : "participant")); (orig_queue == NULL) ? "leader" : "participant"));
DBUG_RETURN(orig_queue == NULL); result= orig_queue == NULL;
end:
if (backup_lock_released)
thd->mdl_context.acquire_lock(thd->backup_commit_lock,
thd->variables.lock_wait_timeout);
DBUG_RETURN(result);
} }
bool bool
......
...@@ -1271,6 +1271,7 @@ void THD::init() ...@@ -1271,6 +1271,7 @@ void THD::init()
first_successful_insert_id_in_prev_stmt_for_binlog= 0; first_successful_insert_id_in_prev_stmt_for_binlog= 0;
first_successful_insert_id_in_cur_stmt= 0; first_successful_insert_id_in_cur_stmt= 0;
current_backup_stage= BACKUP_FINISHED; current_backup_stage= BACKUP_FINISHED;
backup_commit_lock= 0;
#ifdef WITH_WSREP #ifdef WITH_WSREP
wsrep_last_query_id= 0; wsrep_last_query_id= 0;
wsrep_xid.null(); wsrep_xid.null();
...@@ -1383,7 +1384,7 @@ void THD::init_for_queries() ...@@ -1383,7 +1384,7 @@ void THD::init_for_queries()
set_time(); set_time();
/* /*
We don't need to call ha_enable_transaction() as we can't have We don't need to call ha_enable_transaction() as we can't have
any active transactions that has to be commited any active transactions that has to be committed
*/ */
transaction.on= TRUE; transaction.on= TRUE;
...@@ -7386,16 +7387,33 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee) ...@@ -7386,16 +7387,33 @@ wait_for_commit::register_wait_for_prior_commit(wait_for_commit *waitee)
} }
/* /**
Wait for commit of another transaction to complete, as already registered Waits for commit of another transaction to complete, as already registered
with register_wait_for_prior_commit(). If the commit already completed, with register_wait_for_prior_commit(). If the commit already completed,
returns immediately. returns immediately.
If thd->backup_commit_lock is set, release it while waiting for other threads
*/ */
int int
wait_for_commit::wait_for_prior_commit2(THD *thd) wait_for_commit::wait_for_prior_commit2(THD *thd)
{ {
PSI_stage_info old_stage; PSI_stage_info old_stage;
wait_for_commit *loc_waitee; wait_for_commit *loc_waitee;
bool backup_lock_released= 0;
/*
Release MDL_BACKUP_COMMIT LOCK while waiting for other threads to commit
This is needed to avoid deadlock between the other threads (which not
yet have the MDL_BACKUP_COMMIT_LOCK) and any threads using
BACKUP LOCK BLOCK_COMMIT.
*/
if (thd->backup_commit_lock && thd->backup_commit_lock->ticket)
{
backup_lock_released= 1;
thd->mdl_context.release_lock(thd->backup_commit_lock->ticket);
thd->backup_commit_lock->ticket= 0;
}
mysql_mutex_lock(&LOCK_wait_commit); mysql_mutex_lock(&LOCK_wait_commit);
DEBUG_SYNC(thd, "wait_for_prior_commit_waiting"); DEBUG_SYNC(thd, "wait_for_prior_commit_waiting");
...@@ -7445,10 +7463,16 @@ wait_for_commit::wait_for_prior_commit2(THD *thd) ...@@ -7445,10 +7463,16 @@ wait_for_commit::wait_for_prior_commit2(THD *thd)
use within enter_cond/exit_cond. use within enter_cond/exit_cond.
*/ */
DEBUG_SYNC(thd, "wait_for_prior_commit_killed"); DEBUG_SYNC(thd, "wait_for_prior_commit_killed");
if (backup_lock_released)
thd->mdl_context.acquire_lock(thd->backup_commit_lock,
thd->variables.lock_wait_timeout);
return wakeup_error; return wakeup_error;
end: end:
thd->EXIT_COND(&old_stage); thd->EXIT_COND(&old_stage);
if (backup_lock_released)
thd->mdl_context.acquire_lock(thd->backup_commit_lock,
thd->variables.lock_wait_timeout);
return wakeup_error; return wakeup_error;
} }
......
...@@ -2200,7 +2200,10 @@ class THD: public THD_count, /* this must be first */ ...@@ -2200,7 +2200,10 @@ class THD: public THD_count, /* this must be first */
rpl_io_thread_info *rpl_io_info; rpl_io_thread_info *rpl_io_info;
rpl_sql_thread_info *rpl_sql_info; rpl_sql_thread_info *rpl_sql_info;
} system_thread_info; } system_thread_info;
/* Used for BACKUP LOCK */
MDL_ticket *mdl_backup_ticket, *mdl_backup_lock; MDL_ticket *mdl_backup_ticket, *mdl_backup_lock;
/* Used to register that thread has a MDL_BACKUP_WAIT_COMMIT lock */
MDL_request *backup_commit_lock;
void reset_for_next_command(bool do_clear_errors= 1); void reset_for_next_command(bool do_clear_errors= 1);
/* /*
......
...@@ -582,7 +582,7 @@ bool trans_xa_commit(THD *thd) ...@@ -582,7 +582,7 @@ bool trans_xa_commit(THD *thd)
{ {
DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock"); DEBUG_SYNC(thd, "trans_xa_commit_after_acquire_commit_lock");
res= MY_TEST(ha_commit_one_phase(thd, 1, 1)); res= MY_TEST(ha_commit_one_phase(thd, 1));
if (res) if (res)
my_error(ER_XAER_RMERR, MYF(0)); my_error(ER_XAER_RMERR, MYF(0));
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment