Commit cae21c52 authored by Jan Lindström's avatar Jan Lindström

Fix timing on queues, this could clearly lead to starvation.

parent 9c614665
...@@ -51,6 +51,8 @@ Modified 06/02/2014 Jan Lindström jan.lindstrom@skysql.com ...@@ -51,6 +51,8 @@ Modified 06/02/2014 Jan Lindström jan.lindstrom@skysql.com
#include "fil0pagecompress.h" #include "fil0pagecompress.h"
#define MT_COMP_WATER_MARK 50 #define MT_COMP_WATER_MARK 50
/** Time to wait for a message. */
#define MT_WAIT_IN_USECS 5000000
/* Work item status */ /* Work item status */
typedef enum wrk_status { typedef enum wrk_status {
...@@ -272,10 +274,9 @@ mtflush_service_io( ...@@ -272,10 +274,9 @@ mtflush_service_io(
{ {
wrk_t *work_item = NULL; wrk_t *work_item = NULL;
ulint n_flushed=0; ulint n_flushed=0;
ib_time_t max_wait_usecs = 50000;
mtflush_io->wt_status = WTHR_SIG_WAITING; mtflush_io->wt_status = WTHR_SIG_WAITING;
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, max_wait_usecs); work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, MT_WAIT_IN_USECS);
if (work_item) { if (work_item) {
mtflush_io->wt_status = WTHR_RUNNING; mtflush_io->wt_status = WTHR_RUNNING;
...@@ -411,7 +412,7 @@ buf_mtflu_io_thread_exit(void) ...@@ -411,7 +412,7 @@ buf_mtflu_io_thread_exit(void)
/* Wait until all work items on a work queue are processed */ /* Wait until all work items on a work queue are processed */
while(!ib_wqueue_is_empty(mtflush_io->wq)) { while(!ib_wqueue_is_empty(mtflush_io->wq)) {
/* Wait */ /* Wait */
os_thread_sleep(500000); os_thread_sleep(MT_WAIT_IN_USECS * 2);
} }
ut_a(ib_wqueue_is_empty(mtflush_io->wq)); ut_a(ib_wqueue_is_empty(mtflush_io->wq));
...@@ -420,7 +421,7 @@ buf_mtflu_io_thread_exit(void) ...@@ -420,7 +421,7 @@ buf_mtflu_io_thread_exit(void)
for (i=0; i < srv_mtflush_threads;) { for (i=0; i < srv_mtflush_threads;) {
wrk_t* work_item; wrk_t* work_item;
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wr_cq, 50000); work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wr_cq, MT_WAIT_IN_USECS);
/* If we receive reply to work item and it's status is exit, /* If we receive reply to work item and it's status is exit,
thead has processed this message and existed */ thead has processed this message and existed */
...@@ -550,7 +551,7 @@ buf_mtflu_flush_work_items( ...@@ -550,7 +551,7 @@ buf_mtflu_flush_work_items(
/* wait on the completion to arrive */ /* wait on the completion to arrive */
for(i=0; i< buf_pool_inst;) { for(i=0; i< buf_pool_inst;) {
done_wi = (wrk_t *)ib_wqueue_timedwait(mtflush_ctx->wr_cq, 50000); done_wi = (wrk_t *)ib_wqueue_timedwait(mtflush_ctx->wr_cq, MT_WAIT_IN_USECS);
if (done_wi != NULL) { if (done_wi != NULL) {
if(done_wi->n_flushed == 0) { if(done_wi->n_flushed == 0) {
......
...@@ -51,6 +51,8 @@ Modified 06/02/2014 Jan Lindström jan.lindstrom@skysql.com ...@@ -51,6 +51,8 @@ Modified 06/02/2014 Jan Lindström jan.lindstrom@skysql.com
#include "fil0pagecompress.h" #include "fil0pagecompress.h"
#define MT_COMP_WATER_MARK 50 #define MT_COMP_WATER_MARK 50
/** Time to wait for a message. */
#define MT_WAIT_IN_USECS 5000000
/* Work item status */ /* Work item status */
typedef enum wrk_status { typedef enum wrk_status {
...@@ -278,10 +280,9 @@ mtflush_service_io( ...@@ -278,10 +280,9 @@ mtflush_service_io(
{ {
wrk_t *work_item = NULL; wrk_t *work_item = NULL;
ulint n_flushed=0; ulint n_flushed=0;
ib_time_t max_wait_usecs = 50000;
mtflush_io->wt_status = WTHR_SIG_WAITING; mtflush_io->wt_status = WTHR_SIG_WAITING;
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, max_wait_usecs); work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wq, MT_WAIT_IN_USECS);
if (work_item) { if (work_item) {
mtflush_io->wt_status = WTHR_RUNNING; mtflush_io->wt_status = WTHR_RUNNING;
...@@ -417,7 +418,7 @@ buf_mtflu_io_thread_exit(void) ...@@ -417,7 +418,7 @@ buf_mtflu_io_thread_exit(void)
/* Wait until all work items on a work queue are processed */ /* Wait until all work items on a work queue are processed */
while(!ib_wqueue_is_empty(mtflush_io->wq)) { while(!ib_wqueue_is_empty(mtflush_io->wq)) {
/* Wait */ /* Wait */
os_thread_sleep(500000); os_thread_sleep(MT_WAIT_IN_USECS * 2);
} }
ut_a(ib_wqueue_is_empty(mtflush_io->wq)); ut_a(ib_wqueue_is_empty(mtflush_io->wq));
...@@ -426,7 +427,7 @@ buf_mtflu_io_thread_exit(void) ...@@ -426,7 +427,7 @@ buf_mtflu_io_thread_exit(void)
for (i=0; i < srv_mtflush_threads;) { for (i=0; i < srv_mtflush_threads;) {
wrk_t* work_item; wrk_t* work_item;
work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wr_cq, 50000); work_item = (wrk_t *)ib_wqueue_timedwait(mtflush_io->wr_cq, MT_WAIT_IN_USECS);
/* If we receive reply to work item and it's status is exit, /* If we receive reply to work item and it's status is exit,
thead has processed this message and existed */ thead has processed this message and existed */
...@@ -556,7 +557,7 @@ buf_mtflu_flush_work_items( ...@@ -556,7 +557,7 @@ buf_mtflu_flush_work_items(
/* wait on the completion to arrive */ /* wait on the completion to arrive */
for(i=0; i< buf_pool_inst;) { for(i=0; i< buf_pool_inst;) {
done_wi = (wrk_t *)ib_wqueue_timedwait(mtflush_ctx->wr_cq, 50000); done_wi = (wrk_t *)ib_wqueue_timedwait(mtflush_ctx->wr_cq, MT_WAIT_IN_USECS);
if (done_wi != NULL) { if (done_wi != NULL) {
if(done_wi->n_flushed == 0) { if(done_wi->n_flushed == 0) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment