Commit 8a5e5274 authored by unknown's avatar unknown

Fix for BUG#12335 (SP replication) : New binlogging strategy for stored PROCEDUREs/FUNCTIONs.

"Interleaved SPs execution is now binlogged properly, "SELECT spfunc()" is binlogged too.
The known remaining issue is binlogging/replication of "a routine is deleted while it is executed" scenario.


mysql-test/r/rpl_sp.result:
  Fix for BUG#12335: updated test cases/results
mysql-test/t/rpl_sp.test:
  Fix for BUG#12335: updated test cases/results
sql/item.cc:
  Fix for BUG#12335 (SP replication): 
   - Added Item_name_const 'function'
   - Addede 'delete reuse' to call dtor on item reuse
sql/item.h:
  Fix for BUG#12335 (SP replication) : Added Item_name_const 'function' + code cleanup
sql/item_create.cc:
   Fix for BUG#12335 (SP replication) : Added Item_name_const 'function'
sql/item_create.h:
   Fix for BUG#12335 (SP replication) : Added Item_name_const 'function'
sql/item_func.cc:
  Fix for BUG#12335 (SP replication) : binary log is now constrolled from within execute_function.
sql/lex.h:
  Fix for BUG#12335 (SP replication) : Added Item_name_const 'function'
sql/log.cc:
  Fix for BUG#12335 (SP replication) : Added MYSQL_LOG::{start|stop}_union_events to allow
  one to temporary disable binlogging but collect a 'union' information about binlog write
  calls.
sql/mysql_priv.h:
  Fix for BUG#12335 (SP replication)
sql/sp_head.cc:
  Fix for BUG#12335 (SP replication) : Now we use different SP binlogging strategy, grep for 
  StoredRoutinesBinlogging for details
sql/sp_head.h:
  Comments added
sql/sp_pcontext.h:
  Comments added
sql/sp_rcontext.h:
  Comments added
sql/sql_class.cc:
  Fix for BUG#12335 (SP replication) : Now we use different SP binlogging strategy, grep for 
  StoredRoutinesBinlogging for details
sql/sql_class.h:
  Fix for BUG#12335 (SP replication) : Added MYSQL_LOG::{start|stop}_union_events to allow
  one to temporary disable binlogging but collect a 'union' information about binlog write
  calls.
sql/sql_delete.cc:
  Fix for BUG#12335: check THD::query_str_binlog_unsuitable when writing to binlog.
sql/sql_insert.cc:
  Fix for BUG#12335: check THD::query_str_binlog_unsuitable when writing to binlog.
sql/sql_lex.cc:
  Fix for BUG#12335 (SP replication): Add ability to extract previous returned token from
  the tokenizer.
sql/sql_lex.h:
  Fix for BUG#12335 (SP replication): Add ability to extract previous returned token from
  the tokenizer.
sql/sql_parse.cc:
  Fix for BUG#12335 (SP replication) : Now we use different SP binlogging strategy, grep for 
  StoredRoutinesBinlogging for details
sql/sql_update.cc:
  Fix for BUG#12335: check THD::query_str_binlog_unsuitable when writing to binlog.
sql/sql_yacc.yy:
  Fix for BUG#12335 (SP replication) : When creating Item_splocal, remember where it is located
  in the query.
parent 9017addf
......@@ -60,7 +60,8 @@ set b = 8;
insert into t1 values (b);
insert into t1 values (unix_timestamp());
end
master-bin.000001 # Query 1 # use `mysqltest1`; call foo()
master-bin.000001 # Query 1 # use `mysqltest1`; insert into t1 values ( NAME_CONST('b',8))
master-bin.000001 # Query 1 # use `mysqltest1`; insert into t1 values (unix_timestamp())
select * from t1;
a
8
......@@ -76,8 +77,10 @@ reads sql data
select * from mysqltest1.t1;
call foo2();
a
show binlog events from 605;
show binlog events from 518;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Query 1 # use `mysqltest1`; insert into t1 values ( NAME_CONST('b',8))
master-bin.000001 # Query 1 # use `mysqltest1`; insert into t1 values (unix_timestamp())
master-bin.000001 # Query 1 # use `mysqltest1`; delete from t1
master-bin.000001 # Query 1 # use `mysqltest1`; create procedure foo2()
not deterministic
......@@ -124,7 +127,7 @@ alter procedure foo4 sql security invoker;
call foo4();
show warnings;
Level Code Message
show binlog events from 841;
show binlog events from 990;
Log_name Pos Event_type Server_id End_log_pos Info
master-bin.000001 # Query 1 # use `mysqltest1`; drop table t1
master-bin.000001 # Query 1 # use `mysqltest1`; create table t1 (a int)
......@@ -141,9 +144,12 @@ begin
insert into t2 values(3);
insert into t1 values (5);
end
master-bin.000001 # Query 1 # use `mysqltest1`; call foo3()
master-bin.000001 # Query 1 # use `mysqltest1`; insert into t2 values(3)
master-bin.000001 # Query 1 # use `mysqltest1`; insert into t1 values (15)
master-bin.000001 # Query 1 # use `mysqltest1`; insert into t2 values(3)
master-bin.000001 # Query 1 # use `mysqltest1`; alter procedure foo4 sql security invoker
master-bin.000001 # Query 1 # use `mysqltest1`; call foo4()
master-bin.000001 # Query 1 # use `mysqltest1`; insert into t2 values(3)
master-bin.000001 # Query 1 # use `mysqltest1`; insert into t1 values (5)
select * from t1;
a
15
......@@ -160,6 +166,8 @@ a
select * from t2;
a
3
3
3
select * from mysql.proc where name="foo4" and db='mysqltest1';
db name type specific_name language sql_data_access is_deterministic security_type param_list returns body definer created modified sql_mode comment
mysqltest1 foo4 PROCEDURE foo4 SQL CONTAINS_SQL YES INVOKER begin
......@@ -196,6 +204,7 @@ a
select * from t1;
a
21
20
select * from t2;
a
23
......
stop slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
reset master;
reset slave;
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
start slave;
drop procedure if exists p1;
drop procedure if exists p2;
drop function if exists f1;
drop table if exists t1,t2;
drop view if exists v1;
create table t1 (a int);
create procedure p1()
begin
declare spv int default 0;
while spv < 5 do
insert into t1 values(spv+1);
set spv=spv+1;
end while;
end//
call p1();
select * from t1;
a
1
2
3
4
5
delete from t1;
create procedure p2()
begin
declare a int default 4;
create table t2 as select a;
end//
call p2();
select * from t2;
a
4
select * from t2;
a
4
drop procedure p1;
drop procedure p2;
drop table t2;
create function f1(x int) returns int
begin
insert into t1 values(x);
return x+1;
end//
create procedure p1(a int, b int)
begin
declare v int default f1(5);
if (f1(6)) then
select 'yes';
end if;
set v = f1(7);
while f1(8) < 1 do
select 'this cant be';
end while;
end//
call p1(f1(1), f1(2));
yes
yes
select * from t1;
a
1
2
5
6
7
8
create table t2(a int);
insert into t2 values (10),(11);
select a,f1(a) from t2;
a f1(a)
10 11
11 12
insert into t2 select f1(3);
select 'master:',a from t1;
master: a
master: 1
master: 2
master: 5
master: 6
master: 7
master: 8
master: 10
master: 11
master: 3
select 'slave:',a from t1;
slave: a
slave: 1
slave: 2
slave: 5
slave: 6
slave: 7
slave: 8
slave: 10
slave: 11
slave: 3
drop procedure p1;
delete from t1;
delete from t2;
delete from t1;
insert into t2 values(1),(2);
create view v1 as select f1(a) from t2;
select * from v1;
f1(a)
2
3
select 'master:',a from t1;
master: a
master: 1
master: 2
select 'slave:',a from t1;
slave: a
slave: 1
slave: 2
drop view v1;
delete from t1;
prepare s1 from 'select f1(?)';
set @xx=123;
execute s1 using @xx;
f1(?)
124
select 'master:',a from t1;
master: a
master: 123
select 'slave:',a from t1;
slave: a
slave: 123
delete from t1;
create procedure p1(spv int)
begin
declare c cursor for select f1(spv) from t2;
while (spv > 2) do
open c;
fetch c into spv;
close c;
set spv= spv - 10;
end while;
end//
call p1(15);
select 'master:',a from t1;
master: a
master: 15
master: 15
master: 6
master: 6
select 'slave:',a from t1;
slave: a
slave: 15
slave: 15
slave: 6
slave: 6
drop procedure p1;
drop function f1;
drop table t1,t2;
create table t1 (a int);
create table t2 (a char(200));
create procedure p1()
begin
declare dummy int;
while ((select count(*) from t1) < 1) do
set dummy = sleep(1);
end while;
end//
create procedure p2()
begin
select f1();
call p1();
delete from t1 limit 1;
select f1();
call p1();
delete from t1 limit 1;
select f1();
end//
create function f1() returns int
begin
insert into t2 values('f1-r1');
return 0;
end//
call p2();
drop function f1//
create function f1() returns int
begin
insert into t2 values('f1-r2');
return 0;
end//
insert into t1 values (1) //
call p1()//
drop function f1//
create function f1() returns int
begin
insert into t2 values('f1-r3');
return 0;
end//
insert into t1 values (1) //
call p1()//
f1()
0
f1()
0
f1()
0
select * from t2;
a
f1-r1
f1-r1
f1-r1
select * from t2;
a
f1-r1
f1-r3
f1-r3
drop table t1;
......@@ -83,7 +83,7 @@ create procedure foo2()
call foo2();
# verify CALL is not in binlog
--replace_column 2 # 5 #
show binlog events from 605;
show binlog events from 518;
--error 1418;
alter procedure foo2 contains sql;
......@@ -147,7 +147,7 @@ show warnings;
# Check that only successful CALLs are in binlog
--replace_column 2 # 5 #
show binlog events from 841;
show binlog events from 990;
# Note that half-failed CALLs are not in binlog, which is a known
# bug. If we compare t2 on master and slave we see they differ:
......
--log_bin_trust_routine_creators=1
--log_bin_trust_routine_creators=1
# Test of replication of stored procedures (WL#2146 for MySQL 5.0)
source include/master-slave.inc;
# ****************************************************************
connection master;
# cleanup
--disable_warnings
drop procedure if exists p1;
drop procedure if exists p2;
drop function if exists f1;
drop table if exists t1,t2;
drop view if exists v1;
--enable_warnings
create table t1 (a int);
# 1. Test simple variables use.
delimiter //;
create procedure p1()
begin
declare spv int default 0;
while spv < 5 do
insert into t1 values(spv+1);
set spv=spv+1;
end while;
end//
delimiter ;//
call p1();
sync_slave_with_master;
connection slave;
select * from t1;
connection master;
delete from t1;
# 2. Test SP variable name
delimiter //;
create procedure p2()
begin
declare a int default 4;
create table t2 as select a;
end//
delimiter ;//
call p2();
select * from t2;
sync_slave_with_master;
connection slave;
select * from t2;
connection master;
drop procedure p1;
drop procedure p2;
drop table t2;
# 3. Test FUNCTIONs in various places
delimiter //;
create function f1(x int) returns int
begin
insert into t1 values(x);
return x+1;
end//
create procedure p1(a int, b int)
begin
declare v int default f1(5);
if (f1(6)) then
select 'yes';
end if;
set v = f1(7);
while f1(8) < 1 do
select 'this cant be';
end while;
end//
delimiter ;//
call p1(f1(1), f1(2));
select * from t1;
create table t2(a int);
insert into t2 values (10),(11);
select a,f1(a) from t2;
# This shouldn't put separate 'call f1(3)' into binlog:
insert into t2 select f1(3);
select 'master:',a from t1;
sync_slave_with_master;
connection slave;
select 'slave:',a from t1;
connection master;
drop procedure p1;
delete from t1;
delete from t2;
# 4. VIEWs
delete from t1;
insert into t2 values(1),(2);
create view v1 as select f1(a) from t2;
select * from v1;
select 'master:',a from t1;
sync_slave_with_master;
connection slave;
select 'slave:',a from t1;
connection master;
drop view v1;
delete from t1;
# 5. Prepared statements.
prepare s1 from 'select f1(?)';
set @xx=123;
execute s1 using @xx;
select 'master:',a from t1;
sync_slave_with_master;
connection slave;
select 'slave:',a from t1;
connection master;
delete from t1;
# 5. Cursors.
# t2 has (1),(2);
delimiter //;
create procedure p1(spv int)
begin
declare c cursor for select f1(spv) from t2;
while (spv > 2) do
open c;
fetch c into spv;
close c;
set spv= spv - 10;
end while;
end//
delimiter ;//
call p1(15);
select 'master:',a from t1;
sync_slave_with_master;
connection slave;
select 'slave:',a from t1;
connection master;
drop procedure p1;
drop function f1;
drop table t1,t2;
sync_slave_with_master;
# The following will produce incorrect results:
connection master;
create table t1 (a int);
create table t2 (a char(200));
delimiter //;
create procedure p1()
begin
declare dummy int;
while ((select count(*) from t1) < 1) do
set dummy = sleep(1);
end while;
end//
create procedure p2()
begin
select f1();
call p1();
delete from t1 limit 1;
select f1();
call p1();
delete from t1 limit 1;
select f1();
end//
create function f1() returns int
begin
insert into t2 values('f1-r1');
return 0;
end//
delimiter ;//
connection master;
send call p2();
connection master1;
delimiter //;
drop function f1//
create function f1() returns int
begin
insert into t2 values('f1-r2');
return 0;
end//
insert into t1 values (1) //
call p1()//
drop function f1//
create function f1() returns int
begin
insert into t2 values('f1-r3');
return 0;
end//
insert into t1 values (1) //
call p1()//
delimiter ;//
connection master;
reap;
select * from t2;
connection slave;
select * from t2;
# Clean up
connection master;
drop table t1;
sync_slave_with_master;
......@@ -303,6 +303,7 @@ void *Item::operator new(size_t size, Item *reuse, uint *rsize)
if (rsize)
(*rsize)= reuse->rsize;
reuse->cleanup();
delete reuse;
TRASH((void *)reuse, size);
return (void *)reuse;
}
......@@ -789,12 +790,15 @@ int Item::save_in_field_no_warnings(Field *field, bool no_conversions)
}
/*****************************************************************************
Item_splocal methods
*****************************************************************************/
double Item_splocal::val_real()
{
DBUG_ASSERT(fixed);
Item *it= this_item();
double ret= it->val_real();
Item::null_value= it->null_value;
null_value= it->null_value;
return ret;
}
......@@ -804,7 +808,7 @@ longlong Item_splocal::val_int()
DBUG_ASSERT(fixed);
Item *it= this_item();
longlong ret= it->val_int();
Item::null_value= it->null_value;
null_value= it->null_value;
return ret;
}
......@@ -814,7 +818,7 @@ String *Item_splocal::val_str(String *sp)
DBUG_ASSERT(fixed);
Item *it= this_item();
String *ret= it->val_str(sp);
Item::null_value= it->null_value;
null_value= it->null_value;
return ret;
}
......@@ -824,7 +828,7 @@ my_decimal *Item_splocal::val_decimal(my_decimal *decimal_value)
DBUG_ASSERT(fixed);
Item *it= this_item();
my_decimal *val= it->val_decimal(decimal_value);
Item::null_value= it->null_value;
null_value= it->null_value;
return val;
}
......@@ -833,7 +837,7 @@ bool Item_splocal::is_null()
{
Item *it= this_item();
bool ret= it->is_null();
Item::null_value= it->null_value;
null_value= it->null_value;
return ret;
}
......@@ -898,6 +902,97 @@ void Item_splocal::print(String *str)
}
/*****************************************************************************
Item_name_const methods
*****************************************************************************/
double Item_name_const::val_real()
{
DBUG_ASSERT(fixed);
double ret= value_item->val_real();
null_value= value_item->null_value;
return ret;
}
longlong Item_name_const::val_int()
{
DBUG_ASSERT(fixed);
longlong ret= value_item->val_int();
null_value= value_item->null_value;
return ret;
}
String *Item_name_const::val_str(String *sp)
{
DBUG_ASSERT(fixed);
String *ret= value_item->val_str(sp);
null_value= value_item->null_value;
return ret;
}
my_decimal *Item_name_const::val_decimal(my_decimal *decimal_value)
{
DBUG_ASSERT(fixed);
my_decimal *val= value_item->val_decimal(decimal_value);
Item::null_value= value_item->null_value;
return val;
}
bool Item_name_const::is_null()
{
bool ret= value_item->is_null();
Item::null_value= value_item->null_value;
return ret;
}
Item::Type Item_name_const::type() const
{
return value_item->type();
}
bool Item_name_const::fix_fields(THD *thd, Item **)
{
char buf[128];
String *item_name;
String s(buf, sizeof(buf), &my_charset_bin);
s.length(0);
if (value_item->fix_fields(thd, &value_item) ||
name_item->fix_fields(thd, &name_item))
return TRUE;
if (!(value_item->const_item() && name_item->const_item()))
return TRUE;
if (!(item_name= name_item->val_str(&s)))
return TRUE; /* Can't have a NULL name */
set_name(item_name->ptr(), (uint) item_name->length(), system_charset_info);
max_length= value_item->max_length;
decimals= value_item->decimals;
fixed= 1;
return FALSE;
}
void Item_name_const::cleanup()
{
fixed= 0;
}
void Item_name_const::print(String *str)
{
str->append("NAME_CONST(");
name_item->print(str);
str->append(',');
value_item->print(str);
str->append(')');
}
/*
Move SUM items out from item tree and replace with reference
......
......@@ -700,20 +700,40 @@ class Item {
};
// A local SP variable (incl. parameters), used in runtime
/*
A reference to local SP variable (incl. reference to SP parameter), used in
runtime.
NOTE
This item has a "value" item, defined as
this_item() = thd->spcont->get_item(m_offset)
and it delegates everything to that item (if !this_item() then this item
poses as Item_null) except for name, which is the name of SP local
variable.
*/
class Item_splocal : public Item
{
private:
uint m_offset;
public:
LEX_STRING m_name;
public:
/*
Position of this reference to SP variable in the statement (the
statement itself is in sp_instr_stmt::m_query).
This is valid only for references to SP variables in statements,
excluding DECLARE CURSOR statement. It is used to replace references to SP
variables with NAME_CONST calls when putting statements into the binary
log.
Value of 0 means that this object doesn't corresponding to reference to
SP variable in query text.
*/
int pos_in_query;
Item_splocal(LEX_STRING name, uint offset)
: m_offset(offset), m_name(name)
Item_splocal(LEX_STRING name, uint offset, int pos_in_q=0)
: m_offset(offset), m_name(name), pos_in_query(pos_in_q)
{
Item::maybe_null= TRUE;
maybe_null= TRUE;
}
/* For error printing */
......@@ -750,7 +770,7 @@ class Item_splocal : public Item
bool is_null();
void print(String *str);
inline void make_field(Send_field *field)
void make_field(Send_field *field)
{
Item *it= this_item();
......@@ -761,28 +781,84 @@ class Item_splocal : public Item
it->make_field(field);
}
inline Item_result result_type() const
Item_result result_type() const
{
return this_const_item()->result_type();
}
inline bool const_item() const
bool const_item() const
{
return TRUE;
}
inline int save_in_field(Field *field, bool no_conversions)
int save_in_field(Field *field, bool no_conversions)
{
return this_item()->save_in_field(field, no_conversions);
}
inline bool send(Protocol *protocol, String *str)
bool send(Protocol *protocol, String *str)
{
return this_item()->send(protocol, str);
}
};
/*
NAME_CONST(given_name, const_value).
This 'function' has all properties of the supplied const_value (which is
assumed to be a literal constant), and the name given_name.
This is used to replace references to SP variables when we write PROCEDURE
statements into the binary log.
TODO
Together with Item_splocal and Item::this_item() we can actually extract
common a base of this class and Item_splocal. Maybe it is possible to
extract a common base with class Item_ref, too.
*/
class Item_name_const : public Item
{
Item *value_item;
Item *name_item;
public:
Item_name_const(Item *name, Item *val): value_item(val), name_item(name)
{
Item::maybe_null= TRUE;
}
bool fix_fields(THD *, Item **);
void cleanup();
enum Type type() const;
double val_real();
longlong val_int();
String *val_str(String *sp);
my_decimal *val_decimal(my_decimal *);
bool is_null();
void print(String *str);
Item_result result_type() const
{
return value_item->result_type();
}
bool const_item() const
{
return TRUE;
}
int save_in_field(Field *field, bool no_conversions)
{
return value_item->save_in_field(field, no_conversions);
}
inline bool send(Protocol *protocol, String *str)
{
return value_item->send(protocol, str);
}
};
bool agg_item_collations(DTCollation &c, const char *name,
Item **items, uint nitems, uint flags= 0);
bool agg_item_collations_for_comparison(DTCollation &c, const char *name,
......
......@@ -261,6 +261,11 @@ Item *create_func_mod(Item* a, Item *b)
return new Item_func_mod(a,b);
}
Item *create_func_name_const(Item *a, Item *b)
{
return new Item_name_const(a,b);
}
Item *create_func_monthname(Item* a)
{
return new Item_func_monthname(a);
......
......@@ -65,6 +65,7 @@ Item *create_func_ltrim(Item* a);
Item *create_func_md5(Item* a);
Item *create_func_mod(Item* a, Item *b);
Item *create_func_monthname(Item* a);
Item *create_func_name_const(Item *a, Item *b);
Item *create_func_nullif(Item* a, Item *b);
Item *create_func_oct(Item *);
Item *create_func_ord(Item* a);
......
......@@ -4717,7 +4717,11 @@ Item_func_sp::execute(Item **itp)
m_sp->m_db.str, m_sp->m_name.str, 0, 0))
goto error_check_ctx;
#endif
/*
Disable the binlogging if this is not a SELECT statement. If this is a
SELECT, leave binlogging on, so execute_function() code writes the
function call into binlog.
*/
thd->reset_sub_statement_state(&statement_state, SUB_STMT_FUNCTION);
res= m_sp->execute_function(thd, args, arg_count, itp);
thd->restore_sub_statement_state(&statement_state);
......
......@@ -696,6 +696,7 @@ static SYMBOL sql_functions[] = {
{ "MULTIPOINTFROMWKB",SYM(GEOMFROMWKB)},
{ "MULTIPOLYGONFROMTEXT",SYM(MPOLYFROMTEXT)},
{ "MULTIPOLYGONFROMWKB",SYM(GEOMFROMWKB)},
{ "NAME_CONST", F_SYM(FUNC_ARG2),0,CREATE_FUNC(create_func_name_const)},
{ "NOW", SYM(NOW_SYM)},
{ "NULLIF", F_SYM(FUNC_ARG2),0,CREATE_FUNC(create_func_nullif)},
{ "NUMGEOMETRIES", F_SYM(FUNC_ARG1),0,CREATE_FUNC_GEOM(create_func_numgeometries)},
......
......@@ -1553,6 +1553,20 @@ bool MYSQL_LOG::flush_and_sync()
return err;
}
void MYSQL_LOG::start_union_events(THD *thd)
{
DBUG_ASSERT(!thd->binlog_evt_union.do_union);
thd->binlog_evt_union.do_union= TRUE;
thd->binlog_evt_union.unioned_events= FALSE;
thd->binlog_evt_union.unioned_events_trans= FALSE;
}
void MYSQL_LOG::stop_union_events(THD *thd)
{
DBUG_ASSERT(thd->binlog_evt_union.do_union);
thd->binlog_evt_union.do_union= FALSE;
}
/*
Write an event to the binary log
*/
......@@ -1563,6 +1577,13 @@ bool MYSQL_LOG::write(Log_event *event_info)
bool error= 1;
DBUG_ENTER("MYSQL_LOG::write(Log_event *)");
if (thd->binlog_evt_union.do_union)
{
thd->binlog_evt_union.unioned_events= TRUE;
thd->binlog_evt_union.unioned_events_trans |= event_info->cache_stmt;
DBUG_RETURN(0);
}
pthread_mutex_lock(&LOCK_log);
/*
......
......@@ -34,6 +34,7 @@
#include <thr_lock.h>
#include <my_base.h> /* Needed by field.h */
#include "sql_bitmap.h"
#include "sql_array.h"
#ifdef __EMX__
#undef write /* remove pthread.h macro definition for EMX */
......
......@@ -588,8 +588,155 @@ sp_head::make_field(uint max_length, const char *name, TABLE *dummy)
DBUG_RETURN(field);
}
int
sp_head::execute(THD *thd)
int cmp_splocal_locations(Item_splocal * const *a, Item_splocal * const *b)
{
return (int)((*a)->pos_in_query - (*b)->pos_in_query);
}
/*
StoredRoutinesBinlogging
Top-down overview:
1. Statements
Statements that have is_update_query(stmt) == TRUE are written into the
binary log verbatim.
Examples:
UPDATE tbl SET tbl.x = spfunc_w_side_effects()
UPDATE tbl SET tbl.x=1 WHERE spfunc_w_side_effect_that_returns_false(tbl.y)
Statements that have is_update_query(stmt) == FALSE (e.g. SELECTs) are not
written into binary log. Instead we catch function calls the statement
makes and write it into binary log separately (see #3).
We actually can easily write SELECT statements into the binary log in the
right order (we don't have issues with const tables being unlocked early
because SELECTs that use FUNCTIONs unlock all tables at once) We don't do
it because replication slave thread currently can't execute SELECT
statements. Fixing this is on the TODO.
2. PROCEDURE calls
CALL statements are not written into binary log. Instead
* Any FUNCTION invocation (in SET, IF, WHILE, OPEN CURSOR and other SP
instructions) is written into binlog separately.
* Each statement executed in SP is binlogged separately, according to rules
in #1, with the exception that we modify query string: we replace uses
of SP local variables with NAME_CONST('spvar_name', <spvar-value>) calls.
This substitution is done in subst_spvars().
3. FUNCTION calls
In sp_head::execute_function(), we check
* If this function invocation is done from a statement that is written
into the binary log.
* If there were any attempts to write events to the binary log during
function execution.
If the answers are No and Yes, we write the function call into the binary
log as "DO spfunc(<param1value>, <param2value>, ...)"
*/
/*
Replace thd->query{_length} with a string that one can write to the binlog.
SYNOPSIS
subst_spvars()
thd Current thread.
instr Instruction (we look for Item_splocal instances in
instr->free_list)
query_str Original query string
DESCRIPTION
The binlog-suitable string is produced by replacing references to SP local
variables with NAME_CONST('sp_var_name', value) calls.
RETURN
0 Ok, thd->query{_length} either has been appropraiately replaced or
there is no need for replacements.
1 Out of memory error.
*/
static bool subst_spvars(THD *thd, sp_instr *instr, LEX_STRING *query_str)
{
DBUG_ENTER("subst_spvars");
if (thd->prelocked_mode == NON_PRELOCKED && mysql_bin_log.is_open())
{
Dynamic_array<Item_splocal*> sp_vars_uses;
/* Find all instances of item_splocal used in this statement */
for (Item *item= instr->free_list; item; item= item->next)
{
if (item->is_splocal() && ((Item_splocal*)item)->pos_in_query)
sp_vars_uses.append((Item_splocal*)item);
}
if (!sp_vars_uses.elements())
DBUG_RETURN(0);
/* Sort SP var refs by their occurences in the query */
sp_vars_uses.sort(cmp_splocal_locations);
/*
Construct a statement string where SP local var refs are replaced
with "NAME_CONST(name, value)"
*/
char buffer[512];
String qbuf(buffer, sizeof(buffer), &my_charset_bin);
qbuf.length(0);
char *cur= query_str->str;
int prev_pos= 0;
int res= 0;
for (Item_splocal **splocal= sp_vars_uses.front();
splocal < sp_vars_uses.back(); splocal++)
{
/* append the text between sp ref occurences */
res |= qbuf.append(cur + prev_pos, (*splocal)->pos_in_query - prev_pos);
prev_pos= (*splocal)->pos_in_query + (*splocal)->m_name.length;
/* append the spvar substitute */
res |= qbuf.append(" NAME_CONST('");
res |= qbuf.append((*splocal)->m_name.str, (*splocal)->m_name.length);
res |= qbuf.append("',");
Item *val= (*splocal)->this_item();
DBUG_PRINT("info", ("print %p", val));
val->print(&qbuf);
res |= qbuf.append(')');
if (res)
break;
}
res |= qbuf.append(cur + prev_pos, query_str->length - prev_pos);
if (res)
DBUG_RETURN(1);
char *pbuf= thd->alloc(qbuf.length()+1);
if (!pbuf)
DBUG_RETURN(1);
memcpy(pbuf, qbuf.ptr(), qbuf.length()+1);
thd->query= pbuf;
thd->query_length= qbuf.length();
}
DBUG_RETURN(0);
}
/*
Execute the routine. The main instruction jump loop is there
Assume the parameters already set.
RETURN
-1 on error
*/
int sp_head::execute(THD *thd)
{
DBUG_ENTER("sp_head::execute");
char olddb[128];
......@@ -797,9 +944,31 @@ sp_head::execute(THD *thd)
}
/*
Execute a function:
- evaluate parameters
- call sp_head::execute
- evaluate the return value
SYNOPSIS
sp_head::execute_function()
thd Thread handle
argp Passed arguments (these are items from containing statement?)
argcount Number of passed arguments. We need to check if this is
correct.
resp OUT Put result item here (q: is it a constant Item always?)
RETURN
0 on OK
other on error
*/
int
sp_head::execute_function(THD *thd, Item **argp, uint argcount, Item **resp)
{
Item **param_values;
ulonglong binlog_save_options;
bool need_binlog_call;
DBUG_ENTER("sp_head::execute_function");
DBUG_PRINT("info", ("function %s", m_name.str));
uint csize = m_pcont->max_pvars();
......@@ -823,6 +992,8 @@ sp_head::execute_function(THD *thd, Item **argp, uint argcount, Item **resp)
goto end;
}
if (!(param_values= (Item**)thd->alloc(sizeof(Item*)*argcount)))
DBUG_RETURN(-1);
// QQ Should have some error checking here? (types, etc...)
if (!(nctx= new sp_rcontext(csize, hmax, cmax)))
......@@ -831,6 +1002,7 @@ sp_head::execute_function(THD *thd, Item **argp, uint argcount, Item **resp)
{
sp_pvar_t *pvar = m_pcont->find_pvar(i);
Item *it= sp_eval_func_item(thd, argp++, pvar->type, NULL, FALSE);
param_values[i]= it;
if (!it)
goto end; // EOM error
......@@ -855,7 +1027,47 @@ sp_head::execute_function(THD *thd, Item **argp, uint argcount, Item **resp)
}
thd->spcont= nctx;
binlog_save_options= thd->options;
need_binlog_call= mysql_bin_log.is_open() && (thd->options & OPTION_BIN_LOG);
if (need_binlog_call)
mysql_bin_log.start_union_events(thd);
thd->options&= ~OPTION_BIN_LOG;
ret= execute(thd);
thd->options= binlog_save_options;
if (need_binlog_call)
mysql_bin_log.stop_union_events(thd);
if (thd->binlog_evt_union.unioned_events)
{
char buf[64];
String bufstr(buf, sizeof(buf), &my_charset_bin);
bufstr.length(0);
bufstr.append("DO ", 3);
append_identifier(thd, &bufstr, m_name.str, m_name.length);
bufstr.append('(');
for (uint i=0; i < argcount; i++)
{
if (i)
bufstr.append(',');
param_values[i]->print(&bufstr);
}
bufstr.append(')');
if (mysql_bin_log.is_open())
{
bool transactional_table= FALSE;
Query_log_event qinfo(thd, bufstr.ptr(), bufstr.length(),
thd->binlog_evt_union.unioned_events_trans, FALSE);
if (mysql_bin_log.write(&qinfo) && transactional_table)
{
push_warning(thd, MYSQL_ERROR::WARN_LEVEL_WARN, ER_UNKNOWN_ERROR,
"Invoked ROUTINE modified a transactional table but MYSQL"
"failed to reflect this change in the binary log.");
}
}
}
if (m_type == TYPE_ENUM_FUNCTION && ret == 0)
{
......@@ -893,6 +1105,25 @@ static Item_func_get_user_var *item_is_user_var(Item *it)
}
/*
Execute a procedure.
SYNOPSIS
sp_head::execute_procedure()
thd Thread handle
args List of values passed as arguments.
DESCRIPTION
The function does the following steps:
- Set all parameters
- call sp_head::execute
- copy back values of INOUT and OUT parameters
RETURN
0 Ok
-1 Error
*/
int sp_head::execute_procedure(THD *thd, List<Item> *args)
{
int ret= 0;
......@@ -1447,8 +1678,12 @@ sp_head::show_create_function(THD *thd)
DBUG_RETURN(res);
}
void
sp_head::optimize()
/*
TODO: what does this do??
*/
void sp_head::optimize()
{
List<sp_instr> bp;
sp_instr *i;
......@@ -1636,7 +1871,6 @@ int sp_instr::exec_core(THD *thd, uint *nextp)
return 0;
}
/*
sp_instr_stmt class functions
*/
......@@ -1646,9 +1880,9 @@ sp_instr_stmt::execute(THD *thd, uint *nextp)
{
char *query;
uint32 query_length;
int res;
DBUG_ENTER("sp_instr_stmt::execute");
DBUG_PRINT("info", ("command: %d", m_lex_keeper.sql_command()));
int res;
query= thd->query;
query_length= thd->query_length;
......@@ -1657,8 +1891,10 @@ sp_instr_stmt::execute(THD *thd, uint *nextp)
if (query_cache_send_result_to_client(thd,
thd->query, thd->query_length) <= 0)
{
thd->query_str_binlog_unsuitable= subst_spvars(thd, this, &m_query);
res= m_lex_keeper.reset_lex_and_exec_core(thd, nextp, FALSE, this);
query_cache_end_of_result(thd);
thd->query_str_binlog_unsuitable= FALSE;
}
else
*nextp= m_ip+1;
......
......@@ -326,10 +326,22 @@ class sp_instr :public Query_arena, public Sql_alloc
virtual ~sp_instr()
{ free_items(); }
// Execute this instrution. '*nextp' will be set to the index of the next
// instruction to execute. (For most instruction this will be the
// instruction following this one.)
// Returns 0 on success, non-zero if some error occured.
/*
Execute this instruction
SYNOPSIS
execute()
thd Thread handle
nextp OUT index of the next instruction to execute. (For most
instructions this will be the instruction following this
one).
RETURN
0 on success,
other if some error occured
*/
virtual int execute(THD *thd, uint *nextp) = 0;
/*
......@@ -339,7 +351,7 @@ class sp_instr :public Query_arena, public Sql_alloc
Should be implemented for instructions using expressions or whole
statements (thus having to have own LEX). Used in concert with
sp_lex_keeper class and its descendants.
sp_lex_keeper class and its descendants (there are none currently).
*/
virtual int exec_core(THD *thd, uint *nextp);
......@@ -808,6 +820,7 @@ class sp_instr_hreturn : public sp_instr_jump
}; // class sp_instr_hreturn : public sp_instr
/* This is DECLARE CURSOR */
class sp_instr_cpush : public sp_instr
{
sp_instr_cpush(const sp_instr_cpush &); /* Prevent use of these */
......
......@@ -72,6 +72,11 @@ typedef struct sp_cond
sp_cond_type_t *val;
} sp_cond_t;
/*
This seems to be an "SP parsing context" or something.
*/
class sp_pcontext : public Sql_alloc
{
sp_pcontext(const sp_pcontext &); /* Prevent use of these */
......
......@@ -41,6 +41,16 @@ typedef struct
uint foffset; // Frame offset for the handlers declare level
} sp_handler_t;
/*
This is a run context? of one SP ?
THis is
- a stack of cursors?
- a stack of handlers?
- a stack of Items ?
- a stack of instruction locations in SP?
*/
class sp_rcontext : public Sql_alloc
{
sp_rcontext(const sp_rcontext &); /* Prevent use of these */
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <my_sys.h>
/*
A typesafe wrapper around DYNAMIC_ARRAY
*/
template <class Elem> class Dynamic_array
{
DYNAMIC_ARRAY array;
public:
Dynamic_array(uint prealloc=16, uint increment=16)
{
my_init_dynamic_array(&array, sizeof(Elem), prealloc, increment);
}
Elem& at(int idx)
{
return *(((Elem*)array.buffer) + idx);
}
Elem *front()
{
return (Elem*)array.buffer;
}
Elem *back()
{
return ((Elem*)array.buffer) + array.elements;
}
bool append(Elem &el)
{
return (insert_dynamic(&array, (gptr)&el));
}
int elements()
{
return array.elements;
}
~Dynamic_array()
{
delete_dynamic(&array);
}
typedef int (*CMP_FUNC)(const Elem *el1, const Elem *el2);
void sort(CMP_FUNC cmp_func)
{
qsort(array.buffer, array.elements, sizeof(Elem), (qsort_cmp)cmp_func);
}
};
......@@ -178,7 +178,7 @@ THD::THD()
rand_used(0), time_zone_used(0),
last_insert_id_used(0), insert_id_used(0), clear_next_insert_id(0),
in_lock_tables(0), bootstrap(0), derived_tables_processing(FALSE),
spcont(NULL)
spcont(NULL), query_str_binlog_unsuitable(FALSE)
{
current_arena= this;
host= user= priv_user= db= ip= 0;
......@@ -210,6 +210,7 @@ THD::THD()
db_charset= global_system_variables.collation_database;
bzero(ha_data, sizeof(ha_data));
mysys_var=0;
binlog_evt_union.do_union= FALSE;
#ifndef DBUG_OFF
dbug_sentry=THD_SENTRY_MAGIC;
#endif
......@@ -1888,6 +1889,7 @@ void THD::reset_sub_statement_state(Sub_statement_state *backup,
backup->cuted_fields= cuted_fields;
backup->client_capabilities= client_capabilities;
if (!lex->requires_prelocking() || is_update_query(lex->sql_command))
options&= ~OPTION_BIN_LOG;
/* Disable result sets */
client_capabilities &= ~CLIENT_MULTI_RESULTS;
......
......@@ -311,6 +311,9 @@ class MYSQL_LOG: public TC_LOG
bool write(Log_event* event_info); // binary log write
bool write(THD *thd, IO_CACHE *cache, Log_event *commit_event);
void start_union_events(THD *thd);
void stop_union_events(THD *thd);
/*
v stands for vector
invoked as appendv(buf1,len1,buf2,len2,...,bufn,lenn,0)
......@@ -1347,6 +1350,33 @@ class THD :public Statement,
long long_value;
} sys_var_tmp;
/*
If true, thd->query is not a suitable query to write to binary log. This
is not handled everywhere currently - we check it only in statements
that can have SP variable references.
*/
bool query_str_binlog_unsuitable;
struct {
/*
If true, mysql_bin_log::write(Log_event) call will not write events to
binlog, and maintain 2 below variables instead (use
mysql_bin_log.start_union_events to turn this on)
*/
bool do_union;
/*
If TRUE, at least one mysql_bin_log::write(Log_event) call has been
made after last mysql_bin_log.start_union_events() call.
*/
bool unioned_events;
/*
If TRUE, at least one mysql_bin_log::write(Log_event e), where
e.cache_stmt == TRUE call has been made after last
mysql_bin_log.start_union_events() call.
*/
bool unioned_events_trans;
} binlog_evt_union;
THD();
~THD();
......@@ -1968,7 +1998,12 @@ class multi_delete :public select_result_interceptor
ha_rows deleted, found;
uint num_of_tables;
int error;
bool do_delete, transactional_tables, normal_tables, delete_while_scanning;
bool do_delete;
/* True if at least one table we delete from is transactional */
bool transactional_tables;
/* True if at least one table we delete from is not transactional */
bool normal_tables;
bool delete_while_scanning;
public:
multi_delete(TABLE_LIST *dt, uint num_of_tables);
......@@ -1995,7 +2030,10 @@ class multi_update :public select_result_interceptor
uint table_count;
Copy_field *copy_field;
enum enum_duplicates handle_duplicates;
bool do_update, trans_safe, transactional_tables, ignore;
bool do_update, trans_safe;
/* True if the update operation has made a change in a transactional table */
bool transactional_tables;
bool ignore;
public:
multi_update(TABLE_LIST *ut, TABLE_LIST *leaves_list,
......
......@@ -254,7 +254,8 @@ bool mysql_delete(THD *thd, TABLE_LIST *table_list, COND *conds,
thd->clear_error();
Query_log_event qinfo(thd, thd->query, thd->query_length,
transactional_table, FALSE);
if (mysql_bin_log.write(&qinfo) && transactional_table)
if ((thd->query_str_binlog_unsuitable || mysql_bin_log.write(&qinfo))
&& transactional_table)
error=1;
}
if (!transactional_table)
......@@ -719,7 +720,8 @@ bool multi_delete::send_eof()
thd->clear_error();
Query_log_event qinfo(thd, thd->query, thd->query_length,
transactional_tables, FALSE);
if (mysql_bin_log.write(&qinfo) && !normal_tables)
if ((thd->query_str_binlog_unsuitable || mysql_bin_log.write(&qinfo))
&& !normal_tables)
local_error=1; // Log write failed: roll back the SQL statement
}
if (!transactional_tables)
......
......@@ -592,7 +592,8 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list,
thd->clear_error();
Query_log_event qinfo(thd, thd->query, thd->query_length,
transactional_table, FALSE);
if (mysql_bin_log.write(&qinfo) && transactional_table)
if ((thd->query_str_binlog_unsuitable ||
mysql_bin_log.write(&qinfo)) && transactional_table)
error=1;
}
if (!transactional_table)
......
......@@ -517,6 +517,10 @@ int yylex(void *arg, void *yythd)
uchar *ident_map= cs->ident_map;
lex->yylval=yylval; // The global state
lex->tok_end_prev= lex->tok_end;
lex->tok_start_prev= lex->tok_start;
lex->tok_start=lex->tok_end=lex->ptr;
state=lex->next_state;
lex->next_state=MY_LEX_OPERATOR_OR_IDENT;
......
......@@ -703,6 +703,10 @@ typedef struct st_lex
SELECT_LEX *all_selects_list;
uchar *buf; /* The beginning of string, used by SPs */
uchar *ptr,*tok_start,*tok_end,*end_of_query;
/* The values of tok_start/tok_end as they were one call of yylex before */
uchar *tok_start_prev, *tok_end_prev;
char *length,*dec,*change,*name;
char *help_arg;
char *backup_dir; /* For RESTORE/BACKUP */
......
......@@ -2253,6 +2253,7 @@ bool alloc_query(THD *thd, char *packet, ulong packet_length)
return FALSE;
}
/****************************************************************************
** mysql_execute_command
** Execute command saved in thd and current_lex->sql_command
......@@ -4217,28 +4218,16 @@ mysql_execute_command(THD *thd)
thd->variables.select_limit= HA_POS_ERROR;
thd->row_count_func= 0;
tmp_disable_binlog(thd); /* don't binlog the substatements */
res= sp->execute_procedure(thd, &lex->value_list);
reenable_binlog(thd);
/*
We write CALL to binlog; on the opposite we didn't write the
substatements. That choice is necessary because the substatements
may use local vars.
Binlogging should happen when all tables are locked. They are locked
just above, and unlocked by close_thread_tables(). All tables which
are to be updated are locked like with a table-level write lock, and
this also applies to InnoDB (I tested - note that it reduces
InnoDB's concurrency as we don't use row-level locks). So binlogging
below is safe.
Note the limitation: if the SP returned an error, but still did some
updates, we do NOT binlog it. This is because otherwise "permission
denied", "table does not exist" etc would stop the slave quite
often. There is no easy way to know if the SP updated something
(even no_trans_update is not suitable, as it may be a transactional
autocommit update which happened, and no_trans_update covers only
INSERT/UPDATE/LOAD).
We never write CALL statements int binlog:
- If the mode is non-prelocked, each statement will be logged
separately.
- If the mode is prelocked, the invoking statement will care
about writing into binlog.
So just execute the statement.
*/
res= sp->execute_procedure(thd, &lex->value_list);
if (mysql_bin_log.is_open() &&
(sp->m_chistics->daccess == SP_CONTAINS_SQL ||
sp->m_chistics->daccess == SP_MODIFIES_SQL_DATA))
......@@ -4248,11 +4237,7 @@ mysql_execute_command(THD *thd)
ER_FAILED_ROUTINE_BREAK_BINLOG,
ER(ER_FAILED_ROUTINE_BREAK_BINLOG));
else
{
thd->clear_error();
Query_log_event qinfo(thd, thd->query, thd->query_length, 0, FALSE);
mysql_bin_log.write(&qinfo);
}
}
/*
......@@ -5405,8 +5390,10 @@ void mysql_parse(THD *thd, char *inBuf, uint length)
if (query_cache_send_result_to_client(thd, inBuf, length) <= 0)
{
LEX *lex= thd->lex;
sp_cache_flush_obsolete(&thd->sp_proc_cache);
sp_cache_flush_obsolete(&thd->sp_func_cache);
if (!yyparse((void *)thd) && ! thd->is_fatal_error)
{
#ifndef NO_EMBEDDED_ACCESS_CHECKS
......
......@@ -475,7 +475,8 @@ int mysql_update(THD *thd,
thd->clear_error();
Query_log_event qinfo(thd, thd->query, thd->query_length,
transactional_table, FALSE);
if (mysql_bin_log.write(&qinfo) && transactional_table)
if ((thd->query_str_binlog_unsuitable || mysql_bin_log.write(&qinfo))
&& transactional_table)
error=1; // Rollback update
}
if (!transactional_table)
......@@ -1441,7 +1442,8 @@ bool multi_update::send_eof()
thd->clear_error();
Query_log_event qinfo(thd, thd->query, thd->query_length,
transactional_tables, FALSE);
if (mysql_bin_log.write(&qinfo) && trans_safe)
if ((thd->query_str_binlog_unsuitable || mysql_bin_log.write(&qinfo))
&& trans_safe)
local_error= 1; // Rollback update
}
if (!transactional_tables)
......
......@@ -7133,10 +7133,13 @@ simple_ident:
sp_pvar_t *spv;
LEX *lex = Lex;
sp_pcontext *spc = lex->spcont;
if (spc && (spv = spc->find_pvar(&$1)))
{ /* We're compiling a stored procedure and found a variable */
$$ = (Item*) new Item_splocal($1, spv->offset);
{
/* We're compiling a stored procedure and found a variable */
Item_splocal *splocal;
splocal= new Item_splocal($1, spv->offset, lex->tok_start_prev -
lex->sphead->m_tmp_query);
$$ = (Item*) splocal;
lex->variables_used= 1;
lex->safe_to_cache_query=0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment