'Replicate user variables' task

parent edb019ae
......@@ -17,6 +17,7 @@ bell@sanja.is.com.ua
bk@admin.bk
davida@isil.mysql.com
gluh@gluh.(none)
gluh@gluh.mysql.r18.ru
heikki@donna.mysql.fi
heikki@hundin.mysql.fi
heikki@rescue.
......
......@@ -701,6 +701,7 @@ extern void freeze_size(DYNAMIC_ARRAY *array);
#define dynamic_array_ptr(array,array_index) ((array)->buffer+(array_index)*(array)->size_of_element)
#define dynamic_element(array,array_index,type) ((type)((array)->buffer) +(array_index))
#define push_dynamic(A,B) insert_dynamic(A,B)
#define reset_dynamic(array) ((array)->elements= 0)
extern my_bool init_dynamic_string(DYNAMIC_STRING *str, const char *init_str,
uint init_alloc,uint alloc_increment);
......
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
stop slave;
reset master;
drop table if exists t1;
create table t1(n char(30));
set @i1:=12345678901234, @i2:=-12345678901234, @i3:=0, @i4:=-1;
set @s1:='This is a test', @r1:=12.5, @r2:=-12.5;
set @n1:=null;
set @s2:='', @s3:='abc\'def', @s4:= 'abc\\def', @s5:= 'abc''def';
insert into t1 values (@i1), (@i2), (@i3), (@i4);
insert into t1 values (@r1), (@r2);
insert into t1 values (@s1), (@s2), (@s3), (@s4), (@s5);
insert into t1 values (@n1);
insert into t1 values (@n2);
insert into t1 values (@a:=0), (@a:=@a+1), (@a:=@a+1);
insert into t1 values (@a+(@b:=@a+1));
set @q:='abc';
insert t1 values (@q), (@q:=concat(@q, 'n1')), (@q:=concat(@q, 'n2'));
set @a:=5;
insert into t1 values (@a),(@a);
start slave;
select * from t1;
n
12345678901234
-12345678901234
0
-1
12.5
-12.5
This is a test
abc'def
abc\def
abc'def
NULL
NULL
0
1
2
5
abc
abcn1
abcn1n2
5
5
show binlog events from 141;
Log_name Pos Event_type Server_id Orig_log_pos Info
slave-bin.000001 141 User var 2 141 @i1=12345678901234
slave-bin.000001 184 User var 2 184 @i2=-12345678901234
slave-bin.000001 227 User var 2 227 @i3=0
slave-bin.000001 270 User var 2 270 @i4=-1
slave-bin.000001 313 Query 1 313 use `test`; insert into t1 values (@i1), (@i2), (@i3), (@i4)
slave-bin.000001 396 User var 2 396 @r1=12.5
slave-bin.000001 439 User var 2 439 @r2=-12.5
slave-bin.000001 482 Query 1 482 use `test`; insert into t1 values (@r1), (@r2)
slave-bin.000001 551 User var 2 551 @s1='This is a test'
slave-bin.000001 601 User var 2 601 @s2=''
slave-bin.000001 637 User var 2 637 @s3='abc'def'
slave-bin.000001 680 User var 2 680 @s4='abc\def'
slave-bin.000001 723 User var 2 723 @s5='abc'def'
slave-bin.000001 766 Query 1 766 use `test`; insert into t1 values (@s1), (@s2), (@s3), (@s4), (@s5)
slave-bin.000001 856 User var 2 856 @n1=NULL
slave-bin.000001 882 Query 1 882 use `test`; insert into t1 values (@n1)
slave-bin.000001 944 Query 1 944 use `test`; insert into t1 values (@n2)
slave-bin.000001 1006 Query 1 1006 use `test`; insert into t1 values (@a:=0), (@a:=@a+1), (@a:=@a+1)
slave-bin.000001 1094 User var 2 1094 @a='2'
slave-bin.000001 1130 Query 1 1130 use `test`; insert into t1 values (@a+(@b:=@a+1))
slave-bin.000001 1202 User var 2 1202 @q='abc'
slave-bin.000001 1240 Query 1 1240 use `test`; insert t1 values (@q), (@q:=concat(@q, 'n1')), (@q:=concat(@q, 'n2'))
slave-bin.000001 1344 User var 2 1344 @a=5
slave-bin.000001 1386 Query 1 1386 use `test`; insert into t1 values (@a),(@a)
drop table t1;
stop slave;
source include/master-slave.inc;
connection master;
save_master_pos;
connection slave;
sync_with_master;
stop slave;
reset master;
connection master;
--disable_warnings
drop table if exists t1;
--enable_warnings
create table t1(n char(30));
set @i1:=12345678901234, @i2:=-12345678901234, @i3:=0, @i4:=-1;
set @s1:='This is a test', @r1:=12.5, @r2:=-12.5;
set @n1:=null;
set @s2:='', @s3:='abc\'def', @s4:= 'abc\\def', @s5:= 'abc''def';
insert into t1 values (@i1), (@i2), (@i3), (@i4);
insert into t1 values (@r1), (@r2);
insert into t1 values (@s1), (@s2), (@s3), (@s4), (@s5);
insert into t1 values (@n1);
insert into t1 values (@n2);
insert into t1 values (@a:=0), (@a:=@a+1), (@a:=@a+1);
insert into t1 values (@a+(@b:=@a+1));
set @q:='abc';
insert t1 values (@q), (@q:=concat(@q, 'n1')), (@q:=concat(@q, 'n2'));
set @a:=5;
insert into t1 values (@a),(@a);
save_master_pos;
connection slave;
start slave;
sync_with_master;
select * from t1;
show binlog events from 141;
connection master;
drop table t1;
save_master_pos;
connection slave;
sync_with_master;
stop slave;
......@@ -1940,6 +1940,7 @@ static user_var_entry *get_variable(HASH *hash, LEX_STRING &name,
entry->value=0;
entry->length=0;
entry->update_query_id=0;
entry->used_query_id=current_thd->query_id;
entry->type=STRING_RESULT;
memcpy(entry->name.str, name.str, name.length+1);
if (hash_insert(hash,(byte*) entry))
......@@ -2073,7 +2074,7 @@ Item_func_set_user_var::val_str(String *str)
{
String *res=args[0]->val_str(str);
if (!res) // Null value
update_hash((void*) 0,0,STRING_RESULT, default_charset_info);
update_hash((void*) 0, 0, STRING_RESULT, default_charset_info);
else
update_hash(res->c_ptr(),res->length()+1,STRING_RESULT,res->charset());
return res;
......@@ -2172,14 +2173,58 @@ longlong Item_func_get_user_var::val_int()
return LL(0); // Impossible
}
/* From sql_parse.cc */
extern bool is_update_query(enum enum_sql_command command);
void Item_func_get_user_var::fix_length_and_dec()
{
BINLOG_USER_VAR_EVENT *user_var_event;
THD *thd=current_thd;
maybe_null=1;
decimals=NOT_FIXED_DEC;
max_length=MAX_BLOB_WIDTH;
var_entry= get_variable(&thd->user_vars, name, 0);
if ((var_entry= get_variable(&thd->user_vars, name, 0)))
{
if (opt_bin_log && is_update_query(thd->lex.sql_command) &&
var_entry->used_query_id != thd->query_id)
{
/*
First we need to store value of var_entry, when the next situation appers:
> set @a:=1;
> insert into t1 values (@a), (@a:=@a+1), (@a:=@a+1);
We have to write to binlog value @a= 1;
*/
uint size= ALIGN_SIZE(sizeof(BINLOG_USER_VAR_EVENT)) + var_entry->length;
if (!(user_var_event= (BINLOG_USER_VAR_EVENT *) thd->alloc(size)))
goto err;
user_var_event->value= (char*) user_var_event +
ALIGN_SIZE(sizeof(BINLOG_USER_VAR_EVENT));
user_var_event->user_var_event= var_entry;
user_var_event->type= var_entry->type;
user_var_event->charset_number= var_entry->var_charset->number;
if (!var_entry->value)
{
/* NULL value*/
user_var_event->length= 0;
user_var_event->value= 0;
}
else
{
user_var_event->length= var_entry->length;
memcpy(user_var_event->value, var_entry->value,
var_entry->length);
}
var_entry->used_query_id= thd->query_id;
if (insert_dynamic(&thd->user_var_events, (gptr) &user_var_event))
goto err;
}
}
return;
err:
thd->fatal_error= 1;
return;
}
......
......@@ -1097,6 +1097,23 @@ bool MYSQL_LOG::write(Log_event* event_info)
if (e.write(file))
goto err;
}
if (thd->user_var_events.elements)
{
for (uint i= 0; i < thd->user_var_events.elements; i++)
{
BINLOG_USER_VAR_EVENT *user_var_event;
get_dynamic(&thd->user_var_events,(gptr) &user_var_event, i);
User_var_log_event e(thd, user_var_event->user_var_event->name.str,
user_var_event->user_var_event->name.length,
user_var_event->value,
user_var_event->length,
user_var_event->type,
user_var_event->charset_number);
e.set_log_pos(this);
if (e.write(file))
goto err;
}
}
if (thd->variables.convert_set)
{
char buf[256], *p;
......
......@@ -227,6 +227,7 @@ const char* Log_event::get_type_str()
case DELETE_FILE_EVENT: return "Delete_file";
case EXEC_LOAD_EVENT: return "Exec_load";
case RAND_EVENT: return "RAND";
case USER_VAR_EVENT: return "User var";
default: /* impossible */ return "Unknown";
}
}
......@@ -593,6 +594,9 @@ Log_event* Log_event::read_log_event(const char* buf, int event_len,
case RAND_EVENT:
ev = new Rand_log_event(buf, old_format);
break;
case USER_VAR_EVENT:
ev = new User_var_log_event(buf, old_format);
break;
default:
break;
}
......@@ -1894,6 +1898,242 @@ int Rand_log_event::exec_event(struct st_relay_log_info* rli)
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
*****************************************************************************
User_var_log_event methods
*****************************************************************************
****************************************************************************/
/*****************************************************************************
User_var_log_event::pack_info()
****************************************************************************/
#ifndef MYSQL_CLIENT
void User_var_log_event::pack_info(Protocol* protocol)
{
char *buf= 0;
uint val_offset= 2 + name_len;
uint event_len= val_offset;
if (is_null)
{
buf= my_malloc(val_offset + 5, MYF(MY_WME));
strmov(buf + val_offset, "NULL");
event_len= val_offset + 4;
}
else
{
switch (type) {
case REAL_RESULT:
double real_val;
float8get(real_val, val);
buf= my_malloc(val_offset + FLOATING_POINT_BUFFER, MYF(MY_WME));
event_len += my_sprintf(buf + val_offset,
(buf + val_offset, "%.14g", real_val));
break;
case INT_RESULT:
buf= my_malloc(val_offset + 22, MYF(MY_WME));
event_len= longlong10_to_str(uint8korr(val), buf + val_offset,-10)-buf;
break;
case STRING_RESULT:
/*
This is correct as pack_info is used for SHOW BINLOG command
only. But be carefull this is may be incorrect in other cases as
string may contain \ and '.
*/
buf= my_malloc(val_offset + 2 + val_len, MYF(MY_WME));
buf[val_offset]= '\'';
memcpy(buf + val_offset + 1, val, val_len);
buf[val_offset + val_len]= '\'';
event_len= val_offset + 1 + val_len;
break;
case ROW_RESULT:
DBUG_ASSERT(1);
return;
}
}
buf[0]= '@';
buf[1+name_len]= '=';
memcpy(buf+1, name, name_len);
protocol->store(buf, event_len);
my_free(buf, MYF(MY_ALLOW_ZERO_PTR));
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
User_var_log_event::User_var_log_event()
****************************************************************************/
User_var_log_event::User_var_log_event(const char* buf, bool old_format)
:Log_event(buf, old_format)
{
buf+= (old_format) ? OLD_HEADER_LEN : LOG_EVENT_HEADER_LEN;
name_len= uint4korr(buf);
name= (char *) buf + UV_NAME_LEN_SIZE;
is_null= buf[UV_NAME_LEN_SIZE + name_len];
if (is_null)
{
type= STRING_RESULT;
val_len= 0;
val= 0;
}
else
{
type= (Item_result) buf[UV_VAL_IS_NULL + UV_NAME_LEN_SIZE + name_len];
charset_number= uint4korr(buf + UV_NAME_LEN_SIZE + name_len +
UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE);
val_len= uint4korr(buf + UV_NAME_LEN_SIZE + name_len +
UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
UV_CHARSET_NUMBER_SIZE);
val= (char *) buf + UV_NAME_LEN_SIZE + name_len + UV_VAL_IS_NULL +
UV_VAL_TYPE_SIZE + UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE;
}
}
/*****************************************************************************
User_var_log_event::write_data()
****************************************************************************/
int User_var_log_event::write_data(IO_CACHE* file)
{
char buf[UV_NAME_LEN_SIZE];
char buf1[UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE];
char buf2[8];
char *pos= buf2;
int4store(buf, name_len);
buf1[0]= is_null;
if (!is_null)
{
buf1[1]= type;
int4store(buf1 + 2, charset_number);
int4store(buf1 + 2 + UV_CHARSET_NUMBER_SIZE, val_len);
switch (type) {
case REAL_RESULT:
float8store(buf2, *(double*) val);
break;
case INT_RESULT:
int8store(buf2, *(longlong*) val);
break;
case STRING_RESULT:
pos= val;
break;
case ROW_RESULT:
DBUG_ASSERT(1);
return 0;
}
return (my_b_safe_write(file, (byte*) buf, sizeof(buf)) ||
my_b_safe_write(file, (byte*) name, name_len) ||
my_b_safe_write(file, (byte*) buf1, sizeof(buf1)) ||
my_b_safe_write(file, (byte*) pos, val_len));
}
return (my_b_safe_write(file, (byte*) buf, sizeof(buf)) ||
my_b_safe_write(file, (byte*) name, name_len) ||
my_b_safe_write(file, (byte*) buf1, 1));
}
/*****************************************************************************
User_var_log_event::print()
****************************************************************************/
#ifdef MYSQL_CLIENT
void User_var_log_event::print(FILE* file, bool short_form, char* last_db)
{
if (!short_form)
{
print_header(file);
fprintf(file, "\tUser_var\n");
}
fprintf(file, "SET @");
my_fwrite(file, (byte*) name, (uint) (name_len), MYF(MY_NABP | MY_WME));
if (is_null)
{
fprintf(file, ":=NULL;\n");
}
else
{
switch (type) {
case REAL_RESULT:
double real_val;
float8get(real_val, val);
fprintf(file, ":=%.14g;\n", real_val);
break;
case INT_RESULT:
char int_buf[22];
longlong10_to_str(uint8korr(val), int_buf, -10);
fprintf(file, ":=%s;\n", int_buf);
break;
case STRING_RESULT:
fprintf(file, ":='%s';\n", val);
break;
case ROW_RESULT:
DBUG_ASSERT(1);
return;
}
}
fflush(file);
}
#endif
/*****************************************************************************
User_var_log_event::exec_event()
****************************************************************************/
#ifndef MYSQL_CLIENT
int User_var_log_event::exec_event(struct st_relay_log_info* rli)
{
Item *it= 0;
CHARSET_INFO *charset= log_cs;
LEX_STRING user_var_name;
user_var_name.str= name;
user_var_name.length= name_len;
if (type != ROW_RESULT)
init_sql_alloc(&thd->mem_root, 8192,0);
if (is_null)
{
it= new Item_null();
}
else
{
switch (type) {
case REAL_RESULT:
double real_val;
float8get(real_val, val);
it= new Item_real(real_val);
break;
case INT_RESULT:
it= new Item_int((longlong) uint8korr(val));
break;
case STRING_RESULT:
it= new Item_string(val, val_len, charset);
break;
case ROW_RESULT:
DBUG_ASSERT(1);
return 0;
}
charset= get_charset(charset_number, MYF(0));
}
Item_func_set_user_var e(user_var_name, it);
e.fix_fields(thd, 0, 0);
e.update_hash(val, val_len, type, charset);
free_root(&thd->mem_root,0);
rli->inc_pending(get_event_len());
return 0;
}
#endif // !MYSQL_CLIENT
/*****************************************************************************
*****************************************************************************
......
......@@ -173,6 +173,14 @@ struct sql_ex_info
#define RAND_SEED1_OFFSET 0
#define RAND_SEED2_OFFSET 8
/* User_var event post-header */
#define UV_VAL_LEN_SIZE 4
#define UV_VAL_IS_NULL 1
#define UV_VAL_TYPE_SIZE 1
#define UV_NAME_LEN_SIZE 4
#define UV_CHARSET_NUMBER_SIZE 4
/* Load event post-header */
#define L_THREAD_ID_OFFSET 0
......@@ -222,7 +230,7 @@ enum Log_event_type
START_EVENT = 1, QUERY_EVENT =2, STOP_EVENT=3, ROTATE_EVENT = 4,
INTVAR_EVENT=5, LOAD_EVENT=6, SLAVE_EVENT=7, CREATE_FILE_EVENT=8,
APPEND_BLOCK_EVENT=9, EXEC_LOAD_EVENT=10, DELETE_FILE_EVENT=11,
NEW_LOAD_EVENT=12, RAND_EVENT=13
NEW_LOAD_EVENT=12, RAND_EVENT=13, USER_VAR_EVENT=14
};
enum Int_event_type
......@@ -590,6 +598,46 @@ class Rand_log_event: public Log_event
bool is_valid() { return 1; }
};
/*****************************************************************************
User var Log Event class
****************************************************************************/
class User_var_log_event: public Log_event
{
public:
char *name;
uint name_len;
char *val;
ulong val_len;
Item_result type;
uint charset_number;
byte is_null;
#ifndef MYSQL_CLIENT
User_var_log_event(THD* thd_arg, char *name_arg, uint name_len_arg,
char *val_arg, ulong val_len_arg, Item_result type_arg,
uint charset_number_arg)
:Log_event(), name(name_arg), name_len(name_len_arg), val(val_arg),
val_len(val_len_arg), type(type_arg), charset_number(charset_number_arg)
{ is_null= !val; }
void pack_info(Protocol* protocol);
int exec_event(struct st_relay_log_info* rli);
#else
void print(FILE* file, bool short_form = 0, char* last_db = 0);
#endif
User_var_log_event(const char* buf, bool old_format);
~User_var_log_event() {}
Log_event_type get_type_code() { return USER_VAR_EVENT;}
int get_data_size()
{
return (is_null ? UV_NAME_LEN_SIZE + name_len + UV_VAL_IS_NULL :
UV_NAME_LEN_SIZE + name_len + UV_VAL_IS_NULL + UV_VAL_TYPE_SIZE +
UV_CHARSET_NUMBER_SIZE + UV_VAL_LEN_SIZE + val_len);
}
int write_data(IO_CACHE* file);
bool is_valid() { return 1; }
};
/*****************************************************************************
......
......@@ -145,6 +145,17 @@ THD::THD():user_time(0), fatal_error(0),
(hash_get_key) get_var_key,
(hash_free_key) free_user_var,0);
/* For user vars replication*/
if (opt_bin_log)
my_init_dynamic_array(&user_var_events,
sizeof(BINLOG_USER_VAR_EVENT *),
16,
16);
else
bzero((char*) &user_var_events, sizeof(user_var_events));
/* Prepared statements */
last_prepared_stmt= 0;
init_tree(&prepared_statements, 0, 0, sizeof(PREP_STMT),
......@@ -244,6 +255,7 @@ void THD::cleanup(void)
close_thread_tables(this);
}
close_temporary_tables(this);
delete_dynamic(&user_var_events);
hash_free(&user_vars);
if (global_read_lock)
unlock_global_read_lock(this);
......
......@@ -57,6 +57,15 @@ typedef struct st_log_info
~st_log_info() { pthread_mutex_destroy(&lock);}
} LOG_INFO;
typedef struct st_user_var_events
{
user_var_entry *user_var_event;
char *value;
ulong length;
Item_result type;
uint charset_number;
} BINLOG_USER_VAR_EVENT;
class Log_event;
class MYSQL_LOG {
......@@ -511,6 +520,8 @@ class THD :public ilink {
uint check_loops_counter; //last id used to check loops
/* variables.transaction_isolation is reset to this after each commit */
enum_tx_isolation session_tx_isolation;
/* for user variables replication*/
DYNAMIC_ARRAY user_var_events;
// extend scramble to handle new auth
char scramble[SCRAMBLE41_LENGTH+1];
// old scramble is needed to handle old clients
......@@ -896,7 +907,7 @@ class user_var_entry
public:
LEX_STRING name;
char *value;
ulong length, update_query_id;
ulong length, update_query_id, used_query_id;
Item_result type;
CHARSET_INFO *var_charset;
};
......
......@@ -390,6 +390,10 @@ void init_update_queries(void)
uc_update_queries[SQLCOM_UPDATE_MULTI]=1;
}
bool is_update_query(enum enum_sql_command command)
{
return uc_update_queries[command];
}
/*
Check if maximum queries per hour limit has been reached
......@@ -3077,6 +3081,8 @@ mysql_init_query(THD *thd)
thd->sent_row_count= thd->examined_row_count= 0;
thd->fatal_error= thd->rand_used= 0;
thd->possible_loops= 0;
if (opt_bin_log)
reset_dynamic(&thd->user_var_events);
DBUG_VOID_RETURN;
}
......
......@@ -136,6 +136,8 @@ bfill((A)->null_flags,(A)->null_bytes,255);\
#define BIN_LOG_HEADER_SIZE 4
#define FLOATING_POINT_BUFFER 331
/* Include prototypes for unireg */
#include "mysqld_error.h"
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment