Commit 62f4dbd9 authored by unknown's avatar unknown

Various post-review fixes


server-tools/instance-manager/buffer.cc:
  simplified buffer interface
server-tools/instance-manager/buffer.h:
  simplified buffer interface
server-tools/instance-manager/command.cc:
  Command class now uses instance_map directly
server-tools/instance-manager/command.h:
  Made Command to use instance_map directly (not through the factory,
  which is not needed here in fact)
server-tools/instance-manager/commands.cc:
  Moved mysql client/server protocol-specific functions to the commands
server-tools/instance-manager/commands.h:
  Added a comment for Syntax_error command, fixed classes to use instance
  map instead of the factory
server-tools/instance-manager/factory.cc:
  Fixed factory to give appropriate class to the commands
server-tools/instance-manager/guardian.cc:
  Fixed guardian to delay start of new instances monitoring.
  Moved guardian initialization to the class from Instance map.
server-tools/instance-manager/guardian.h:
  interface fixed
server-tools/instance-manager/instance.cc:
  added some loging
server-tools/instance-manager/instance_map.cc:
  All non-instance map specific functions moved from the class. Added
  iterator for instance_map
server-tools/instance-manager/instance_map.h:
  All non-instance map related functions moved from the class. Added
  iterator for instance_map.
server-tools/instance-manager/listener.cc:
  Added FD_CLOEXEC flag to sockets, as we don't want instances to inherit
  them after exec.
server-tools/instance-manager/manager.cc:
  use guardian method moved from the instance map
server-tools/instance-manager/mysql_connection.cc:
  cleanup
server-tools/instance-manager/protocol.cc:
  fix according to the changes in the Buffer class
parent 60889280
......@@ -43,12 +43,12 @@
1 - The buffer came to 16Mb barrier
*/
int Buffer::append(char *position, const char *string, uint len_arg)
int Buffer::append(uint position, const char *string, uint len_arg)
{
if (reserve(position - buffer, len_arg))
if (reserve(position, len_arg))
return 1;
strnmov(position, string, len_arg);
strnmov(buffer + position, string, len_arg);
return 0;
}
......@@ -89,3 +89,4 @@ int Buffer::reserve(uint position, uint len_arg)
}
return 0;
}
......@@ -35,7 +35,7 @@ class Buffer
enum { BUFFER_INITIAL_SIZE= 4096 };
/* maximum buffer size is 16Mb */
enum { MAX_BUFFER_SIZE= 16777216 };
uint buffer_size;
size_t buffer_size;
public:
Buffer()
{
......@@ -50,7 +50,7 @@ class Buffer
public:
char *buffer;
int append(char *start_pos, const char *string, uint len_arg);
int append(uint position, const char *string, uint len_arg);
int reserve(uint position, uint len_arg);
};
......
......@@ -21,8 +21,8 @@
#include "command.h"
Command::Command(Command_factory *factory_arg)
:factory(factory_arg)
Command::Command(Instance_map *imap_arg)
:instance_map(imap_arg)
{}
Command::~Command()
......
......@@ -24,7 +24,7 @@
/* Class responsible for allocation of im commands. */
class Command_factory;
class Instance_map;
/*
Command - entry point for any command.
......@@ -34,14 +34,14 @@ class Command_factory;
class Command
{
public:
Command(Command_factory *factory_arg= 0);
Command(Instance_map *instance_map_arg= 0);
virtual ~Command();
/* method of executing: */
virtual int execute(struct st_net *net, ulong connection_id) = 0;
protected:
Command_factory *factory;
Instance_map *instance_map;
};
#endif /* INCLUDES_MYSQL_INSTANCE_MANAGER_COMMAND_H */
This diff is collapsed.
......@@ -27,9 +27,10 @@
class Show_instances : public Command
{
public:
Show_instances(Command_factory *factory): Command(factory)
Show_instances(Instance_map *imap_arg): Command(imap_arg)
{}
int do_command(struct st_net *net);
int execute(struct st_net *net, ulong connection_id);
};
......@@ -42,7 +43,7 @@ class Show_instances : public Command
class Flush_instances : public Command
{
public:
Flush_instances(Command_factory *factory): Command(factory)
Flush_instances(Instance_map *imap_arg): Command(imap_arg)
{}
int execute(struct st_net *net, ulong connection_id);
......@@ -58,8 +59,8 @@ class Show_instance_status : public Command
{
public:
Show_instance_status(Command_factory *factory, const char *name, uint len);
Show_instance_status(Instance_map *imap_arg, const char *name, uint len);
int do_command(struct st_net *net, const char *instance_name);
int execute(struct st_net *net, ulong connection_id);
const char *instance_name;
};
......@@ -74,9 +75,10 @@ class Show_instance_options : public Command
{
public:
Show_instance_options(Command_factory *factory, const char *name, uint len);
Show_instance_options(Instance_map *imap_arg, const char *name, uint len);
int execute(struct st_net *net, ulong connection_id);
int do_command(struct st_net *net, const char *instance_name);
const char *instance_name;
};
......@@ -89,11 +91,11 @@ class Show_instance_options : public Command
class Start_instance : public Command
{
public:
Start_instance(Command_factory *factory, const char *name, uint len);
Start_instance(Instance_map *imap_arg, const char *name, uint len);
Instance *instance;
int execute(struct st_net *net, ulong connection_id);
const char *instance_name;
Instance *instance;
};
......@@ -105,7 +107,7 @@ class Start_instance : public Command
class Stop_instance : public Command
{
public:
Stop_instance(Command_factory *factory, const char *name, uint len);
Stop_instance(Instance_map *imap_arg, const char *name, uint len);
Instance *instance;
int execute(struct st_net *net, ulong connection_id);
......@@ -114,7 +116,10 @@ class Stop_instance : public Command
/*
Syntax error command.
Syntax error command. This command is issued if parser reported a syntax error.
We need it to distinguish the parse error and the situation when parser internal
error occured. E.g. parsing failed because we hadn't had enought memory. In the
latter case parse_command() should return an error.
*/
class Syntax_error : public Command
......
......@@ -23,35 +23,35 @@
Show_instances *Command_factory::new_Show_instances()
{
return new Show_instances(this);
return new Show_instances(&instance_map);
}
Flush_instances *Command_factory::new_Flush_instances()
{
return new Flush_instances(this);
return new Flush_instances(&instance_map);
}
Show_instance_status *Command_factory::
new_Show_instance_status(const char *name, uint len)
{
return new Show_instance_status(this, name, len);
return new Show_instance_status(&instance_map, name, len);
}
Show_instance_options *Command_factory::
new_Show_instance_options(const char *name, uint len)
{
return new Show_instance_options(this, name, len);
return new Show_instance_options(&instance_map, name, len);
}
Start_instance *Command_factory::
new_Start_instance(const char *name, uint len)
{
return new Start_instance(this, name, len);
return new Start_instance(&instance_map, name, len);
}
Stop_instance *Command_factory::new_Stop_instance(const char *name, uint len)
{
return new Stop_instance(this, name, len);
return new Stop_instance(&instance_map, name, len);
}
Syntax_error *Command_factory::new_Syntax_error()
......
......@@ -21,6 +21,8 @@
#include "guardian.h"
#include "instance_map.h"
#include "mysql_manager_error.h"
#include "log.h"
#include <string.h>
C_MODE_START
......@@ -46,6 +48,7 @@ Guardian_thread::Guardian_thread(Thread_registry &thread_registry_arg,
thread_registry.register_thread(&thread_info);
init_alloc_root(&alloc, MEM_ROOT_BLOCK_SIZE, 0);
guarded_instances= NULL;
starting_instances= NULL;
}
......@@ -76,7 +79,6 @@ void Guardian_thread::run()
{
Instance *instance;
LIST *loop;
int i= 0;
my_thread_init();
......@@ -88,9 +90,12 @@ void Guardian_thread::run()
{
instance= (Instance *) loop->data;
/* instance-> start already checks whether instance is running */
instance->start();
if (instance->start() != ER_INSTANCE_ALREADY_STARTED)
log_info("guardian attempted to restart instance %s",
instance->options.instance_name);
loop= loop->next;
}
move_to_list(&starting_instances, &guarded_instances);
pthread_mutex_unlock(&LOCK_guardian);
sleep(monitoring_interval);
}
......@@ -99,6 +104,24 @@ void Guardian_thread::run()
}
int Guardian_thread::start()
{
Instance *instance;
Imap_iterator iterator(instance_map);
instance_map->lock();
while (instance= iterator.next())
{
if ((instance->options.is_guarded != NULL) && (instance->is_running()))
if (guard(instance))
return 1;
}
instance_map->unlock();
return 0;
}
/*
Start instance guarding
......@@ -116,20 +139,38 @@ void Guardian_thread::run()
1 - error occured
*/
int Guardian_thread::guard(const char *instance_name, uint name_len)
int Guardian_thread::guard(Instance *instance)
{
return add_instance_to_list(instance, &starting_instances);
}
void Guardian_thread::move_to_list(LIST **from, LIST **to)
{
LIST *tmp;
while (*from)
{
tmp= rest(*from);
*to= list_add(*to, *from);
*from= tmp;
}
}
int Guardian_thread::add_instance_to_list(Instance *instance, LIST **list)
{
LIST *node;
Instance *instance;
node= (LIST *) alloc_root(&alloc, sizeof(LIST));
if (node == NULL)
return 1;
instance= instance_map->find(instance_name, name_len);
/* we store the pointers to instances from the instance_map's MEM_ROOT */
node->data= (void *) instance;
pthread_mutex_lock(&LOCK_guardian);
guarded_instances= list_add(guarded_instances, node);
*list= list_add(*list, node);
pthread_mutex_unlock(&LOCK_guardian);
return 0;
......@@ -143,12 +184,9 @@ int Guardian_thread::guard(const char *instance_name, uint name_len)
a piece of the MEM_ROOT).
*/
int Guardian_thread::stop_guard(const char *instance_name, uint name_len)
int Guardian_thread::stop_guard(Instance *instance)
{
LIST *node;
Instance *instance;
instance= instance_map->find(instance_name, name_len);
pthread_mutex_lock(&LOCK_guardian);
node= guarded_instances;
......
......@@ -66,13 +66,19 @@ class Guardian_thread: public Guardian_thread_args
~Guardian_thread();
void run();
int init();
int guard(const char *instance_name, uint name_len);
int stop_guard(const char *instance_name, uint name_len);
int start();
int guard(Instance *instance);
int stop_guard(Instance *instance);
private:
int add_instance_to_list(Instance *instance, LIST **list);
void move_to_list(LIST **from, LIST **to);
private:
pthread_mutex_t LOCK_guardian;
Thread_info thread_info;
LIST *guarded_instances;
LIST *starting_instances;
MEM_ROOT alloc;
enum { MEM_ROOT_BLOCK_SIZE= 512 };
};
......
......@@ -20,6 +20,7 @@
#include "instance.h"
#include "mysql_manager_error.h"
#include "log.h"
#include <my_sys.h>
#include <signal.h>
#include <m_string.h>
......@@ -43,6 +44,7 @@ int Instance::start()
if (!is_running())
{
log_info("trying to start instance %s", options.instance_name);
switch (fork()) {
case 0:
if (fork()) /* zombie protection */
......
......@@ -127,253 +127,29 @@ Instance_map::~Instance_map()
}
int Instance_map::flush_instances()
int Instance_map::lock()
{
int rc;
pthread_mutex_lock(&LOCK_instance_map);
hash_free(&hash);
hash_init(&hash, default_charset_info, START_HASH_SIZE, 0, 0,
get_instance_key, delete_instance, 0);
rc= load();
pthread_mutex_unlock(&LOCK_instance_map);
return rc;
}
int Instance_map::show_instance_options(struct st_net *net,
const char *instance_name)
{
enum { MAX_VERSION_LENGTH= 40 };
Buffer send_buff; /* buffer for packets */
LIST name, option;
LIST *field_list;
NAME_WITH_LENGTH name_field, option_field;
uint position=0;
/* create list of the fileds to be passed to send_fields */
name_field.name= (char *) "option_name";
name_field.length= 20;
name.data= &name_field;
option_field.name= (char *) "value";
option_field.length= 20;
option.data= &option_field;
field_list= list_add(NULL, &option);
field_list= list_add(field_list, &name);
send_fields(net, field_list);
{
Instance *instance;
if ((instance= find(instance_name, strlen(instance_name))) == NULL)
goto err;
store_to_string(&send_buff, (char *) "instance_name", &position);
store_to_string(&send_buff, (char *) instance_name, &position);
my_net_write(net, send_buff.buffer, (uint) position);
if (instance->options.mysqld_path != NULL)
{
position= 0;
store_to_string(&send_buff, (char *) "mysqld_path", &position);
store_to_string(&send_buff,
(char *) instance->options.mysqld_path,
&position);
my_net_write(net, send_buff.buffer, (uint) position);
}
if (instance->options.mysqld_user != NULL)
{
position= 0;
store_to_string(&send_buff, (char *) "admin_user", &position);
store_to_string(&send_buff,
(char *) instance->options.mysqld_user,
&position);
my_net_write(net, send_buff.buffer, (uint) position);
}
if (instance->options.mysqld_password != NULL)
{
position= 0;
store_to_string(&send_buff, (char *) "admin_password", &position);
store_to_string(&send_buff,
(char *) instance->options.mysqld_password,
&position);
my_net_write(net, send_buff.buffer, (uint) position);
}
/* loop through the options stored in DYNAMIC_ARRAY */
for (int i= 0; i < instance->options.options_array.elements; i++)
{
char *tmp_option, *option_value;
get_dynamic(&(instance->options.options_array), (gptr) &tmp_option, i);
option_value= strchr(tmp_option, '=');
/* split the option string into two parts */
*option_value= 0;
position= 0;
store_to_string(&send_buff, tmp_option + 2, &position);
store_to_string(&send_buff, option_value + 1, &position);
/* join name and the value into the same option again */
*option_value= '=';
my_net_write(net, send_buff.buffer, (uint) position);
}
}
send_eof(net);
net_flush(net);
return 0;
err:
return 1;
}
/* return the list of running guarded instances */
int Instance_map::init_guardian()
int Instance_map::unlock()
{
Instance *instance;
uint i= 0;
while (i < hash.records)
{
instance= (Instance *) hash_element(&hash, i);
if ((instance->options.is_guarded != NULL) && (instance->is_running()))
if (guardian->guard(instance->options.instance_name,
instance->options.instance_name_len))
return 1;
i++;
}
return 0;
pthread_mutex_unlock(&LOCK_instance_map);
}
/*
The method sends a list of instances in the instance map to the client.
SYNOPSYS
show_instances()
net The network connection to the client.
RETURN
0 - ok
1 - error occured
*/
int Instance_map::show_instances(struct st_net *net)
int Instance_map::flush_instances()
{
Buffer send_buff; /* buffer for packets */
LIST name, status;
NAME_WITH_LENGTH name_field, status_field;
LIST *field_list;
uint position=0;
name_field.name= (char *) "instance_name";
name_field.length= 20;
name.data= &name_field;
status_field.name= (char *) "status";
status_field.length= 20;
status.data= &status_field;
field_list= list_add(NULL, &status);
field_list= list_add(field_list, &name);
send_fields(net, field_list);
{
Instance *instance;
uint i= 0;
int rc;
pthread_mutex_lock(&LOCK_instance_map);
while (i < hash.records)
{
position= 0;
instance= (Instance *) hash_element(&hash, i);
store_to_string(&send_buff, instance->options.instance_name, &position);
if (instance->is_running())
store_to_string(&send_buff, (char *) "online", &position);
else
store_to_string(&send_buff, (char *) "offline", &position);
if (my_net_write(net, send_buff.buffer, (uint) position))
goto err;
i++;
}
hash_free(&hash);
hash_init(&hash, default_charset_info, START_HASH_SIZE, 0, 0,
get_instance_key, delete_instance, 0);
rc= load();
pthread_mutex_unlock(&LOCK_instance_map);
}
if (send_eof(net))
goto err;
if (net_flush(net))
goto err;
return 0;
err:
return 1;
}
/*
The method sends a table with a status of requested instance to the client.
SYNOPSYS
show_instance_status()
net The network connection to the client.
instance_name The name of the instance.
RETURN
0 - ok
1 - error occured
*/
int Instance_map::show_instance_status(struct st_net *net,
const char *instance_name)
{
enum { MAX_VERSION_LENGTH= 40 };
Buffer send_buff; /* buffer for packets */
LIST name, status, version;
LIST *field_list;
NAME_WITH_LENGTH name_field, status_field, version_field;
uint position=0;
/* create list of the fileds to be passed to send_fields */
name_field.name= (char *) "instance_name";
name_field.length= 20;
name.data= &name_field;
status_field.name= (char *) "status";
status_field.length= 20;
status.data= &status_field;
version_field.name= (char *) "version";
version_field.length= MAX_VERSION_LENGTH;
version.data= &version_field;
field_list= list_add(NULL, &version);
field_list= list_add(field_list, &status);
field_list= list_add(field_list, &name);
send_fields(net, field_list);
{
Instance *instance;
store_to_string(&send_buff, (char *) instance_name, &position);
if ((instance= find(instance_name, strlen(instance_name))) == NULL)
goto err;
if (instance->is_running())
{
store_to_string(&send_buff, (char *) "online", &position);
store_to_string(&send_buff, mysql_get_server_info(&(instance->mysql)), &position);
}
else
{
store_to_string(&send_buff, (char *) "offline", &position);
store_to_string(&send_buff, (char *) "unknown", &position);
}
my_net_write(net, send_buff.buffer, (uint) position);
}
send_eof(net);
net_flush(net);
err:
return 0;
return rc;
}
......@@ -448,3 +224,28 @@ int Instance_map::load()
return error;
}
Instance *Instance_map::get_instance(uint instance_number)
{
if (instance_number < hash.records)
return (Instance *) hash_element(&hash, instance_number);
else
return NULL;
}
/*--- Implementaton of the Instance map iterator class (Imap_iterator) ---*/
void Imap_iterator::go_to_first()
{
current_instance=0;
}
Instance *Imap_iterator::next()
{
return instance_map->get_instance(current_instance++);
}
......@@ -43,12 +43,11 @@ class Instance_map
Instance *find(const char *name, uint name_len);
Instance *find(uint instance_number);
int show_instances(struct st_net *net);
int show_instance_status(struct st_net *net, const char *instance_name);
int show_instance_options(struct st_net *net, const char *instance_name);
int flush_instances();
int init_guardian();
int cleanup();
int lock();
int unlock();
Instance *get_instance(uint instance_number);
Instance_map();
~Instance_map();
......@@ -73,4 +72,22 @@ class Instance_map
HASH hash;
};
/* Instance_map iterator */
class Imap_iterator
{
private:
uint current_instance;
Instance_map *instance_map;
public:
Imap_iterator(Instance_map *instance_map_arg) :
instance_map(instance_map_arg), current_instance(0)
{}
void go_to_first();
Instance *next();
};
#endif /* INCLUDES_MYSQL_INSTANCE_MANAGER_INSTANCE_MAP_H */
......@@ -129,8 +129,12 @@ void Listener_thread::run()
thread_registry.request_shutdown();
return;
}
/* set the socket nonblocking */
flags= fcntl(ip_socket, F_GETFL, 0);
fcntl(ip_socket, F_SETFL, flags | O_NONBLOCK);
/* make shure that instances won't be listening our sockets */
flags= fcntl(ip_socket, F_GETFD, 0);
fcntl(ip_socket, F_SETFD, flags | FD_CLOEXEC);
log_info("accepting connections on ip socket");
......@@ -180,6 +184,9 @@ void Listener_thread::run()
/* set the socket nonblocking */
flags= fcntl(unix_socket, F_GETFL, 0);
fcntl(unix_socket, F_SETFL, flags | O_NONBLOCK);
/* make shure that instances won't be listening our sockets */
flags= fcntl(unix_socket, F_GETFD, 0);
fcntl(unix_socket, F_SETFD, flags | FD_CLOEXEC);
}
log_info("accepting connections on unix socket %s",
unix_socket_address.sun_path);
......
......@@ -161,7 +161,7 @@ void manager(const Options &options)
alarm structures initialization as we have to use net_* functions while
making the list. And they in their turn need alarms for timeout suppport.
*/
instance_map.init_guardian();
guardian_thread.start();
while (!shutdown_complete)
{
......
......@@ -319,10 +319,10 @@ int Mysql_connection_thread::dispatch_command(enum enum_server_command command,
{
switch (command) {
case COM_QUIT: // client exit
log_info("query for connection %d received quit command",connection_id);
log_info("query for connection %d received quit command", connection_id);
return 1;
case COM_PING:
log_info("query for connection %d received ping command",connection_id);
log_info("query for connection %d received ping command", connection_id);
net_send_ok(&net, connection_id);
break;
case COM_QUERY:
......
......@@ -101,14 +101,14 @@ char *net_store_length(char *pkg, uint length)
void store_to_string(Buffer *buf, const char *string, uint *position)
{
char* currpos;
uint currpos;
uint string_len;
string_len= strlen(string);
buf->reserve(*position, 2);
currpos= net_store_length(buf->buffer + *position, string_len);
currpos= (net_store_length(buf->buffer + *position, string_len) - buf->buffer);
buf->append(currpos, string, string_len);
*position= *position + string_len + (currpos - buf->buffer - *position);
*position= *position + string_len + (currpos - *position);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment