Commit 66946183 authored by osku's avatar osku

Add a work queue implementation.

parent c89455c1
...@@ -26,57 +26,65 @@ SUBDIRS = os ut btr buf data dict dyn eval fil fsp fut \ ...@@ -26,57 +26,65 @@ SUBDIRS = os ut btr buf data dict dyn eval fil fsp fut \
ha ibuf lock log mach mem mtr page \ ha ibuf lock log mach mem mtr page \
pars que read rem row srv sync thr trx usr pars que read rem row srv sync thr trx usr
EXTRA_DIST = include/btr0btr.h include/btr0btr.ic include/btr0cur.h include/btr0cur.ic \ EXTRA_DIST = include/btr0btr.h include/btr0btr.ic include/btr0cur.h \
include/btr0pcur.h include/btr0pcur.ic include/btr0sea.h include/btr0sea.ic \ include/btr0cur.ic include/btr0pcur.h include/btr0pcur.ic \
include/btr0types.h \ include/btr0sea.h include/btr0sea.ic include/btr0types.h \
include/buf0buf.h include/buf0buf.ic include/buf0flu.h include/buf0flu.ic \ include/buf0buf.h include/buf0buf.ic include/buf0flu.h \
include/buf0lru.h include/buf0lru.ic include/buf0rea.h include/buf0types.h \ include/buf0flu.ic include/buf0lru.h include/buf0lru.ic \
include/data0data.h include/data0data.ic include/data0type.h include/data0type.ic \ include/buf0rea.h include/buf0types.h include/data0data.h \
include/data0types.h include/db0err.h \ include/data0data.ic include/data0type.h include/data0type.ic \
include/dict0boot.h include/dict0boot.ic include/dict0crea.h include/dict0crea.ic \ include/data0types.h include/db0err.h include/dict0boot.h \
include/dict0dict.h include/dict0dict.ic include/dict0load.h include/dict0load.ic \ include/dict0boot.ic include/dict0crea.h include/dict0crea.ic \
include/dict0mem.h include/dict0mem.ic include/dict0types.h \ include/dict0dict.h include/dict0dict.ic include/dict0load.h \
include/dyn0dyn.h include/dyn0dyn.ic \ include/dict0load.ic include/dict0mem.h include/dict0mem.ic \
include/eval0eval.h include/eval0eval.ic include/eval0proc.h include/eval0proc.ic \ include/dict0types.h include/dyn0dyn.h include/dyn0dyn.ic \
include/fil0fil.h include/fsp0fsp.h include/fsp0fsp.ic \ include/eval0eval.h include/eval0eval.ic include/eval0proc.h \
include/fut0fut.h include/fut0fut.ic include/fut0lst.h include/fut0lst.ic \ include/eval0proc.ic include/fil0fil.h include/fsp0fsp.h \
include/ha0ha.h include/ha0ha.ic include/hash0hash.h include/hash0hash.ic \ include/fsp0fsp.ic include/fut0fut.h include/fut0fut.ic \
include/fut0lst.h include/fut0lst.ic include/ha0ha.h \
include/ha0ha.ic include/hash0hash.h include/hash0hash.ic \
include/ibuf0ibuf.h include/ibuf0ibuf.ic include/ibuf0types.h \ include/ibuf0ibuf.h include/ibuf0ibuf.ic include/ibuf0types.h \
include/lock0lock.h include/lock0lock.ic include/lock0types.h \ include/lock0lock.h include/lock0lock.ic include/lock0types.h \
include/log0log.h include/log0log.ic include/log0recv.h include/log0recv.ic \ include/log0log.h include/log0log.ic include/log0recv.h \
include/mach0data.h include/mach0data.ic include/mem0dbg.h include/mem0dbg.ic \ include/log0recv.ic include/mach0data.h include/mach0data.ic \
include/mem0mem.h include/mem0mem.ic include/mem0pool.h include/mem0pool.ic \ include/mem0dbg.h include/mem0dbg.ic include/mem0mem.h \
include/mtr0log.h include/mtr0log.ic include/mtr0mtr.h include/mtr0mtr.ic \ include/mem0mem.ic include/mem0pool.h include/mem0pool.ic \
include/mtr0types.h include/os0file.h \ include/mtr0log.h include/mtr0log.ic include/mtr0mtr.h \
include/os0proc.h include/os0proc.ic include/os0sync.h include/os0sync.ic \ include/mtr0mtr.ic include/mtr0types.h include/os0file.h \
include/os0thread.h include/os0thread.ic \ include/os0proc.h include/os0proc.ic include/os0sync.h \
include/page0cur.h include/page0cur.ic include/page0page.h include/page0page.ic \ include/os0sync.ic include/os0thread.h include/os0thread.ic \
include/page0types.h \ include/page0cur.h include/page0cur.ic include/page0page.h \
include/pars0grm.h include/pars0opt.h include/pars0opt.ic \ include/page0page.ic include/page0types.h include/pars0grm.h \
include/pars0pars.h include/pars0pars.ic include/pars0sym.h include/pars0sym.ic \ include/pars0opt.h include/pars0opt.ic include/pars0pars.h \
include/pars0types.h \ include/pars0pars.ic include/pars0sym.h include/pars0sym.ic \
include/que0que.h include/que0que.ic include/que0types.h \ include/pars0types.h include/que0que.h include/que0que.ic \
include/read0read.h include/read0read.ic include/read0types.h \ include/que0types.h include/read0read.h include/read0read.ic \
include/rem0cmp.h include/rem0cmp.ic include/rem0rec.h include/rem0rec.ic \ include/read0types.h include/rem0cmp.h include/rem0cmp.ic \
include/rem0types.h \ include/rem0rec.h include/rem0rec.ic include/rem0types.h \
include/row0ins.h include/row0ins.ic include/row0mysql.h include/row0mysql.ic \ include/row0ins.h include/row0ins.ic include/row0mysql.h \
include/row0purge.h include/row0purge.ic include/row0row.h include/row0row.ic \ include/row0mysql.ic include/row0purge.h include/row0purge.ic \
include/row0sel.h include/row0sel.ic include/row0types.h \ include/row0row.h include/row0row.ic include/row0sel.h \
include/row0uins.h include/row0uins.ic include/row0umod.h include/row0umod.ic \ include/row0sel.ic include/row0types.h include/row0uins.h \
include/row0undo.h include/row0undo.ic include/row0upd.h include/row0upd.ic \ include/row0uins.ic include/row0umod.h include/row0umod.ic \
include/row0vers.h include/row0vers.ic \ include/row0undo.h include/row0undo.ic include/row0upd.h \
include/srv0que.h include/srv0srv.h include/srv0srv.ic include/srv0start.h \ include/row0upd.ic include/row0vers.h include/row0vers.ic \
include/sync0arr.h include/sync0arr.ic include/sync0rw.h include/sync0rw.ic \ include/srv0que.h include/srv0srv.h include/srv0srv.ic \
include/sync0sync.h include/sync0sync.ic include/sync0types.h \ include/srv0start.h include/sync0arr.h include/sync0arr.ic \
include/thr0loc.h include/thr0loc.ic \ include/sync0rw.h include/sync0rw.ic include/sync0sync.h \
include/trx0purge.h include/trx0purge.ic include/trx0rec.h include/trx0rec.ic \ include/sync0sync.ic include/sync0types.h include/thr0loc.h \
include/trx0roll.h include/trx0roll.ic include/trx0rseg.h include/trx0rseg.ic \ include/thr0loc.ic include/trx0purge.h include/trx0purge.ic \
include/trx0sys.h include/trx0sys.ic include/trx0trx.h include/trx0trx.ic \ include/trx0rec.h include/trx0rec.ic include/trx0roll.h \
include/trx0types.h include/trx0undo.h include/trx0undo.ic include/trx0xa.h \ include/trx0roll.ic include/trx0rseg.h include/trx0rseg.ic \
include/univ.i include/usr0sess.h include/usr0sess.ic include/usr0types.h \ include/trx0sys.h include/trx0sys.ic include/trx0trx.h \
include/ut0byte.h include/ut0byte.ic include/ut0dbg.h include/ut0lst.h \ include/trx0trx.ic include/trx0types.h include/trx0undo.h \
include/ut0mem.h include/ut0mem.ic include/ut0rnd.h include/ut0rnd.ic \ include/trx0undo.ic include/trx0xa.h include/univ.i \
include/ut0sort.h include/ut0ut.h include/ut0ut.ic include/ut0vec.h include/ut0vec.ic include/ut0list.h include/ut0list.ic cmakelists.txt include/usr0sess.h include/usr0sess.ic include/usr0types.h \
include/ut0byte.h include/ut0byte.ic include/ut0dbg.h \
include/ut0lst.h include/ut0mem.h include/ut0mem.ic \
include/ut0rnd.h include/ut0rnd.ic include/ut0sort.h \
include/ut0ut.h include/ut0ut.ic include/ut0vec.h \
include/ut0vec.ic include/ut0list.h include/ut0list.ic \
include/ut0wqueue.h cmakelists.txt
# Don't update the files from bitkeeper # Don't update the files from bitkeeper
%::SCCS/s.% %::SCCS/s.%
...@@ -32,4 +32,4 @@ ADD_LIBRARY(innobase btr/btr0btr.c btr/btr0cur.c btr/btr0pcur.c btr/btr0sea.c ...@@ -32,4 +32,4 @@ ADD_LIBRARY(innobase btr/btr0btr.c btr/btr0cur.c btr/btr0pcur.c btr/btr0sea.c
thr/thr0loc.c thr/thr0loc.c
trx/trx0purge.c trx/trx0rec.c trx/trx0roll.c trx/trx0rseg.c trx/trx0sys.c trx/trx0trx.c trx/trx0undo.c trx/trx0purge.c trx/trx0rec.c trx/trx0roll.c trx/trx0rseg.c trx/trx0sys.c trx/trx0trx.c trx/trx0undo.c
usr/usr0sess.c usr/usr0sess.c
ut/ut0byte.c ut/ut0dbg.c ut/ut0mem.c ut/ut0rnd.c ut/ut0ut.c ut/ut0vec.c ut/ut0list.c) ut/ut0byte.c ut/ut0dbg.c ut/ut0mem.c ut/ut0rnd.c ut/ut0ut.c ut/ut0vec.c ut/ut0list.c ut/ut0wqueue.c)
...@@ -426,6 +426,7 @@ or row lock! */ ...@@ -426,6 +426,7 @@ or row lock! */
#define SYNC_TRX_SYS_HEADER 290 #define SYNC_TRX_SYS_HEADER 290
#define SYNC_LOG 170 #define SYNC_LOG 170
#define SYNC_RECV 168 #define SYNC_RECV 168
#define SYNC_WORK_QUEUE 161
#define SYNC_SEARCH_SYS 160 /* NOTE that if we have a memory #define SYNC_SEARCH_SYS 160 /* NOTE that if we have a memory
heap that can be extended to the heap that can be extended to the
buffer pool, its logical level is buffer pool, its logical level is
......
/***********************************************************************
A Work queue. Threads can add work items to the queue and other threads can
wait for work items to be available and take them off the queue for
processing.
************************************************************************/
#ifndef IB_WORK_QUEUE_H
#define IB_WORK_QUEUE_H
#include "ut0list.h"
#include "mem0mem.h"
#include "os0sync.h"
#include "sync0types.h"
typedef struct ib_wqueue_struct ib_wqueue_t;
/********************************************************************
Create a new work queue. */
ib_wqueue_t*
ib_wqueue_create(void);
/*===================*/
/* out: work queue */
/********************************************************************
Free a work queue. */
void
ib_wqueue_free(
/*===========*/
ib_wqueue_t* wq); /* in: work queue */
/********************************************************************
Add a work item to the queue. */
void
ib_wqueue_add(
/*==========*/
ib_wqueue_t* wq, /* in: work queue */
void* item, /* in: work item */
mem_heap_t* heap); /* in: memory heap to use for allocating the
list node */
/********************************************************************
Wait for a work item to appear in the queue. */
void*
ib_wqueue_wait(
/* out: work item */
ib_wqueue_t* wq); /* in: work queue */
/* Work queue. */
struct ib_wqueue_struct {
mutex_t mutex; /* mutex protecting everything */
ib_list_t* items; /* work item list */
os_event_t event; /* event we use to signal additions to list */
};
#endif
...@@ -1050,6 +1050,9 @@ sync_thread_add_level( ...@@ -1050,6 +1050,9 @@ sync_thread_add_level(
case SYNC_RECV: case SYNC_RECV:
ut_a(sync_thread_levels_g(array, SYNC_RECV)); ut_a(sync_thread_levels_g(array, SYNC_RECV));
break; break;
case SYNC_WORK_QUEUE:
ut_a(sync_thread_levels_g(array, SYNC_WORK_QUEUE));
break;
case SYNC_LOG: case SYNC_LOG:
ut_a(sync_thread_levels_g(array, SYNC_LOG)); ut_a(sync_thread_levels_g(array, SYNC_LOG));
break; break;
......
...@@ -19,6 +19,6 @@ include ../include/Makefile.i ...@@ -19,6 +19,6 @@ include ../include/Makefile.i
noinst_LIBRARIES = libut.a noinst_LIBRARIES = libut.a
libut_a_SOURCES = ut0byte.c ut0dbg.c ut0mem.c ut0rnd.c ut0ut.c ut0vec.c ut0list.c libut_a_SOURCES = ut0byte.c ut0dbg.c ut0mem.c ut0rnd.c ut0ut.c ut0vec.c ut0list.c ut0wqueue.c
EXTRA_PROGRAMS = EXTRA_PROGRAMS =
#include "ut0wqueue.h"
/********************************************************************
Create a new work queue. */
ib_wqueue_t*
ib_wqueue_create(void)
/*===================*/
/* out: work queue */
{
ib_wqueue_t* wq = mem_alloc(sizeof(ib_wqueue_t));
mutex_create(&wq->mutex);
mutex_set_level(&wq->mutex, SYNC_WORK_QUEUE);
wq->items = ib_list_create();
wq->event = os_event_create(NULL);
return(wq);
}
/********************************************************************
Free a work queue. */
void
ib_wqueue_free(
/*===========*/
ib_wqueue_t* wq) /* in: work queue */
{
ut_a(!ib_list_get_first(wq->items));
mutex_free(&wq->mutex);
ib_list_free(wq->items);
os_event_free(wq->event);
mem_free(wq);
}
/********************************************************************
Add a work item to the queue. */
void
ib_wqueue_add(
/*==========*/
ib_wqueue_t* wq, /* in: work queue */
void* item, /* in: work item */
mem_heap_t* heap) /* in: memory heap to use for allocating the
list node */
{
mutex_enter(&wq->mutex);
ib_list_add_last(wq->items, item, heap);
os_event_set(wq->event);
mutex_exit(&wq->mutex);
}
/********************************************************************
Wait for a work item to appear in the queue. */
void*
ib_wqueue_wait(
/* out: work item */
ib_wqueue_t* wq) /* in: work queue */
{
ib_list_node_t* node;
for (;;) {
os_event_wait(wq->event);
mutex_enter(&wq->mutex);
node = ib_list_get_first(wq->items);
if (node) {
ib_list_remove(wq->items, node);
if (!ib_list_get_first(wq->items)) {
/* We must reset the event when the list
gets emptied. */
os_event_reset(wq->event);
}
break;
}
mutex_exit(&wq->mutex);
}
mutex_exit(&wq->mutex);
return(node->data);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment