Commit 07f09df9 authored by vinchen's avatar vinchen Committed by Kristian Nielsen

fix the ABI and stop slave hang problem

parent 0fa39ffb
...@@ -35,7 +35,6 @@ typedef struct st_net { ...@@ -35,7 +35,6 @@ typedef struct st_net {
my_bool thread_specific_malloc; my_bool thread_specific_malloc;
unsigned char compress; unsigned char compress;
my_bool unused3; my_bool unused3;
unsigned long real_network_read_len;
void *thd; void *thd;
unsigned int last_errno; unsigned int last_errno;
unsigned char error; unsigned char error;
......
...@@ -424,7 +424,6 @@ typedef struct st_net { ...@@ -424,7 +424,6 @@ typedef struct st_net {
Pointer to query object in query cache, do not equal NULL (0) for Pointer to query object in query cache, do not equal NULL (0) for
queries in cache that have not stored its results yet queries in cache that have not stored its results yet
*/ */
unsigned long real_network_read_len; // the my_real_read length for each package
#endif #endif
void *thd; /* Used by MariaDB server to avoid calling current_thd */ void *thd; /* Used by MariaDB server to avoid calling current_thd */
unsigned int last_errno; unsigned int last_errno;
...@@ -586,6 +585,8 @@ my_bool net_write_command(NET *net,unsigned char command, ...@@ -586,6 +585,8 @@ my_bool net_write_command(NET *net,unsigned char command,
const unsigned char *packet, size_t len); const unsigned char *packet, size_t len);
int net_real_write(NET *net,const unsigned char *packet, size_t len); int net_real_write(NET *net,const unsigned char *packet, size_t len);
unsigned long my_net_read_packet(NET *net, my_bool read_from_server); unsigned long my_net_read_packet(NET *net, my_bool read_from_server);
ulong my_net_read_packet_reallen(NET *net, my_bool read_from_server,
ulong* reallen);
#define my_net_read(A) my_net_read_packet((A), 0) #define my_net_read(A) my_net_read_packet((A), 0)
#ifdef MY_GLOBAL_INCLUDED #ifdef MY_GLOBAL_INCLUDED
......
...@@ -104,6 +104,7 @@ cli_advanced_command(MYSQL *mysql, enum enum_server_command command, ...@@ -104,6 +104,7 @@ cli_advanced_command(MYSQL *mysql, enum enum_server_command command,
const unsigned char *arg, ulong arg_length, const unsigned char *arg, ulong arg_length,
my_bool skip_check, MYSQL_STMT *stmt); my_bool skip_check, MYSQL_STMT *stmt);
unsigned long cli_safe_read(MYSQL *mysql); unsigned long cli_safe_read(MYSQL *mysql);
unsigned long cli_safe_read_reallen(MYSQL *mysql, ulong* reallen);
void net_clear_error(NET *net); void net_clear_error(NET *net);
void set_stmt_errmsg(MYSQL_STMT *stmt, NET *net); void set_stmt_errmsg(MYSQL_STMT *stmt, NET *net);
void set_stmt_error(MYSQL_STMT *stmt, int errcode, const char *sqlstate, void set_stmt_error(MYSQL_STMT *stmt, int errcode, const char *sqlstate,
......
...@@ -569,16 +569,22 @@ HANDLE create_shared_memory(MYSQL *mysql,NET *net, uint connect_timeout) ...@@ -569,16 +569,22 @@ HANDLE create_shared_memory(MYSQL *mysql,NET *net, uint connect_timeout)
Error message is set. Error message is set.
@retval @retval
*/ */
ulong ulong
cli_safe_read(MYSQL *mysql) cli_safe_read(MYSQL *mysql)
{
ulong reallen = 0;
return cli_safe_read_reallen(mysql, &reallen);
}
ulong
cli_safe_read_reallen(MYSQL *mysql, ulong* reallen)
{ {
NET *net= &mysql->net; NET *net= &mysql->net;
ulong len=0; ulong len=0;
restart: restart:
if (net->vio != 0) if (net->vio != 0)
len= my_net_read_packet(net, 0); len= my_net_read_packet_reallen(net, 0, reallen);
if (len == packet_error || len == 0) if (len == packet_error || len == 0)
{ {
......
...@@ -1133,16 +1133,22 @@ ulong my_net_read(NET *net) ...@@ -1133,16 +1133,22 @@ ulong my_net_read(NET *net)
The function returns the length of the found packet or packet_error. The function returns the length of the found packet or packet_error.
net->read_pos points to the read data. net->read_pos points to the read data.
*/ */
ulong
my_net_read_packet(NET *net, my_bool read_from_server)
{
ulong reallen = 0;
return my_net_read_packet_reallen(net, read_from_server, &reallen);
}
ulong ulong
my_net_read_packet(NET *net, my_bool read_from_server) my_net_read_packet_reallen(NET *net, my_bool read_from_server, ulong* reallen)
{ {
size_t len, complen; size_t len, complen;
MYSQL_NET_READ_START(); MYSQL_NET_READ_START();
net->real_network_read_len = 0; *reallen = 0;
#ifdef HAVE_COMPRESS #ifdef HAVE_COMPRESS
if (!net->compress) if (!net->compress)
{ {
...@@ -1167,7 +1173,7 @@ my_net_read_packet(NET *net, my_bool read_from_server) ...@@ -1167,7 +1173,7 @@ my_net_read_packet(NET *net, my_bool read_from_server)
if (len != packet_error) if (len != packet_error)
{ {
net->read_pos[len]=0; /* Safeguard for mysql_use_result */ net->read_pos[len]=0; /* Safeguard for mysql_use_result */
net->real_network_read_len = len; *reallen = len;
} }
MYSQL_NET_READ_DONE(0, len); MYSQL_NET_READ_DONE(0, len);
return len; return len;
...@@ -1269,7 +1275,7 @@ my_net_read_packet(NET *net, my_bool read_from_server) ...@@ -1269,7 +1275,7 @@ my_net_read_packet(NET *net, my_bool read_from_server)
return packet_error; return packet_error;
} }
buf_length+= complen; buf_length+= complen;
net->real_network_read_len += packet_len; *reallen += packet_len;
} }
net->read_pos= net->buff+ first_packet_offset + NET_HEADER_SIZE; net->read_pos= net->buff+ first_packet_offset + NET_HEADER_SIZE;
......
...@@ -3315,7 +3315,8 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, ...@@ -3315,7 +3315,8 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
number Length of packet number Length of packet
*/ */
static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, ulong* network_read_len) static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings,
ulong* network_read_len)
{ {
ulong len; ulong len;
DBUG_ENTER("read_event"); DBUG_ENTER("read_event");
...@@ -3330,7 +3331,7 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, ...@@ -3330,7 +3331,7 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings,
DBUG_RETURN(packet_error); DBUG_RETURN(packet_error);
#endif #endif
len = cli_safe_read(mysql); len = cli_safe_read_reallen(mysql, network_read_len);
if (len == packet_error || (long) len < 1) if (len == packet_error || (long) len < 1)
{ {
if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED) if (mysql_errno(mysql) == ER_NET_READ_INTERRUPTED)
...@@ -3357,8 +3358,6 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, ...@@ -3357,8 +3358,6 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings,
DBUG_RETURN(packet_error); DBUG_RETURN(packet_error);
} }
*network_read_len = mysql->net.real_network_read_len ;
DBUG_PRINT("exit", ("len: %lu net->read_pos[4]: %d", DBUG_PRINT("exit", ("len: %lu net->read_pos[4]: %d",
len, mysql->net.read_pos[4])); len, mysql->net.read_pos[4]));
DBUG_RETURN(len - 1); DBUG_RETURN(len - 1);
...@@ -4479,29 +4478,42 @@ Stopping slave I/O thread due to out-of-memory error from master"); ...@@ -4479,29 +4478,42 @@ Stopping slave I/O thread due to out-of-memory error from master");
goto err; goto err;
} }
/* Control the binlog read speed of master when read_binlog_speed_limit is non-zero /* Control the binlog read speed of master
when read_binlog_speed_limit is non-zero
*/ */
ulonglong read_binlog_speed_limit_in_bytes = opt_read_binlog_speed_limit * 1024; ulonglong speed_limit_in_bytes = opt_read_binlog_speed_limit * 1024;
if (read_binlog_speed_limit_in_bytes) if (speed_limit_in_bytes)
{ {
/* prevent the tokenamount become a large value, /* Prevent the tokenamount become a large value,
for example, the IO thread doesn't work for a long time for example, the IO thread doesn't work for a long time
*/ */
if (tokenamount > read_binlog_speed_limit_in_bytes * 2) if (tokenamount > speed_limit_in_bytes * 2)
{ {
lastchecktime = my_hrtime().val; lastchecktime = my_hrtime().val;
tokenamount = read_binlog_speed_limit_in_bytes * 2; tokenamount = speed_limit_in_bytes * 2;
} }
do do
{ {
ulonglong currenttime = my_hrtime().val; ulonglong currenttime = my_hrtime().val;
tokenamount += (currenttime - lastchecktime) * read_binlog_speed_limit_in_bytes / (1000*1000); tokenamount += (currenttime - lastchecktime) * speed_limit_in_bytes / (1000*1000);
lastchecktime = currenttime; lastchecktime = currenttime;
if(tokenamount < network_read_len) if(tokenamount < network_read_len)
{ {
ulonglong micro_sleeptime = 1000*1000 * (network_read_len - tokenamount) / read_binlog_speed_limit_in_bytes ; ulonglong micro_time = 1000*1000 * (network_read_len - tokenamount) / speed_limit_in_bytes ;
my_sleep(micro_sleeptime > 1000 ? micro_sleeptime : 1000); // at least sleep 1000 micro second ulonglong second_time = micro_time / (1000 * 1000);
micro_time = micro_time % (1000 * 1000);
// at least sleep 1000 micro second
my_sleep(micro_time > 1000 ? micro_time : 1000);
/*
If it sleep more than one second,
it should use slave_sleep() to avoid the STOP SLAVE hang.
*/
if (second_time)
slave_sleep(thd, second_time, io_slave_killed, mi);
} }
}while(tokenamount < network_read_len); }while(tokenamount < network_read_len);
tokenamount -= network_read_len; tokenamount -= network_read_len;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment