Commit 43789901 authored by vinchen's avatar vinchen Committed by Kristian Nielsen

Control the binlog read speed for compressed protocol

parent 8eb0f5ca
...@@ -424,6 +424,7 @@ typedef struct st_net { ...@@ -424,6 +424,7 @@ typedef struct st_net {
Pointer to query object in query cache, do not equal NULL (0) for Pointer to query object in query cache, do not equal NULL (0) for
queries in cache that have not stored its results yet queries in cache that have not stored its results yet
*/ */
unsigned long real_network_read_len; // the my_real_read length for each package
#endif #endif
void *thd; /* Used by MariaDB server to avoid calling current_thd */ void *thd; /* Used by MariaDB server to avoid calling current_thd */
unsigned int last_errno; unsigned int last_errno;
......
...@@ -1142,6 +1142,7 @@ my_net_read_packet(NET *net, my_bool read_from_server) ...@@ -1142,6 +1142,7 @@ my_net_read_packet(NET *net, my_bool read_from_server)
MYSQL_NET_READ_START(); MYSQL_NET_READ_START();
net->real_network_read_len = 0;
#ifdef HAVE_COMPRESS #ifdef HAVE_COMPRESS
if (!net->compress) if (!net->compress)
{ {
...@@ -1154,17 +1155,19 @@ my_net_read_packet(NET *net, my_bool read_from_server) ...@@ -1154,17 +1155,19 @@ my_net_read_packet(NET *net, my_bool read_from_server)
size_t total_length= 0; size_t total_length= 0;
do do
{ {
net->where_b += len; net->where_b += len;
total_length += len; total_length += len;
len = my_real_read(net,&complen, 0); len = my_real_read(net,&complen, 0);
} while (len == MAX_PACKET_LENGTH); } while (len == MAX_PACKET_LENGTH);
if (len != packet_error) if (len != packet_error)
len+= total_length; len+= total_length;
net->where_b = save_pos; net->where_b = save_pos;
} }
net->read_pos = net->buff + net->where_b; net->read_pos = net->buff + net->where_b;
if (len != packet_error) if (len != packet_error) {
net->read_pos[len]=0; /* Safeguard for mysql_use_result */ net->read_pos[len]=0; /* Safeguard for mysql_use_result */
net->real_network_read_len = len;
}
MYSQL_NET_READ_DONE(0, len); MYSQL_NET_READ_DONE(0, len);
return len; return len;
#ifdef HAVE_COMPRESS #ifdef HAVE_COMPRESS
...@@ -1182,7 +1185,7 @@ my_net_read_packet(NET *net, my_bool read_from_server) ...@@ -1182,7 +1185,7 @@ my_net_read_packet(NET *net, my_bool read_from_server)
{ {
buf_length= net->buf_length; /* Data left in old packet */ buf_length= net->buf_length; /* Data left in old packet */
first_packet_offset= start_of_packet= (net->buf_length - first_packet_offset= start_of_packet= (net->buf_length -
net->remain_in_buf); net->remain_in_buf);
/* Restore the character that was overwritten by the end 0 */ /* Restore the character that was overwritten by the end 0 */
net->buff[start_of_packet]= net->save_char; net->buff[start_of_packet]= net->save_char;
} }
...@@ -1197,81 +1200,82 @@ my_net_read_packet(NET *net, my_bool read_from_server) ...@@ -1197,81 +1200,82 @@ my_net_read_packet(NET *net, my_bool read_from_server)
if (buf_length - start_of_packet >= NET_HEADER_SIZE) if (buf_length - start_of_packet >= NET_HEADER_SIZE)
{ {
read_length = uint3korr(net->buff+start_of_packet); read_length = uint3korr(net->buff+start_of_packet);
if (!read_length) if (!read_length)
{ {
/* End of multi-byte packet */ /* End of multi-byte packet */
start_of_packet += NET_HEADER_SIZE; start_of_packet += NET_HEADER_SIZE;
break; break;
} }
if (read_length + NET_HEADER_SIZE <= buf_length - start_of_packet) if (read_length + NET_HEADER_SIZE <= buf_length - start_of_packet)
{ {
if (multi_byte_packet) if (multi_byte_packet)
{ {
/* Remove packet header for second packet */ /* Remove packet header for second packet */
memmove(net->buff + first_packet_offset + start_of_packet, memmove(net->buff + first_packet_offset + start_of_packet,
net->buff + first_packet_offset + start_of_packet + net->buff + first_packet_offset + start_of_packet +
NET_HEADER_SIZE, NET_HEADER_SIZE,
buf_length - start_of_packet); buf_length - start_of_packet);
start_of_packet += read_length; start_of_packet += read_length;
buf_length -= NET_HEADER_SIZE; buf_length -= NET_HEADER_SIZE;
} }
else else
start_of_packet+= read_length + NET_HEADER_SIZE; start_of_packet+= read_length + NET_HEADER_SIZE;
if (read_length != MAX_PACKET_LENGTH) /* last package */ if (read_length != MAX_PACKET_LENGTH) /* last package */
{ {
multi_byte_packet= 0; /* No last zero len packet */ multi_byte_packet= 0; /* No last zero len packet */
break; break;
} }
multi_byte_packet= NET_HEADER_SIZE; multi_byte_packet= NET_HEADER_SIZE;
/* Move data down to read next data packet after current one */ /* Move data down to read next data packet after current one */
if (first_packet_offset) if (first_packet_offset)
{ {
memmove(net->buff,net->buff+first_packet_offset, memmove(net->buff,net->buff+first_packet_offset,
buf_length-first_packet_offset); buf_length-first_packet_offset);
buf_length-=first_packet_offset; buf_length-=first_packet_offset;
start_of_packet -= first_packet_offset; start_of_packet -= first_packet_offset;
first_packet_offset=0; first_packet_offset=0;
} }
continue; continue;
} }
} }
/* Move data down to read next data packet after current one */ /* Move data down to read next data packet after current one */
if (first_packet_offset) if (first_packet_offset)
{ {
memmove(net->buff,net->buff+first_packet_offset, memmove(net->buff,net->buff+first_packet_offset,
buf_length-first_packet_offset); buf_length-first_packet_offset);
buf_length-=first_packet_offset; buf_length-=first_packet_offset;
start_of_packet -= first_packet_offset; start_of_packet -= first_packet_offset;
first_packet_offset=0; first_packet_offset=0;
} }
net->where_b=buf_length; net->where_b=buf_length;
if ((packet_len = my_real_read(net,&complen, read_from_server)) if ((packet_len = my_real_read(net,&complen, read_from_server))
== packet_error) == packet_error)
{ {
MYSQL_NET_READ_DONE(1, 0); MYSQL_NET_READ_DONE(1, 0);
return packet_error; return packet_error;
} }
read_from_server= 0; read_from_server= 0;
if (my_uncompress(net->buff + net->where_b, packet_len, if (my_uncompress(net->buff + net->where_b, packet_len,
&complen)) &complen))
{ {
net->error= 2; /* caller will close socket */ net->error= 2; /* caller will close socket */
net->last_errno= ER_NET_UNCOMPRESS_ERROR; net->last_errno= ER_NET_UNCOMPRESS_ERROR;
MYSQL_SERVER_my_error(ER_NET_UNCOMPRESS_ERROR, MYF(0)); MYSQL_SERVER_my_error(ER_NET_UNCOMPRESS_ERROR, MYF(0));
MYSQL_NET_READ_DONE(1, 0); MYSQL_NET_READ_DONE(1, 0);
return packet_error; return packet_error;
} }
buf_length+= complen; buf_length+= complen;
net->real_network_read_len += packet_len;
} }
net->read_pos= net->buff+ first_packet_offset + NET_HEADER_SIZE; net->read_pos= net->buff+ first_packet_offset + NET_HEADER_SIZE;
net->buf_length= buf_length; net->buf_length= buf_length;
net->remain_in_buf= (ulong) (buf_length - start_of_packet); net->remain_in_buf= (ulong) (buf_length - start_of_packet);
len = ((ulong) (start_of_packet - first_packet_offset) - NET_HEADER_SIZE - len = ((ulong) (start_of_packet - first_packet_offset) - NET_HEADER_SIZE -
multi_byte_packet); multi_byte_packet);
net->save_char= net->read_pos[len]; /* Must be saved */ net->save_char= net->read_pos[len]; /* Must be saved */
net->read_pos[len]=0; /* Safeguard for mysql_use_result */ net->read_pos[len]=0; /* Safeguard for mysql_use_result */
} }
......
...@@ -3308,13 +3308,14 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi, ...@@ -3308,13 +3308,14 @@ static int request_dump(THD *thd, MYSQL* mysql, Master_info* mi,
try a reconnect. We do not want to print anything to try a reconnect. We do not want to print anything to
the error log in this case because this a anormal the error log in this case because this a anormal
event in an idle server. event in an idle server.
network_read_len get the real network read length in VIO, especially using compressed protocol
RETURN VALUES RETURN VALUES
'packet_error' Error 'packet_error' Error
number Length of packet number Length of packet
*/ */
static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings) static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings, ulong* network_read_len)
{ {
ulong len; ulong len;
DBUG_ENTER("read_event"); DBUG_ENTER("read_event");
...@@ -3356,6 +3357,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings) ...@@ -3356,6 +3357,8 @@ static ulong read_event(MYSQL* mysql, Master_info *mi, bool* suppress_warnings)
DBUG_RETURN(packet_error); DBUG_RETURN(packet_error);
} }
*network_read_len = mysql->net.real_network_read_len ;
DBUG_PRINT("exit", ("len: %lu net->read_pos[4]: %d", DBUG_PRINT("exit", ("len: %lu net->read_pos[4]: %d",
len, mysql->net.read_pos[4])); len, mysql->net.read_pos[4]));
DBUG_RETURN(len - 1); DBUG_RETURN(len - 1);
...@@ -4420,7 +4423,7 @@ pthread_handler_t handle_slave_io(void *arg) ...@@ -4420,7 +4423,7 @@ pthread_handler_t handle_slave_io(void *arg)
ulonglong tokenamount = opt_read_binlog_speed_limit*1024; ulonglong tokenamount = opt_read_binlog_speed_limit*1024;
while (!io_slave_killed(mi)) while (!io_slave_killed(mi))
{ {
ulong event_len; ulong event_len, network_read_len = 0;
/* /*
We say "waiting" because read_event() will wait if there's nothing to We say "waiting" because read_event() will wait if there's nothing to
read. But if there's something to read, it will not wait. The read. But if there's something to read, it will not wait. The
...@@ -4428,7 +4431,7 @@ pthread_handler_t handle_slave_io(void *arg) ...@@ -4428,7 +4431,7 @@ pthread_handler_t handle_slave_io(void *arg)
we're in fact receiving nothing. we're in fact receiving nothing.
*/ */
THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event); THD_STAGE_INFO(thd, stage_waiting_for_master_to_send_event);
event_len= read_event(mysql, mi, &suppress_warnings); event_len= read_event(mysql, mi, &suppress_warnings, &network_read_len);
if (check_io_slave_killed(mi, NullS)) if (check_io_slave_killed(mi, NullS))
goto err; goto err;
...@@ -4493,13 +4496,13 @@ Stopping slave I/O thread due to out-of-memory error from master"); ...@@ -4493,13 +4496,13 @@ Stopping slave I/O thread due to out-of-memory error from master");
ulonglong currenttime = my_micro_time()/1000; ulonglong currenttime = my_micro_time()/1000;
tokenamount += (currenttime - lastchecktime)*read_binlog_speed_limit*1024/1000; tokenamount += (currenttime - lastchecktime)*read_binlog_speed_limit*1024/1000;
lastchecktime = currenttime; lastchecktime = currenttime;
if(tokenamount < event_len) if(tokenamount < network_read_len)
{ {
ulonglong micro_sleeptime = 1000*1000*(event_len - tokenamount) / (read_binlog_speed_limit * 1024); ulonglong micro_sleeptime = 1000*1000*(network_read_len - tokenamount) / (read_binlog_speed_limit * 1024);
my_sleep(micro_sleeptime > 1000 ? micro_sleeptime : 1000); // at least sleep 1000 micro second my_sleep(micro_sleeptime > 1000 ? micro_sleeptime : 1000); // at least sleep 1000 micro second
} }
}while(tokenamount < event_len); }while(tokenamount < network_read_len);
tokenamount -= event_len; tokenamount -= network_read_len;
} }
/* XXX: 'synced' should be updated by queue_event to indicate /* XXX: 'synced' should be updated by queue_event to indicate
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment