Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
f13daea1
Commit
f13daea1
authored
Aug 31, 2006
by
andrey@fifo.vaih.whnetz
Browse files
Options
Browse Files
Download
Plain Diff
Merge ahristov@bk-internal.mysql.com:/home/bk/mysql-5.1-wl3337
into fifo.vaih.whnetz:/work/mysql-5.1-wl3337-tree2
parents
eedee130
7b5916a2
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
227 additions
and
187 deletions
+227
-187
sql/event_queue.cc
sql/event_queue.cc
+124
-49
sql/event_queue.h
sql/event_queue.h
+10
-6
sql/event_scheduler.cc
sql/event_scheduler.cc
+79
-109
sql/event_scheduler.h
sql/event_scheduler.h
+12
-21
sql/events.cc
sql/events.cc
+2
-2
No files found.
sql/event_queue.cc
View file @
f13daea1
...
@@ -18,7 +18,6 @@
...
@@ -18,7 +18,6 @@
#include "event_queue.h"
#include "event_queue.h"
#include "event_data_objects.h"
#include "event_data_objects.h"
#include "event_db_repository.h"
#include "event_db_repository.h"
#include "event_scheduler.h"
#define EVENT_QUEUE_INITIAL_SIZE 30
#define EVENT_QUEUE_INITIAL_SIZE 30
...
@@ -87,6 +86,7 @@ Event_queue::Event_queue()
...
@@ -87,6 +86,7 @@ Event_queue::Event_queue()
{
{
mutex_last_unlocked_in_func
=
mutex_last_locked_in_func
=
mutex_last_unlocked_in_func
=
mutex_last_locked_in_func
=
mutex_last_attempted_lock_in_func
=
""
;
mutex_last_attempted_lock_in_func
=
""
;
set_zero_time
(
&
next_activation_at
,
MYSQL_TIMESTAMP_DATETIME
);
}
}
...
@@ -135,8 +135,7 @@ Event_queue::deinit_mutexes()
...
@@ -135,8 +135,7 @@ Event_queue::deinit_mutexes()
*/
*/
bool
bool
Event_queue
::
init_queue
(
THD
*
thd
,
Event_db_repository
*
db_repo
,
Event_queue
::
init_queue
(
THD
*
thd
,
Event_db_repository
*
db_repo
)
Event_scheduler
*
sched
)
{
{
pthread_t
th
;
pthread_t
th
;
bool
res
;
bool
res
;
...
@@ -147,7 +146,6 @@ Event_queue::init_queue(THD *thd, Event_db_repository *db_repo,
...
@@ -147,7 +146,6 @@ Event_queue::init_queue(THD *thd, Event_db_repository *db_repo,
LOCK_QUEUE_DATA
();
LOCK_QUEUE_DATA
();
db_repository
=
db_repo
;
db_repository
=
db_repo
;
scheduler
=
sched
;
if
(
init_queue_ex
(
&
queue
,
EVENT_QUEUE_INITIAL_SIZE
,
0
/*offset*/
,
if
(
init_queue_ex
(
&
queue
,
EVENT_QUEUE_INITIAL_SIZE
,
0
/*offset*/
,
0
/*max_on_top*/
,
event_queue_element_compare_q
,
0
/*max_on_top*/
,
event_queue_element_compare_q
,
...
@@ -233,9 +231,8 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
...
@@ -233,9 +231,8 @@ Event_queue::create_event(THD *thd, LEX_STRING dbname, LEX_STRING name)
DBUG_PRINT
(
"info"
,
(
"new event in the queue 0x%lx"
,
new_element
));
DBUG_PRINT
(
"info"
,
(
"new event in the queue 0x%lx"
,
new_element
));
queue_insert_safe
(
&
queue
,
(
byte
*
)
new_element
);
queue_insert_safe
(
&
queue
,
(
byte
*
)
new_element
);
dbug_dump_queue
(
thd
->
query_start
());
dbug_dump_queue
(
thd
->
query_start
());
pthread_cond_broadcast
(
&
COND_queue_state
);
UNLOCK_QUEUE_DATA
();
UNLOCK_QUEUE_DATA
();
notify_observers
();
}
}
DBUG_RETURN
(
res
);
DBUG_RETURN
(
res
);
...
@@ -298,13 +295,12 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
...
@@ -298,13 +295,12 @@ Event_queue::update_event(THD *thd, LEX_STRING dbname, LEX_STRING name,
{
{
DBUG_PRINT
(
"info"
,
(
"new event in the Q 0x%lx"
,
new_element
));
DBUG_PRINT
(
"info"
,
(
"new event in the Q 0x%lx"
,
new_element
));
queue_insert_safe
(
&
queue
,
(
byte
*
)
new_element
);
queue_insert_safe
(
&
queue
,
(
byte
*
)
new_element
);
pthread_cond_broadcast
(
&
COND_queue_state
);
}
}
dbug_dump_queue
(
thd
->
query_start
());
dbug_dump_queue
(
thd
->
query_start
());
UNLOCK_QUEUE_DATA
();
UNLOCK_QUEUE_DATA
();
if
(
new_element
)
notify_observers
();
end:
end:
DBUG_PRINT
(
"info"
,
(
"res=%d"
,
res
));
DBUG_PRINT
(
"info"
,
(
"res=%d"
,
res
));
DBUG_RETURN
(
res
);
DBUG_RETURN
(
res
);
...
@@ -386,7 +382,8 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
...
@@ -386,7 +382,8 @@ Event_queue::drop_matching_events(THD *thd, LEX_STRING pattern,
i
++
;
i
++
;
}
}
/*
/*
We don't call notify_observers() . If we remove the top event:
We don't call pthread_cond_broadcast(&COND_queue_state);
If we remove the top event:
1. The queue is empty. The scheduler will wake up at some time and
1. The queue is empty. The scheduler will wake up at some time and
realize that the queue is empty. If create_event() comes inbetween
realize that the queue is empty. If create_event() comes inbetween
it will signal the scheduler
it will signal the scheduler
...
@@ -421,24 +418,6 @@ Event_queue::drop_schema_events(THD *thd, LEX_STRING schema)
...
@@ -421,24 +418,6 @@ Event_queue::drop_schema_events(THD *thd, LEX_STRING schema)
}
}
/*
Signals the observers (the main scheduler thread) that the
state of the queue has been changed.
SYNOPSIS
Event_queue::notify_observers()
*/
void
Event_queue
::
notify_observers
()
{
DBUG_ENTER
(
"Event_queue::notify_observers"
);
DBUG_PRINT
(
"info"
,
(
"Signalling change of the queue"
));
scheduler
->
queue_changed
();
DBUG_VOID_RETURN
;
}
/*
/*
Searches for an event in the queue
Searches for an event in the queue
...
@@ -701,6 +680,8 @@ Event_queue::dbug_dump_queue(time_t now)
...
@@ -701,6 +680,8 @@ Event_queue::dbug_dump_queue(time_t now)
#endif
#endif
}
}
static
const
char
*
queue_empty_msg
=
"Waiting on empty queue"
;
static
const
char
*
queue_wait_msg
=
"Waiting for next activation"
;
/*
/*
Checks whether the top of the queue is elligible for execution and
Checks whether the top of the queue is elligible for execution and
...
@@ -725,39 +706,62 @@ Event_queue::dbug_dump_queue(time_t now)
...
@@ -725,39 +706,62 @@ Event_queue::dbug_dump_queue(time_t now)
*/
*/
bool
bool
Event_queue
::
get_top_for_execution_if_time
(
THD
*
thd
,
time_t
now
,
Event_queue
::
get_top_for_execution_if_time
(
THD
*
thd
,
Event_job_data
**
job_data
)
Event_job_data
**
job_data
,
struct
timespec
*
abstime
)
{
{
bool
ret
=
FALSE
;
bool
ret
=
FALSE
;
struct
timespec
top_time
;
struct
timespec
top_time
;
struct
timespec
*
abstime
;
*
job_data
=
NULL
;
*
job_data
=
NULL
;
DBUG_ENTER
(
"Event_queue::get_top_for_execution_if_time"
);
DBUG_ENTER
(
"Event_queue::get_top_for_execution_if_time"
);
DBUG_PRINT
(
"enter"
,
(
"thd=0x%lx now=%d"
,
thd
,
now
));
abstime
->
tv_nsec
=
0
;
top_time
.
tv_nsec
=
0
;
LOCK_QUEUE_DATA
();
LOCK_QUEUE_DATA
();
do
{
for
(;;)
{
int
res
;
int
res
;
if
(
!
queue
.
elements
)
Event_queue_element
*
top
=
NULL
;
{
abstime
->
tv_sec
=
0
;
break
;
}
Event_queue_element
*
top
=
((
Event_queue_element
*
)
queue_element
(
&
queue
,
0
));
thd
->
end_time
();
time_t
now
=
thd
->
query_start
();
abstime
=
NULL
;
top_time
.
tv_sec
=
sec_since_epoch_TIME
(
&
top
->
execute_at
);
if
(
queue
.
elements
)
{
top
=
((
Event_queue_element
*
)
queue_element
(
&
queue
,
0
));
top_time
.
tv_sec
=
sec_since_epoch_TIME
(
&
top
->
execute_at
);
if
(
top_time
.
tv_sec
>
now
)
abstime
=
&
top_time
;
}
if
(
!
abstime
||
abstime
->
tv_sec
>
now
)
{
{
abstime
->
tv_sec
=
top_time
.
tv_sec
;
const
char
*
msg
;
DBUG_PRINT
(
"info"
,
(
"Have to wait %d till %d"
,
abstime
->
tv_sec
-
now
,
if
(
abstime
)
abstime
->
tv_sec
));
{
break
;
next_activation_at
=
top
->
execute_at
;
msg
=
queue_wait_msg
;
}
else
{
set_zero_time
(
&
next_activation_at
,
MYSQL_TIMESTAMP_DATETIME
);
msg
=
queue_wait_msg
;
}
cond_wait
(
thd
,
abstime
,
msg
,
SCHED_FUNC
,
__LINE__
);
if
(
thd
->
killed
)
{
DBUG_PRINT
(
"info"
,
(
"thd->killed=%d"
,
thd
->
killed
));
goto
end
;
}
/*
The queue could have been emptied. Therefore it's safe to start from
the beginning. Moreover, this way we will get also the new top, if
the element at the top has been changed.
*/
continue
;
}
}
DBUG_PRINT
(
"info"
,
(
"Ready for execution"
));
DBUG_PRINT
(
"info"
,
(
"Ready for execution"
));
abstime
->
tv_sec
=
0
;
if
(
!
(
*
job_data
=
new
Event_job_data
()))
if
(
!
(
*
job_data
=
new
Event_job_data
()))
{
{
ret
=
TRUE
;
ret
=
TRUE
;
...
@@ -766,6 +770,7 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
...
@@ -766,6 +770,7 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
if
((
res
=
db_repository
->
load_named_event
(
thd
,
top
->
dbname
,
top
->
name
,
if
((
res
=
db_repository
->
load_named_event
(
thd
,
top
->
dbname
,
top
->
name
,
*
job_data
)))
*
job_data
)))
{
{
DBUG_PRINT
(
"error"
,
(
"Got %d from load_named_event"
,
res
));
delete
*
job_data
;
delete
*
job_data
;
*
job_data
=
NULL
;
*
job_data
=
NULL
;
ret
=
TRUE
;
ret
=
TRUE
;
...
@@ -796,11 +801,13 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
...
@@ -796,11 +801,13 @@ Event_queue::get_top_for_execution_if_time(THD *thd, time_t now,
queue_replaced
(
&
queue
);
queue_replaced
(
&
queue
);
dbug_dump_queue
(
now
);
dbug_dump_queue
(
now
);
}
while
(
0
);
break
;
}
end:
UNLOCK_QUEUE_DATA
();
UNLOCK_QUEUE_DATA
();
DBUG_PRINT
(
"info"
,
(
"returning %d. et_new=0x%lx abstime.tv_sec=%d "
,
DBUG_PRINT
(
"info"
,
(
"returning %d. et_new=0x%lx abstime.tv_sec=%d "
,
ret
,
*
job_data
,
abstime
->
tv_sec
));
ret
,
*
job_data
,
abstime
?
abstime
->
tv_sec
:
0
));
if
(
*
job_data
)
if
(
*
job_data
)
DBUG_PRINT
(
"info"
,
(
"db=%s name=%s definer=%s"
,
(
*
job_data
)
->
dbname
.
str
,
DBUG_PRINT
(
"info"
,
(
"db=%s name=%s definer=%s"
,
(
*
job_data
)
->
dbname
.
str
,
...
@@ -864,6 +871,52 @@ Event_queue::unlock_data(const char *func, uint line)
...
@@ -864,6 +871,52 @@ Event_queue::unlock_data(const char *func, uint line)
}
}
/*
Wrapper for pthread_cond_wait/timedwait
SYNOPSIS
Event_queue::cond_wait()
thd Thread (Could be NULL during shutdown procedure)
msg Message for thd->proc_info
abstime If not null then call pthread_cond_timedwait()
func Which function is requesting cond_wait
line On which line cond_wait is requested
*/
void
Event_queue
::
cond_wait
(
THD
*
thd
,
struct
timespec
*
abstime
,
const
char
*
msg
,
const
char
*
func
,
uint
line
)
{
DBUG_ENTER
(
"Event_queue::cond_wait"
);
waiting_on_cond
=
TRUE
;
mutex_last_unlocked_at_line
=
line
;
mutex_queue_data_locked
=
FALSE
;
mutex_last_unlocked_in_func
=
func
;
thd
->
enter_cond
(
&
COND_queue_state
,
&
LOCK_event_queue
,
msg
);
DBUG_PRINT
(
"info"
,
(
"pthread_cond_%swait"
,
abstime
?
"timed"
:
""
));
if
(
!
abstime
)
pthread_cond_wait
(
&
COND_queue_state
,
&
LOCK_event_queue
);
else
pthread_cond_timedwait
(
&
COND_queue_state
,
&
LOCK_event_queue
,
abstime
);
mutex_last_locked_in_func
=
func
;
mutex_last_locked_at_line
=
line
;
mutex_queue_data_locked
=
TRUE
;
waiting_on_cond
=
FALSE
;
/*
This will free the lock so we need to relock. Not the best thing to
do but we need to obey cond_wait()
*/
thd
->
exit_cond
(
""
);
LOCK_QUEUE_DATA
();
DBUG_VOID_RETURN
;
}
/*
/*
Dumps the internal status of the queue
Dumps the internal status of the queue
...
@@ -943,6 +996,28 @@ Event_queue::dump_internal_status(THD *thd)
...
@@ -943,6 +996,28 @@ Event_queue::dump_internal_status(THD *thd)
protocol
->
store
(
&
tmp_string
);
protocol
->
store
(
&
tmp_string
);
ret
=
protocol
->
write
();
ret
=
protocol
->
write
();
/* waiting on */
protocol
->
prepare_for_resend
();
protocol
->
store
(
STRING_WITH_LEN
(
"queue waiting on condition"
),
scs
);
int_string
.
set
((
longlong
)
waiting_on_cond
,
scs
);
protocol
->
store
(
&
int_string
);
ret
=
protocol
->
write
();
protocol
->
prepare_for_resend
();
protocol
->
store
(
STRING_WITH_LEN
(
"next activation at"
),
scs
);
tmp_string
.
length
(
scs
->
cset
->
snprintf
(
scs
,
(
char
*
)
tmp_string
.
ptr
(),
tmp_string
.
alloced_length
(),
"%4d-%02d-%02d %02d:%02d:%02d"
,
next_activation_at
.
year
,
next_activation_at
.
month
,
next_activation_at
.
day
,
next_activation_at
.
hour
,
next_activation_at
.
minute
,
next_activation_at
.
second
));
protocol
->
store
(
&
tmp_string
);
ret
=
protocol
->
write
();
#endif
#endif
DBUG_RETURN
(
FALSE
);
DBUG_RETURN
(
FALSE
);
}
}
sql/event_queue.h
View file @
f13daea1
...
@@ -36,7 +36,7 @@ class Event_queue
...
@@ -36,7 +36,7 @@ class Event_queue
deinit_mutexes
();
deinit_mutexes
();
bool
bool
init_queue
(
THD
*
thd
,
Event_db_repository
*
db_repo
,
Event_scheduler
*
sched
);
init_queue
(
THD
*
thd
,
Event_db_repository
*
db_repo
);
void
void
deinit_queue
();
deinit_queue
();
...
@@ -60,8 +60,7 @@ class Event_queue
...
@@ -60,8 +60,7 @@ class Event_queue
recalculate_activation_times
(
THD
*
thd
);
recalculate_activation_times
(
THD
*
thd
);
bool
bool
get_top_for_execution_if_time
(
THD
*
thd
,
time_t
now
,
Event_job_data
**
job_data
,
get_top_for_execution_if_time
(
THD
*
thd
,
Event_job_data
**
job_data
);
struct
timespec
*
abstime
);
bool
bool
dump_internal_status
(
THD
*
thd
);
dump_internal_status
(
THD
*
thd
);
...
@@ -80,14 +79,12 @@ class Event_queue
...
@@ -80,14 +79,12 @@ class Event_queue
void
void
empty_queue
();
empty_queue
();
void
notify_observers
();
void
void
dbug_dump_queue
(
time_t
now
);
dbug_dump_queue
(
time_t
now
);
/* LOCK_event_queue is the mutex which protects the access to the queue. */
/* LOCK_event_queue is the mutex which protects the access to the queue. */
pthread_mutex_t
LOCK_event_queue
;
pthread_mutex_t
LOCK_event_queue
;
pthread_cond_t
COND_queue_state
;
Event_db_repository
*
db_repository
;
Event_db_repository
*
db_repository
;
...
@@ -96,6 +93,8 @@ class Event_queue
...
@@ -96,6 +93,8 @@ class Event_queue
/* The sorted queue with the Event_job_data objects */
/* The sorted queue with the Event_job_data objects */
QUEUE
queue
;
QUEUE
queue
;
TIME
next_activation_at
;
uint
mutex_last_locked_at_line
;
uint
mutex_last_locked_at_line
;
uint
mutex_last_unlocked_at_line
;
uint
mutex_last_unlocked_at_line
;
uint
mutex_last_attempted_lock_at_line
;
uint
mutex_last_attempted_lock_at_line
;
...
@@ -104,6 +103,7 @@ class Event_queue
...
@@ -104,6 +103,7 @@ class Event_queue
const
char
*
mutex_last_attempted_lock_in_func
;
const
char
*
mutex_last_attempted_lock_in_func
;
bool
mutex_queue_data_locked
;
bool
mutex_queue_data_locked
;
bool
mutex_queue_data_attempting_lock
;
bool
mutex_queue_data_attempting_lock
;
bool
waiting_on_cond
;
/* helper functions for working with mutexes & conditionals */
/* helper functions for working with mutexes & conditionals */
void
void
...
@@ -111,6 +111,10 @@ class Event_queue
...
@@ -111,6 +111,10 @@ class Event_queue
void
void
unlock_data
(
const
char
*
func
,
uint
line
);
unlock_data
(
const
char
*
func
,
uint
line
);
void
cond_wait
(
THD
*
thd
,
struct
timespec
*
abstime
,
const
char
*
msg
,
const
char
*
func
,
uint
line
);
};
};
#endif
/* _EVENT_QUEUE_H_ */
#endif
/* _EVENT_QUEUE_H_ */
sql/event_scheduler.cc
View file @
f13daea1
...
@@ -313,7 +313,7 @@ Event_scheduler::init_scheduler(Event_queue *q)
...
@@ -313,7 +313,7 @@ Event_scheduler::init_scheduler(Event_queue *q)
LOCK_DATA
();
LOCK_DATA
();
queue
=
q
;
queue
=
q
;
started_events
=
0
;
started_events
=
0
;
thread_id
=
0
;
scheduler_thd
=
NULL
;
state
=
INITIALIZED
;
state
=
INITIALIZED
;
UNLOCK_DATA
();
UNLOCK_DATA
();
}
}
...
@@ -397,22 +397,18 @@ Event_scheduler::start()
...
@@ -397,22 +397,18 @@ Event_scheduler::start()
scheduler_param_value
->
thd
=
new_thd
;
scheduler_param_value
->
thd
=
new_thd
;
scheduler_param_value
->
scheduler
=
this
;
scheduler_param_value
->
scheduler
=
this
;
scheduler_thd
=
new_thd
;
DBUG_PRINT
(
"info"
,
(
"Setting state go RUNNING"
));
state
=
RUNNING
;
DBUG_PRINT
(
"info"
,
(
"Forking new thread for scheduduler. THD=0x%lx"
,
new_thd
));
DBUG_PRINT
(
"info"
,
(
"Forking new thread for scheduduler. THD=0x%lx"
,
new_thd
));
if
(
pthread_create
(
&
th
,
&
connection_attrib
,
event_scheduler_thread
,
if
(
pthread_create
(
&
th
,
&
connection_attrib
,
event_scheduler_thread
,
(
void
*
)
scheduler_param_value
))
(
void
*
)
scheduler_param_value
))
{
{
DBUG_PRINT
(
"error"
,
(
"cannot create a new thread"
));
DBUG_PRINT
(
"error"
,
(
"cannot create a new thread"
));
state
=
INITIALIZED
;
state
=
INITIALIZED
;
scheduler_thd
=
NULL
;
ret
=
TRUE
;
ret
=
TRUE
;
}
DBUG_PRINT
(
"info"
,
(
"Setting state go RUNNING"
));
state
=
RUNNING
;
end:
UNLOCK_DATA
();
if
(
ret
&&
new_thd
)
{
DBUG_PRINT
(
"info"
,
(
"There was an error during THD creation. Clean up"
));
new_thd
->
proc_info
=
"Clearing"
;
new_thd
->
proc_info
=
"Clearing"
;
DBUG_ASSERT
(
new_thd
->
net
.
buff
!=
0
);
DBUG_ASSERT
(
new_thd
->
net
.
buff
!=
0
);
net_end
(
&
new_thd
->
net
);
net_end
(
&
new_thd
->
net
);
...
@@ -422,6 +418,9 @@ Event_scheduler::start()
...
@@ -422,6 +418,9 @@ Event_scheduler::start()
delete
new_thd
;
delete
new_thd
;
pthread_mutex_unlock
(
&
LOCK_thread_count
);
pthread_mutex_unlock
(
&
LOCK_thread_count
);
}
}
end:
UNLOCK_DATA
();
DBUG_RETURN
(
ret
);
DBUG_RETURN
(
ret
);
}
}
...
@@ -446,66 +445,41 @@ Event_scheduler::run(THD *thd)
...
@@ -446,66 +445,41 @@ Event_scheduler::run(THD *thd)
Event_job_data
*
job_data
;
Event_job_data
*
job_data
;
DBUG_ENTER
(
"Event_scheduler::run"
);
DBUG_ENTER
(
"Event_scheduler::run"
);
LOCK_DATA
();
thread_id
=
thd
->
thread_id
;
sql_print_information
(
"SCHEDULER: Manager thread started with id %lu"
,
sql_print_information
(
"SCHEDULER: Manager thread started with id %lu"
,
thread_id
);
th
d
->
th
read_id
);
/*
/*
Recalculate the values in the queue because there could have been stops
Recalculate the values in the queue because there could have been stops
in executions of the scheduler and some times could have passed by.
in executions of the scheduler and some times could have passed by.
*/
*/
queue
->
recalculate_activation_times
(
thd
);
queue
->
recalculate_activation_times
(
thd
);
while
(
state
==
RUNNING
)
while
(
is_running
())
{
{
thd
->
end_time
();
/* Gets a minimized version */
/* Gets a minimized version */
if
(
queue
->
get_top_for_execution_if_time
(
thd
,
thd
->
query_start
(),
if
(
queue
->
get_top_for_execution_if_time
(
thd
,
&
job_data
))
&
job_data
,
&
abstime
))
{
{
sql_print_information
(
"SCHEDULER: Serious error during getting next"
sql_print_information
(
"SCHEDULER: Serious error during getting next
"
"
event to execute. Stopping"
);
"event to execute. Stopping"
);
break
;
break
;
}
}
DBUG_PRINT
(
"info"
,
(
"get_top returned job_data=0x%lx now=%d "
DBUG_PRINT
(
"info"
,
(
"get_top returned job_data=0x%lx"
,
job_data
));
"abs_time.tv_sec=%d"
,
if
(
job_data
)
job_data
,
thd
->
query_start
(),
abstime
.
tv_sec
));
if
(
!
job_data
&&
!
abstime
.
tv_sec
)
{
DBUG_PRINT
(
"info"
,
(
"The queue is empty. Going to sleep"
));
COND_STATE_WAIT
(
thd
,
NULL
,
"Waiting on empty queue"
);
DBUG_PRINT
(
"info"
,
(
"Woke up. Got COND_state"
));
}
else
if
(
abstime
.
tv_sec
)
{
{
DBUG_PRINT
(
"info"
,
(
"Have to sleep some time %u s. till %u"
,
if
((
res
=
execute_top
(
thd
,
job_data
)))
abstime
.
tv_sec
-
thd
->
query_start
(),
abstime
.
tv_sec
));
break
;
COND_STATE_WAIT
(
thd
,
&
abstime
,
"Waiting for next activation"
);
/*
If we get signal we should recalculate the whether it's the right time
because there could be :
1. Spurious wake-up
2. The top of the queue was changed (new one becase of create/update)
*/
DBUG_PRINT
(
"info"
,
(
"Woke up. Got COND_stat or time for execution."
));
}
}
else
else
{
{
UNLOCK_DATA
();
DBUG_ASSERT
(
thd
->
killed
);
res
=
execute_top
(
thd
,
job_data
);
DBUG_PRINT
(
"info"
,
(
"job_data is NULL, the thread was killed"
));
LOCK_DATA
();
if
(
res
)
break
;
++
started_events
;
}
}
DBUG_PRINT
(
"info"
,
(
"state=%s"
,
scheduler_states_names
[
state
].
str
));
DBUG_PRINT
(
"info"
,
(
"state=%s"
,
scheduler_states_names
[
state
].
str
));
}
}
LOCK_DATA
();
DBUG_PRINT
(
"info"
,
(
"Signalling back to the stopper COND_state"
));
DBUG_PRINT
(
"info"
,
(
"Signalling back to the stopper COND_state"
));
pthread_cond_signal
(
&
COND_state
);
error:
state
=
INITIALIZED
;
state
=
INITIALIZED
;
pthread_cond_signal
(
&
COND_state
);
UNLOCK_DATA
();
UNLOCK_DATA
();
sql_print_information
(
"SCHEDULER: Stopped"
);
sql_print_information
(
"SCHEDULER: Stopped"
);
...
@@ -546,6 +520,8 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data)
...
@@ -546,6 +520,8 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data)
job_data
)))
job_data
)))
goto
error
;
goto
error
;
++
started_events
;
DBUG_PRINT
(
"info"
,
(
"Launch succeeded. BURAN is in THD=0x%lx"
,
new_thd
));
DBUG_PRINT
(
"info"
,
(
"Launch succeeded. BURAN is in THD=0x%lx"
,
new_thd
));
DBUG_RETURN
(
FALSE
);
DBUG_RETURN
(
FALSE
);
...
@@ -567,6 +543,27 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data)
...
@@ -567,6 +543,27 @@ Event_scheduler::execute_top(THD *thd, Event_job_data *job_data)
}
}
/*
Checkes whether the state of the scheduler is RUNNING
SYNOPSIS
Event_scheduler::is_running()
RETURN VALUE
TRUE RUNNING
FALSE Not RUNNING
*/
inline
bool
Event_scheduler
::
is_running
()
{
LOCK_DATA
();
bool
ret
=
(
state
==
RUNNING
);
UNLOCK_DATA
();
return
ret
;
}
/*
/*
Stops the scheduler (again). Waits for acknowledgement from the
Stops the scheduler (again). Waits for acknowledgement from the
scheduler that it has stopped - synchronous stopping.
scheduler that it has stopped - synchronous stopping.
...
@@ -591,26 +588,48 @@ Event_scheduler::stop()
...
@@ -591,26 +588,48 @@ Event_scheduler::stop()
if
(
state
!=
RUNNING
)
if
(
state
!=
RUNNING
)
goto
end
;
goto
end
;
state
=
STOPPING
;
DBUG_PRINT
(
"info"
,
(
"Manager thread has id %d"
,
thread_id
));
sql_print_information
(
"SCHEDULER: Killing manager thread %lu"
,
thread_id
);
pthread_cond_signal
(
&
COND_state
);
/* Guarantee we don't catch spurious signals */
/* Guarantee we don't catch spurious signals */
sql_print_information
(
"SCHEDULER: Waiting the manager thread to reply"
);
do
{
do
{
DBUG_PRINT
(
"info"
,
(
"Waiting for COND_started_or_stopped from the manager "
DBUG_PRINT
(
"info"
,
(
"Waiting for COND_started_or_stopped from the manager "
"thread. Current value of state is %s . "
"thread. Current value of state is %s . "
"workers count=%d"
,
scheduler_states_names
[
state
].
str
,
"workers count=%d"
,
scheduler_states_names
[
state
].
str
,
workers_count
()));
workers_count
()));
/*
NOTE: We don't use kill_one_thread() because it can't kill COM_DEAMON
threads. In addition, kill_one_thread() requires THD but during shutdown
current_thd is NULL. Hence, if kill_one_thread should be used it has to
be modified to kill also daemons, by adding a flag, and also we have to
create artificial THD here. To save all this work, we just do what
kill_one_thread() does to kill a thread. See also sql_repl.cc for similar
usage.
*/
state
=
STOPPING
;
DBUG_PRINT
(
"info"
,
(
"Manager thread has id %d"
,
scheduler_thd
->
thread_id
));
/* Lock from delete */
pthread_mutex_lock
(
&
scheduler_thd
->
LOCK_delete
);
/* This will wake up the thread if it waits on Queue's conditional */
sql_print_information
(
"SCHEDULER: Killing manager thread %lu"
,
scheduler_thd
->
thread_id
);
scheduler_thd
->
awake
(
THD
::
KILL_CONNECTION
);
pthread_mutex_unlock
(
&
scheduler_thd
->
LOCK_delete
);
/* thd could be 0x0, when shutting down */
/* thd could be 0x0, when shutting down */
sql_print_information
(
"SCHEDULER: Waiting the manager thread to reply"
);
COND_STATE_WAIT
(
thd
,
NULL
,
"Waiting scheduler to stop"
);
COND_STATE_WAIT
(
thd
,
NULL
,
"Waiting scheduler to stop"
);
}
while
(
state
==
STOPPING
);
}
while
(
state
==
STOPPING
);
DBUG_PRINT
(
"info"
,
(
"Manager thread has cleaned up. Set state to INIT"
));
DBUG_PRINT
(
"info"
,
(
"Manager thread has cleaned up. Set state to INIT"
));
/*
thread_id
=
0
;
The rationale behind setting it to NULL here but not destructing it
beforehand is because the THD will be deinited in event_scheduler_thread().
It's more clear when the post_init and the deinit is done in one function.
Here we just mark that the scheduler doesn't have a THD anymore. Though for
milliseconds the old thread could exist we can't use it anymore. When we
unlock the mutex in this function a little later the state will be
INITIALIZED. Therefore, a connection thread could enter the critical section
and will create a new THD object.
*/
scheduler_thd
=
NULL
;
end:
end:
UNLOCK_DATA
();
UNLOCK_DATA
();
DBUG_RETURN
(
FALSE
);
DBUG_RETURN
(
FALSE
);
...
@@ -634,37 +653,14 @@ Event_scheduler::workers_count()
...
@@ -634,37 +653,14 @@ Event_scheduler::workers_count()
pthread_mutex_lock
(
&
LOCK_thread_count
);
// For unlink from list
pthread_mutex_lock
(
&
LOCK_thread_count
);
// For unlink from list
I_List_iterator
<
THD
>
it
(
threads
);
I_List_iterator
<
THD
>
it
(
threads
);
while
((
tmp
=
it
++
))
while
((
tmp
=
it
++
))
{
if
(
tmp
->
command
==
COM_DAEMON
)
continue
;
if
(
tmp
->
system_thread
==
SYSTEM_THREAD_EVENT_WORKER
)
if
(
tmp
->
system_thread
==
SYSTEM_THREAD_EVENT_WORKER
)
++
count
;
++
count
;
}
pthread_mutex_unlock
(
&
LOCK_thread_count
);
pthread_mutex_unlock
(
&
LOCK_thread_count
);
DBUG_PRINT
(
"exit"
,
(
"%d"
,
count
));
DBUG_PRINT
(
"exit"
,
(
"%d"
,
count
));
DBUG_RETURN
(
count
);
DBUG_RETURN
(
count
);
}
}
/*
Signals the main scheduler thread that the queue has changed
its state.
SYNOPSIS
Event_scheduler::queue_changed()
*/
void
Event_scheduler
::
queue_changed
()
{
DBUG_ENTER
(
"Event_scheduler::queue_changed"
);
DBUG_PRINT
(
"info"
,
(
"Sending COND_state. state (read wo lock)=%s "
,
scheduler_states_names
[
state
].
str
));
pthread_cond_signal
(
&
COND_state
);
DBUG_VOID_RETURN
;
}
/*
/*
Auxiliary function for locking LOCK_scheduler_state. Used
Auxiliary function for locking LOCK_scheduler_state. Used
by the LOCK_DATA macro.
by the LOCK_DATA macro.
...
@@ -718,6 +714,7 @@ Event_scheduler::unlock_data(const char *func, uint line)
...
@@ -718,6 +714,7 @@ Event_scheduler::unlock_data(const char *func, uint line)
Event_scheduler::cond_wait()
Event_scheduler::cond_wait()
thd Thread (Could be NULL during shutdown procedure)
thd Thread (Could be NULL during shutdown procedure)
abstime If not null then call pthread_cond_timedwait()
abstime If not null then call pthread_cond_timedwait()
msg Message for thd->proc_info
func Which function is requesting cond_wait
func Which function is requesting cond_wait
line On which line cond_wait is requested
line On which line cond_wait is requested
*/
*/
...
@@ -756,33 +753,6 @@ Event_scheduler::cond_wait(THD *thd, struct timespec *abstime, const char* msg,
...
@@ -756,33 +753,6 @@ Event_scheduler::cond_wait(THD *thd, struct timespec *abstime, const char* msg,
}
}
/*
Returns the current state of the scheduler
SYNOPSIS
Event_scheduler::get_state()
RETURN VALUE
The state of the scheduler (INITIALIZED | RUNNING | STOPPING)
*/
enum
Event_scheduler
::
enum_state
Event_scheduler
::
get_state
()
{
enum
Event_scheduler
::
enum_state
ret
;
DBUG_ENTER
(
"Event_scheduler::get_state"
);
LOCK_DATA
();
ret
=
state
;
UNLOCK_DATA
();
DBUG_RETURN
(
ret
);
}
/*
REMOVE THIS COMMENT AFTER PATCH REVIEW. USED TO HELP DIFF
Returns whether the scheduler was initialized.
*/
/*
/*
Dumps the internal status of the scheduler
Dumps the internal status of the scheduler
...
@@ -826,7 +796,7 @@ Event_scheduler::dump_internal_status(THD *thd)
...
@@ -826,7 +796,7 @@ Event_scheduler::dump_internal_status(THD *thd)
protocol
->
store
(
STRING_WITH_LEN
(
"thread_id"
),
scs
);
protocol
->
store
(
STRING_WITH_LEN
(
"thread_id"
),
scs
);
if
(
thread_id
)
if
(
thread_id
)
{
{
int_string
.
set
((
longlong
)
thread_id
,
scs
);
int_string
.
set
((
longlong
)
scheduler_thd
->
thread_id
,
scs
);
protocol
->
store
(
&
int_string
);
protocol
->
store
(
&
int_string
);
}
}
else
else
...
...
sql/event_scheduler.h
View file @
f13daea1
...
@@ -34,14 +34,6 @@ class Event_scheduler
...
@@ -34,14 +34,6 @@ class Event_scheduler
Event_scheduler
()
:
state
(
UNINITIALIZED
){}
Event_scheduler
()
:
state
(
UNINITIALIZED
){}
~
Event_scheduler
(){}
~
Event_scheduler
(){}
enum
enum_state
{
UNINITIALIZED
=
0
,
INITIALIZED
,
RUNNING
,
STOPPING
};
/* State changing methods follow */
/* State changing methods follow */
bool
bool
...
@@ -70,12 +62,8 @@ class Event_scheduler
...
@@ -70,12 +62,8 @@ class Event_scheduler
deinit_mutexes
();
deinit_mutexes
();
/* Information retrieving methods follow */
/* Information retrieving methods follow */
bool
enum
enum_state
is_running
();
get_state
();
void
queue_changed
();
bool
bool
dump_internal_status
(
THD
*
thd
);
dump_internal_status
(
THD
*
thd
);
...
@@ -84,6 +72,7 @@ class Event_scheduler
...
@@ -84,6 +72,7 @@ class Event_scheduler
uint
uint
workers_count
();
workers_count
();
/* helper functions */
/* helper functions */
bool
bool
execute_top
(
THD
*
thd
,
Event_job_data
*
job_data
);
execute_top
(
THD
*
thd
,
Event_job_data
*
job_data
);
...
@@ -101,16 +90,18 @@ class Event_scheduler
...
@@ -101,16 +90,18 @@ class Event_scheduler
pthread_mutex_t
LOCK_scheduler_state
;
pthread_mutex_t
LOCK_scheduler_state
;
enum
enum_state
{
UNINITIALIZED
=
0
,
INITIALIZED
,
RUNNING
,
STOPPING
};
/* This is the current status of the life-cycle of the scheduler. */
/* This is the current status of the life-cycle of the scheduler. */
enum
enum_state
state
;
enum
enum_state
state
;
/*
THD
*
scheduler_thd
;
Holds the thread id of the executor thread or 0 if the scheduler is not
running. It is used by ::shutdown() to know which thread to kill with
kill_one_thread(). The latter wake ups a thread if it is waiting on a
conditional variable and sets thd->killed to non-zero.
*/
ulong
thread_id
;
pthread_cond_t
COND_state
;
pthread_cond_t
COND_state
;
...
...
sql/events.cc
View file @
f13daea1
...
@@ -630,7 +630,7 @@ Events::init()
...
@@ -630,7 +630,7 @@ Events::init()
}
}
check_system_tables_error
=
FALSE
;
check_system_tables_error
=
FALSE
;
if
(
event_queue
->
init_queue
(
thd
,
db_repository
,
scheduler
))
if
(
event_queue
->
init_queue
(
thd
,
db_repository
))
{
{
sql_print_error
(
"SCHEDULER: Error while loading from disk."
);
sql_print_error
(
"SCHEDULER: Error while loading from disk."
);
goto
end
;
goto
end
;
...
@@ -820,7 +820,7 @@ Events::is_execution_of_events_started()
...
@@ -820,7 +820,7 @@ Events::is_execution_of_events_started()
my_error
(
ER_EVENTS_DB_ERROR
,
MYF
(
0
));
my_error
(
ER_EVENTS_DB_ERROR
,
MYF
(
0
));
DBUG_RETURN
(
FALSE
);
DBUG_RETURN
(
FALSE
);
}
}
DBUG_RETURN
(
scheduler
->
get_state
()
==
Event_scheduler
::
RUNNING
);
DBUG_RETURN
(
scheduler
->
is_running
()
);
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment