Commit a6c58676 authored by unknown's avatar unknown

changed manager to use my_net_*

fixed bug in master - unregister slaves when they disconnect


client/mysqlmanagerc.c:
  changed default port
include/mysql.h:
  use my_net_*
libmysql/manager.c:
  use my_net_*
mysql-test/mysql-test-run.sh:
  added support for client strace
sql/sql_parse.cc:
  unregister slaves on disconnect
sql/sql_repl.cc:
  unregister slaves on disconnect
sql/sql_repl.h:
  unregister slaves on disconnect
tools/mysqlmanager.c:
  changed to use my_net_*
parent d34950e8
......@@ -14,7 +14,7 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#define MANAGER_CLIENT_VERSION "1.0"
#define MANAGER_CLIENT_VERSION "1.1"
#include <my_global.h>
#include <mysql.h>
......@@ -28,7 +28,7 @@
#include <unistd.h>
#ifndef MYSQL_MANAGER_PORT
#define MYSQL_MANAGER_PORT 23546
#define MYSQL_MANAGER_PORT 9305
#endif
static void die(const char* fmt, ...);
......
......@@ -238,7 +238,7 @@ typedef struct st_mysql_res {
typedef struct st_mysql_manager
{
Vio* vio;
NET net;
char *host,*user,*passwd;
unsigned int port;
my_bool free_me;
......
......@@ -91,6 +91,7 @@ MYSQL_MANAGER* STDCALL mysql_manager_connect(MYSQL_MANAGER* con,
uint32 ip_addr;
char msg_buf[MAX_MYSQL_MANAGER_MSG];
int msg_len;
Vio* vio;
if (!host)
host="localhost";
......@@ -105,13 +106,14 @@ MYSQL_MANAGER* STDCALL mysql_manager_connect(MYSQL_MANAGER* con,
strmov(con->last_error,"Cannot create socket");
goto err;
}
if (!(con->vio=vio_new(sock,VIO_TYPE_TCPIP,FALSE)))
if (!(vio=vio_new(sock,VIO_TYPE_TCPIP,FALSE)))
{
con->last_errno=ENOMEM;
strmov(con->last_error,"Cannot create network I/O object");
goto err;
}
vio_blocking(con->vio,TRUE);
vio_blocking(vio,TRUE);
my_net_init(&con->net,vio);
bzero((char*) &sock_addr,sizeof(sock_addr));
sock_addr.sin_family = AF_INET;
if ((int) (ip_addr = inet_addr(host)) != (int) INADDR_NONE)
......@@ -155,7 +157,7 @@ MYSQL_MANAGER* STDCALL mysql_manager_connect(MYSQL_MANAGER* con,
goto err;
}
/* read the greating */
if (vio_read(con->vio,msg_buf,MAX_MYSQL_MANAGER_MSG)<=0)
if (my_net_read(&con->net) == packet_error)
{
con->last_errno=errno;
strmov(con->last_error,"Read error on socket");
......@@ -163,19 +165,19 @@ MYSQL_MANAGER* STDCALL mysql_manager_connect(MYSQL_MANAGER* con,
}
sprintf(msg_buf,"%-.16s %-.16s\n",user,passwd);
msg_len=strlen(msg_buf);
if (vio_write(con->vio,msg_buf,msg_len)!=msg_len)
if (my_net_write(&con->net,msg_buf,msg_len) || net_flush(&con->net))
{
con->last_errno=errno;
con->last_errno=con->net.last_errno;
strmov(con->last_error,"Write error on socket");
goto err;
}
if (vio_read(con->vio,msg_buf,MAX_MYSQL_MANAGER_MSG)<=0)
if (my_net_read(&con->net) == packet_error)
{
con->last_errno=errno;
strmov(con->last_error,"Read error on socket");
goto err;
}
if ((con->cmd_status=atoi(msg_buf)) != MANAGER_OK)
if ((con->cmd_status=atoi(con->net.read_pos)) != MANAGER_OK)
{
strmov(con->last_error,"Access denied");
goto err;
......@@ -210,11 +212,7 @@ void STDCALL mysql_manager_close(MYSQL_MANAGER* con)
allocated in my_multimalloc() along with con->host, freeing
con->hosts frees the whole block
*/
if (con->vio)
{
vio_delete(con->vio);
con->vio=0;
}
net_end(&con->net);
if (con->free_me)
my_free((gptr)con,MYF(0));
}
......@@ -224,7 +222,7 @@ int STDCALL mysql_manager_command(MYSQL_MANAGER* con,const char* cmd,
{
if (!cmd_len)
cmd_len=strlen(cmd);
if (vio_write(con->vio,(char*)cmd,cmd_len) != cmd_len)
if (my_net_write(&con->net,(char*)cmd,cmd_len) || net_flush(&con->net))
{
con->last_errno=errno;
strmov(con->last_error,"Write error on socket");
......@@ -238,9 +236,9 @@ int STDCALL mysql_manager_fetch_line(MYSQL_MANAGER* con, char* res_buf,
int res_buf_size)
{
char* res_buf_end=res_buf+res_buf_size;
char* net_buf_pos=con->net_buf_pos, *net_buf_end=con->net_data_end;
char* net_buf=con->net.read_pos, *net_buf_end;
int res_buf_shift=RES_BUF_SHIFT;
int done=0;
uint num_bytes;
if (res_buf_size<RES_BUF_SHIFT)
{
......@@ -249,50 +247,26 @@ int STDCALL mysql_manager_fetch_line(MYSQL_MANAGER* con, char* res_buf,
return 1;
}
for (;;)
if ((num_bytes=my_net_read(&con->net)) == packet_error)
{
for (;net_buf_pos<net_buf_end && res_buf<res_buf_end;
net_buf_pos++,res_buf++)
{
char c=*net_buf_pos;
if (c == '\r')
c=*++net_buf_pos;
if (c == '\n')
{
*res_buf=0;
net_buf_pos++;
done=1;
break;
}
else
*res_buf=*net_buf_pos;
}
if (done || res_buf==res_buf_end)
break;
if (net_buf_pos == net_buf_end && res_buf<res_buf_end)
{
int num_bytes;
if ((num_bytes=vio_read(con->vio,con->net_buf,con->net_buf_size))<=0)
{
con->last_errno=errno;
strmov(con->last_error,"socket read failed");
return 1;
}
net_buf_pos=con->net_buf;
net_buf_end=net_buf_pos+num_bytes;
}
con->last_errno=errno;
strmov(con->last_error,"socket read failed");
return 1;
}
con->net_buf_pos=net_buf_pos;
con->net_data_end=net_buf_end;
res_buf=res_buf_end-res_buf_size;
if ((con->eof=(res_buf[3]==' ')))
net_buf_end=net_buf+num_bytes;
if ((con->eof=(net_buf[3]==' ')))
res_buf_shift--;
res_buf_end-=res_buf_shift;
for (;res_buf<res_buf_end;res_buf++)
net_buf+=res_buf_shift;
res_buf_end[-1]=0;
for (;net_buf<net_buf_end && res_buf < res_buf_end;res_buf++,net_buf++)
{
if(!(*res_buf=res_buf[res_buf_shift]))
if((*res_buf=*net_buf) == '\r')
{
*res_buf=0;
break;
}
}
return 0;
}
......
......@@ -234,6 +234,9 @@ while test $# -gt 0; do
EXTRA_MASTER_MYSQLD_OPT="$EXTRA_MASTER_MYSQLD_OPT $1"
EXTRA_SLAVE_MYSQLD_OPT="$EXTRA_SLAVE_MYSQLD_OPT $1"
;;
--strace-client )
STRACE_CLIENT=1
;;
--debug)
EXTRA_MASTER_MYSQLD_OPT="$EXTRA_MASTER_MYSQLD_OPT \
--debug=d:t:i:O,$MYSQL_TEST_DIR/var/log/master.trace"
......@@ -299,6 +302,10 @@ if [ x$SOURCE_DIST = x1 ] ; then
else
MYSQL_TEST="$BASEDIR/client/mysqltest"
fi
if [ -n "$STRACE_CLIENT" ]; then
MYSQL_TEST="strace -o $MYSQL_TEST_DIR/var/log/mysqltest.strace $MYSQL_TEST"
fi
MYSQLADMIN="$BASEDIR/client/mysqladmin"
MYSQL_MANAGER_CLIENT="$BASEDIR/client/mysqlmanagerc"
MYSQL_MANAGER="$BASEDIR/tools/mysqlmanager"
......
......@@ -938,10 +938,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
pos = uint4korr(packet);
flags = uint2korr(packet + 4);
pthread_mutex_lock(&LOCK_server_id);
thd->server_id=0; /* avoid suicide */
kill_zombie_dump_threads(slave_server_id = uint4korr(packet+6));
thd->server_id = slave_server_id;
pthread_mutex_unlock(&LOCK_server_id);
mysql_binlog_send(thd, thd->strdup(packet + 10), pos, flags);
unregister_slave(thd,1,1);
// fake COM_QUIT -- if we get here, the thread needs to terminate
error = TRUE;
net->error = 0;
......
......@@ -106,10 +106,25 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
p+= len; \
}\
void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
{
if (need_mutex)
pthread_mutex_lock(&LOCK_slave_list);
if (thd->server_id)
{
SLAVE_INFO* old_si;
if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
(byte*)&thd->server_id, 4)) &&
(!only_mine || old_si->thd == thd))
hash_delete(&slave_list, (byte*)old_si);
}
if (need_mutex)
pthread_mutex_unlock(&LOCK_slave_list);
}
int register_slave(THD* thd, uchar* packet, uint packet_length)
{
SLAVE_INFO *si, *old_si;
SLAVE_INFO *si;
int res = 1;
uchar* p = packet, *p_end = packet + packet_length;
......@@ -119,18 +134,16 @@ int register_slave(THD* thd, uchar* packet, uint packet_length)
if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
goto err;
si->server_id = uint4korr(p);
thd->server_id = si->server_id = uint4korr(p);
p += 4;
get_object(p,si->host);
get_object(p,si->user);
get_object(p,si->password);
si->port = uint2korr(p);
si->thd = thd;
pthread_mutex_lock(&LOCK_slave_list);
if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
(byte*)&si->server_id, 4)))
hash_delete(&slave_list, (byte*)old_si);
unregister_slave(thd,0,0);
res = hash_insert(&slave_list, (byte*) si);
pthread_mutex_unlock(&LOCK_slave_list);
return res;
......
......@@ -10,6 +10,7 @@ typedef struct st_slave_info
char user[USERNAME_LENGTH+1];
char password[HASH_PASSWORD_LENGTH+1];
uint16 port;
THD* thd;
} SLAVE_INFO;
extern bool opt_show_slave_auth_info, opt_old_rpl_compat;
......@@ -44,6 +45,7 @@ void reset_master();
void init_slave_list();
void end_slave_list();
int register_slave(THD* thd, uchar* packet, uint packet_length);
void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
int purge_master_logs(THD* thd, const char* to_log);
bool log_in_use(const char* log_name);
void adjust_linfo_offsets(my_off_t purge_offset);
......
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment