Commit 2bdd872e authored by andrey@lmy004's avatar andrey@lmy004

WL #3337 (Event scheduler new architecture)

Cut Nr. 8.

All tests pass.

Separated Event_scheduler into Event_queue and Event_scheduler.
Added new Event_scheduler_ng which is the new scheduler and is used
system-wide. Will be moved to the event_scheduler.cc in the future.
Using Event_timed in Event_queue as well as cloned during execution.
Next step is to have Event_worker_data which will be used during execution
and will take ::compile()/::execute() out of Event_timed.
parent 6dd9a3bb
...@@ -323,7 +323,6 @@ root@localhost закачка events_test ...@@ -323,7 +323,6 @@ root@localhost закачка events_test
"Should be only 1 process" "Should be only 1 process"
select /*1*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; select /*1*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info;
user host db command state info user host db command state info
event_scheduler localhost NULL Connect Suspended NULL
select release_lock("test_lock1"); select release_lock("test_lock1");
release_lock("test_lock1") release_lock("test_lock1")
1 1
...@@ -343,7 +342,7 @@ create event закачка on schedule every 10 hour do select get_lock("test_l ...@@ -343,7 +342,7 @@ create event закачка on schedule every 10 hour do select get_lock("test_l
"Should have only 2 processes: the scheduler and the locked event" "Should have only 2 processes: the scheduler and the locked event"
select /*2*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; select /*2*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info;
user host db command state info user host db command state info
event_scheduler localhost NULL Connect Sleeping NULL event_scheduler localhost NULL Connect Waiting for next activation NULL
root localhost events_test Connect User lock select get_lock("test_lock2", 20) root localhost events_test Connect User lock select get_lock("test_lock2", 20)
"Release the mutex, the event worker should finish." "Release the mutex, the event worker should finish."
"Release the mutex, the event worker should finish." "Release the mutex, the event worker should finish."
...@@ -359,13 +358,12 @@ create event закачка21 on schedule every 10 hour do select get_lock("test ...@@ -359,13 +358,12 @@ create event закачка21 on schedule every 10 hour do select get_lock("test
"Should have only 3 processes: the scheduler, our conn and the locked event" "Should have only 3 processes: the scheduler, our conn and the locked event"
select /*3*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; select /*3*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info;
user host db command state info user host db command state info
event_scheduler localhost NULL Connect Sleeping NULL event_scheduler localhost NULL Connect Waiting for next activation NULL
root localhost events_test Connect User lock select get_lock("test_lock2_1", 20) root localhost events_test Connect User lock select get_lock("test_lock2_1", 20)
set global event_scheduler=2; set global event_scheduler=2;
"Should have only our process now:" "Should have only our process now:"
select /*4*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; select /*4*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info;
user host db command state info user host db command state info
event_scheduler localhost NULL Connect Suspended NULL
root localhost events_test Connect User lock select get_lock("test_lock2_1", 20) root localhost events_test Connect User lock select get_lock("test_lock2_1", 20)
drop event закачка21; drop event закачка21;
create table t_16 (s1 int); create table t_16 (s1 int);
......
...@@ -41,7 +41,7 @@ end| ...@@ -41,7 +41,7 @@ end|
"Now if everything is fine the event has compiled and is locked "Now if everything is fine the event has compiled and is locked
select /*1*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; select /*1*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info;
user host db command state info user host db command state info
event_scheduler localhost NULL Connect Sleeping NULL event_scheduler localhost NULL Connect Waiting for next activation NULL
root localhost events_test Connect User lock select get_lock('test_bug16407', 60) root localhost events_test Connect User lock select get_lock('test_bug16407', 60)
select release_lock('test_bug16407'); select release_lock('test_bug16407');
release_lock('test_bug16407') release_lock('test_bug16407')
...@@ -94,7 +94,7 @@ get_lock('ee_16407_2', 60) ...@@ -94,7 +94,7 @@ get_lock('ee_16407_2', 60)
set global event_scheduler= 1; set global event_scheduler= 1;
select /*2*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; select /*2*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info;
user host db command state info user host db command state info
event_scheduler localhost NULL Connect Sleeping NULL event_scheduler localhost NULL Connect Waiting for next activation NULL
root localhost events_test Connect User lock select get_lock('ee_16407_2', 60) /*ee_16407_2*/ root localhost events_test Connect User lock select get_lock('ee_16407_2', 60) /*ee_16407_2*/
root localhost events_test Connect User lock select get_lock('ee_16407_2', 60) /*ee_16407_3*/ root localhost events_test Connect User lock select get_lock('ee_16407_2', 60) /*ee_16407_3*/
root localhost events_test Connect User lock select get_lock('ee_16407_2', 60) /*ee_16407_4*/ root localhost events_test Connect User lock select get_lock('ee_16407_2', 60) /*ee_16407_4*/
...@@ -103,7 +103,7 @@ release_lock('ee_16407_2') ...@@ -103,7 +103,7 @@ release_lock('ee_16407_2')
1 1
select /*3*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; select /*3*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info;
user host db command state info user host db command state info
event_scheduler localhost NULL Connect Sleeping NULL event_scheduler localhost NULL Connect Waiting for next activation NULL
set global event_scheduler= 2; set global event_scheduler= 2;
select * from events_smode_test order by ev_name, a; select * from events_smode_test order by ev_name, a;
ev_name a ev_name a
...@@ -142,7 +142,7 @@ set global event_scheduler= 1; ...@@ -142,7 +142,7 @@ set global event_scheduler= 1;
"Should have 2 locked processes" "Should have 2 locked processes"
select /*4*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; select /*4*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info;
user host db command state info user host db command state info
event_scheduler localhost NULL Connect Sleeping NULL event_scheduler localhost NULL Connect Waiting for next activation NULL
root localhost events_test Connect User lock select get_lock('ee_16407_5', 60) /*ee_16407_5*/ root localhost events_test Connect User lock select get_lock('ee_16407_5', 60) /*ee_16407_5*/
root localhost events_test Connect User lock select get_lock('ee_16407_5', 60) /*ee_16407_6*/ root localhost events_test Connect User lock select get_lock('ee_16407_5', 60) /*ee_16407_6*/
select release_lock('ee_16407_5'); select release_lock('ee_16407_5');
...@@ -151,7 +151,7 @@ release_lock('ee_16407_5') ...@@ -151,7 +151,7 @@ release_lock('ee_16407_5')
"Should have 0 processes locked" "Should have 0 processes locked"
select /*5*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info; select /*5*/ user, host, db, command, state, info from information_schema.processlist where info is null or info not like '%processlist%' order by info;
user host db command state info user host db command state info
event_scheduler localhost NULL Connect Sleeping NULL event_scheduler localhost NULL Connect Waiting for next activation NULL
select * from events_smode_test order by ev_name, a; select * from events_smode_test order by ev_name, a;
ev_name a ev_name a
ee_16407_6 2004-02-29 ee_16407_6 2004-02-29
......
...@@ -299,7 +299,7 @@ t9 MyISAM 10 Dynamic 2 216 432 # 2048 0 NULL # # # latin1_swedish_ci NULL ...@@ -299,7 +299,7 @@ t9 MyISAM 10 Dynamic 2 216 432 # 2048 0 NULL # # # latin1_swedish_ci NULL
prepare stmt4 from ' show status like ''Threads_running'' '; prepare stmt4 from ' show status like ''Threads_running'' ';
execute stmt4; execute stmt4;
Variable_name Value Variable_name Value
Threads_running 2 Threads_running 1
prepare stmt4 from ' show variables like ''sql_mode'' '; prepare stmt4 from ' show variables like ''sql_mode'' ';
execute stmt4; execute stmt4;
Variable_name Value Variable_name Value
......
...@@ -10,6 +10,5 @@ user() ...@@ -10,6 +10,5 @@ user()
# #
show processlist; show processlist;
Id User Host db Command Time State Info Id User Host db Command Time State Info
<id> event_scheduler <host> NULL <command> <time> <state> <info>
<id> root <host> test <command> <time> <state> <info> <id> root <host> test <command> <time> <state> <info>
<id> root <host> test <command> <time> <state> <info> <id> root <host> test <command> <time> <state> <info>
...@@ -34,7 +34,6 @@ lock tables t2 write; ...@@ -34,7 +34,6 @@ lock tables t2 write;
call bug9486(); call bug9486();
show processlist; show processlist;
Id User Host db Command Time State Info Id User Host db Command Time State Info
# event_scheduler localhost NULL Connect # Suspended NULL
# root localhost test Sleep # NULL # root localhost test Sleep # NULL
# root localhost test Query # Locked update t1, t2 set val= 1 where id1=id2 # root localhost test Query # Locked update t1, t2 set val= 1 where id1=id2
# root localhost test Query # NULL show processlist # root localhost test Query # NULL show processlist
......
...@@ -18,11 +18,9 @@ show processlist; ...@@ -18,11 +18,9 @@ show processlist;
end| end|
call bug4902_2()| call bug4902_2()|
Id User Host db Command Time State Info Id User Host db Command Time State Info
# event_scheduler localhost NULL Connect # Suspended NULL
# root localhost test Query # NULL show processlist # root localhost test Query # NULL show processlist
call bug4902_2()| call bug4902_2()|
Id User Host db Command Time State Info Id User Host db Command Time State Info
# event_scheduler localhost NULL Connect # Suspended NULL
# root localhost test Query # NULL show processlist # root localhost test Query # NULL show processlist
drop procedure bug4902_2| drop procedure bug4902_2|
drop function if exists bug5278| drop function if exists bug5278|
......
...@@ -52,22 +52,22 @@ drop table t1; ...@@ -52,22 +52,22 @@ drop table t1;
FLUSH STATUS; FLUSH STATUS;
SHOW STATUS LIKE 'max_used_connections'; SHOW STATUS LIKE 'max_used_connections';
Variable_name Value Variable_name Value
Max_used_connections 2 Max_used_connections 1
SET @save_thread_cache_size=@@thread_cache_size; SET @save_thread_cache_size=@@thread_cache_size;
SET GLOBAL thread_cache_size=3; SET GLOBAL thread_cache_size=3;
SHOW STATUS LIKE 'max_used_connections'; SHOW STATUS LIKE 'max_used_connections';
Variable_name Value Variable_name Value
Max_used_connections 4 Max_used_connections 3
FLUSH STATUS; FLUSH STATUS;
SHOW STATUS LIKE 'max_used_connections'; SHOW STATUS LIKE 'max_used_connections';
Variable_name Value Variable_name Value
Max_used_connections 3 Max_used_connections 2
SHOW STATUS LIKE 'max_used_connections'; SHOW STATUS LIKE 'max_used_connections';
Variable_name Value Variable_name Value
Max_used_connections 4 Max_used_connections 3
SHOW STATUS LIKE 'max_used_connections'; SHOW STATUS LIKE 'max_used_connections';
Variable_name Value Variable_name Value
Max_used_connections 5 Max_used_connections 4
SET GLOBAL thread_cache_size=@save_thread_cache_size; SET GLOBAL thread_cache_size=@save_thread_cache_size;
show status like 'com_show_status'; show status like 'com_show_status';
Variable_name Value Variable_name Value
......
...@@ -61,7 +61,7 @@ while ($1) ...@@ -61,7 +61,7 @@ while ($1)
--enable_query_log --enable_query_log
SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test2'; SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test2';
SET GLOBAL event_scheduler=1; SET GLOBAL event_scheduler=1;
--sleep 6 --sleep 12
DROP DATABASE events_conn1_test2; DROP DATABASE events_conn1_test2;
SET GLOBAL event_scheduler=2; SET GLOBAL event_scheduler=2;
...@@ -100,7 +100,7 @@ while ($1) ...@@ -100,7 +100,7 @@ while ($1)
} }
--enable_query_log --enable_query_log
SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test2'; SELECT COUNT(*) FROM INFORMATION_SCHEMA.EVENTS WHERE EVENT_SCHEMA='events_conn1_test2';
--sleep 6 --sleep 12
connection conn2; connection conn2;
--send --send
DROP DATABASE events_conn2_db; DROP DATABASE events_conn2_db;
......
...@@ -67,7 +67,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \ ...@@ -67,7 +67,7 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
sql_array.h sql_cursor.h events.h \ sql_array.h sql_cursor.h events.h \
sql_plugin.h authors.h sql_partition.h event_data_objects.h \ sql_plugin.h authors.h sql_partition.h event_data_objects.h \
event_queue.h event_db_repository.h \ event_queue.h event_db_repository.h \
partition_info.h partition_element.h event_scheduler.h \ partition_info.h partition_element.h event_scheduler_ng.h \
contributors.h contributors.h
mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
item.cc item_sum.cc item_buff.cc item_func.cc \ item.cc item_sum.cc item_buff.cc item_func.cc \
...@@ -104,8 +104,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \ ...@@ -104,8 +104,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
gstream.cc spatial.cc sql_help.cc sql_cursor.cc \ gstream.cc spatial.cc sql_help.cc sql_cursor.cc \
tztime.cc my_time.c my_user.c my_decimal.cc\ tztime.cc my_time.c my_user.c my_decimal.cc\
sp_head.cc sp_pcontext.cc sp_rcontext.cc sp.cc \ sp_head.cc sp_pcontext.cc sp_rcontext.cc sp.cc \
sp_cache.cc parse_file.cc sql_trigger.cc \ sp_cache.cc parse_file.cc sql_trigger.cc event_scheduler.cc\
event_scheduler.cc events.cc event_data_objects.cc \ event_scheduler_ng.cc events.cc event_data_objects.cc \
event_queue.cc event_db_repository.cc \ event_queue.cc event_db_repository.cc \
sql_plugin.cc sql_binlog.cc \ sql_plugin.cc sql_binlog.cc \
sql_builtin.cc sql_tablespace.cc partition_info.cc sql_builtin.cc sql_tablespace.cc partition_info.cc
......
...@@ -556,6 +556,7 @@ Event_timed::Event_timed():in_spawned_thread(0),locked_by_thread_id(0), ...@@ -556,6 +556,7 @@ Event_timed::Event_timed():in_spawned_thread(0),locked_by_thread_id(0),
Event_timed::~Event_timed() Event_timed::~Event_timed()
{ {
deinit_mutexes(); deinit_mutexes();
free_root(&mem_root, MYF(0));
if (free_sphead_on_delete) if (free_sphead_on_delete)
free_sp(); free_sp();
...@@ -622,6 +623,8 @@ Event_timed::init() ...@@ -622,6 +623,8 @@ Event_timed::init()
definer_user.length= definer_host.length= 0; definer_user.length= definer_host.length= 0;
sql_mode= 0; sql_mode= 0;
/* init memory root */
init_alloc_root(&mem_root, 256, 512);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -644,7 +647,7 @@ Event_timed::init() ...@@ -644,7 +647,7 @@ Event_timed::init()
*/ */
int int
Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table) Event_timed::load_from_row(TABLE *table)
{ {
char *ptr; char *ptr;
Event_timed *et; Event_timed *et;
...@@ -661,22 +664,22 @@ Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table) ...@@ -661,22 +664,22 @@ Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table)
if (table->s->fields != ET_FIELD_COUNT) if (table->s->fields != ET_FIELD_COUNT)
goto error; goto error;
if ((et->dbname.str= get_field(mem_root, table->field[ET_FIELD_DB])) == NULL) if ((et->dbname.str= get_field(&mem_root, table->field[ET_FIELD_DB])) == NULL)
goto error; goto error;
et->dbname.length= strlen(et->dbname.str); et->dbname.length= strlen(et->dbname.str);
if ((et->name.str= get_field(mem_root, table->field[ET_FIELD_NAME])) == NULL) if ((et->name.str= get_field(&mem_root, table->field[ET_FIELD_NAME])) == NULL)
goto error; goto error;
et->name.length= strlen(et->name.str); et->name.length= strlen(et->name.str);
if ((et->body.str= get_field(mem_root, table->field[ET_FIELD_BODY])) == NULL) if ((et->body.str= get_field(&mem_root, table->field[ET_FIELD_BODY])) == NULL)
goto error; goto error;
et->body.length= strlen(et->body.str); et->body.length= strlen(et->body.str);
if ((et->definer.str= get_field(mem_root, if ((et->definer.str= get_field(&mem_root,
table->field[ET_FIELD_DEFINER])) == NullS) table->field[ET_FIELD_DEFINER])) == NullS)
goto error; goto error;
et->definer.length= strlen(et->definer.str); et->definer.length= strlen(et->definer.str);
...@@ -688,10 +691,10 @@ Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table) ...@@ -688,10 +691,10 @@ Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table)
len= ptr - et->definer.str; len= ptr - et->definer.str;
et->definer_user.str= strmake_root(mem_root, et->definer.str, len); et->definer_user.str= strmake_root(&mem_root, et->definer.str, len);
et->definer_user.length= len; et->definer_user.length= len;
len= et->definer.length - len - 1; //1 is because of @ len= et->definer.length - len - 1; //1 is because of @
et->definer_host.str= strmake_root(mem_root, ptr + 1, len);/* 1:because of @*/ et->definer_host.str= strmake_root(&mem_root, ptr + 1, len);/* 1:because of @*/
et->definer_host.length= len; et->definer_host.length= len;
et->starts_null= table->field[ET_FIELD_STARTS]->is_null(); et->starts_null= table->field[ET_FIELD_STARTS]->is_null();
...@@ -737,21 +740,21 @@ Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table) ...@@ -737,21 +740,21 @@ Event_timed::load_from_row(MEM_ROOT *mem_root, TABLE *table)
last_executed_changed= false; last_executed_changed= false;
/* ToDo : Andrey . Find a way not to allocate ptr on event_mem_root */ /* ToDo : Andrey . Find a way not to allocate ptr on event_mem_root */
if ((ptr= get_field(mem_root, table->field[ET_FIELD_STATUS])) == NullS) if ((ptr= get_field(&mem_root, table->field[ET_FIELD_STATUS])) == NullS)
goto error; goto error;
DBUG_PRINT("load_from_row", ("Event [%s] is [%s]", et->name.str, ptr)); DBUG_PRINT("load_from_row", ("Event [%s] is [%s]", et->name.str, ptr));
et->status= (ptr[0]=='E'? Event_timed::ENABLED:Event_timed::DISABLED); et->status= (ptr[0]=='E'? Event_timed::ENABLED:Event_timed::DISABLED);
/* ToDo : Andrey . Find a way not to allocate ptr on event_mem_root */ /* ToDo : Andrey . Find a way not to allocate ptr on event_mem_root */
if ((ptr= get_field(mem_root, if ((ptr= get_field(&mem_root,
table->field[ET_FIELD_ON_COMPLETION])) == NullS) table->field[ET_FIELD_ON_COMPLETION])) == NullS)
goto error; goto error;
et->on_completion= (ptr[0]=='D'? Event_timed::ON_COMPLETION_DROP: et->on_completion= (ptr[0]=='D'? Event_timed::ON_COMPLETION_DROP:
Event_timed::ON_COMPLETION_PRESERVE); Event_timed::ON_COMPLETION_PRESERVE);
et->comment.str= get_field(mem_root, table->field[ET_FIELD_COMMENT]); et->comment.str= get_field(&mem_root, table->field[ET_FIELD_COMMENT]);
if (et->comment.str != NullS) if (et->comment.str != NullS)
et->comment.length= strlen(et->comment.str); et->comment.length= strlen(et->comment.str);
else else
...@@ -953,10 +956,10 @@ Event_timed::compute_next_execution_time() ...@@ -953,10 +956,10 @@ Event_timed::compute_next_execution_time()
int tmp; int tmp;
DBUG_ENTER("Event_timed::compute_next_execution_time"); DBUG_ENTER("Event_timed::compute_next_execution_time");
DBUG_PRINT("enter", ("starts=%llu ends=%llu last_executed=%llu", DBUG_PRINT("enter", ("starts=%llu ends=%llu last_executed=%llu this=%p",
TIME_to_ulonglong_datetime(&starts), TIME_to_ulonglong_datetime(&starts),
TIME_to_ulonglong_datetime(&ends), TIME_to_ulonglong_datetime(&ends),
TIME_to_ulonglong_datetime(&last_executed))); TIME_to_ulonglong_datetime(&last_executed), this));
if (status == Event_timed::DISABLED) if (status == Event_timed::DISABLED)
{ {
...@@ -1168,7 +1171,8 @@ Event_timed::compute_next_execution_time() ...@@ -1168,7 +1171,8 @@ Event_timed::compute_next_execution_time()
goto ret; goto ret;
} }
ret: ret:
DBUG_PRINT("info", ("ret=0")); DBUG_PRINT("info", ("ret=0 execute_at=%llu",
TIME_to_ulonglong_datetime(&execute_at)));
DBUG_RETURN(false); DBUG_RETURN(false);
err: err:
DBUG_PRINT("info", ("ret=1")); DBUG_PRINT("info", ("ret=1"));
...@@ -1392,6 +1396,7 @@ Event_timed::get_create_event(THD *thd, String *buf) ...@@ -1392,6 +1396,7 @@ Event_timed::get_create_event(THD *thd, String *buf)
int int
Event_timed::execute(THD *thd, MEM_ROOT *mem_root) Event_timed::execute(THD *thd, MEM_ROOT *mem_root)
{ {
Security_context *save_ctx;
/* this one is local and not needed after exec */ /* this one is local and not needed after exec */
Security_context security_ctx; Security_context security_ctx;
int ret= 0; int ret= 0;
...@@ -1400,14 +1405,8 @@ Event_timed::execute(THD *thd, MEM_ROOT *mem_root) ...@@ -1400,14 +1405,8 @@ Event_timed::execute(THD *thd, MEM_ROOT *mem_root)
DBUG_PRINT("info", (" EVEX EXECUTING event %s.%s [EXPR:%d]", DBUG_PRINT("info", (" EVEX EXECUTING event %s.%s [EXPR:%d]",
dbname.str, name.str, (int) expression)); dbname.str, name.str, (int) expression));
VOID(pthread_mutex_lock(&this->LOCK_running)); thd->change_security_context(definer_user, definer_host, dbname,
if (running) &security_ctx, &save_ctx);
{
VOID(pthread_mutex_unlock(&this->LOCK_running));
DBUG_RETURN(-100);
}
running= true;
VOID(pthread_mutex_unlock(&this->LOCK_running));
if (!sphead && (ret= compile(thd, mem_root))) if (!sphead && (ret= compile(thd, mem_root)))
goto done; goto done;
...@@ -1434,14 +1433,11 @@ Event_timed::execute(THD *thd, MEM_ROOT *mem_root) ...@@ -1434,14 +1433,11 @@ Event_timed::execute(THD *thd, MEM_ROOT *mem_root)
definer_host.str, dbname.str)); definer_host.str, dbname.str));
ret= -99; ret= -99;
} }
VOID(pthread_mutex_lock(&this->LOCK_running));
running= false;
/* Will compile every time a new sp_head on different root */ /* Will compile every time a new sp_head on different root */
free_sp(); free_sp();
VOID(pthread_mutex_unlock(&this->LOCK_running));
done: done:
thd->restore_security_context(save_ctx);
/* /*
1. Don't cache sphead if allocated on another mem_root 1. Don't cache sphead if allocated on another mem_root
2. Don't call security_ctx.destroy() because this will free our dbname.str 2. Don't call security_ctx.destroy() because this will free our dbname.str
...@@ -1807,3 +1803,4 @@ event_timed_identifier_equal(LEX_STRING db, LEX_STRING name, Event_timed *b) ...@@ -1807,3 +1803,4 @@ event_timed_identifier_equal(LEX_STRING db, LEX_STRING name, Event_timed *b)
return !sortcmp_lex_string(name, b->name, system_charset_info) && return !sortcmp_lex_string(name, b->name, system_charset_info) &&
!sortcmp_lex_string(db, b->dbname, system_charset_info); !sortcmp_lex_string(db, b->dbname, system_charset_info);
} }
...@@ -73,7 +73,10 @@ class Event_timed ...@@ -73,7 +73,10 @@ class Event_timed
bool status_changed; bool status_changed;
bool last_executed_changed; bool last_executed_changed;
MEM_ROOT mem_root;
public: public:
THD *thd;
enum enum_status enum enum_status
{ {
ENABLED = 1, ENABLED = 1,
...@@ -147,7 +150,7 @@ class Event_timed ...@@ -147,7 +150,7 @@ class Event_timed
deinit_mutexes(); deinit_mutexes();
int int
load_from_row(MEM_ROOT *mem_root, TABLE *table); load_from_row(TABLE *table);
bool bool
compute_next_execution_time(); compute_next_execution_time();
...@@ -264,9 +267,33 @@ class Event_parse_data : public Sql_alloc ...@@ -264,9 +267,33 @@ class Event_parse_data : public Sql_alloc
}; };
class Event_queue_element : public Event_timed class Event_job_data
{ {
public:
LEX_STRING dbname;
LEX_STRING name;
sp_head *sphead;
LEX_STRING definer;
LEX_STRING body;
ulong sql_mode;
}; THD *thd;
Event_job_data(){}
~Event_job_data(){}
int
execute();
private:
int
load_from_disk();
int
compile();
Event_job_data(const Event_job_data &); /* Prevent use of these */
void operator=(Event_job_data &);
};
#endif /* _EVENT_DATA_OBJECTS_H_ */ #endif /* _EVENT_DATA_OBJECTS_H_ */
...@@ -130,135 +130,9 @@ TABLE_FIELD_W_TYPE event_table_fields[ET_FIELD_COUNT] = { ...@@ -130,135 +130,9 @@ TABLE_FIELD_W_TYPE event_table_fields[ET_FIELD_COUNT] = {
SYNOPSIS SYNOPSIS
evex_fill_row() evex_fill_row()
thd THD thd THD
table the row to fill out table The row to fill out
et Event's data
RETURN VALUE
0 - OK
EVEX_GENERAL_ERROR - bad data
EVEX_GET_FIELD_FAILED - field count does not match. table corrupted?
DESCRIPTION
Used both when an event is created and when it is altered.
*/
static int
evex_fill_row(THD *thd, TABLE *table, Event_timed *et, my_bool is_update)
{
CHARSET_INFO *scs= system_charset_info;
enum enum_events_table_field field_num;
DBUG_ENTER("evex_fill_row");
DBUG_PRINT("info", ("dbname=[%s]", et->dbname.str));
DBUG_PRINT("info", ("name =[%s]", et->name.str));
DBUG_PRINT("info", ("body =[%s]", et->body.str));
if (table->field[field_num= ET_FIELD_DEFINER]->
store(et->definer.str, et->definer.length, scs))
goto err_truncate;
if (table->field[field_num= ET_FIELD_DB]->
store(et->dbname.str, et->dbname.length, scs))
goto err_truncate;
if (table->field[field_num= ET_FIELD_NAME]->
store(et->name.str, et->name.length, scs))
goto err_truncate;
/* both ON_COMPLETION and STATUS are NOT NULL thus not calling set_notnull()*/
table->field[ET_FIELD_ON_COMPLETION]->
store((longlong)et->on_completion, true);
table->field[ET_FIELD_STATUS]->store((longlong)et->status, true);
/*
Change the SQL_MODE only if body was present in an ALTER EVENT and of course
always during CREATE EVENT.
*/
if (et->body.str)
{
table->field[ET_FIELD_SQL_MODE]->
store((longlong)thd->variables.sql_mode, true);
if (table->field[field_num= ET_FIELD_BODY]->
store(et->body.str, et->body.length, scs))
goto err_truncate;
}
if (et->expression)
{
table->field[ET_FIELD_INTERVAL_EXPR]->set_notnull();
table->field[ET_FIELD_INTERVAL_EXPR]->store((longlong)et->expression, true);
table->field[ET_FIELD_TRANSIENT_INTERVAL]->set_notnull();
/*
In the enum (C) intervals start from 0 but in mysql enum valid values start
from 1. Thus +1 offset is needed!
*/
table->field[ET_FIELD_TRANSIENT_INTERVAL]->
store((longlong)et->interval+1, true);
table->field[ET_FIELD_EXECUTE_AT]->set_null();
if (!et->starts_null)
{
table->field[ET_FIELD_STARTS]->set_notnull();
table->field[ET_FIELD_STARTS]->
store_time(&et->starts, MYSQL_TIMESTAMP_DATETIME);
}
if (!et->ends_null)
{
table->field[ET_FIELD_ENDS]->set_notnull();
table->field[ET_FIELD_ENDS]->
store_time(&et->ends, MYSQL_TIMESTAMP_DATETIME);
}
}
else if (et->execute_at.year)
{
table->field[ET_FIELD_INTERVAL_EXPR]->set_null();
table->field[ET_FIELD_TRANSIENT_INTERVAL]->set_null();
table->field[ET_FIELD_STARTS]->set_null();
table->field[ET_FIELD_ENDS]->set_null();
table->field[ET_FIELD_EXECUTE_AT]->set_notnull();
table->field[ET_FIELD_EXECUTE_AT]->
store_time(&et->execute_at, MYSQL_TIMESTAMP_DATETIME);
}
else
{
DBUG_ASSERT(is_update);
/*
it is normal to be here when the action is update
this is an error if the action is create. something is borked
*/
}
((Field_timestamp *)table->field[ET_FIELD_MODIFIED])->set_time();
if (et->comment.str)
{
if (table->field[field_num= ET_FIELD_COMMENT]->
store(et->comment.str, et->comment.length, scs))
goto err_truncate;
}
DBUG_RETURN(0);
err_truncate:
my_error(ER_EVENT_DATA_TOO_LONG, MYF(0), table->field[field_num]->field_name);
DBUG_RETURN(EVEX_GENERAL_ERROR);
}
/*
Puts some data common to CREATE and ALTER EVENT into a row.
SYNOPSIS
evex_fill_row()
thd THD
table the row to fill out
et Event's data et Event's data
is_update CREATE EVENT or ALTER EVENT
RETURN VALUE RETURN VALUE
0 - OK 0 - OK
...@@ -596,7 +470,7 @@ Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name, ...@@ -596,7 +470,7 @@ Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
TABLE *table; TABLE *table;
int ret; int ret;
Event_timed *et= NULL; Event_timed *et= NULL;
DBUG_ENTER("db_find_event"); DBUG_ENTER("Event_db_repository::find_event");
DBUG_PRINT("enter", ("name: %*s", name.length, name.str)); DBUG_PRINT("enter", ("name: %*s", name.length, name.str));
if (tbl) if (tbl)
...@@ -621,7 +495,7 @@ Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name, ...@@ -621,7 +495,7 @@ Event_db_repository::find_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
2)::load_from_row() is silent on error therefore we emit error msg here 2)::load_from_row() is silent on error therefore we emit error msg here
*/ */
if ((ret= et->load_from_row(root, table))) if ((ret= et->load_from_row(table)))
{ {
my_error(ER_CANNOT_LOAD_FROM_TABLE, MYF(0), "event"); my_error(ER_CANNOT_LOAD_FROM_TABLE, MYF(0), "event");
goto done; goto done;
...@@ -722,7 +596,7 @@ evex_check_params(THD *thd, Event_parse_data *parse_data) ...@@ -722,7 +596,7 @@ evex_check_params(THD *thd, Event_parse_data *parse_data)
const char *pos= NULL; const char *pos= NULL;
Item *bad_item; Item *bad_item;
DBUG_ENTER("evex_check_timing_params"); DBUG_ENTER("evex_check_params");
DBUG_PRINT("info", ("execute_at=0x%d expr=0x%d starts=0x%d ends=0x%d", DBUG_PRINT("info", ("execute_at=0x%d expr=0x%d starts=0x%d ends=0x%d",
parse_data->item_execute_at, parse_data->item_execute_at,
parse_data->item_expression, parse_data->item_expression,
...@@ -1212,7 +1086,7 @@ Event_db_repository::drop_events_by_field(THD *thd, ...@@ -1212,7 +1086,7 @@ Event_db_repository::drop_events_by_field(THD *thd,
TABLE *table; TABLE *table;
Open_tables_state backup; Open_tables_state backup;
READ_RECORD read_record_info; READ_RECORD read_record_info;
DBUG_ENTER("drop_events_from_table_by_field"); DBUG_ENTER("Event_db_repository::drop_events_by_field");
DBUG_PRINT("enter", ("field=%d field_value=%s", field, field_value.str)); DBUG_PRINT("enter", ("field=%d field_value=%s", field, field_value.str));
if (open_event_table(thd, TL_WRITE, &table)) if (open_event_table(thd, TL_WRITE, &table))
...@@ -1270,7 +1144,7 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na ...@@ -1270,7 +1144,7 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na
Event_timed *et_loaded= NULL; Event_timed *et_loaded= NULL;
Open_tables_state backup; Open_tables_state backup;
DBUG_ENTER("Event_scheduler::load_and_compile_event"); DBUG_ENTER("Event_db_repository::load_named_event");
DBUG_PRINT("enter",("thd=%p name:%*s",thd, name.length, name.str)); DBUG_PRINT("enter",("thd=%p name:%*s",thd, name.length, name.str));
thd->reset_n_backup_open_tables_state(&backup); thd->reset_n_backup_open_tables_state(&backup);
...@@ -1297,4 +1171,3 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na ...@@ -1297,4 +1171,3 @@ Event_db_repository::load_named_event(THD *thd, LEX_STRING dbname, LEX_STRING na
DBUG_RETURN(OP_OK); DBUG_RETURN(OP_OK);
} }
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "mysql_priv.h" #include "mysql_priv.h"
#include "events.h" #include "events.h"
#include "event_scheduler.h" #include "event_scheduler_ng.h"
#include "event_queue.h" #include "event_queue.h"
#include "event_data_objects.h" #include "event_data_objects.h"
#include "event_db_repository.h" #include "event_db_repository.h"
...@@ -35,10 +35,6 @@ ...@@ -35,10 +35,6 @@
#define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__) #define UNLOCK_QUEUE_DATA() unlock_data(SCHED_FUNC, __LINE__)
Event_scheduler*
Event_queue::singleton= NULL;
/* /*
Compares the execute_at members of 2 Event_timed instances. Compares the execute_at members of 2 Event_timed instances.
Used as callback for the prioritized queue when shifting Used as callback for the prioritized queue when shifting
...@@ -111,10 +107,10 @@ Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence) ...@@ -111,10 +107,10 @@ Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence)
goto end; goto end;
} }
/* We need to load the event on scheduler_root */
if (!(res= db_repository-> if (!(res= db_repository->
load_named_event(thd, et->dbname, et->name, &et_new))) load_named_event(thd, et->dbname, et->name, &et_new)))
{ {
DBUG_PRINT("info", ("new event in the queue %p", et_new));
queue_insert_safe(&queue, (byte *) et_new); queue_insert_safe(&queue, (byte *) et_new);
on_queue_change(); on_queue_change();
} }
...@@ -130,7 +126,7 @@ Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence) ...@@ -130,7 +126,7 @@ Event_queue::create_event(THD *thd, Event_parse_data *et, bool check_existence)
Updates an event from the scheduler queue Updates an event from the scheduler queue
SYNOPSIS SYNOPSIS
Event_scheduler::update_event() Event_queue::update_event()
thd Thread thd Thread
et The event to replace(add) into the queue et The event to replace(add) into the queue
new_schema New schema new_schema New schema
...@@ -172,15 +168,11 @@ Event_queue::update_event(THD *thd, Event_parse_data *et, ...@@ -172,15 +168,11 @@ Event_queue::update_event(THD *thd, Event_parse_data *et,
et->dbname= *new_schema; et->dbname= *new_schema;
et->name= *new_name; et->name= *new_name;
} }
/*
We need to load the event (it's strings but on the object itself)
on scheduler_root. et_new could be NULL :
1. Error occured
2. If the replace is DISABLED, we don't load it into the queue.
*/
if (!(res= db_repository-> if (!(res= db_repository->
load_named_event(thd, et->dbname, et->name, &et_new))) load_named_event(thd, et->dbname, et->name, &et_new)))
{ {
DBUG_PRINT("info", ("new event in the queue %p old %p", et_new, et_old));
queue_insert_safe(&queue, (byte *) et_new); queue_insert_safe(&queue, (byte *) et_new);
on_queue_change(); on_queue_change();
} }
...@@ -240,7 +232,7 @@ Event_queue::update_event(THD *thd, Event_parse_data *et, ...@@ -240,7 +232,7 @@ Event_queue::update_event(THD *thd, Event_parse_data *et,
/* /*
Drops an event from the scheduler queue Drops an event from the queue
SYNOPSIS SYNOPSIS
Event_queue::drop_event() Event_queue::drop_event()
...@@ -303,10 +295,8 @@ Event_queue::drop_event(THD *thd, sp_name *name) ...@@ -303,10 +295,8 @@ Event_queue::drop_event(THD *thd, sp_name *name)
} }
/* /*
Searches for an event in the scheduler queue Searches for an event in the queue
SYNOPSIS SYNOPSIS
Event_queue::find_event() Event_queue::find_event()
...@@ -358,7 +348,6 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q) ...@@ -358,7 +348,6 @@ Event_queue::find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q)
comparator The function to use for comparing comparator The function to use for comparing
RETURN VALUE RETURN VALUE
-1 Scheduler not working
>=0 Number of dropped events >=0 Number of dropped events
NOTE NOTE
...@@ -426,7 +415,6 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern, ...@@ -426,7 +415,6 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
db The schema name db The schema name
RETURN VALUE RETURN VALUE
-1 Scheduler not working
>=0 Number of dropped events >=0 Number of dropped events
*/ */
...@@ -459,8 +447,7 @@ void ...@@ -459,8 +447,7 @@ void
Event_queue::lock_data(const char *func, uint line) Event_queue::lock_data(const char *func, uint line)
{ {
DBUG_ENTER("Event_queue::lock_mutex"); DBUG_ENTER("Event_queue::lock_mutex");
DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u", DBUG_PRINT("enter", ("func=%s line=%u", func, line));
&LOCK_event_queue, func, line));
pthread_mutex_lock(&LOCK_event_queue); pthread_mutex_lock(&LOCK_event_queue);
mutex_last_locked_in_func= func; mutex_last_locked_in_func= func;
mutex_last_locked_at_line= line; mutex_last_locked_at_line= line;
...@@ -481,9 +468,8 @@ Event_queue::lock_data(const char *func, uint line) ...@@ -481,9 +468,8 @@ Event_queue::lock_data(const char *func, uint line)
void void
Event_queue::unlock_data(const char *func, uint line) Event_queue::unlock_data(const char *func, uint line)
{ {
DBUG_ENTER("Event_queue::UNLOCK_mutex"); DBUG_ENTER("Event_queue::unlock_mutex");
DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u", DBUG_PRINT("enter", ("func=%s line=%u", func, line));
&LOCK_event_queue, func, line));
mutex_last_unlocked_at_line= line; mutex_last_unlocked_at_line= line;
mutex_queue_data_locked= FALSE; mutex_queue_data_locked= FALSE;
mutex_last_unlocked_in_func= func; mutex_last_unlocked_in_func= func;
...@@ -510,7 +496,7 @@ Event_queue::events_count() ...@@ -510,7 +496,7 @@ Event_queue::events_count()
LOCK_QUEUE_DATA(); LOCK_QUEUE_DATA();
n= queue.elements; n= queue.elements;
UNLOCK_QUEUE_DATA(); UNLOCK_QUEUE_DATA();
DBUG_PRINT("info", ("n=%u", n));
DBUG_RETURN(n); DBUG_RETURN(n);
} }
...@@ -529,7 +515,7 @@ uint ...@@ -529,7 +515,7 @@ uint
Event_queue::events_count_no_lock() Event_queue::events_count_no_lock()
{ {
uint n; uint n;
DBUG_ENTER("Event_scheduler::events_count_no_lock"); DBUG_ENTER("Event_queue::events_count_no_lock");
n= queue.elements; n= queue.elements;
...@@ -590,7 +576,7 @@ Event_queue::load_events_from_db(THD *thd) ...@@ -590,7 +576,7 @@ Event_queue::load_events_from_db(THD *thd)
} }
DBUG_PRINT("info", ("Loading event from row.")); DBUG_PRINT("info", ("Loading event from row."));
if ((ret= et->load_from_row(&scheduler_root, table))) if ((ret= et->load_from_row(table)))
{ {
clean_the_queue= TRUE; clean_the_queue= TRUE;
sql_print_error("SCHEDULER: Error while loading from mysql.event. " sql_print_error("SCHEDULER: Error while loading from mysql.event. "
...@@ -735,7 +721,7 @@ Event_queue::check_system_tables(THD *thd) ...@@ -735,7 +721,7 @@ Event_queue::check_system_tables(THD *thd)
void void
Event_queue::init_mutexes() Event_queue::init_mutexes()
{ {
pthread_mutex_init(&singleton->LOCK_event_queue, MY_MUTEX_INIT_FAST); pthread_mutex_init(&LOCK_event_queue, MY_MUTEX_INIT_FAST);
} }
...@@ -743,13 +729,13 @@ Event_queue::init_mutexes() ...@@ -743,13 +729,13 @@ Event_queue::init_mutexes()
Destroys mutexes. Destroys mutexes.
SYNOPSIS SYNOPSIS
Event_queue::destroy_mutexes() Event_queue::deinit_mutexes()
*/ */
void void
Event_queue::destroy_mutexes() Event_queue::deinit_mutexes()
{ {
pthread_mutex_destroy(&singleton->LOCK_event_queue); pthread_mutex_destroy(&LOCK_event_queue);
} }
...@@ -765,8 +751,8 @@ void ...@@ -765,8 +751,8 @@ void
Event_queue::on_queue_change() Event_queue::on_queue_change()
{ {
DBUG_ENTER("Event_queue::on_queue_change"); DBUG_ENTER("Event_queue::on_queue_change");
DBUG_PRINT("info", ("Sending COND_new_work")); DBUG_PRINT("info", ("Signalling change of the queue"));
singleton->queue_changed(); scheduler->queue_changed();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -787,13 +773,11 @@ Event_queue::init(Event_db_repository *db_repo) ...@@ -787,13 +773,11 @@ Event_queue::init(Event_db_repository *db_repo)
{ {
int i= 0; int i= 0;
bool ret= FALSE; bool ret= FALSE;
DBUG_ENTER("Event_scheduler::init"); DBUG_ENTER("Event_queue::init");
DBUG_PRINT("enter", ("this=%p", this)); DBUG_PRINT("enter", ("this=%p", this));
LOCK_QUEUE_DATA(); LOCK_QUEUE_DATA();
db_repository= db_repo; db_repository= db_repo;
/* init memory root */
init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/, if (init_queue_ex(&queue, 30 /*num_el*/, 0 /*offset*/, 0 /*smallest_on_top*/,
event_timed_compare_q, NULL, 30 /*auto_extent*/)) event_timed_compare_q, NULL, 30 /*auto_extent*/))
...@@ -824,8 +808,8 @@ Event_queue::deinit() ...@@ -824,8 +808,8 @@ Event_queue::deinit()
DBUG_ENTER("Event_queue::deinit"); DBUG_ENTER("Event_queue::deinit");
LOCK_QUEUE_DATA(); LOCK_QUEUE_DATA();
empty_queue();
delete_queue(&queue); delete_queue(&queue);
free_root(&scheduler_root, MYF(0));
UNLOCK_QUEUE_DATA(); UNLOCK_QUEUE_DATA();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
...@@ -835,7 +819,7 @@ Event_queue::deinit() ...@@ -835,7 +819,7 @@ Event_queue::deinit()
void void
Event_queue::recalculate_queue(THD *thd) Event_queue::recalculate_queue(THD *thd)
{ {
int i; uint i;
for (i= 0; i < queue.elements; i++) for (i= 0; i < queue.elements; i++)
{ {
((Event_timed*)queue_element(&queue, i))->compute_next_execution_time(); ((Event_timed*)queue_element(&queue, i))->compute_next_execution_time();
...@@ -848,13 +832,118 @@ Event_queue::recalculate_queue(THD *thd) ...@@ -848,13 +832,118 @@ Event_queue::recalculate_queue(THD *thd)
void void
Event_queue::empty_queue() Event_queue::empty_queue()
{ {
int i; uint i;
/* empty the queue */ /* empty the queue */
for (i= 0; i < events_count_no_lock(); ++i) for (i= 0; i < events_count_no_lock(); ++i)
{ {
Event_timed *et= (Event_timed *) queue_element(&queue, i); Event_timed *et= (Event_timed *) queue_element(&queue, i);
et->free_sp();
delete et; delete et;
} }
resize_queue(&queue, 0); resize_queue(&queue, 0);
} }
Event_timed*
Event_queue::get_top()
{
return (Event_timed *)queue_top(&queue);
}
void
Event_queue::remove_top()
{
queue_remove(&queue, 0);// 0 is top, internally 1
}
void
Event_queue::top_changed()
{
queue_replaced(&queue);
}
Event_timed *
Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
struct timespec *abstime)
{
struct timespec top_time;
Event_timed *et_new= NULL;
DBUG_ENTER("Event_queue::get_top_for_execution_if_time");
DBUG_PRINT("enter", ("thd=%p now=%d", thd, now));
abstime->tv_nsec= 0;
LOCK_QUEUE_DATA();
do {
int res;
Event_timed *et= NULL;
if (!queue.elements)
{
abstime->tv_sec= 0;
break;
}
int i;
DBUG_PRINT("info", ("Dumping queue . Elements=%u", queue.elements));
for (i = 0; i < queue.elements; i++)
{
et= ((Event_timed*)queue_element(&queue, i));
DBUG_PRINT("info",("et=%p db=%s name=%s",et, et->dbname.str, et->name.str));
DBUG_PRINT("info", ("exec_at=%llu starts=%llu ends=%llu "
" expr=%lld et.exec_at=%d now=%d (et.exec_at - now)=%d if=%d",
TIME_to_ulonglong_datetime(&et->execute_at),
TIME_to_ulonglong_datetime(&et->starts),
TIME_to_ulonglong_datetime(&et->ends),
et->expression, sec_since_epoch_TIME(&et->execute_at), now,
(int)(sec_since_epoch_TIME(&et->execute_at) - now),
sec_since_epoch_TIME(&et->execute_at) <= now));
}
et= ((Event_timed*)queue_element(&queue, 0));
top_time.tv_sec= sec_since_epoch_TIME(&et->execute_at);
if (top_time.tv_sec <= now)
{
DBUG_PRINT("info", ("Ready for execution"));
abstime->tv_sec= 0;
if ((res= db_repository->load_named_event(thd, et->dbname, et->name,
&et_new)))
{
DBUG_ASSERT(0);
break;
}
et->mark_last_executed(thd);
if (et->compute_next_execution_time())
et->status= Event_timed::DISABLED;
DBUG_PRINT("info", ("event's status is %d", et->status));
et->update_fields(thd);
if (((et->execute_at.year && !et->expression) || et->execute_at_null) ||
(et->status == Event_timed::DISABLED))
{
DBUG_PRINT("info", ("removing from the queue"));
if (et->dropped)
et->drop(thd);
delete et;
queue_remove(&queue, 0);
}
else
queue_replaced(&queue);
}
else
{
abstime->tv_sec= top_time.tv_sec;
DBUG_PRINT("info", ("Have to wait %d till %d", abstime->tv_sec - now,
abstime->tv_sec));
}
} while (0);
UNLOCK_QUEUE_DATA();
DBUG_PRINT("info", ("returning. et_new=%p abstime.tv_sec=%d ", et_new,
abstime->tv_sec));
if (et_new)
DBUG_PRINT("info", ("db=%s name=%s definer=%s "
"et_new.execute_at=%lld", et_new->dbname.str, et_new->name.str,
et_new->definer.str,
TIME_to_ulonglong_datetime(&et_new->execute_at)));
DBUG_RETURN(et_new);
}
...@@ -19,22 +19,23 @@ ...@@ -19,22 +19,23 @@
class sp_name; class sp_name;
class Event_timed; class Event_timed;
class Event_db_repository; class Event_db_repository;
class Event_job_data;
class THD; class THD;
typedef bool * (*event_timed_identifier_comparator)(Event_timed*, Event_timed*); typedef bool * (*event_timed_identifier_comparator)(Event_timed*, Event_timed*);
class Event_scheduler; class Event_scheduler_ng;
class Event_queue class Event_queue
{ {
public: public:
Event_queue(); Event_queue();
static void void
init_mutexes(); init_mutexes();
static void void
destroy_mutexes(); deinit_mutexes();
bool bool
init(Event_db_repository *db_repo); init(Event_db_repository *db_repo);
...@@ -76,6 +77,18 @@ class Event_queue ...@@ -76,6 +77,18 @@ class Event_queue
void void
empty_queue(); empty_queue();
Event_timed *
get_top_for_execution_if_time(THD *thd, time_t now, struct timespec *abstime);
Event_timed*
get_top();
void
remove_top();
void
top_changed();
///////////////protected ///////////////protected
Event_timed * Event_timed *
find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q); find_event(LEX_STRING db, LEX_STRING name, bool remove_from_q);
...@@ -92,9 +105,6 @@ class Event_queue ...@@ -92,9 +105,6 @@ class Event_queue
Event_db_repository *db_repository; Event_db_repository *db_repository;
/* The MEM_ROOT of the object */
MEM_ROOT scheduler_root;
/* The sorted queue with the Event_timed objects */ /* The sorted queue with the Event_timed objects */
QUEUE queue; QUEUE queue;
...@@ -111,11 +121,11 @@ class Event_queue ...@@ -111,11 +121,11 @@ class Event_queue
void void
unlock_data(const char *func, uint line); unlock_data(const char *func, uint line);
static void void
on_queue_change(); on_queue_change();
Event_scheduler_ng *scheduler;
protected: protected:
/* Singleton instance */
static Event_scheduler *singleton;
}; };
......
...@@ -20,223 +20,8 @@ ...@@ -20,223 +20,8 @@
#include "event_scheduler.h" #include "event_scheduler.h"
#include "event_db_repository.h" #include "event_db_repository.h"
#include "sp_head.h" #include "sp_head.h"
#include "event_queue.h"
/*
ToDo:
1. Talk to Alik to get a check for configure.in for my_time_t and time_t
2. Look at guardian.h|cc to see its life cycle, has similarities.
*/
/*
The scheduler is implemented as class Event_scheduler. Only one instance is
kept during the runtime of the server, by implementing the Singleton DP.
Object instance is always there because the memory is allocated statically
and initialized when the OS loader loads mysqld. This initialization is
bare. Extended initialization is done during the call to
Event_scheduler::init() in Events::init(). The reason for that late initialization
is that some subsystems needed to boot the Scheduler are not available at
earlier stages of the mysqld boot procedure. Events::init() is called in
mysqld.cc . If the mysqld is started with --event-scheduler=0 then
no initialization takes place and the scheduler is unavailable during this
server run. The server should be started with --event-scheduler=1 to have
the scheduler initialized and able to execute jobs. This starting alwa
s implies that the jobs execution will start immediately. If the server
is started with --event-scheduler=2 then the scheduler is started in suspended
state. Default state, if --event-scheduler is not specified is 2.
The scheduler only manages execution of the events. Their creation,
alteration and deletion is delegated to other routines found in event.cc .
These routines interact with the scheduler :
- CREATE EVENT -> Event_scheduler::create_event()
- ALTER EVENT -> Event_scheduler::update_event()
- DROP EVENT -> Event_scheduler::drop_event()
There is one mutex in the single Event_scheduler object which controls
the simultaneous access to the objects invariants. Using one lock makes
it easy to follow the workflow. This mutex is LOCK_scheduler_data. It is
initialized in Event_scheduler::init(). Which in turn is called by the
Facade class Events in event.cc, coming from init_thread_environment() from
mysqld.cc -> no concurrency at this point. It's destroyed in
Events::destroy_mutexes() called from clean_up_mutexes() in mysqld.cc .
The full initialization is done in Event_scheduler::init() called from
Events::init(). It's done before any requests coming in, so this is a
guarantee for not having concurrency.
The scheduler is started with Event_scheduler::start() and stopped with
Event_scheduler::stop(). When the scheduler starts it loads all events
from mysql.event table. Unfortunately, there is a race condition between
the event disk management functions and the scheduler ones
(add/replace/drop_event & load_events_from_db()), because the operations
do not happen under one global lock but the disk operations are guarded
by the MYISAM lock on mysql.event. In the same time, the queue operations
are guarded by LOCK_scheduler_data. If the scheduler is start()-ed during
server startup and stopped()-ed during server shutdown (in Events::shutdown()
called by kill_server() in mysqld.cc) these races does not exist.
Since the user may want to temporarily inhibit execution of events the
scheduler can be suspended and then it can be forced to resume its
operations. The API call to perform these is
Event_scheduler::suspend_or_resume(enum enum_suspend_or_resume) .
When the scheduler is suspended the main scheduler thread, which ATM
happens to have thread_id 1, locks on a condition COND_suspend_or_resume.
When this is signal is sent for the reverse operation the main scheduler
loops continues to roll and execute events.
When the scheduler is suspended all add/replace/drop_event() operations
work as expected and the modify the queue but no events execution takes
place.
In contrast to the previous scheduler implementation, found in
event_executor.cc, the start, shutdown, suspend and resume are synchronous
operations. As a whole all operations are synchronized and no busy waits
are used except in stop_all_running_events(), which waits until all
running event worker threads have finished. It would have been nice to
use a conditional on which this method will wait and the last thread to
finish would signal it but this implies subclassing THD.
The scheduler does not keep a counter of how many event worker threads are
running, at any specific moment, because this will copy functionality
already existing in the server. Namely, all THDs are registered in the
global `threads` array. THD has member variable system_thread which
identifies the type of thread. Connection threads being NON_SYSTEM_THREAD,
all other have their enum value. Important for the scheduler are
SYSTEM_THREAD_EVENT_SCHEDULER and SYSTEM_THREAD_EVENT_WORKER.
Class THD subclasses class ilink, which is the linked list of all threads.
When a THD instance is destroyed it's being removed from threads, thus
no manual intervention is needed. On the contrary registering is manual
with threads.append() . Traversing the threads array every time a subclass
of THD, for instance if we would have had THD_scheduler_worker to see
how many events we have and whether the scheduler is shutting down will
take much time and lead to a deadlock. stop_all_running_events() is called
under LOCK_scheduler_data. If the THD_scheduler_worker was aware of
the single Event_scheduler instance it will try to check
Event_scheduler::state but for this it would need to acquire
LOCK_scheduler_data => deadlock. Thus stop_all_running_events() uses a
busy wait.
DROP DATABASE DDL should drop all events defined in a specific schema.
DROP USER also should drop all events who has as definer the user being
dropped (this one is not addressed at the moment but a hook exists). For
this specific needs Event_scheduler::drop_matching_events() is
implemented. Which expects a callback to be applied on every object in
the queue. Thus events that match specific schema or user, will be
removed from the queue. The exposed interface is :
- Event_scheduler::drop_schema_events()
- Event_scheduler::drop_user_events()
This bulk dropping happens under LOCK_scheduler_data, thus no two or
more threads can execute it in parallel. However, DROP DATABASE is also
synchronized, currently, in the server thus this does not impact the
overall performance. In addition, DROP DATABASE is not that often
executed DDL.
Though the interface to the scheduler is only through the public methods
of class Event_scheduler, there are currently few functions which are
used during its operations. Namely :
- static evex_print_warnings()
After every event execution all errors/warnings are dumped, so the user
can see in case of a problem what the problem was.
- static init_event_thread()
This function is both used by event_scheduler_thread() and
event_worker_thread(). It initializes the THD structure. The
initialization looks pretty similar to the one in slave.cc done for the
replication threads. However, though the similarities it cannot be
factored out to have one routine.
- static event_scheduler_thread()
Because our way to register functions to be used by the threading library
does not allow usage of static methods this function is used to start the
scheduler in it. It does THD initialization and then calls
Event_scheduler::run().
- static event_worker_thread()
With already stated the reason for not being able to use methods, this
function executes the worker threads.
The execution of events is, to some extent, synchronized to inhibit race
conditions when Event_timed::thread_id is being updated with the thread_id of
the THD in which the event is being executed. The thread_id is in the
Event_timed object because we need to be able to kill quickly a specific
event during ALTER/DROP EVENT without traversing the global `threads` array.
However, this makes the scheduler's code more complicated. The event worker
thread is started by Event_timed::spawn_now(), which in turn calls
pthread_create(). The thread_id which will be associated in init_event_thread
is not known in advance thus the registering takes place in
event_worker_thread(). This registering has to be synchronized under
LOCK_scheduler_data, so no kill_event() on a object in
replace_event/drop_event/drop_matching_events() could take place.
This synchronization is done through class Worker_thread_param that is
local to this file. Event_scheduler::execute_top() is called under
LOCK_scheduler_data. This method :
1. Creates an instance of Worker_thread_param on the stack
2. Locks Worker_thread_param::LOCK_started
3. Calls Event_timed::spawn_now() which in turn creates a new thread.
4. Locks on Worker_thread_param::COND_started_or_stopped and waits till the
worker thread send signal. The code is spurious wake-up safe because
Worker_thread_param::started is checked.
5. The worker thread initializes its THD, then sets Event_timed::thread_id,
sets Worker_thread_param::started to TRUE and sends back
Worker_thread_param::COND_started. From this moment on, the event
is being executed and could be killed by using Event_timed::thread_id.
When Event_timed::spawn_thread_finish() is called in the worker thread,
it sets thread_id to 0. From this moment on, the worker thread should not
touch the Event_timed instance.
The life-cycle of the server is a FSA.
enum enum_state Event_scheduler::state keeps the state of the scheduler.
The states are:
|---UNINITIALIZED
|
| |------------------> IN_SHUTDOWN
--> INITIALIZED -> COMMENCING ---> RUNNING ----------|
^ ^ | | ^ |
| |- CANTSTART <--| | |- SUSPENDED <-|
|______________________________|
- UNINITIALIZED :The object is created and only the mutex is initialized
- INITIALIZED :All member variables are initialized
- COMMENCING :The scheduler is starting, no other attempt to start
should succeed before the state is back to INITIALIZED.
- CANTSTART :Set by the ::run() method in case it can't start for some
reason. In this case the connection thread that tries to
start the scheduler sees that some error has occurred and
returns an error to the user. Finally, the connection
thread sets the state to INITIALIZED, so further attempts
to start the scheduler could be made.
- RUNNING :The scheduler is running. New events could be added,
dropped, altered. The scheduler could be stopped.
- SUSPENDED :Like RUNNING but execution of events does not take place.
Operations on the memory queue are possible.
- IN_SHUTDOWN :The scheduler is shutting down, due to request by setting
the global event_scheduler to 0/FALSE, or because of a
KILL command sent by a user to the master thread.
In every method the macros LOCK_SCHEDULER_DATA() and UNLOCK_SCHEDULER_DATA()
are used for (un)locking purposes. They are used to save the programmer
from typing everytime
lock_data(__FUNCTION__, __LINE__);
All locking goes through Event_scheduler::lock_data() and ::unlock_data().
These two functions then record in variables where for last time
LOCK_scheduler_data was locked and unlocked (two different variables). In
multithreaded environment, in some cases they make no sense but are useful for
inspecting deadlocks without having the server debug log turned on and the
server is still running.
The same strategy is used for conditional variables.
Event_scheduler::cond_wait() is invoked from all places with parameter
an enum enum_cond_vars. In this manner, it's possible to inspect the last
on which condition the last call to cond_wait() was waiting. If the server
was started with debug trace switched on, the trace file also holds information
about conditional variables used.
*/
#ifdef __GNUC__ #ifdef __GNUC__
#if __GNUC__ >= 2 #if __GNUC__ >= 2
...@@ -250,6 +35,10 @@ ...@@ -250,6 +35,10 @@
#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__) #define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__)
Event_scheduler*
Event_scheduler::singleton= NULL;
#ifndef DBUG_OFF #ifndef DBUG_OFF
static static
LEX_STRING states_names[] = LEX_STRING states_names[] =
...@@ -462,7 +251,7 @@ event_scheduler_thread(void *arg) ...@@ -462,7 +251,7 @@ event_scheduler_thread(void *arg)
thd->security_ctx->set_user((char*)"event_scheduler"); thd->security_ctx->set_user((char*)"event_scheduler");
sql_print_information("SCHEDULER: Manager thread booting"); sql_print_information("SCHEDULER: Manager thread booting");
if (Event_scheduler::check_system_tables(thd)) if (Event_scheduler::get_instance()->event_queue->check_system_tables(thd))
scheduler->report_error_during_start(); scheduler->report_error_during_start();
else else
scheduler->run(thd); scheduler->run(thd);
...@@ -625,13 +414,13 @@ event_worker_thread(void *arg) ...@@ -625,13 +414,13 @@ event_worker_thread(void *arg)
Event_scheduler::Event_scheduler() Event_scheduler::Event_scheduler()
{ {
thread_id= 0; thread_id= 0;
mutex_last_unlocked_at_line_nr= mutex_last_locked_at_line_nr= 0; mutex_last_unlocked_at_line= mutex_last_locked_at_line= 0;
mutex_last_unlocked_in_func_name= mutex_last_locked_in_func_name= ""; mutex_last_unlocked_in_func= mutex_last_locked_in_func= "";
cond_waiting_on= COND_NONE; cond_waiting_on= COND_NONE;
mutex_scheduler_data_locked= FALSE; mutex_scheduler_data_locked= FALSE;
state= UNINITIALIZED; state= UNINITIALIZED;
start_scheduler_suspended= FALSE; start_scheduler_suspended= FALSE;
LOCK_scheduler_data= &LOCK_event_queue; LOCK_scheduler_data= &LOCK_data;
} }
...@@ -647,9 +436,10 @@ Event_scheduler::Event_scheduler() ...@@ -647,9 +436,10 @@ Event_scheduler::Event_scheduler()
*/ */
void void
Event_scheduler::create_instance() Event_scheduler::create_instance(Event_queue *queue)
{ {
singleton= new Event_scheduler(); singleton= new Event_scheduler();
singleton->event_queue= queue;
} }
/* /*
...@@ -689,8 +479,8 @@ Event_scheduler::init(Event_db_repository *db_repo) ...@@ -689,8 +479,8 @@ Event_scheduler::init(Event_db_repository *db_repo)
DBUG_ENTER("Event_scheduler::init"); DBUG_ENTER("Event_scheduler::init");
DBUG_PRINT("enter", ("this=%p", this)); DBUG_PRINT("enter", ("this=%p", this));
Event_queue::init(db_repo);
LOCK_SCHEDULER_DATA(); LOCK_SCHEDULER_DATA();
init_alloc_root(&scheduler_root, MEM_ROOT_BLOCK_SIZE, MEM_ROOT_PREALLOC);
for (;i < COND_LAST; i++) for (;i < COND_LAST; i++)
if (pthread_cond_init(&cond_vars[i], NULL)) if (pthread_cond_init(&cond_vars[i], NULL))
{ {
...@@ -720,7 +510,6 @@ void ...@@ -720,7 +510,6 @@ void
Event_scheduler::destroy() Event_scheduler::destroy()
{ {
DBUG_ENTER("Event_scheduler"); DBUG_ENTER("Event_scheduler");
Event_queue::deinit();
LOCK_SCHEDULER_DATA(); LOCK_SCHEDULER_DATA();
switch (state) { switch (state) {
case UNINITIALIZED: case UNINITIALIZED:
...@@ -879,7 +668,7 @@ Event_scheduler::run(THD *thd) ...@@ -879,7 +668,7 @@ Event_scheduler::run(THD *thd)
DBUG_PRINT("enter", ("thd=%p", thd)); DBUG_PRINT("enter", ("thd=%p", thd));
LOCK_SCHEDULER_DATA(); LOCK_SCHEDULER_DATA();
ret= load_events_from_db(thd); ret= event_queue->load_events_from_db(thd);
if (!ret) if (!ret)
{ {
...@@ -923,7 +712,8 @@ Event_scheduler::run(THD *thd) ...@@ -923,7 +712,8 @@ Event_scheduler::run(THD *thd)
} }
DBUG_ASSERT(state == RUNNING); DBUG_ASSERT(state == RUNNING);
et= (Event_timed *)queue_top(&queue); // et= (Event_timed *)queue_top(&event_queue->queue);
et= event_queue->get_top();
/* Skip disabled events */ /* Skip disabled events */
if (et->status != Event_timed::ENABLED) if (et->status != Event_timed::ENABLED)
...@@ -935,7 +725,7 @@ Event_scheduler::run(THD *thd) ...@@ -935,7 +725,7 @@ Event_scheduler::run(THD *thd)
sql_print_information("SCHEDULER: Found a disabled event %*s.%*s in the queue", sql_print_information("SCHEDULER: Found a disabled event %*s.%*s in the queue",
et->dbname.length, et->dbname.str, et->name.length, et->dbname.length, et->dbname.str, et->name.length,
et->name.str); et->name.str);
queue_remove(&queue, 0); queue_remove(&event_queue->queue, 0);
/* ToDo: check this again */ /* ToDo: check this again */
if (et->dropped) if (et->dropped)
et->drop(thd); et->drop(thd);
...@@ -1095,16 +885,16 @@ Event_scheduler::execute_top(THD *thd, Event_timed *et) ...@@ -1095,16 +885,16 @@ Event_scheduler::execute_top(THD *thd, Event_timed *et)
sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.", sql_print_information("SCHEDULER: %s.%s in execution. Skip this time.",
et->dbname.str, et->name.str); et->dbname.str, et->name.str);
if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED) if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED)
queue_remove(&queue, 0);// 0 is top, internally 1 event_queue->remove_top();
else else
queue_replaced(&queue); event_queue->top_changed();
break; break;
default: default:
DBUG_ASSERT(!spawn_ret_code); DBUG_ASSERT(!spawn_ret_code);
if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED) if ((et->flags & EVENT_EXEC_NO_MORE) || et->status == Event_timed::DISABLED)
queue_remove(&queue, 0);// 0 is top, internally 1 event_queue->remove_top();
else else
queue_replaced(&queue); event_queue->top_changed();
/* /*
We don't lock LOCK_scheduler_data here because it's a pre-requisite We don't lock LOCK_scheduler_data here because it's a pre-requisite
for calling the current_method. for calling the current_method.
...@@ -1152,7 +942,7 @@ Event_scheduler::clean_memory(THD *thd) ...@@ -1152,7 +942,7 @@ Event_scheduler::clean_memory(THD *thd)
sql_print_information("SCHEDULER: Emptying the queue"); sql_print_information("SCHEDULER: Emptying the queue");
empty_queue(); event_queue->empty_queue();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -1432,7 +1222,7 @@ Event_scheduler::check_n_suspend_if_needed(THD *thd) ...@@ -1432,7 +1222,7 @@ Event_scheduler::check_n_suspend_if_needed(THD *thd)
} }
if (was_suspended) if (was_suspended)
{ {
recalculate_queue(thd); event_queue->recalculate_queue(thd);
/* This will implicitly unlock LOCK_scheduler_data */ /* This will implicitly unlock LOCK_scheduler_data */
thd->exit_cond(""); thd->exit_cond("");
} }
...@@ -1461,14 +1251,14 @@ Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd) ...@@ -1461,14 +1251,14 @@ Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd)
bool slept= FALSE; bool slept= FALSE;
DBUG_ENTER("Event_scheduler::check_n_wait_for_non_empty_queue"); DBUG_ENTER("Event_scheduler::check_n_wait_for_non_empty_queue");
DBUG_PRINT("enter", ("q.elements=%lu state=%s", DBUG_PRINT("enter", ("q.elements=%lu state=%s",
events_count_no_lock(), states_names[state])); event_queue->events_count_no_lock(), states_names[state]));
if (!events_count_no_lock()) if (!event_queue->events_count_no_lock())
thd->enter_cond(&cond_vars[COND_new_work], LOCK_scheduler_data, thd->enter_cond(&cond_vars[COND_new_work], LOCK_scheduler_data,
"Empty queue, sleeping"); "Empty queue, sleeping");
/* Wait in a loop protecting against catching spurious signals */ /* Wait in a loop protecting against catching spurious signals */
while (!events_count_no_lock() && state == RUNNING) while (!event_queue->events_count_no_lock() && state == RUNNING)
{ {
slept= TRUE; slept= TRUE;
DBUG_PRINT("info", ("Entering condition because of empty queue")); DBUG_PRINT("info", ("Entering condition because of empty queue"));
...@@ -1485,7 +1275,7 @@ Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd) ...@@ -1485,7 +1275,7 @@ Event_scheduler::check_n_wait_for_non_empty_queue(THD *thd)
thd->exit_cond(""); thd->exit_cond("");
DBUG_PRINT("exit", ("q.elements=%lu state=%s thd->killed=%d", DBUG_PRINT("exit", ("q.elements=%lu state=%s thd->killed=%d",
events_count_no_lock(), states_names[state], thd->killed)); event_queue->events_count_no_lock(), states_names[state], thd->killed));
DBUG_RETURN(slept); DBUG_RETURN(slept);
} }
...@@ -1627,7 +1417,7 @@ Event_scheduler::dump_internal_status(THD *thd) ...@@ -1627,7 +1417,7 @@ Event_scheduler::dump_internal_status(THD *thd)
/* queue.elements */ /* queue.elements */
protocol->prepare_for_resend(); protocol->prepare_for_resend();
protocol->store(STRING_WITH_LEN("queue.elements"), scs); protocol->store(STRING_WITH_LEN("queue.elements"), scs);
int_string.set((longlong) scheduler->events_count_no_lock(), scs); int_string.set((longlong) scheduler->event_queue->events_count_no_lock(), scs);
protocol->store(&int_string); protocol->store(&int_string);
ret= protocol->write(); ret= protocol->write();
...@@ -1663,8 +1453,8 @@ Event_scheduler::lock_data(const char *func, uint line) ...@@ -1663,8 +1453,8 @@ Event_scheduler::lock_data(const char *func, uint line)
DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u", DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u",
&LOCK_scheduler_data, func, line)); &LOCK_scheduler_data, func, line));
pthread_mutex_lock(LOCK_scheduler_data); pthread_mutex_lock(LOCK_scheduler_data);
mutex_last_locked_in_func_name= func; mutex_last_locked_in_func= func;
mutex_last_locked_at_line_nr= line; mutex_last_locked_at_line= line;
mutex_scheduler_data_locked= TRUE; mutex_scheduler_data_locked= TRUE;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -1685,9 +1475,9 @@ Event_scheduler::unlock_data(const char *func, uint line) ...@@ -1685,9 +1475,9 @@ Event_scheduler::unlock_data(const char *func, uint line)
DBUG_ENTER("Event_scheduler::UNLOCK_mutex"); DBUG_ENTER("Event_scheduler::UNLOCK_mutex");
DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u", DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u",
LOCK_scheduler_data, func, line)); LOCK_scheduler_data, func, line));
mutex_last_unlocked_at_line_nr= line; mutex_last_unlocked_at_line= line;
mutex_scheduler_data_locked= FALSE; mutex_scheduler_data_locked= FALSE;
mutex_last_unlocked_in_func_name= func; mutex_last_unlocked_in_func= func;
pthread_mutex_unlock(LOCK_scheduler_data); pthread_mutex_unlock(LOCK_scheduler_data);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -1733,3 +1523,31 @@ Event_scheduler::queue_changed() ...@@ -1733,3 +1523,31 @@ Event_scheduler::queue_changed()
pthread_cond_signal(&cond_vars[COND_new_work]); pthread_cond_signal(&cond_vars[COND_new_work]);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/*
Inits mutexes.
SYNOPSIS
Event_scheduler::init_mutexes()
*/
void
Event_scheduler::init_mutexes()
{
pthread_mutex_init(singleton->LOCK_scheduler_data, MY_MUTEX_INIT_FAST);
}
/*
Destroys mutexes.
SYNOPSIS
Event_queue::destroy_mutexes()
*/
void
Event_scheduler::destroy_mutexes()
{
pthread_mutex_destroy(singleton->LOCK_scheduler_data);
}
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
class sp_name; class sp_name;
class Event_timed; class Event_timed;
class Event_db_repository; class Event_db_repository;
class Event_queue;
class THD; class THD;
...@@ -31,7 +32,7 @@ events_shutdown(); ...@@ -31,7 +32,7 @@ events_shutdown();
#include "event_queue.h" #include "event_queue.h"
#include "event_scheduler.h" #include "event_scheduler.h"
class Event_scheduler : public Event_queue class Event_scheduler
{ {
public: public:
enum enum_state enum enum_state
...@@ -56,7 +57,13 @@ class Event_scheduler : public Event_queue ...@@ -56,7 +57,13 @@ class Event_scheduler : public Event_queue
static void static void
create_instance(); create_instance(Event_queue *queue);
static void
init_mutexes();
static void
destroy_mutexes();
/* Singleton access */ /* Singleton access */
static Event_scheduler* static Event_scheduler*
...@@ -122,6 +129,8 @@ class Event_scheduler : public Event_queue ...@@ -122,6 +129,8 @@ class Event_scheduler : public Event_queue
void void
queue_changed(); queue_changed();
Event_queue *event_queue;
protected: protected:
uint uint
...@@ -147,9 +156,11 @@ class Event_scheduler : public Event_queue ...@@ -147,9 +156,11 @@ class Event_scheduler : public Event_queue
/* Singleton DP is used */ /* Singleton DP is used */
Event_scheduler(); Event_scheduler();
pthread_mutex_t LOCK_data;
pthread_mutex_t *LOCK_scheduler_data; pthread_mutex_t *LOCK_scheduler_data;
/* The MEM_ROOT of the object */
MEM_ROOT scheduler_root;
/* Set to start the scheduler in suspended state */ /* Set to start the scheduler in suspended state */
bool start_scheduler_suspended; bool start_scheduler_suspended;
...@@ -172,18 +183,20 @@ class Event_scheduler : public Event_queue ...@@ -172,18 +183,20 @@ class Event_scheduler : public Event_queue
COND_LAST COND_LAST
}; };
uint mutex_last_locked_at_line_nr; uint mutex_last_locked_at_line;
uint mutex_last_unlocked_at_line_nr; uint mutex_last_unlocked_at_line;
const char* mutex_last_locked_in_func_name; const char* mutex_last_locked_in_func;
const char* mutex_last_unlocked_in_func_name; const char* mutex_last_unlocked_in_func;
int cond_waiting_on; int cond_waiting_on;
bool mutex_scheduler_data_locked; bool mutex_scheduler_data_locked;
static const char * const cond_vars_names[COND_LAST]; static const char * const cond_vars_names[COND_LAST];
pthread_cond_t cond_vars[COND_LAST]; pthread_cond_t cond_vars[COND_LAST];
/* Singleton instance */
static Event_scheduler *singleton;
private: private:
/* Prevent use of these */ /* Prevent use of these */
Event_scheduler(const Event_scheduler &); Event_scheduler(const Event_scheduler &);
......
/* Copyright (C) 2004-2006 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "mysql_priv.h"
#include "events.h"
#include "event_data_objects.h"
#include "event_scheduler_ng.h"
#include "event_queue.h"
#ifdef __GNUC__
#if __GNUC__ >= 2
#define SCHED_FUNC __FUNCTION__
#endif
#else
#define SCHED_FUNC "<unknown>"
#endif
#define LOCK_SCHEDULER_DATA() lock_data(SCHED_FUNC, __LINE__)
#define UNLOCK_SCHEDULER_DATA() unlock_data(SCHED_FUNC, __LINE__)
extern pthread_attr_t connection_attrib;
struct scheduler_param
{
THD *thd;
Event_scheduler_ng *scheduler;
};
struct scheduler_param scheduler_param_value;
static
LEX_STRING scheduler_states_names[] =
{
{ C_STRING_WITH_LEN("INITIALIZED")},
{ C_STRING_WITH_LEN("RUNNING")},
{ C_STRING_WITH_LEN("STOPPING")}
};
class Worker_thread_param
{
public:
Event_timed *et;
pthread_mutex_t LOCK_started;
pthread_cond_t COND_started;
bool started;
Worker_thread_param(Event_timed *etn):et(etn), started(FALSE)
{
pthread_mutex_init(&LOCK_started, MY_MUTEX_INIT_FAST);
pthread_cond_init(&COND_started, NULL);
}
~Worker_thread_param()
{
pthread_mutex_destroy(&LOCK_started);
pthread_cond_destroy(&COND_started);
}
};
/*
Prints the stack of infos, warnings, errors from thd to
the console so it can be fetched by the logs-into-tables and
checked later.
SYNOPSIS
evex_print_warnings
thd - thread used during the execution of the event
et - the event itself
*/
static void
evex_print_warnings(THD *thd, Event_timed *et)
{
MYSQL_ERROR *err;
DBUG_ENTER("evex_print_warnings");
if (!thd->warn_list.elements)
DBUG_VOID_RETURN;
char msg_buf[10 * STRING_BUFFER_USUAL_SIZE];
char prefix_buf[5 * STRING_BUFFER_USUAL_SIZE];
String prefix(prefix_buf, sizeof(prefix_buf), system_charset_info);
prefix.length(0);
prefix.append("SCHEDULER: [");
append_identifier(thd, &prefix, et->definer.str, et->definer.length);
prefix.append("][", 2);
append_identifier(thd,&prefix, et->dbname.str, et->dbname.length);
prefix.append('.');
append_identifier(thd,&prefix, et->name.str, et->name.length);
prefix.append("] ", 2);
List_iterator_fast<MYSQL_ERROR> it(thd->warn_list);
while ((err= it++))
{
String err_msg(msg_buf, sizeof(msg_buf), system_charset_info);
/* set it to 0 or we start adding at the end. That's the trick ;) */
err_msg.length(0);
err_msg.append(prefix);
err_msg.append(err->msg, strlen(err->msg), system_charset_info);
err_msg.append("]");
DBUG_ASSERT(err->level < 3);
(sql_print_message_handlers[err->level])("%*s", err_msg.length(),
err_msg.c_ptr());
}
DBUG_VOID_RETURN;
}
/*
Inits an scheduler thread handler, both the main and a worker
SYNOPSIS
init_event_thread()
thd - the THD of the thread. Has to be allocated by the caller.
NOTES
1. The host of the thead is my_localhost
2. thd->net is initted with NULL - no communication.
RETURN VALUE
0 OK
-1 Error
*/
static int
init_scheduler_thread(THD* thd)
{
DBUG_ENTER("init_event_thread");
thd->client_capabilities= 0;
thd->security_ctx->master_access= 0;
thd->security_ctx->db_access= 0;
thd->security_ctx->host_or_ip= (char*)my_localhost;
thd->security_ctx->set_user((char*)"event_scheduler");
my_net_init(&thd->net, 0);
thd->net.read_timeout= slave_net_timeout;
thd->slave_thread= 0;
thd->options|= OPTION_AUTO_IS_NULL;
thd->client_capabilities|= CLIENT_MULTI_RESULTS;
VOID(pthread_mutex_lock(&LOCK_thread_count));
thd->thread_id= thread_id++;
threads.append(thd);
thread_count++;
thread_running++;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
/*
Guarantees that we will see the thread in SHOW PROCESSLIST though its
vio is NULL.
*/
thd->system_thread= SYSTEM_THREAD_EVENT_SCHEDULER;
thd->proc_info= "Initialized";
thd->version= refresh_version;
thd->set_time();
DBUG_RETURN(0);
}
pthread_handler_t
event_scheduler_ng_thread(void *arg)
{
/* needs to be first for thread_stack */
THD *thd= (THD *)(*(struct scheduler_param *) arg).thd;
thd->thread_stack= (char *)&thd; // remember where our stack is
DBUG_ENTER("event_scheduler_ng_thread");
my_thread_init();
pthread_detach_this_thread();
thd->real_id=pthread_self();
if (init_thr_lock() || thd->store_globals())
{
thd->cleanup();
goto end;
}
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
sigset_t set;
VOID(sigemptyset(&set)); // Get mask in use
VOID(pthread_sigmask(SIG_UNBLOCK,&set,&thd->block_signals));
#endif
((struct scheduler_param *) arg)->scheduler->run(thd);
end:
thd->proc_info= "Clearing";
DBUG_ASSERT(thd->net.buff != 0);
net_end(&thd->net);
DBUG_PRINT("exit", ("Scheduler thread finishing"));
pthread_mutex_lock(&LOCK_thread_count);
thread_count--;
thread_running--;
delete thd;
pthread_mutex_unlock(&LOCK_thread_count);
my_thread_end();
}
/*
Function that executes an event in a child thread. Setups the
environment for the event execution and cleans after that.
SYNOPSIS
event_worker_ng_thread()
arg The Event_timed object to be processed
RETURN VALUE
0 OK
*/
pthread_handler_t
event_worker_ng_thread(void *arg)
{
/* needs to be first for thread_stack */
THD *thd;
Event_timed *event= (Event_timed *)arg;
int ret;
thd= event->thd;
thd->thread_stack= (char *) &thd;
DBUG_ENTER("event_worker_thread");
DBUG_PRINT("enter", ("event=[%s.%s]", event->dbname.str, event->name.str));
my_thread_init();
pthread_detach_this_thread();
thd->real_id=pthread_self();
if (init_thr_lock() || thd->store_globals())
{
thd->cleanup();
goto end;
}
#if !defined(__WIN__) && !defined(OS2) && !defined(__NETWARE__)
sigset_t set;
VOID(sigemptyset(&set)); // Get mask in use
VOID(pthread_sigmask(SIG_UNBLOCK, &set, &thd->block_signals));
#endif
sql_print_information("SCHEDULER: [%s.%s of %s] executing in thread %lu",
event->dbname.str, event->name.str,
event->definer.str, thd->thread_id);
thd->init_for_queries();
thd->enable_slow_log= TRUE;
ret= event->execute(thd, thd->mem_root);
evex_print_warnings(thd, event);
sql_print_information("SCHEDULER: [%s.%s of %s] executed. RetCode=%d",
event->dbname.str, event->name.str,
event->definer.str, ret);
if (ret == EVEX_COMPILE_ERROR)
sql_print_information("SCHEDULER: COMPILE ERROR for event %s.%s of %s",
event->dbname.str, event->name.str,
event->definer.str);
else if (ret == EVEX_MICROSECOND_UNSUP)
sql_print_information("SCHEDULER: MICROSECOND is not supported");
DBUG_PRINT("info", ("master_access=%d db_access=%d",
thd->security_ctx->master_access, thd->security_ctx->db_access));
end:
thd->proc_info= "Clearing";
DBUG_ASSERT(thd->net.buff != 0);
/*
Free it here because net.vio is NULL for us => THD::~THD will check it
and won't call net_end(&net); See also replication code.
*/
net_end(&thd->net);
DBUG_PRINT("info", ("Worker thread %lu exiting", thd->thread_id));
VOID(pthread_mutex_lock(&LOCK_thread_count));
thread_count--;
thread_running--;
delete thd;
VOID(pthread_mutex_unlock(&LOCK_thread_count));
delete event;
my_thread_end();
}
bool
Event_scheduler_ng::init(Event_queue *q)
{
thread_id= 0;
state= INITIALIZED;
/* init memory root */
queue= q;
return FALSE;
}
void
Event_scheduler_ng::deinit()
{
}
void
Event_scheduler_ng::init_mutexes()
{
pthread_mutex_init(&LOCK_scheduler_state, MY_MUTEX_INIT_FAST);
pthread_cond_init(&COND_state, NULL);
}
void
Event_scheduler_ng::deinit_mutexes()
{
pthread_mutex_destroy(&LOCK_scheduler_state);
pthread_cond_destroy(&COND_state);
}
bool
Event_scheduler_ng::start()
{
THD *new_thd= NULL;
bool ret= FALSE;
pthread_t th;
DBUG_ENTER("Event_scheduler_ng::start");
LOCK_SCHEDULER_DATA();
if (state > INITIALIZED)
goto end;
if (!(new_thd= new THD) || init_scheduler_thread(new_thd))
{
sql_print_error("SCHEDULER: Cannot init manager event thread.");
ret= TRUE;
goto end;
}
scheduler_param_value.thd= new_thd;
scheduler_param_value.scheduler= this;
if (pthread_create(&th, &connection_attrib, event_scheduler_ng_thread,
(void*)&scheduler_param_value))
{
DBUG_PRINT("error", ("cannot create a new thread"));
state= INITIALIZED;
ret= TRUE;
}
state= RUNNING;
end:
UNLOCK_SCHEDULER_DATA();
if (ret && new_thd)
{
new_thd->proc_info= "Clearing";
DBUG_ASSERT(new_thd->net.buff != 0);
net_end(&new_thd->net);
pthread_mutex_lock(&LOCK_thread_count);
thread_count--;
thread_running--;
delete new_thd;
pthread_mutex_unlock(&LOCK_thread_count);
}
DBUG_RETURN(ret);
}
bool
Event_scheduler_ng::stop()
{
THD *thd= current_thd;
DBUG_ENTER("Event_scheduler_ng::stop");
DBUG_PRINT("enter", ("thd=%p", current_thd));
LOCK_SCHEDULER_DATA();
if (state != RUNNING)
goto end;
state= STOPPING;
DBUG_PRINT("info", ("Manager thread has id %d", thread_id));
sql_print_information("SCHEDULER: Killing manager thread %lu", thread_id);
pthread_cond_signal(&COND_state);
/* Guarantee we don't catch spurious signals */
sql_print_information("SCHEDULER: Waiting the manager thread to reply");
do {
DBUG_PRINT("info", ("Waiting for COND_started_or_stopped from the manager "
"thread. Current value of state is %s . "
"workers count=%d", scheduler_states_names[state].str,
workers_count()));
/* thd could be 0x0, when shutting down */
pthread_cond_wait(&COND_state, &LOCK_scheduler_state);
} while (state == STOPPING);
DBUG_PRINT("info", ("Manager thread has cleaned up. Set state to INIT"));
end:
UNLOCK_SCHEDULER_DATA();
DBUG_RETURN(FALSE);
}
bool
Event_scheduler_ng::run(THD *thd)
{
struct timespec abstime;
Event_timed *job_data;
LOCK_SCHEDULER_DATA();
thread_id= thd->thread_id;
sql_print_information("SCHEDULER: Manager thread started with id %lu",
thread_id);
while (state == RUNNING)
{
thd->end_time();
/* Gets a minimized version */
job_data= queue->get_top_for_execution_if_time(thd, thd->query_start(),
&abstime);
DBUG_PRINT("info", ("get_top returned job_data=%p now=%d abs_time.tv_sec=%d",
job_data, thd->query_start(), abstime.tv_sec));
if (!job_data && !abstime.tv_sec)
{
thd->enter_cond(&COND_state, &LOCK_scheduler_state,
"Waiting on empty queue");
pthread_cond_wait(&COND_state, &LOCK_scheduler_state);
thd->exit_cond("");
DBUG_PRINT("info", ("Woke up. Got COND_state"));
LOCK_SCHEDULER_DATA();
}
else if (abstime.tv_sec)
{
thd->enter_cond(&COND_state, &LOCK_scheduler_state,
"Waiting for next activation");
pthread_cond_timedwait(&COND_state, &LOCK_scheduler_state, &abstime);
/*
If we get signal we should recalculate the whether it's the right time
because there could be :
1. Spurious wake-up
2. The top of the queue was changed (new one becase of create/update)
*/
/* This will do implicit UNLOCK_SCHEDULER_DATA() */
thd->exit_cond("");
DBUG_PRINT("info", ("Woke up. Got COND_stat or time for execution."));
LOCK_SCHEDULER_DATA();
}
else
{
int res;
UNLOCK_SCHEDULER_DATA();
res= execute_top(thd, job_data);
LOCK_SCHEDULER_DATA();
if (res)
break;
}
DBUG_PRINT("info", ("state=%s", scheduler_states_names[state].str));
}
DBUG_PRINT("info", ("Signalling back to the stopper COND_state"));
pthread_cond_signal(&COND_state);
error:
state= INITIALIZED;
stop_all_running_events(thd);
UNLOCK_SCHEDULER_DATA();
sql_print_information("SCHEDULER: Stopped");
return FALSE;
}
bool
Event_scheduler_ng::execute_top(THD *thd, Event_timed *job_data)
{
THD *new_thd;
pthread_t th;
DBUG_ENTER("Event_scheduler_ng::execute_top");
if (!(new_thd= new THD) || init_scheduler_thread(new_thd))
goto error;
/* Major failure */
job_data->thd= new_thd;
DBUG_PRINT("info", ("Starting new thread for %s@%s",
job_data->dbname.str, job_data->name.str));
if (pthread_create(&th, &connection_attrib, event_worker_ng_thread, job_data))
goto error;
DBUG_RETURN(FALSE);
error:
if (new_thd)
{
new_thd->proc_info= "Clearing";
DBUG_ASSERT(new_thd->net.buff != 0);
net_end(&new_thd->net);
pthread_mutex_lock(&LOCK_thread_count);
thread_count--;
thread_running--;
delete new_thd;
pthread_mutex_unlock(&LOCK_thread_count);
}
DBUG_RETURN(TRUE);
}
enum Event_scheduler_ng::enum_state
Event_scheduler_ng::get_state()
{
enum Event_scheduler_ng::enum_state ret;
LOCK_SCHEDULER_DATA();
ret= state;
UNLOCK_SCHEDULER_DATA();
return ret;
}
int
Event_scheduler_ng::dump_internal_status(THD *thd)
{
return 1;
}
uint
Event_scheduler_ng::workers_count()
{
THD *tmp;
uint count= 0;
DBUG_ENTER("Event_scheduler_ng::workers_count");
VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list
I_List_iterator<THD> it(threads);
while ((tmp=it++))
{
if (tmp->command == COM_DAEMON)
continue;
if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
++count;
}
VOID(pthread_mutex_unlock(&LOCK_thread_count));
DBUG_PRINT("exit", ("%d", count));
DBUG_RETURN(count);
}
/*
Stops all running events
SYNOPSIS
Event_scheduler::stop_all_running_events()
thd Thread
NOTE
LOCK_scheduler data must be acquired prior to call to this method
*/
void
Event_scheduler_ng::stop_all_running_events(THD *thd)
{
CHARSET_INFO *scs= system_charset_info;
uint i;
DYNAMIC_ARRAY running_threads;
THD *tmp;
DBUG_ENTER("Event_scheduler::stop_all_running_events");
DBUG_PRINT("enter", ("workers_count=%d", workers_count()));
my_init_dynamic_array(&running_threads, sizeof(ulong), 10, 10);
bool had_super= FALSE;
VOID(pthread_mutex_lock(&LOCK_thread_count)); // For unlink from list
I_List_iterator<THD> it(threads);
while ((tmp=it++))
{
if (tmp->command == COM_DAEMON)
continue;
if (tmp->system_thread == SYSTEM_THREAD_EVENT_WORKER)
push_dynamic(&running_threads, (gptr) &tmp->thread_id);
}
VOID(pthread_mutex_unlock(&LOCK_thread_count));
/* We need temporarily SUPER_ACL to be able to kill our offsprings */
if (!(thd->security_ctx->master_access & SUPER_ACL))
thd->security_ctx->master_access|= SUPER_ACL;
else
had_super= TRUE;
char tmp_buff[10*STRING_BUFFER_USUAL_SIZE];
char int_buff[STRING_BUFFER_USUAL_SIZE];
String tmp_string(tmp_buff, sizeof(tmp_buff), scs);
String int_string(int_buff, sizeof(int_buff), scs);
tmp_string.length(0);
for (i= 0; i < running_threads.elements; ++i)
{
int ret;
ulong thd_id= *dynamic_element(&running_threads, i, ulong*);
int_string.set((longlong) thd_id,scs);
tmp_string.append(int_string);
if (i < running_threads.elements - 1)
tmp_string.append(' ');
if ((ret= kill_one_thread(thd, thd_id, FALSE)))
{
sql_print_error("SCHEDULER: Error killing %lu code=%d", thd_id, ret);
break;
}
}
if (running_threads.elements)
sql_print_information("SCHEDULER: Killing workers :%s", tmp_string.c_ptr());
if (!had_super)
thd->security_ctx->master_access &= ~SUPER_ACL;
delete_dynamic(&running_threads);
sql_print_information("SCHEDULER: Waiting for worker threads to finish");
while (workers_count())
my_sleep(100000);
DBUG_VOID_RETURN;
}
/*
Signals the main scheduler thread that the queue has changed
its state.
SYNOPSIS
Event_scheduler::queue_changed()
*/
void
Event_scheduler_ng::queue_changed()
{
DBUG_ENTER("Event_scheduler::queue_changed");
DBUG_PRINT("info", ("Sending COND_state"));
pthread_cond_signal(&COND_state);
DBUG_VOID_RETURN;
}
void
Event_scheduler_ng::lock_data(const char *func, uint line)
{
DBUG_ENTER("Event_scheduler_ng::lock_mutex");
DBUG_PRINT("enter", ("mutex_lock=%p func=%s line=%u",
&LOCK_scheduler_state, func, line));
pthread_mutex_lock(&LOCK_scheduler_state);
mutex_last_locked_in_func= func;
mutex_last_locked_at_line= line;
mutex_scheduler_data_locked= TRUE;
DBUG_VOID_RETURN;
}
void
Event_scheduler_ng::unlock_data(const char *func, uint line)
{
DBUG_ENTER("Event_scheduler_ng::UNLOCK_mutex");
DBUG_PRINT("enter", ("mutex_unlock=%p func=%s line=%u",
&LOCK_scheduler_state, func, line));
mutex_last_unlocked_at_line= line;
mutex_scheduler_data_locked= FALSE;
mutex_last_unlocked_in_func= func;
pthread_mutex_unlock(&LOCK_scheduler_state);
DBUG_VOID_RETURN;
}
#ifndef _EVENT_SCHEDULER_NG_H_
#define _EVENT_SCHEDULER_NG_H_
/* Copyright (C) 2004-2006 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
class Event_timed;
class Event_queue;
class Event_scheduler_ng
{
public:
Event_scheduler_ng(){}
~Event_scheduler_ng(){}
enum enum_state
{
INITIALIZED = 0,
RUNNING,
STOPPING
};
/* State changing methods follow */
bool
start();
bool
stop();
/*
Need to be public because has to be called from the function
passed to pthread_create.
*/
bool
run(THD *thd);
bool
init(Event_queue *queue);
void
deinit();
void
init_mutexes();
void
deinit_mutexes();
/* Information retrieving methods follow */
enum enum_state
get_state();
void
queue_changed();
static int
dump_internal_status(THD *thd);
private:
uint
workers_count();
/* helper functions */
bool
execute_top(THD *thd, Event_timed *job_data);
void
stop_all_running_events(THD *thd);
/* helper functions for working with mutexes & conditionals */
void
lock_data(const char *func, uint line);
void
unlock_data(const char *func, uint line);
pthread_mutex_t LOCK_scheduler_state;
/* This is the current status of the life-cycle of the scheduler. */
enum enum_state state;
/*
Holds the thread id of the executor thread or 0 if the scheduler is not
running. It is used by ::shutdown() to know which thread to kill with
kill_one_thread(). The latter wake ups a thread if it is waiting on a
conditional variable and sets thd->killed to non-zero.
*/
ulong thread_id;
pthread_cond_t COND_state;
Event_queue *queue;
Event_db_repository *db_repository;
uint mutex_last_locked_at_line;
uint mutex_last_unlocked_at_line;
const char* mutex_last_locked_in_func;
const char* mutex_last_unlocked_in_func;
bool mutex_scheduler_data_locked;
private:
/* Prevent use of these */
Event_scheduler_ng(const Event_scheduler_ng &);
void operator=(Event_scheduler_ng &);
};
#endif /* _EVENT_SCHEDULER_NG_H_ */
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "event_scheduler.h" #include "event_scheduler.h"
#include "event_db_repository.h" #include "event_db_repository.h"
#include "sp_head.h" #include "sp_head.h"
#include "event_scheduler_ng.h"
/* /*
TODO list : TODO list :
...@@ -293,9 +294,7 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, uint create_options ...@@ -293,9 +294,7 @@ Events::create_event(THD *thd, Event_parse_data *parse_data, uint create_options
create_options & HA_LEX_CREATE_IF_NOT_EXISTS, create_options & HA_LEX_CREATE_IF_NOT_EXISTS,
rows_affected))) rows_affected)))
{ {
Event_scheduler *scheduler= Event_scheduler::get_instance(); if ((ret= event_queue->create_event(thd, parse_data, true)))
if (scheduler->initialized() &&
(ret= scheduler->create_event(thd, parse_data, true)))
my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret); my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret);
} }
/* No need to close the table, it will be closed in sql_parse::do_command */ /* No need to close the table, it will be closed in sql_parse::do_command */
...@@ -336,9 +335,7 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name, ...@@ -336,9 +335,7 @@ Events::update_event(THD *thd, Event_parse_data *parse_data, sp_name *new_name,
*/ */
if (!(ret= db_repository->update_event(thd, parse_data, new_name))) if (!(ret= db_repository->update_event(thd, parse_data, new_name)))
{ {
Event_scheduler *scheduler= Event_scheduler::get_instance(); if ((ret= event_queue->update_event(thd, parse_data,
if (scheduler->initialized() &&
(ret= scheduler->update_event(thd, parse_data,
new_name? &new_name->m_db: NULL, new_name? &new_name->m_db: NULL,
new_name? &new_name->m_name: NULL))) new_name? &new_name->m_name: NULL)))
my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret); my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret);
...@@ -373,8 +370,7 @@ Events::drop_event(THD *thd, sp_name *name, bool drop_if_exists, ...@@ -373,8 +370,7 @@ Events::drop_event(THD *thd, sp_name *name, bool drop_if_exists,
if (!(ret= db_repository->drop_event(thd, name->m_db, name->m_name, if (!(ret= db_repository->drop_event(thd, name->m_db, name->m_name,
drop_if_exists, rows_affected))) drop_if_exists, rows_affected)))
{ {
Event_scheduler *scheduler= Event_scheduler::get_instance(); if ((ret= event_queue->drop_event(thd, name)))
if (scheduler->initialized() && (ret= scheduler->drop_event(thd, name)))
my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret); my_error(ER_EVENT_MODIFY_QUEUE_ERROR, MYF(0), ret);
} }
DBUG_RETURN(ret); DBUG_RETURN(ret);
...@@ -476,8 +472,7 @@ Events::drop_schema_events(THD *thd, char *db) ...@@ -476,8 +472,7 @@ Events::drop_schema_events(THD *thd, char *db)
DBUG_ENTER("evex_drop_db_events"); DBUG_ENTER("evex_drop_db_events");
DBUG_PRINT("enter", ("dropping events from %s", db)); DBUG_PRINT("enter", ("dropping events from %s", db));
Event_scheduler *scheduler= Event_scheduler::get_instance(); ret= event_queue->drop_schema_events(thd, db_lex);
ret= scheduler->drop_schema_events(thd, db_lex);
ret= db_repository->drop_schema_events(thd, db_lex); ret= db_repository->drop_schema_events(thd, db_lex);
DBUG_RETURN(ret); DBUG_RETURN(ret);
...@@ -505,16 +500,18 @@ Events::init() ...@@ -505,16 +500,18 @@ Events::init()
Event_db_repository *db_repo; Event_db_repository *db_repo;
DBUG_ENTER("Events::init"); DBUG_ENTER("Events::init");
db_repository->init_repository(); db_repository->init_repository();
event_queue->init(db_repository);
event_queue->scheduler= scheduler_ng;
scheduler_ng->init(event_queue);
/* it should be an assignment! */ /* it should be an assignment! */
if (opt_event_scheduler) if (opt_event_scheduler)
{ {
Event_scheduler *scheduler= Event_scheduler::get_instance();
DBUG_ASSERT(opt_event_scheduler == 1 || opt_event_scheduler == 2); DBUG_ASSERT(opt_event_scheduler == 1 || opt_event_scheduler == 2);
DBUG_RETURN(scheduler->init(db_repository) || if (opt_event_scheduler == 1)
(opt_event_scheduler == 1? scheduler->start(): DBUG_RETURN(scheduler_ng->start());
scheduler->start_suspended()));
} }
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -534,13 +531,9 @@ Events::deinit() ...@@ -534,13 +531,9 @@ Events::deinit()
{ {
DBUG_ENTER("Events::deinit"); DBUG_ENTER("Events::deinit");
Event_scheduler *scheduler= Event_scheduler::get_instance(); scheduler_ng->stop();
if (scheduler->initialized()) scheduler_ng->deinit();
{ event_queue->deinit();
scheduler->stop();
scheduler->destroy();
}
db_repository->deinit_repository(); db_repository->deinit_repository();
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
...@@ -559,8 +552,12 @@ void ...@@ -559,8 +552,12 @@ void
Events::init_mutexes() Events::init_mutexes()
{ {
db_repository= new Event_db_repository; db_repository= new Event_db_repository;
Event_scheduler::create_instance();
Event_scheduler::init_mutexes(); event_queue= new Event_queue;
event_queue->init_mutexes();
scheduler_ng= new Event_scheduler_ng();
scheduler_ng->init_mutexes();
} }
...@@ -574,9 +571,11 @@ Events::init_mutexes() ...@@ -574,9 +571,11 @@ Events::init_mutexes()
void void
Events::destroy_mutexes() Events::destroy_mutexes()
{ {
Event_scheduler::destroy_mutexes(); event_queue->deinit_mutexes();
scheduler_ng->deinit_mutexes();
delete scheduler_ng;
delete db_repository; delete db_repository;
db_repository= NULL;
} }
...@@ -595,7 +594,7 @@ Events::destroy_mutexes() ...@@ -595,7 +594,7 @@ Events::destroy_mutexes()
int int
Events::dump_internal_status(THD *thd) Events::dump_internal_status(THD *thd)
{ {
return Event_scheduler::dump_internal_status(thd); return Event_scheduler_ng::dump_internal_status(thd);
} }
...@@ -633,3 +632,26 @@ Events::fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */) ...@@ -633,3 +632,26 @@ Events::fill_schema_events(THD *thd, TABLE_LIST *tables, COND * /* cond */)
} }
DBUG_RETURN(get_instance()->db_repository->fill_schema_events(thd, tables, db)); DBUG_RETURN(get_instance()->db_repository->fill_schema_events(thd, tables, db));
} }
bool
Events::start_execution_of_events()
{
DBUG_ENTER("Events::start_execution_of_events");
DBUG_RETURN(scheduler_ng->start());
}
bool
Events::stop_execution_of_events()
{
DBUG_ENTER("Events::stop_execution_of_events");
DBUG_RETURN(scheduler_ng->stop());
}
bool
Events::is_started()
{
DBUG_ENTER("Events::is_started");
DBUG_RETURN(scheduler_ng->get_state() == Event_scheduler_ng::RUNNING);
}
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
class sp_name; class sp_name;
class Event_parse_data; class Event_parse_data;
class Event_db_repository; class Event_db_repository;
class Event_queue;
class Event_scheduler_ng;
/* Return codes */ /* Return codes */
enum enum_events_error_code enum enum_events_error_code
...@@ -60,6 +62,15 @@ class Events ...@@ -60,6 +62,15 @@ class Events
void void
destroy_mutexes(); destroy_mutexes();
bool
start_execution_of_events();
bool
stop_execution_of_events();
bool
is_started();
static Events* static Events*
get_instance(); get_instance();
...@@ -95,6 +106,8 @@ class Events ...@@ -95,6 +106,8 @@ class Events
dump_internal_status(THD *thd); dump_internal_status(THD *thd);
Event_db_repository *db_repository; Event_db_repository *db_repository;
Event_queue *event_queue;
Event_scheduler_ng *scheduler_ng;
private: private:
/* Singleton DP is used */ /* Singleton DP is used */
......
...@@ -864,7 +864,7 @@ static void close_connections(void) ...@@ -864,7 +864,7 @@ static void close_connections(void)
DBUG_PRINT("quit",("Informing thread %ld that it's time to die", DBUG_PRINT("quit",("Informing thread %ld that it's time to die",
tmp->thread_id)); tmp->thread_id));
/* We skip slave threads & scheduler on this first loop through. */ /* We skip slave threads & scheduler on this first loop through. */
if (tmp->slave_thread || tmp->system_thread == SYSTEM_THREAD_EVENT_SCHEDULER) if (tmp->slave_thread)
continue; continue;
tmp->killed= THD::KILL_CONNECTION; tmp->killed= THD::KILL_CONNECTION;
......
...@@ -58,6 +58,7 @@ ...@@ -58,6 +58,7 @@
#include <my_dir.h> #include <my_dir.h>
#include "event_scheduler.h" #include "event_scheduler.h"
#include "events.h"
/* WITH_BERKELEY_STORAGE_ENGINE */ /* WITH_BERKELEY_STORAGE_ENGINE */
extern bool berkeley_shared_data; extern bool berkeley_shared_data;
...@@ -3896,26 +3897,29 @@ sys_var_event_scheduler::update(THD *thd, set_var *var) ...@@ -3896,26 +3897,29 @@ sys_var_event_scheduler::update(THD *thd, set_var *var)
Event_scheduler *scheduler= Event_scheduler::get_instance(); Event_scheduler *scheduler= Event_scheduler::get_instance();
/* here start the thread if not running. */ /* here start the thread if not running. */
DBUG_ENTER("sys_var_event_scheduler::update"); DBUG_ENTER("sys_var_event_scheduler::update");
if (Events::opt_event_scheduler == 0)
DBUG_PRINT("new_value", ("%lu", (bool)var->save_result.ulong_value));
if (!scheduler->initialized())
{ {
my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--event-scheduler=0"); my_error(ER_OPTION_PREVENTS_STATEMENT, MYF(0), "--event-scheduler=0");
DBUG_RETURN(true); DBUG_RETURN(TRUE);
} }
DBUG_PRINT("new_value", ("%lu", (bool)var->save_result.ulong_value));
if (var->save_result.ulonglong_value < 1 || if (var->save_result.ulonglong_value < 1 ||
var->save_result.ulonglong_value > 2) var->save_result.ulonglong_value > 2)
{ {
char buf[64]; char buf[64];
my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "event_scheduler", my_error(ER_WRONG_VALUE_FOR_VAR, MYF(0), "event_scheduler",
llstr(var->save_result.ulonglong_value, buf)); llstr(var->save_result.ulonglong_value, buf));
DBUG_RETURN(true); DBUG_RETURN(TRUE);
} }
if ((res= scheduler->suspend_or_resume(var->save_result.ulonglong_value == 1? if (var->save_result.ulonglong_value == 1)
Event_scheduler::RESUME : res= Events::get_instance()->start_execution_of_events();
Event_scheduler::SUSPEND))) else
my_error(ER_EVENT_SET_VAR_ERROR, MYF(0), (uint) res); res= Events::get_instance()->stop_execution_of_events();
if (res)
my_error(ER_EVENT_SET_VAR_ERROR, MYF(0));
DBUG_RETURN((bool) res); DBUG_RETURN((bool) res);
} }
...@@ -3925,9 +3929,9 @@ byte *sys_var_event_scheduler::value_ptr(THD *thd, enum_var_type type, ...@@ -3925,9 +3929,9 @@ byte *sys_var_event_scheduler::value_ptr(THD *thd, enum_var_type type,
{ {
Event_scheduler *scheduler= Event_scheduler::get_instance(); Event_scheduler *scheduler= Event_scheduler::get_instance();
if (!scheduler->initialized()) if (Events::opt_event_scheduler == 0)
thd->sys_var_tmp.long_value= 0; thd->sys_var_tmp.long_value= 0;
else if (scheduler->get_state() == Event_scheduler::RUNNING) else if (Events::get_instance()->is_started())
thd->sys_var_tmp.long_value= 1; thd->sys_var_tmp.long_value= 1;
else else
thd->sys_var_tmp.long_value= 2; thd->sys_var_tmp.long_value= 2;
......
...@@ -5831,7 +5831,7 @@ ER_DUP_ENTRY_AUTOINCREMENT_CASE ...@@ -5831,7 +5831,7 @@ ER_DUP_ENTRY_AUTOINCREMENT_CASE
ER_EVENT_MODIFY_QUEUE_ERROR ER_EVENT_MODIFY_QUEUE_ERROR
eng "Internal scheduler error %d" eng "Internal scheduler error %d"
ER_EVENT_SET_VAR_ERROR ER_EVENT_SET_VAR_ERROR
eng "Error during starting/stopping of the scheduler. Error code %u" eng "Error during starting/stopping of the scheduler."
ER_PARTITION_MERGE_ERROR ER_PARTITION_MERGE_ERROR
eng "%s handler cannot be used in partitioned tables" eng "%s handler cannot be used in partitioned tables"
swe "%s kan inte anvndas i en partitionerad tabell" swe "%s kan inte anvndas i en partitionerad tabell"
......
...@@ -4177,7 +4177,7 @@ copy_event_to_schema_table(THD *thd, TABLE *sch_table, TABLE *event_table) ...@@ -4177,7 +4177,7 @@ copy_event_to_schema_table(THD *thd, TABLE *sch_table, TABLE *event_table)
restore_record(sch_table, s->default_values); restore_record(sch_table, s->default_values);
if (et.load_from_row(thd->mem_root, event_table)) if (et.load_from_row(event_table))
{ {
my_error(ER_CANNOT_LOAD_FROM_TABLE, MYF(0)); my_error(ER_CANNOT_LOAD_FROM_TABLE, MYF(0));
DBUG_RETURN(1); DBUG_RETURN(1);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment