Commit 5546fee6 authored by Isaac Ackerman's avatar Isaac Ackerman Committed by Sergei Petrunia

Improved connection management to clustrix supporting round robin and fail over

parent b2f56c36
......@@ -8,6 +8,7 @@ Copyright (c) 2019, MariaDB Corporation.
#include "ha_xpand_pushdown.h"
#include "key.h"
#include <strfunc.h> /* strconvert */
#include "my_pthread.h"
handlerton *xpand_hton = NULL;
......@@ -41,69 +42,100 @@ static MYSQL_SYSVAR_INT
NULL, NULL, -1, -1, 2147483647, 0
);
char *xpand_host;
static MYSQL_SYSVAR_STR
//state for load balancing
int xpand_hosts_cur; //protected by my_atomic's
ulong xpand_balance_algorithm;
const char* balance_algorithm_names[]=
{
"first", "round_robin", NullS
};
TYPELIB balance_algorithms=
{
array_elements(balance_algorithm_names) - 1, "",
balance_algorithm_names, NULL
};
static void update_balance_algorithm(MYSQL_THD thd, struct st_mysql_sys_var *var,
void *var_ptr, const void *save)
{
*static_cast<ulong *>(var_ptr) = *static_cast<const ulong *>(save);
my_atomic_store32(&xpand_hosts_cur, 0);
}
static MYSQL_SYSVAR_ENUM
(
host,
xpand_host,
PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
"Xpand host",
NULL, NULL, "127.0.0.1"
balance_algorithm,
xpand_balance_algorithm,
PLUGIN_VAR_OPCMDARG,
"Method for managing load balancing of Clustrix nodes, can take values FIRST or ROUND_ROBIN",
NULL, update_balance_algorithm, XPAND_BALANCE_ROUND_ROBIN, &balance_algorithms
);
int host_list_cnt;
char **host_list;
//current list of clx hosts
static PSI_rwlock_key key_xpand_hosts;
mysql_rwlock_t xpand_hosts_lock;
xpand_host_list *xpand_hosts;
static void free_host_list()
//only call while holding lock
static void clear_hosts()
{
if (host_list) {
for (int i = 0; host_list[i]; i++)
my_free(host_list[i]);
my_free(host_list);
host_list = NULL;
}
delete xpand_hosts;
xpand_hosts = NULL;
my_atomic_store32(&xpand_hosts_cur, 0);
}
static void update_host_list(char *xpand_host)
static int check_hosts(MYSQL_THD thd, struct st_mysql_sys_var *var,
void *save, struct st_mysql_value *value)
{
free_host_list();
char b;
int len = 0;
const char *val = value->val_str(value, &b, &len);
int cnt = 0;
for (char *p = xpand_host, *s = xpand_host; ; p++) {
if (*p == ',' || *p == '\0') {
if (p > s) {
cnt++;
}
if (!*p)
break;
s = p + 1;
}
}
if (!val)
return HA_ERR_OUT_OF_MEM;
DBUG_PRINT("host_cnt", ("%d", cnt));
host_list = (char **)my_malloc(sizeof(char *) * cnt+1, MYF(MY_WME));
host_list[cnt] = 0;
host_list_cnt = cnt;
int i = 0;
for (char *p = xpand_host, *s = xpand_host; ; p++) {
if (*p == ',' || *p == '\0') {
if (p > s) {
char *host = (char *)my_malloc(p - s + 1, MYF(MY_WME));
host[p-s] = '\0';
memcpy(host, s, p-s);
DBUG_PRINT("host", ("%s", host));
host_list[i++] = host;
}
if (!*p)
break;
s = p + 1;
}
int error_code = 0;
xpand_host_list *host_list = xpand_host_list::create(val, thd, &error_code);
if (error_code)
return error_code;
*static_cast<xpand_host_list**>(save) = host_list;
return 0;
}
static void update_hosts(MYSQL_THD thd, struct st_mysql_sys_var *var,
void *var_ptr, const void *save)
{
mysql_rwlock_wrlock(&xpand_hosts_lock);
xpand_host_list *from_save = *static_cast<xpand_host_list * const *>(save);
char* raw = from_save->full_list;
int error_code = 0;
xpand_host_list *new_hosts = xpand_host_list::create(raw, &error_code);
if (error_code) {
my_printf_error(error_code, "Unhandled error setting xpand hostlist", MYF(0));
return;
}
DBUG_PRINT("xpand_host", ("%s", xpand_host));
clear_hosts();
xpand_hosts = new_hosts;
*static_cast<char**>(var_ptr) = new_hosts->full_list;
mysql_rwlock_unlock(&xpand_hosts_lock);
}
static char *xpand_hosts_str;
static MYSQL_SYSVAR_STR
(
hosts,
xpand_hosts_str,
PLUGIN_VAR_OPCMDARG | PLUGIN_VAR_MEMALLOC,
"List of xpand hostnames seperated by commas, semicolons or spaces",
check_hosts, update_hosts, "localhost"
);
char *xpand_username;
static MYSQL_SYSVAR_STR
(
......@@ -1403,15 +1435,20 @@ static int xpand_init(void *p)
xpand_hton->create_select = create_xpand_select_handler;
xpand_hton->create_derived = create_xpand_derived_handler;
update_host_list(xpand_host);
DBUG_RETURN(0);
mysql_rwlock_init(key_xpand_hosts, &xpand_hosts_lock);
mysql_rwlock_wrlock(&xpand_hosts_lock);
int error_code = 0;
xpand_hosts = xpand_host_list::create(xpand_hosts_str, &error_code);
mysql_rwlock_unlock(&xpand_hosts_lock);
DBUG_RETURN(error_code);
}
static int xpand_deinit(void *p)
{
DBUG_ENTER("xpand_deinit");
free_host_list();
mysql_rwlock_wrlock(&xpand_hosts_lock);
delete xpand_hosts;
mysql_rwlock_destroy(&xpand_hosts_lock);
DBUG_RETURN(0);
}
......@@ -1425,7 +1462,8 @@ static struct st_mysql_sys_var* xpand_system_variables[] =
MYSQL_SYSVAR(connect_timeout),
MYSQL_SYSVAR(read_timeout),
MYSQL_SYSVAR(write_timeout),
MYSQL_SYSVAR(host),
MYSQL_SYSVAR(balance_algorithm),
MYSQL_SYSVAR(hosts),
MYSQL_SYSVAR(username),
MYSQL_SYSVAR(password),
MYSQL_SYSVAR(port),
......
......@@ -10,6 +10,7 @@ Copyright (c) 2019, MariaDB Corporation.
#include "handler.h"
#include "table.h"
#include "sql_class.h"
#include "my_pthread.h"
#include "tztime.h"
//#include "errmsg.h"
......@@ -125,30 +126,42 @@ void xpand_connection::disconnect(bool is_destructor)
DBUG_VOID_RETURN;
}
int host_list_next;
extern int host_list_cnt;
extern char **host_list;
extern int xpand_hosts_cur;
extern ulong xpand_balance_algorithm;
extern mysql_rwlock_t xpand_hosts_lock;
extern xpand_host_list *xpand_hosts;
int xpand_connection::connect()
{
int error_code = 0;
my_bool my_true = 1;
DBUG_ENTER("xpand_connection::connect");
int start = 0;
if (xpand_balance_algorithm == XPAND_BALANCE_ROUND_ROBIN)
start = my_atomic_add32(&xpand_hosts_cur, 1);
// cpu concurrency by damned!
int host_num = host_list_next;
host_num = host_num % host_list_cnt;
char *host = host_list[host_num];
host_list_next = host_num + 1;
DBUG_PRINT("host", ("%s", host));
mysql_rwlock_rdlock(&xpand_hosts_lock);
//search for available host
int error_code = 0;
for (int i = 0; i < xpand_hosts->hosts_len; i++) {
char *host = xpand_hosts->hosts[(start + i) % xpand_hosts->hosts_len];
error_code = connect_direct(host);
if (!error_code)
break;
}
mysql_rwlock_unlock(&xpand_hosts_lock);
if (error_code)
my_error(error_code, MYF(0), "clustrix");
DBUG_RETURN(error_code);
}
/* Validate the connection parameters */
if (!strcmp(xpand_socket, ""))
if (!strcmp(host, "127.0.0.1"))
if (xpand_port == MYSQL_PORT_DEFAULT)
DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE);
//xpand_net.methods = &connection_methods;
int xpand_connection::connect_direct(char *host)
{
DBUG_ENTER("xpand_connection::connect_direct");
my_bool my_true = true;
DBUG_PRINT("host", ("%s", host));
if (!mysql_init(&xpand_net))
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
......@@ -180,29 +193,20 @@ int xpand_connection::connect()
}
#endif
int error_code = 0;
if (!mysql_real_connect(&xpand_net, host, xpand_username, xpand_password,
NULL, xpand_port, xpand_socket,
CLIENT_MULTI_STATEMENTS))
{
error_code = mysql_errno(&xpand_net);
disconnect();
if (error_code != CR_CONN_HOST_ERROR &&
error_code != CR_CONNECTION_ERROR)
{
if (error_code == ER_CON_COUNT_ERROR)
{
my_error(ER_CON_COUNT_ERROR, MYF(0));
DBUG_RETURN(ER_CON_COUNT_ERROR);
}
my_error(ER_CONNECT_TO_FOREIGN_DATA_SOURCE, MYF(0), host);
DBUG_RETURN(ER_CONNECT_TO_FOREIGN_DATA_SOURCE);
}
}
xpand_net.reconnect = 1;
if (error_code && error_code != ER_CON_COUNT_ERROR) {
error_code = ER_CONNECT_TO_FOREIGN_DATA_SOURCE;
}
DBUG_RETURN(0);
DBUG_RETURN(error_code);
}
int xpand_connection::add_status_vars()
......@@ -1268,3 +1272,69 @@ int xpand_connection::add_command_operand_bitmap(MY_BITMAP *bitmap)
command_length += no_bytes;
return 0;
}
/****************************************************************************
** Class xpand_host_list
****************************************************************************/
xpand_host_list *xpand_host_list::create(const char *hosts, int *error_code)
{
return xpand_host_list::create(hosts, NULL, error_code);
}
xpand_host_list *xpand_host_list::create(const char *hosts, THD *thd, int *error_code)
{
xpand_host_list *list = static_cast<xpand_host_list*>(
thd ?
thd_calloc(thd, sizeof(xpand_host_list)) :
my_malloc(sizeof(xpand_host_list), MYF(MY_WME | MY_ZEROFILL)));
if (!list) {
*error_code = HA_ERR_OUT_OF_MEM;
return NULL;
}
list->full_list = thd ?
thd_strdup(thd, hosts) :
my_strdup(hosts, MYF(MY_WME));
list->strtok_buf = thd ?
thd_strdup(thd, hosts) :
my_strdup(hosts, MYF(MY_WME));
if (!list->full_list || !list->strtok_buf) {
*error_code = HA_ERR_OUT_OF_MEM;
return NULL;
}
const char *sep = ",; ";
//parse into array
int i = 0;
char *cursor = NULL;
char *token = NULL;
for (token = strtok_r(list->strtok_buf, sep, &cursor);
token && i < max_host_count;
token = strtok_r(NULL, sep, &cursor)) {
list->hosts[i] = token;
i++;
}
//host count out of range
if (i == 0 || token) {
my_free(list->full_list);
my_free(list->strtok_buf);
my_free(list);
*error_code = ER_BAD_HOST_ERROR;
return NULL;
}
list->hosts_len = i;
return list;
}
void xpand_host_list::operator delete(void *p)
{
xpand_host_list *list = static_cast<xpand_host_list*>(p);
if (list) {
my_free(list->full_list);
my_free(list->strtok_buf);
}
my_free(list);
}
......@@ -21,11 +21,32 @@ Copyright (c) 2019, MariaDB Corporation.
#define XPAND_SERVER_REQUEST 30
typedef enum xpand_lock_mode {
enum xpand_lock_mode_t {
XPAND_NO_LOCKS,
XPAND_SHARED,
XPAND_EXCLUSIVE,
} xpand_lock_mode_t;
};
enum xpand_balance_algorithm_enum {
XPAND_BALANCE_FIRST,
XPAND_BALANCE_ROUND_ROBIN
};
static const int max_host_count = 128;
class xpand_host_list {
private:
char *strtok_buf;
public:
char *full_list;
int hosts_len;
char *hosts[max_host_count];
static xpand_host_list *create(const char *hosts, int *error_code);
static xpand_host_list *create(const char *hosts, THD *thd, int *error_code);
xpand_host_list() = delete;
static void operator delete(void *p);
};
class xpand_connection_cursor;
class xpand_connection
......@@ -50,6 +71,7 @@ class xpand_connection
return xpand_net.net.vio;
}
int connect();
int connect_direct(char *host);
void disconnect(bool is_destructor = FALSE);
bool has_open_transaction();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment