changed manager to use my_net_*

fixed bug in master - unregister slaves when they disconnect
parent 2f27322a
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#define MANAGER_CLIENT_VERSION "1.0" #define MANAGER_CLIENT_VERSION "1.1"
#include <my_global.h> #include <my_global.h>
#include <mysql.h> #include <mysql.h>
...@@ -28,7 +28,7 @@ ...@@ -28,7 +28,7 @@
#include <unistd.h> #include <unistd.h>
#ifndef MYSQL_MANAGER_PORT #ifndef MYSQL_MANAGER_PORT
#define MYSQL_MANAGER_PORT 23546 #define MYSQL_MANAGER_PORT 9305
#endif #endif
static void die(const char* fmt, ...); static void die(const char* fmt, ...);
......
...@@ -238,7 +238,7 @@ typedef struct st_mysql_res { ...@@ -238,7 +238,7 @@ typedef struct st_mysql_res {
typedef struct st_mysql_manager typedef struct st_mysql_manager
{ {
Vio* vio; NET net;
char *host,*user,*passwd; char *host,*user,*passwd;
unsigned int port; unsigned int port;
my_bool free_me; my_bool free_me;
......
...@@ -91,6 +91,7 @@ MYSQL_MANAGER* STDCALL mysql_manager_connect(MYSQL_MANAGER* con, ...@@ -91,6 +91,7 @@ MYSQL_MANAGER* STDCALL mysql_manager_connect(MYSQL_MANAGER* con,
uint32 ip_addr; uint32 ip_addr;
char msg_buf[MAX_MYSQL_MANAGER_MSG]; char msg_buf[MAX_MYSQL_MANAGER_MSG];
int msg_len; int msg_len;
Vio* vio;
if (!host) if (!host)
host="localhost"; host="localhost";
...@@ -105,13 +106,14 @@ MYSQL_MANAGER* STDCALL mysql_manager_connect(MYSQL_MANAGER* con, ...@@ -105,13 +106,14 @@ MYSQL_MANAGER* STDCALL mysql_manager_connect(MYSQL_MANAGER* con,
strmov(con->last_error,"Cannot create socket"); strmov(con->last_error,"Cannot create socket");
goto err; goto err;
} }
if (!(con->vio=vio_new(sock,VIO_TYPE_TCPIP,FALSE))) if (!(vio=vio_new(sock,VIO_TYPE_TCPIP,FALSE)))
{ {
con->last_errno=ENOMEM; con->last_errno=ENOMEM;
strmov(con->last_error,"Cannot create network I/O object"); strmov(con->last_error,"Cannot create network I/O object");
goto err; goto err;
} }
vio_blocking(con->vio,TRUE); vio_blocking(vio,TRUE);
my_net_init(&con->net,vio);
bzero((char*) &sock_addr,sizeof(sock_addr)); bzero((char*) &sock_addr,sizeof(sock_addr));
sock_addr.sin_family = AF_INET; sock_addr.sin_family = AF_INET;
if ((int) (ip_addr = inet_addr(host)) != (int) INADDR_NONE) if ((int) (ip_addr = inet_addr(host)) != (int) INADDR_NONE)
...@@ -155,7 +157,7 @@ MYSQL_MANAGER* STDCALL mysql_manager_connect(MYSQL_MANAGER* con, ...@@ -155,7 +157,7 @@ MYSQL_MANAGER* STDCALL mysql_manager_connect(MYSQL_MANAGER* con,
goto err; goto err;
} }
/* read the greating */ /* read the greating */
if (vio_read(con->vio,msg_buf,MAX_MYSQL_MANAGER_MSG)<=0) if (my_net_read(&con->net) == packet_error)
{ {
con->last_errno=errno; con->last_errno=errno;
strmov(con->last_error,"Read error on socket"); strmov(con->last_error,"Read error on socket");
...@@ -163,19 +165,19 @@ MYSQL_MANAGER* STDCALL mysql_manager_connect(MYSQL_MANAGER* con, ...@@ -163,19 +165,19 @@ MYSQL_MANAGER* STDCALL mysql_manager_connect(MYSQL_MANAGER* con,
} }
sprintf(msg_buf,"%-.16s %-.16s\n",user,passwd); sprintf(msg_buf,"%-.16s %-.16s\n",user,passwd);
msg_len=strlen(msg_buf); msg_len=strlen(msg_buf);
if (vio_write(con->vio,msg_buf,msg_len)!=msg_len) if (my_net_write(&con->net,msg_buf,msg_len) || net_flush(&con->net))
{ {
con->last_errno=errno; con->last_errno=con->net.last_errno;
strmov(con->last_error,"Write error on socket"); strmov(con->last_error,"Write error on socket");
goto err; goto err;
} }
if (vio_read(con->vio,msg_buf,MAX_MYSQL_MANAGER_MSG)<=0) if (my_net_read(&con->net) == packet_error)
{ {
con->last_errno=errno; con->last_errno=errno;
strmov(con->last_error,"Read error on socket"); strmov(con->last_error,"Read error on socket");
goto err; goto err;
} }
if ((con->cmd_status=atoi(msg_buf)) != MANAGER_OK) if ((con->cmd_status=atoi(con->net.read_pos)) != MANAGER_OK)
{ {
strmov(con->last_error,"Access denied"); strmov(con->last_error,"Access denied");
goto err; goto err;
...@@ -210,11 +212,7 @@ void STDCALL mysql_manager_close(MYSQL_MANAGER* con) ...@@ -210,11 +212,7 @@ void STDCALL mysql_manager_close(MYSQL_MANAGER* con)
allocated in my_multimalloc() along with con->host, freeing allocated in my_multimalloc() along with con->host, freeing
con->hosts frees the whole block con->hosts frees the whole block
*/ */
if (con->vio) net_end(&con->net);
{
vio_delete(con->vio);
con->vio=0;
}
if (con->free_me) if (con->free_me)
my_free((gptr)con,MYF(0)); my_free((gptr)con,MYF(0));
} }
...@@ -224,7 +222,7 @@ int STDCALL mysql_manager_command(MYSQL_MANAGER* con,const char* cmd, ...@@ -224,7 +222,7 @@ int STDCALL mysql_manager_command(MYSQL_MANAGER* con,const char* cmd,
{ {
if (!cmd_len) if (!cmd_len)
cmd_len=strlen(cmd); cmd_len=strlen(cmd);
if (vio_write(con->vio,(char*)cmd,cmd_len) != cmd_len) if (my_net_write(&con->net,(char*)cmd,cmd_len) || net_flush(&con->net))
{ {
con->last_errno=errno; con->last_errno=errno;
strmov(con->last_error,"Write error on socket"); strmov(con->last_error,"Write error on socket");
...@@ -238,9 +236,9 @@ int STDCALL mysql_manager_fetch_line(MYSQL_MANAGER* con, char* res_buf, ...@@ -238,9 +236,9 @@ int STDCALL mysql_manager_fetch_line(MYSQL_MANAGER* con, char* res_buf,
int res_buf_size) int res_buf_size)
{ {
char* res_buf_end=res_buf+res_buf_size; char* res_buf_end=res_buf+res_buf_size;
char* net_buf_pos=con->net_buf_pos, *net_buf_end=con->net_data_end; char* net_buf=con->net.read_pos, *net_buf_end;
int res_buf_shift=RES_BUF_SHIFT; int res_buf_shift=RES_BUF_SHIFT;
int done=0; uint num_bytes;
if (res_buf_size<RES_BUF_SHIFT) if (res_buf_size<RES_BUF_SHIFT)
{ {
...@@ -249,51 +247,27 @@ int STDCALL mysql_manager_fetch_line(MYSQL_MANAGER* con, char* res_buf, ...@@ -249,51 +247,27 @@ int STDCALL mysql_manager_fetch_line(MYSQL_MANAGER* con, char* res_buf,
return 1; return 1;
} }
for (;;) if ((num_bytes=my_net_read(&con->net)) == packet_error)
{
for (;net_buf_pos<net_buf_end && res_buf<res_buf_end;
net_buf_pos++,res_buf++)
{
char c=*net_buf_pos;
if (c == '\r')
c=*++net_buf_pos;
if (c == '\n')
{
*res_buf=0;
net_buf_pos++;
done=1;
break;
}
else
*res_buf=*net_buf_pos;
}
if (done || res_buf==res_buf_end)
break;
if (net_buf_pos == net_buf_end && res_buf<res_buf_end)
{
int num_bytes;
if ((num_bytes=vio_read(con->vio,con->net_buf,con->net_buf_size))<=0)
{ {
con->last_errno=errno; con->last_errno=errno;
strmov(con->last_error,"socket read failed"); strmov(con->last_error,"socket read failed");
return 1; return 1;
} }
net_buf_pos=con->net_buf;
net_buf_end=net_buf_pos+num_bytes; net_buf_end=net_buf+num_bytes;
}
} if ((con->eof=(net_buf[3]==' ')))
con->net_buf_pos=net_buf_pos;
con->net_data_end=net_buf_end;
res_buf=res_buf_end-res_buf_size;
if ((con->eof=(res_buf[3]==' ')))
res_buf_shift--; res_buf_shift--;
res_buf_end-=res_buf_shift; net_buf+=res_buf_shift;
for (;res_buf<res_buf_end;res_buf++) res_buf_end[-1]=0;
for (;net_buf<net_buf_end && res_buf < res_buf_end;res_buf++,net_buf++)
{
if((*res_buf=*net_buf) == '\r')
{ {
if(!(*res_buf=res_buf[res_buf_shift])) *res_buf=0;
break; break;
} }
}
return 0; return 0;
} }
......
...@@ -234,6 +234,9 @@ while test $# -gt 0; do ...@@ -234,6 +234,9 @@ while test $# -gt 0; do
EXTRA_MASTER_MYSQLD_OPT="$EXTRA_MASTER_MYSQLD_OPT $1" EXTRA_MASTER_MYSQLD_OPT="$EXTRA_MASTER_MYSQLD_OPT $1"
EXTRA_SLAVE_MYSQLD_OPT="$EXTRA_SLAVE_MYSQLD_OPT $1" EXTRA_SLAVE_MYSQLD_OPT="$EXTRA_SLAVE_MYSQLD_OPT $1"
;; ;;
--strace-client )
STRACE_CLIENT=1
;;
--debug) --debug)
EXTRA_MASTER_MYSQLD_OPT="$EXTRA_MASTER_MYSQLD_OPT \ EXTRA_MASTER_MYSQLD_OPT="$EXTRA_MASTER_MYSQLD_OPT \
--debug=d:t:i:O,$MYSQL_TEST_DIR/var/log/master.trace" --debug=d:t:i:O,$MYSQL_TEST_DIR/var/log/master.trace"
...@@ -299,6 +302,10 @@ if [ x$SOURCE_DIST = x1 ] ; then ...@@ -299,6 +302,10 @@ if [ x$SOURCE_DIST = x1 ] ; then
else else
MYSQL_TEST="$BASEDIR/client/mysqltest" MYSQL_TEST="$BASEDIR/client/mysqltest"
fi fi
if [ -n "$STRACE_CLIENT" ]; then
MYSQL_TEST="strace -o $MYSQL_TEST_DIR/var/log/mysqltest.strace $MYSQL_TEST"
fi
MYSQLADMIN="$BASEDIR/client/mysqladmin" MYSQLADMIN="$BASEDIR/client/mysqladmin"
MYSQL_MANAGER_CLIENT="$BASEDIR/client/mysqlmanagerc" MYSQL_MANAGER_CLIENT="$BASEDIR/client/mysqlmanagerc"
MYSQL_MANAGER="$BASEDIR/tools/mysqlmanager" MYSQL_MANAGER="$BASEDIR/tools/mysqlmanager"
......
...@@ -938,10 +938,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd, ...@@ -938,10 +938,12 @@ bool dispatch_command(enum enum_server_command command, THD *thd,
pos = uint4korr(packet); pos = uint4korr(packet);
flags = uint2korr(packet + 4); flags = uint2korr(packet + 4);
pthread_mutex_lock(&LOCK_server_id); pthread_mutex_lock(&LOCK_server_id);
thd->server_id=0; /* avoid suicide */
kill_zombie_dump_threads(slave_server_id = uint4korr(packet+6)); kill_zombie_dump_threads(slave_server_id = uint4korr(packet+6));
thd->server_id = slave_server_id; thd->server_id = slave_server_id;
pthread_mutex_unlock(&LOCK_server_id); pthread_mutex_unlock(&LOCK_server_id);
mysql_binlog_send(thd, thd->strdup(packet + 10), pos, flags); mysql_binlog_send(thd, thd->strdup(packet + 10), pos, flags);
unregister_slave(thd,1,1);
// fake COM_QUIT -- if we get here, the thread needs to terminate // fake COM_QUIT -- if we get here, the thread needs to terminate
error = TRUE; error = TRUE;
net->error = 0; net->error = 0;
......
...@@ -106,10 +106,25 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name, ...@@ -106,10 +106,25 @@ static int fake_rotate_event(NET* net, String* packet, char* log_file_name,
p+= len; \ p+= len; \
}\ }\
void unregister_slave(THD* thd, bool only_mine, bool need_mutex)
{
if (need_mutex)
pthread_mutex_lock(&LOCK_slave_list);
if (thd->server_id)
{
SLAVE_INFO* old_si;
if ((old_si = (SLAVE_INFO*)hash_search(&slave_list,
(byte*)&thd->server_id, 4)) &&
(!only_mine || old_si->thd == thd))
hash_delete(&slave_list, (byte*)old_si);
}
if (need_mutex)
pthread_mutex_unlock(&LOCK_slave_list);
}
int register_slave(THD* thd, uchar* packet, uint packet_length) int register_slave(THD* thd, uchar* packet, uint packet_length)
{ {
SLAVE_INFO *si, *old_si; SLAVE_INFO *si;
int res = 1; int res = 1;
uchar* p = packet, *p_end = packet + packet_length; uchar* p = packet, *p_end = packet + packet_length;
...@@ -119,18 +134,16 @@ int register_slave(THD* thd, uchar* packet, uint packet_length) ...@@ -119,18 +134,16 @@ int register_slave(THD* thd, uchar* packet, uint packet_length)
if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME)))) if (!(si = (SLAVE_INFO*)my_malloc(sizeof(SLAVE_INFO), MYF(MY_WME))))
goto err; goto err;
si->server_id = uint4korr(p); thd->server_id = si->server_id = uint4korr(p);
p += 4; p += 4;
get_object(p,si->host); get_object(p,si->host);
get_object(p,si->user); get_object(p,si->user);
get_object(p,si->password); get_object(p,si->password);
si->port = uint2korr(p); si->port = uint2korr(p);
si->thd = thd;
pthread_mutex_lock(&LOCK_slave_list); pthread_mutex_lock(&LOCK_slave_list);
if ((old_si = (SLAVE_INFO*)hash_search(&slave_list, unregister_slave(thd,0,0);
(byte*)&si->server_id, 4)))
hash_delete(&slave_list, (byte*)old_si);
res = hash_insert(&slave_list, (byte*) si); res = hash_insert(&slave_list, (byte*) si);
pthread_mutex_unlock(&LOCK_slave_list); pthread_mutex_unlock(&LOCK_slave_list);
return res; return res;
......
...@@ -10,6 +10,7 @@ typedef struct st_slave_info ...@@ -10,6 +10,7 @@ typedef struct st_slave_info
char user[USERNAME_LENGTH+1]; char user[USERNAME_LENGTH+1];
char password[HASH_PASSWORD_LENGTH+1]; char password[HASH_PASSWORD_LENGTH+1];
uint16 port; uint16 port;
THD* thd;
} SLAVE_INFO; } SLAVE_INFO;
extern bool opt_show_slave_auth_info, opt_old_rpl_compat; extern bool opt_show_slave_auth_info, opt_old_rpl_compat;
...@@ -44,6 +45,7 @@ void reset_master(); ...@@ -44,6 +45,7 @@ void reset_master();
void init_slave_list(); void init_slave_list();
void end_slave_list(); void end_slave_list();
int register_slave(THD* thd, uchar* packet, uint packet_length); int register_slave(THD* thd, uchar* packet, uint packet_length);
void unregister_slave(THD* thd, bool only_mine, bool need_mutex);
int purge_master_logs(THD* thd, const char* to_log); int purge_master_logs(THD* thd, const char* to_log);
bool log_in_use(const char* log_name); bool log_in_use(const char* log_name);
void adjust_linfo_offsets(my_off_t purge_offset); void adjust_linfo_offsets(my_off_t purge_offset);
......
...@@ -133,7 +133,7 @@ typedef enum {PARAM_STDOUT,PARAM_STDERR} PARAM_TYPE; ...@@ -133,7 +133,7 @@ typedef enum {PARAM_STDOUT,PARAM_STDERR} PARAM_TYPE;
struct manager_thd struct manager_thd
{ {
Vio* vio; NET net;
char user[MAX_USER_NAME+1]; char user[MAX_USER_NAME+1];
int priv_flags; int priv_flags;
char* cmd_buf; char* cmd_buf;
...@@ -154,7 +154,7 @@ struct manager_exec* cur_launch_exec=0; ...@@ -154,7 +154,7 @@ struct manager_exec* cur_launch_exec=0;
static struct manager_thd* manager_thd_new(Vio* vio); static struct manager_thd* manager_thd_new(Vio* vio);
static struct manager_exec* manager_exec_new(char* arg_start,char* arg_end); static struct manager_exec* manager_exec_new(char* arg_start,char* arg_end);
static void manager_exec_print(Vio* vio,struct manager_exec* e); static void manager_exec_print(NET* net,struct manager_exec* e);
static void manager_thd_free(struct manager_thd* thd); static void manager_thd_free(struct manager_thd* thd);
static void manager_exec_free(void* e); static void manager_exec_free(void* e);
static void manager_exec_connect(struct manager_exec* e); static void manager_exec_connect(struct manager_exec* e);
...@@ -291,9 +291,9 @@ static void die(const char* fmt,...); ...@@ -291,9 +291,9 @@ static void die(const char* fmt,...);
static void print_time(FILE* fp); static void print_time(FILE* fp);
static void clean_up(); static void clean_up();
static struct manager_cmd* lookup_cmd(char* s,int len); static struct manager_cmd* lookup_cmd(char* s,int len);
static void client_msg(Vio* vio,int err_code,const char* fmt,...); static void client_msg(NET* net,int err_code,const char* fmt,...);
static void client_msg_pre(Vio* vio,int err_code,const char* fmt,...); static void client_msg_pre(NET* net,int err_code,const char* fmt,...);
static void client_msg_raw(Vio* vio,int err_code,int pre,const char* fmt, static void client_msg_raw(NET* net,int err_code,int pre,const char* fmt,
va_list args); va_list args);
static int authenticate(struct manager_thd* thd); static int authenticate(struct manager_thd* thd);
static char* read_line(struct manager_thd* thd); /* returns pointer to end of static char* read_line(struct manager_thd* thd); /* returns pointer to end of
...@@ -371,7 +371,7 @@ static int exec_line(struct manager_thd* thd,char* buf,char* buf_end) ...@@ -371,7 +371,7 @@ static int exec_line(struct manager_thd* thd,char* buf,char* buf_end)
*p=tolower(*p); *p=tolower(*p);
if (!(cmd=lookup_cmd(buf,(int)(p-buf)))) if (!(cmd=lookup_cmd(buf,(int)(p-buf))))
{ {
client_msg(thd->vio,MANAGER_CLIENT_ERR, client_msg(&thd->net,MANAGER_CLIENT_ERR,
"Unrecognized command, type help to see list of supported\ "Unrecognized command, type help to see list of supported\
commands"); commands");
return 1; return 1;
...@@ -393,13 +393,13 @@ static struct manager_cmd* lookup_cmd(char* s,int len) ...@@ -393,13 +393,13 @@ static struct manager_cmd* lookup_cmd(char* s,int len)
HANDLE_NOARG_DECL(handle_ping) HANDLE_NOARG_DECL(handle_ping)
{ {
client_msg(thd->vio,MANAGER_OK,"Server management daemon is alive"); client_msg(&thd->net,MANAGER_OK,"Server management daemon is alive");
return 0; return 0;
} }
HANDLE_NOARG_DECL(handle_quit) HANDLE_NOARG_DECL(handle_quit)
{ {
client_msg(thd->vio,MANAGER_OK,"Goodbye"); client_msg(&thd->net,MANAGER_OK,"Goodbye");
thd->finished=1; thd->finished=1;
return 0; return 0;
} }
...@@ -407,19 +407,19 @@ HANDLE_NOARG_DECL(handle_quit) ...@@ -407,19 +407,19 @@ HANDLE_NOARG_DECL(handle_quit)
HANDLE_NOARG_DECL(handle_help) HANDLE_NOARG_DECL(handle_help)
{ {
struct manager_cmd* cmd = commands; struct manager_cmd* cmd = commands;
Vio* vio = thd->vio; NET* net = &thd->net;
client_msg_pre(vio,MANAGER_INFO,"Available commands:"); client_msg_pre(net,MANAGER_INFO,"Available commands:");
for (;cmd->name;cmd++) for (;cmd->name;cmd++)
{ {
client_msg_pre(vio,MANAGER_INFO,"%s - %s", cmd->name, cmd->help); client_msg_pre(net,MANAGER_INFO,"%s - %s", cmd->name, cmd->help);
} }
client_msg_pre(vio,MANAGER_INFO,"End of help"); client_msg_pre(net,MANAGER_INFO,"End of help");
return 0; return 0;
} }
HANDLE_NOARG_DECL(handle_shutdown) HANDLE_NOARG_DECL(handle_shutdown)
{ {
client_msg(thd->vio,MANAGER_OK,"Shutdown started, goodbye"); client_msg(&thd->net,MANAGER_OK,"Shutdown started, goodbye");
thd->finished=1; thd->finished=1;
shutdown_requested = 1; shutdown_requested = 1;
if (!one_thread) if (!one_thread)
...@@ -470,10 +470,10 @@ HANDLE_DECL(handle_set_exec_con) ...@@ -470,10 +470,10 @@ HANDLE_DECL(handle_set_exec_con)
} }
} }
pthread_mutex_unlock(&lock_exec_hash); pthread_mutex_unlock(&lock_exec_hash);
client_msg(thd->vio,MANAGER_OK,"Entry updated"); client_msg(&thd->net,MANAGER_OK,"Entry updated");
return 0; return 0;
err: err:
client_msg(thd->vio,MANAGER_CLIENT_ERR,error); client_msg(&thd->net,MANAGER_CLIENT_ERR,error);
return 1; return 1;
} }
...@@ -531,10 +531,10 @@ static int set_exec_param(struct manager_thd* thd, char* args_start, ...@@ -531,10 +531,10 @@ static int set_exec_param(struct manager_thd* thd, char* args_start,
} }
strnmov(param,arg_p,FN_REFLEN); strnmov(param,arg_p,FN_REFLEN);
pthread_mutex_unlock(&lock_exec_hash); pthread_mutex_unlock(&lock_exec_hash);
client_msg(thd->vio,MANAGER_OK,"Entry updated"); client_msg(&thd->net,MANAGER_OK,"Entry updated");
return 0; return 0;
err: err:
client_msg(thd->vio,MANAGER_CLIENT_ERR,error); client_msg(&thd->net,MANAGER_CLIENT_ERR,error);
return 1; return 1;
} }
...@@ -581,10 +581,10 @@ HANDLE_DECL(handle_start_exec) ...@@ -581,10 +581,10 @@ HANDLE_DECL(handle_start_exec)
pthread_mutex_unlock(&e->lock); pthread_mutex_unlock(&e->lock);
if (error) if (error)
goto err; goto err;
client_msg(thd->vio,MANAGER_OK,"'%s' started",e->ident); client_msg(&thd->net,MANAGER_OK,"'%s' started",e->ident);
return 0; return 0;
err: err:
client_msg(thd->vio,MANAGER_CLIENT_ERR,error); client_msg(&thd->net,MANAGER_CLIENT_ERR,error);
return 1; return 1;
} }
...@@ -636,11 +636,11 @@ HANDLE_DECL(handle_stop_exec) ...@@ -636,11 +636,11 @@ HANDLE_DECL(handle_stop_exec)
pthread_mutex_unlock(&e->lock); pthread_mutex_unlock(&e->lock);
if (!error) if (!error)
{ {
client_msg(thd->vio,MANAGER_OK,"'%s' terminated",e->ident); client_msg(&thd->net,MANAGER_OK,"'%s' terminated",e->ident);
return 0; return 0;
} }
err: err:
client_msg(thd->vio,MANAGER_CLIENT_ERR,error); client_msg(&thd->net,MANAGER_CLIENT_ERR,error);
return 1; return 1;
} }
...@@ -705,7 +705,7 @@ HANDLE_DECL(handle_query) ...@@ -705,7 +705,7 @@ HANDLE_DECL(handle_query)
*p++='\t'; *p++='\t';
} }
*p=0; *p=0;
client_msg_pre(thd->vio,MANAGER_OK,buf); client_msg_pre(&thd->net,MANAGER_OK,buf);
while ((row=mysql_fetch_row(res))) while ((row=mysql_fetch_row(res)))
{ {
...@@ -716,14 +716,14 @@ HANDLE_DECL(handle_query) ...@@ -716,14 +716,14 @@ HANDLE_DECL(handle_query)
*p++='\t'; *p++='\t';
} }
*p=0; *p=0;
client_msg_pre(thd->vio,MANAGER_OK,buf); client_msg_pre(&thd->net,MANAGER_OK,buf);
} }
} }
pthread_mutex_unlock(&e->lock); pthread_mutex_unlock(&e->lock);
client_msg(thd->vio,MANAGER_OK,"End"); client_msg(&thd->net,MANAGER_OK,"End");
return 0; return 0;
err: err:
client_msg(thd->vio,MANAGER_CLIENT_ERR,error); client_msg(&thd->net,MANAGER_CLIENT_ERR,error);
return 1; return 1;
} }
...@@ -756,10 +756,10 @@ HANDLE_DECL(handle_def_exec) ...@@ -756,10 +756,10 @@ HANDLE_DECL(handle_def_exec)
} }
hash_insert(&exec_hash,(byte*)e); hash_insert(&exec_hash,(byte*)e);
pthread_mutex_unlock(&lock_exec_hash); pthread_mutex_unlock(&lock_exec_hash);
client_msg(thd->vio,MANAGER_OK,"Exec definition created"); client_msg(&thd->net,MANAGER_OK,"Exec definition created");
return 0; return 0;
err: err:
client_msg(thd->vio,MANAGER_CLIENT_ERR,error); client_msg(&thd->net,MANAGER_CLIENT_ERR,error);
if (e) if (e)
manager_exec_free(e); manager_exec_free(e);
return 1; return 1;
...@@ -768,16 +768,16 @@ err: ...@@ -768,16 +768,16 @@ err:
HANDLE_NOARG_DECL(handle_show_exec) HANDLE_NOARG_DECL(handle_show_exec)
{ {
uint i; uint i;
client_msg_pre(thd->vio,MANAGER_INFO,"Exec_def\tPid\tExit_status\tCon_info\ client_msg_pre(&thd->net,MANAGER_INFO,"Exec_def\tPid\tExit_status\tCon_info\
\tStdout\tStderr\tArguments"); \tStdout\tStderr\tArguments");
pthread_mutex_lock(&lock_exec_hash); pthread_mutex_lock(&lock_exec_hash);
for (i=0;i<exec_hash.records;i++) for (i=0;i<exec_hash.records;i++)
{ {
struct manager_exec* e=(struct manager_exec*)hash_element(&exec_hash,i); struct manager_exec* e=(struct manager_exec*)hash_element(&exec_hash,i);
manager_exec_print(thd->vio,e); manager_exec_print(&thd->net,e);
} }
pthread_mutex_unlock(&lock_exec_hash); pthread_mutex_unlock(&lock_exec_hash);
client_msg(thd->vio,MANAGER_INFO,"End"); client_msg(&thd->net,MANAGER_INFO,"End");
return 0; return 0;
} }
...@@ -873,7 +873,7 @@ static char* arg_strmov(char* dest, const char* src, int n) ...@@ -873,7 +873,7 @@ static char* arg_strmov(char* dest, const char* src, int n)
return dest; return dest;
} }
static void manager_exec_print(Vio* vio,struct manager_exec* e) static void manager_exec_print(NET* net,struct manager_exec* e)
{ {
char buf[MAX_MYSQL_MANAGER_MSG]; char buf[MAX_MYSQL_MANAGER_MSG];
char* p=buf,*buf_end=buf+sizeof(buf)-1; char* p=buf,*buf_end=buf+sizeof(buf)-1;
...@@ -921,7 +921,7 @@ static void manager_exec_print(Vio* vio,struct manager_exec* e) ...@@ -921,7 +921,7 @@ static void manager_exec_print(Vio* vio,struct manager_exec* e)
} }
end: end:
*p=0; *p=0;
client_msg_pre(vio,MANAGER_INFO,buf); client_msg_pre(net,MANAGER_INFO,buf);
return; return;
} }
...@@ -933,7 +933,7 @@ static int authenticate(struct manager_thd* thd) ...@@ -933,7 +933,7 @@ static int authenticate(struct manager_thd* thd)
struct manager_user* u; struct manager_user* u;
char c; char c;
client_msg(thd->vio,MANAGER_INFO, manager_greeting); client_msg(&thd->net,MANAGER_INFO, manager_greeting);
if (!(buf_end=read_line(thd))) if (!(buf_end=read_line(thd)))
return -1; return -1;
for (buf=thd->cmd_buf,p=thd->user,p_end=p+MAX_USER_NAME; for (buf=thd->cmd_buf,p=thd->user,p_end=p+MAX_USER_NAME;
...@@ -959,7 +959,7 @@ static int authenticate(struct manager_thd* thd) ...@@ -959,7 +959,7 @@ static int authenticate(struct manager_thd* thd)
my_MD5Final(digest,&context); my_MD5Final(digest,&context);
if (memcmp(u->md5_pass,digest,MD5_LEN)) if (memcmp(u->md5_pass,digest,MD5_LEN))
return 1; return 1;
client_msg(thd->vio,MANAGER_OK,"OK"); client_msg(&thd->net,MANAGER_OK,"OK");
return 0; return 0;
} }
...@@ -1121,7 +1121,7 @@ static pthread_handler_decl(process_connection,arg) ...@@ -1121,7 +1121,7 @@ static pthread_handler_decl(process_connection,arg)
return 0; /* Don't get cc warning */ return 0; /* Don't get cc warning */
} }
static void client_msg_raw(Vio* vio, int err_code, int pre, const char* fmt, static void client_msg_raw(NET* net, int err_code, int pre, const char* fmt,
va_list args) va_list args)
{ {
char buf[MAX_CLIENT_MSG_LEN],*p,*buf_end; char buf[MAX_CLIENT_MSG_LEN],*p,*buf_end;
...@@ -1136,73 +1136,44 @@ static void client_msg_raw(Vio* vio, int err_code, int pre, const char* fmt, ...@@ -1136,73 +1136,44 @@ static void client_msg_raw(Vio* vio, int err_code, int pre, const char* fmt,
p=buf_end - 2; p=buf_end - 2;
*p++='\r'; *p++='\r';
*p++='\n'; *p++='\n';
if (vio_write(vio,buf,(uint)(p-buf))<=0) if (my_net_write(net,buf,(uint)(p-buf)) || net_flush(net))
log_err("Failed writing to client: errno=%d",errno); log_err("Failed writing to client: errno=%d",net->last_errno);
} }
static void client_msg(Vio* vio, int err_code, const char* fmt, ...) static void client_msg(NET* net, int err_code, const char* fmt, ...)
{ {
va_list args; va_list args;
va_start(args,fmt); va_start(args,fmt);
client_msg_raw(vio,err_code,0,fmt,args); client_msg_raw(net,err_code,0,fmt,args);
} }
static void client_msg_pre(Vio* vio, int err_code, const char* fmt, ...) static void client_msg_pre(NET* net, int err_code, const char* fmt, ...)
{ {
va_list args; va_list args;
va_start(args,fmt); va_start(args,fmt);
client_msg_raw(vio,err_code,1,fmt,args); client_msg_raw(net,err_code,1,fmt,args);
} }
static char* read_line(struct manager_thd* thd) static char* read_line(struct manager_thd* thd)
{ {
char* p=thd->cmd_buf; uint len;
char* buf_end = thd->cmd_buf + manager_max_cmd_len; char* p, *buf_end;
int escaped = 0; if ((len=my_net_read(&thd->net)) == packet_error)
for (;p<buf_end;)
{
int len,read_len;
char *block_end,*p_back;
uint retry_count=0;
read_len = min(NET_BLOCK,(uint)(buf_end-p));
while ((len=vio_read(thd->vio,p,read_len))<=0)
{ {
if (vio_should_retry(thd->vio) && retry_count++ < MAX_RETRY_COUNT)
continue;
log_err("Error reading command from client (Error: %d)", log_err("Error reading command from client (Error: %d)",
vio_errno(thd->vio)); errno);
thd->fatal=1; thd->fatal=1;
return 0; return 0;
} }
block_end=p+len; buf_end=thd->cmd_buf+len;
/* a trick to unescape in place */ for (p=thd->cmd_buf;p<buf_end;p++)
for (p_back=p;p<block_end;p++) if (*p == '\r' || *p == '\n')
{ {
char c=*p; *p=0;
if (c==ESCAPE_CHAR)
{
if (!escaped)
{
escaped=1;
continue;
}
else
escaped=0;
}
if (c==EOL_CHAR && !escaped)
break; break;
*p_back++=c;
escaped=0;
} }
if (p!=block_end)
{ return p;
*p_back=0;
return p_back;
}
}
client_msg(thd->vio,MANAGER_CLIENT_ERR,"Command line too long");
return 0;
} }
static void handle_child(int __attribute__((unused)) sig) static void handle_child(int __attribute__((unused)) sig)
...@@ -1225,25 +1196,23 @@ static void handle_child(int __attribute__((unused)) sig) ...@@ -1225,25 +1196,23 @@ static void handle_child(int __attribute__((unused)) sig)
struct manager_thd* manager_thd_new(Vio* vio) struct manager_thd* manager_thd_new(Vio* vio)
{ {
struct manager_thd* tmp; struct manager_thd* tmp;
if (!(tmp=(struct manager_thd*)my_malloc(sizeof(*tmp)+manager_max_cmd_len, if (!(tmp=(struct manager_thd*)my_malloc(sizeof(*tmp),
MYF(0)))) MYF(0))))
{ {
log_err("Out of memory in manager_thd_new"); log_err("Out of memory in manager_thd_new");
return 0; return 0;
} }
tmp->vio=vio; my_net_init(&tmp->net,vio);
tmp->user[0]=0; tmp->user[0]=0;
tmp->priv_flags=0; tmp->priv_flags=0;
tmp->fatal=tmp->finished=0; tmp->fatal=tmp->finished=0;
tmp->cmd_buf=(char*)tmp+sizeof(*tmp); tmp->cmd_buf=tmp->net.read_pos;
return tmp; return tmp;
} }
static void manager_thd_free(struct manager_thd* thd) static void manager_thd_free(struct manager_thd* thd)
{ {
if (thd->vio) net_end(&thd->net);
vio_close(thd->vio);
my_free((byte*)thd->vio,MYF(0));
} }
static void clean_up() static void clean_up()
...@@ -1413,7 +1382,7 @@ static int run_server_loop() ...@@ -1413,7 +1382,7 @@ static int run_server_loop()
if (authenticate(thd)) if (authenticate(thd))
{ {
client_msg(vio,MANAGER_ACCESS, "Access denied"); client_msg(&thd->net,MANAGER_ACCESS, "Access denied");
manager_thd_free(thd); manager_thd_free(thd);
continue; continue;
} }
...@@ -1427,7 +1396,8 @@ static int run_server_loop() ...@@ -1427,7 +1396,8 @@ static int run_server_loop()
} }
else if (pthread_create(&th,&thr_attr,process_connection,(void*)thd)) else if (pthread_create(&th,&thr_attr,process_connection,(void*)thd))
{ {
client_msg(vio,MANAGER_INTERNAL_ERR,"Could not create thread, errno=%d", client_msg(&thd->net,MANAGER_INTERNAL_ERR,
"Could not create thread, errno=%d",
errno); errno);
manager_thd_free(thd); manager_thd_free(thd);
continue; continue;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment