Commit 35face2a authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

closes #5283, delete workqueue code, it is now unused

git-svn-id: file:///svn/toku/tokudb@46801 c7de825b-a66e-492c-adef-691d508d4ae1
parent c451f662
...@@ -71,7 +71,6 @@ set(FT_SOURCES ...@@ -71,7 +71,6 @@ set(FT_SOURCES
txn txn
txn_manager txn_manager
ule ule
workqueue
x1764 x1764
xids xids
ybt ybt
......
...@@ -10,7 +10,6 @@ ...@@ -10,7 +10,6 @@
#include <fcntl.h> #include <fcntl.h>
#include "fttypes.h" #include "fttypes.h"
#include "minicron.h" #include "minicron.h"
#include "workqueue.h"
// Maintain a cache mapping from cachekeys to values (void*) // Maintain a cache mapping from cachekeys to values (void*)
......
...@@ -12,6 +12,7 @@ ...@@ -12,6 +12,7 @@
#include "compress.h" #include "compress.h"
#include "sub_block.h" #include "sub_block.h"
#include "threadpool.h"
static uint8_t static uint8_t
get_uint8_at_offset(void *vp, size_t offset) { get_uint8_at_offset(void *vp, size_t offset) {
......
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#include <toku_portability.h>
#include "test.h"
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <toku_pthread.h>
#include "memory.h"
#include "workqueue.h"
#include "threadpool.h"
static WORKITEM
new_workitem (void) {
WORKITEM wi = (WORKITEM) toku_malloc(sizeof *wi); assert(wi);
return wi;
}
static void
destroy_workitem(WORKITEM wi) {
toku_free(wi);
}
// test simple create and destroy
static void
test_create_destroy (void) {
if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
struct workqueue workqueue, *wq = &workqueue;
workqueue_init(wq);
assert(workqueue_empty(wq));
workqueue_destroy(wq);
}
// verify that the wq implements FIFO ordering
static void
test_simple_enq_deq (int n) {
if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
struct workqueue workqueue, *wq = &workqueue;
int r;
workqueue_init(wq);
assert(workqueue_empty(wq));
WORKITEM work[n];
int i;
for (i=0; i<n; i++) {
work[i] = new_workitem();
workqueue_enq(wq, work[i], 1);
assert(!workqueue_empty(wq));
}
for (i=0; i<n; i++) {
WORKITEM wi = 0;
r = workqueue_deq(wq, &wi, 1);
assert(r == 0 && wi == work[i]);
destroy_workitem(wi);
}
assert(workqueue_empty(wq));
workqueue_destroy(wq);
}
// setting the wq closed should cause deq to return EINVAL
static void
test_set_closed (void) {
if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
struct workqueue workqueue, *wq = &workqueue;
workqueue_init(wq);
WORKITEM wi = 0;
workqueue_set_closed(wq, 1);
int r = workqueue_deq(wq, &wi, 1);
assert(r == EINVAL && wi == 0);
workqueue_destroy(wq);
}
// closing a wq with a blocked reader thread should cause the reader to get EINVAL
static void *
test_set_closed_waiter(void *arg) {
struct workqueue *CAST_FROM_VOIDP(wq, arg);
int r;
WORKITEM wi = 0;
r = workqueue_deq(wq, &wi, 1);
assert(r == EINVAL && wi == 0);
return arg;
}
static void
test_set_closed_thread (void) {
if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
struct workqueue workqueue, *wq = &workqueue;
int r;
workqueue_init(wq);
toku_pthread_t tid;
r = toku_pthread_create(&tid, 0, test_set_closed_waiter, wq); assert(r == 0);
sleep(1);
workqueue_set_closed(wq, 1);
void *ret;
r = toku_pthread_join(tid, &ret);
assert(r == 0 && ret == wq);
workqueue_destroy(wq);
}
// verify writer reader flow control
// the write (main) thread writes as fast as possible until the wq is full. then it
// waits.
// the read thread reads from the wq slowly using a random delay. it wakes up any
// writers when the wq size <= 1/2 of the wq limit
struct rwfc {
struct workqueue workqueue;
int current, limit;
};
static void rwfc_init (struct rwfc *rwfc, int limit) {
workqueue_init(&rwfc->workqueue);
rwfc->current = 0; rwfc->limit = limit;
}
static void
rwfc_destroy (struct rwfc *rwfc) {
workqueue_destroy(&rwfc->workqueue);
}
static void
rwfc_do_read (WORKITEM wi) {
struct rwfc *rwfc = (struct rwfc *) workitem_arg(wi);
workqueue_lock(&rwfc->workqueue);
if (2*rwfc->current-- > rwfc->limit && 2*rwfc->current <= rwfc->limit) {
workqueue_wakeup_write(&rwfc->workqueue, 0);
}
workqueue_unlock(&rwfc->workqueue);
destroy_workitem(wi);
}
static void *
rwfc_worker (void *arg) {
struct workqueue *CAST_FROM_VOIDP(wq, arg);
while (1) {
WORKITEM wi = 0;
int r = workqueue_deq(wq, &wi, 1);
if (r == EINVAL) {
assert(wi == 0);
break;
}
usleep(random() % 100);
wi->f(wi);
}
return arg;
}
static void
test_flow_control (int limit, int n, int maxthreads) {
if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
struct rwfc my_rwfc, *rwfc = &my_rwfc;
THREADPOOL tp;
int i;
rwfc_init(rwfc, limit);
toku_thread_pool_create(&tp, maxthreads);
int T = maxthreads;
toku_thread_pool_run(tp, 0, &T, rwfc_worker, &rwfc->workqueue);
assert(T == maxthreads);
sleep(1); // this is here to block the reader on the first deq
for (i=0; i<n; i++) {
WORKITEM wi = new_workitem();
workitem_init(wi, rwfc_do_read, rwfc);
workqueue_lock(&rwfc->workqueue);
workqueue_enq(&rwfc->workqueue, wi, 0);
rwfc->current++;
while (rwfc->current >= rwfc->limit) {
// printf("%d - %d %d\n", i, rwfc->current, rwfc->limit);
workqueue_wait_write(&rwfc->workqueue, 0);
}
workqueue_unlock(&rwfc->workqueue);
// toku_os_usleep(random() % 1);
}
workqueue_set_closed(&rwfc->workqueue, 1);
toku_thread_pool_destroy(&tp);
rwfc_destroy(rwfc);
}
int
test_main (int argc, const char *argv[]) {
int i;
for (i=1; i<argc; i++) {
const char *arg = argv[i];
if (strcmp(arg, "-v") == 0)
verbose++;
}
test_create_destroy();
test_simple_enq_deq(0);
test_simple_enq_deq(42);
test_set_closed();
test_set_closed_thread();
test_flow_control(8, 10000, 1);
test_flow_control(8, 10000, 2);
test_flow_control(8, 10000, 17);
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <stdio.h>
#include <errno.h>
#include <toku_portability.h>
#include "toku_assert.h"
#include "toku_os.h"
#include <toku_pthread.h>
#include "workqueue.h"
#include "threadpool.h"
// Create fixed number of worker threads, all waiting on a single queue
// of work items (WORKQUEUE).
void toku_init_workers(WORKQUEUE wq, THREADPOOL *tpptr, int fraction) {
workqueue_init(wq);
assert(fraction > 0);
int nprocs = toku_os_get_number_active_processors();
int nthreads = (nprocs*2)/fraction;
if (nthreads == 0) nthreads = 1;
toku_thread_pool_create(tpptr, nthreads);
toku_thread_pool_run(*tpptr, 0, &nthreads, toku_worker, wq);
}
void toku_destroy_workers(WORKQUEUE wq, THREADPOOL *tpptr) {
workqueue_set_closed(wq, 1); // close the work queue and [see "A" in toku_worker()]
toku_thread_pool_destroy(tpptr); // wait for all of the worker threads to exit
workqueue_destroy(wq);
}
void *toku_worker(void *arg) {
WORKQUEUE CAST_FROM_VOIDP(wq, arg);
int r;
while (1) {
WORKITEM wi = 0;
r = workqueue_deq(wq, &wi, 1); // get work from the queue, block if empty
if (r != 0) // shut down worker threads when work queue is closed
break; // [see "A" in toku_destroy_workers() ]
wi->f(wi); // call the work handler function
}
return arg;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
#ifndef _TOKU_WORKQUEUE_H
#define _TOKU_WORKQUEUE_H
#ident "$Id$"
#ident "Copyright (c) 2007-2012 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <errno.h>
#include "toku_assert.h"
#include <toku_pthread.h>
struct workitem;
// A work function is called by a worker thread when the workitem (see below) is being handled
// by a worker thread.
typedef void (*WORKFUNC)(struct workitem *wi);
// A workitem contains the function that is called by a worker thread in a threadpool.
// A workitem is queued in a workqueue.
typedef struct workitem *WORKITEM;
struct workitem {
WORKFUNC f;
void *arg;
struct workitem *next;
};
// Initialize a workitem with a function and argument
static inline void workitem_init(WORKITEM wi, WORKFUNC f, void *arg) {
wi->f = f;
wi->arg = arg;
wi->next = 0;
}
// Access the workitem function
static inline WORKFUNC workitem_func(WORKITEM wi) {
return wi->f;
}
// Access the workitem argument
static inline void *workitem_arg(WORKITEM wi) {
return wi->arg;
}
// A workqueue is currently a fifo of workitems that feeds a thread pool. We may
// divide the workqueue into per worker thread queues.
typedef struct workqueue *WORKQUEUE;
struct workqueue {
WORKITEM head, tail; // list of workitems
toku_mutex_t lock;
toku_cond_t wait_read; // wait for read
int want_read; // number of threads waiting to read
toku_cond_t wait_write; // wait for write
int want_write; // number of threads waiting to write
char closed; // kicks waiting threads off of the write queue
int n_in_queue; // count of how many workitems are in the queue.
};
// Get a pointer to the workqueue lock. This is used by workqueue client software
// that wants to control the workqueue locking.
static inline toku_mutex_t *workqueue_lock_ref(WORKQUEUE wq) {
return &wq->lock;
}
// Lock the workqueue
static inline void workqueue_lock(WORKQUEUE wq) {
toku_mutex_lock(&wq->lock);
}
// Unlock the workqueue
static inline void workqueue_unlock(WORKQUEUE wq) {
toku_mutex_unlock(&wq->lock);
}
// Initialize a workqueue
// Expects: the workqueue is not initialized
// Effects: the workqueue is set to empty and the condition variable is initialized
__attribute__((unused))
static void workqueue_init(WORKQUEUE wq) {
toku_mutex_init(&wq->lock, 0);
wq->head = wq->tail = 0;
toku_cond_init(&wq->wait_read, 0);
wq->want_read = 0;
toku_cond_init(&wq->wait_write, 0);
wq->want_write = 0;
wq->closed = 0;
wq->n_in_queue = 0;
}
// Destroy a work queue
// Expects: the work queue must be initialized and empty
__attribute__((unused))
static void workqueue_destroy(WORKQUEUE wq) {
workqueue_lock(wq); // shutup helgrind
assert(wq->head == 0 && wq->tail == 0);
workqueue_unlock(wq);
toku_cond_destroy(&wq->wait_read);
toku_cond_destroy(&wq->wait_write);
toku_mutex_destroy(&wq->lock);
}
// Close the work queue
// Effects: signal any threads blocked in the work queue
__attribute__((unused))
static void workqueue_set_closed(WORKQUEUE wq, int dolock) {
if (dolock) workqueue_lock(wq);
wq->closed = 1;
toku_cond_broadcast(&wq->wait_read);
toku_cond_broadcast(&wq->wait_write);
if (dolock) workqueue_unlock(wq);
}
// Determine whether or not the work queue is empty
// Returns: 1 if the work queue is empty, otherwise 0
static inline int workqueue_empty(WORKQUEUE wq) {
return wq->head == 0;
}
// Put a work item at the tail of the work queue
// Effects: append the work item to the end of the work queue and signal
// any work queue readers.
// Dolock controls whether or not the work queue lock should be taken.
__attribute__((unused))
static void workqueue_enq(WORKQUEUE wq, WORKITEM wi, int dolock) {
if (dolock) workqueue_lock(wq);
wq->n_in_queue++;
wi->next = 0;
if (wq->tail)
wq->tail->next = wi;
else
wq->head = wi;
wq->tail = wi;
if (wq->want_read) {
toku_cond_signal(&wq->wait_read);
}
if (dolock) workqueue_unlock(wq);
}
// Get a work item from the head of the work queue
// Effects: wait until the workqueue is not empty, remove the first workitem from the
// queue and return it.
// Dolock controls whether or not the work queue lock should be taken.
// Success: returns 0 and set the wiptr
// Failure: returns non-zero
__attribute__((unused))
static int workqueue_deq(WORKQUEUE wq, WORKITEM *wiptr, int dolock) {
if (dolock) workqueue_lock(wq);
assert(wq->n_in_queue >= 0);
while (workqueue_empty(wq)) {
if (wq->closed) {
if (dolock) workqueue_unlock(wq);
return EINVAL;
}
wq->want_read++;
toku_cond_wait(&wq->wait_read, &wq->lock);
wq->want_read--;
}
wq->n_in_queue--;
WORKITEM wi = wq->head;
wq->head = wi->next;
if (wq->head == 0)
wq->tail = 0;
wi->next = 0;
if (dolock) workqueue_unlock(wq);
*wiptr = wi;
return 0;
}
// Suspend the caller (thread that is currently attempting to put more work items into the work queue)
__attribute__((unused))
static void workqueue_wait_write(WORKQUEUE wq, int dolock) {
if (dolock) workqueue_lock(wq);
wq->want_write++;
toku_cond_wait(&wq->wait_write, &wq->lock);
wq->want_write--;
if (dolock) workqueue_unlock(wq);
}
// Wakeup all threads that are currently attempting to put more work items into the work queue
__attribute__((unused))
static void workqueue_wakeup_write(WORKQUEUE wq, int dolock) {
if (wq->want_write) {
if (dolock) workqueue_lock(wq);
if (wq->want_write) {
toku_cond_broadcast(&wq->wait_write);
}
if (dolock) workqueue_unlock(wq);
}
}
__attribute__((unused))
static int workqueue_n_in_queue (WORKQUEUE wq, int dolock) {
if (dolock) workqueue_lock(wq);
int r = wq->n_in_queue;
if (dolock) workqueue_unlock(wq);
return r;
}
#include "threadpool.h"
// initialize the work queue and worker
void toku_init_workers(WORKQUEUE wq, THREADPOOL *tpptr, int fraction);
void toku_init_workers_with_num_threads(WORKQUEUE wq, THREADPOOL *tpptr, int num_threads);
// destroy the work queue and worker
void toku_destroy_workers(WORKQUEUE wq, THREADPOOL *tpptr);
// this is the thread function for the worker threads in the worker thread
// pool. the arg is a pointer to the work queue that feeds work to the
// workers.
void *toku_worker(void *arg);
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment