Commit 8eaa912f authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge generalized worker threads from the 1183 branch. addresses #1183

git-svn-id: file:///svn/toku/tokudb@8902 c7de825b-a66e-492c-adef-691d508d4ae1
parent c730d763
...@@ -54,6 +54,7 @@ BRT_SOURCES = \ ...@@ -54,6 +54,7 @@ BRT_SOURCES = \
recover \ recover \
roll \ roll \
threadpool \ threadpool \
toku_worker \
trace_mem \ trace_mem \
x1764 \ x1764 \
ybt \ ybt \
......
// When objects are evicted from the cachetable, they are written to storage by a
// thread in a thread pool. The objects are placed onto a write queue that feeds
// the thread pool. The write queue expects that an external mutex is used to
// protect it.
typedef struct writequeue *WRITEQUEUE;
struct writequeue {
PAIR head, tail; // head and tail of the linked list of pair's
toku_pthread_cond_t wait_read; // wait for read
int want_read; // number of threads waiting to read
toku_pthread_cond_t wait_write; // wait for write
int want_write; // number of threads waiting to write
int ninq; // number of pairs in the queue
char closed; // kicks waiting threads off of the write queue
};
// initialize a writequeue
// expects: the writequeue is not initialized
// effects: the writequeue is set to empty and the condition variable is initialized
static void writequeue_init(WRITEQUEUE wq) {
wq->head = wq->tail = 0;
int r;
r = toku_pthread_cond_init(&wq->wait_read, 0); assert(r == 0);
wq->want_read = 0;
r = toku_pthread_cond_init(&wq->wait_write, 0); assert(r == 0);
wq->want_write = 0;
wq->ninq = 0;
wq->closed = 0;
}
// destroy a writequeue
// expects: the writequeue must be initialized and empty
static void writequeue_destroy(WRITEQUEUE wq) {
assert(wq->head == 0 && wq->tail == 0);
int r;
r = toku_pthread_cond_destroy(&wq->wait_read); assert(r == 0);
r = toku_pthread_cond_destroy(&wq->wait_write); assert(r == 0);
}
// close the writequeue
// effects: signal any threads blocked in the writequeue
static void writequeue_set_closed(WRITEQUEUE wq) {
wq->closed = 1;
int r;
r = toku_pthread_cond_broadcast(&wq->wait_read); assert(r == 0);
r = toku_pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
// determine whether or not the write queue is empty
// return: 1 if the write queue is empty, otherwise 0
static int writequeue_empty(WRITEQUEUE wq) {
return wq->head == 0;
}
// put a pair at the tail of the write queue
// expects: the mutex is locked
// effects: append the pair to the end of the write queue and signal
// any readers.
static void writequeue_enq(WRITEQUEUE wq, PAIR pair) {
pair->next_wq = 0;
if (wq->tail)
wq->tail->next_wq = pair;
else
wq->head = pair;
wq->tail = pair;
wq->ninq++;
if (wq->want_read) {
int r = toku_pthread_cond_signal(&wq->wait_read); assert(r == 0);
}
}
// get a pair from the head of the write queue
// expects: the mutex is locked
// effects: wait until the writequeue is not empty, remove the first pair from the
// write queue and return it
// returns: 0 if success, otherwise an error
static int writequeue_deq(WRITEQUEUE wq, toku_pthread_mutex_t *mutex, PAIR *pairptr) {
while (writequeue_empty(wq)) {
if (wq->closed)
return EINVAL;
wq->want_read++;
int r = toku_pthread_cond_wait(&wq->wait_read, mutex); assert(r == 0);
wq->want_read--;
}
PAIR pair = wq->head;
wq->head = pair->next_wq;
if (wq->head == 0)
wq->tail = 0;
wq->ninq--;
pair->next_wq = 0;
*pairptr = pair;
return 0;
}
// suspend the writer thread
// expects: the mutex is locked
static void writequeue_wait_write(WRITEQUEUE wq, toku_pthread_mutex_t *mutex) {
wq->want_write++;
int r = toku_pthread_cond_wait(&wq->wait_write, mutex); assert(r == 0);
wq->want_write--;
}
// wakeup the writer threads
// expects: the mutex is locked
static void writequeue_wakeup_write(WRITEQUEUE wq) {
if (wq->want_write) {
int r = toku_pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
}
/* -*- mode: C; c-basic-offset: 4 -*- */ /* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#include "includes.h" #include <stdlib.h>
#include <string.h>
// execute the cachetable callbacks using a writer thread 0->no 1->yes #include <malloc.h>
#define DO_WRITER_THREAD 1
#include "toku_portability.h"
#if DO_WRITER_THREAD #include "memory.h"
static void *cachetable_writer(void *); #include "workqueue.h"
#include "threadpool.h"
#include "cachetable.h"
#include "cachetable-rwlock.h"
#include "toku_worker.h"
// use worker threads 0->no 1->yes
#define DO_WORKER_THREAD 1
#if DO_WORKER_THREAD
static void cachetable_writer(WORKITEM);
static void cachetable_reader(WORKITEM);
#endif #endif
// we use 4 threads since gunzip is 4 times faster than gzip
#define MAX_WRITER_THREADS 4
// use cachetable locks 0->no 1->yes // use cachetable locks 0->no 1->yes
#define DO_CACHETABLE_LOCK 1 #define DO_CACHETABLE_LOCK 1
// unlock the cachetable while executing callbacks 0->no 1->yes
#define DO_CALLBACK_UNLOCK 1
// simulate long latency write operations with usleep. time in milliseconds. // simulate long latency write operations with usleep. time in milliseconds.
#define DO_CALLBACK_USLEEP 0 #define DO_CALLBACK_USLEEP 0
#define DO_CALLBACK_BUSYWAIT 0 #define DO_CALLBACK_BUSYWAIT 0
//#define TRACE_CACHETABLE #define TRACE_CACHETABLE 0
#ifdef TRACE_CACHETABLE #if TRACE_CACHETABLE
#define WHEN_TRACE_CT(x) x #define WHEN_TRACE_CT(x) x
#else #else
#define WHEN_TRACE_CT(x) ((void)0) #define WHEN_TRACE_CT(x) ((void)0)
#endif #endif
enum ctpair_state {
CTPAIR_INVALID = 0, // pair is invalid
CTPAIR_IDLE = 1, // pair is in memory
CTPAIR_READING = 2, // pair is being fetched into memory
CTPAIR_WRITING = 3, // pair is being flushed from memory
};
typedef struct ctpair *PAIR; typedef struct ctpair *PAIR;
struct ctpair { struct ctpair {
enum typ_tag tag; enum typ_tag tag;
char dirty; CACHEFILE cachefile;
char verify_flag; // Used in verify_cachetable()
char writing; // writing back
char write_me;
CACHEKEY key; CACHEKEY key;
void *value; void *value;
long size; long size;
PAIR next,prev; // In LRU list. enum ctpair_state state;
PAIR hash_chain; char dirty;
CACHEFILE cachefile; char verify_flag; // Used in verify_cachetable()
char write_me;
u_int32_t fullhash;
CACHETABLE_FLUSH_CALLBACK flush_callback; CACHETABLE_FLUSH_CALLBACK flush_callback;
CACHETABLE_FETCH_CALLBACK fetch_callback; CACHETABLE_FETCH_CALLBACK fetch_callback;
void *extraargs; void *extraargs;
PAIR next,prev; // In LRU list.
PAIR hash_chain;
LSN modified_lsn; // What was the LSN when modified (undefined if not dirty) LSN modified_lsn; // What was the LSN when modified (undefined if not dirty)
LSN written_lsn; // What was the LSN when written (we need to get this information when we fetch) LSN written_lsn; // What was the LSN when written (we need to get this information when we fetch)
u_int32_t fullhash;
PAIR next_wq; // the ctpair's are linked into a write queue when evicted struct ctpair_rwlock rwlock; // multiple get's, single writer
struct ctpair_rwlock rwlock; // reader writer lock used to grant an exclusive lock to the writeback thread struct workqueue *cq; // writers sometimes return ctpair's using this queue
struct writequeue *cq; // writers sometimes return ctpair's using this queue struct workitem asyncwork; // work item for the worker threads
}; };
#include "cachetable-writequeue.h" static void * const zero_value = 0;
static int const zero_size = 0;
static inline void ctpair_destroy(PAIR p) { static inline void ctpair_destroy(PAIR p) {
ctpair_rwlock_destroy(&p->rwlock); ctpair_rwlock_destroy(&p->rwlock);
...@@ -75,34 +91,31 @@ struct cachetable { ...@@ -75,34 +91,31 @@ struct cachetable {
long size_writing; // the sum of the sizes of the pairs being written long size_writing; // the sum of the sizes of the pairs being written
LSN lsn_of_checkpoint; // the most recent checkpoint in the log. LSN lsn_of_checkpoint; // the most recent checkpoint in the log.
TOKULOGGER logger; TOKULOGGER logger;
toku_pthread_mutex_t mutex; // coarse lock that protects the cachetable, the cachefiles, and the pair's toku_pthread_mutex_t *mutex; // coarse lock that protects the cachetable, the cachefiles, and the pair's
struct writequeue wq; // write queue for the writer threads struct workqueue wq; // async work queue
THREADPOOL threadpool; // pool of writer threads THREADPOOL threadpool; // pool of worker threads
char checkpointing; // checkpoint in progress char checkpointing; // checkpoint in progress
}; };
// lock the cachetable mutex // Lock the cachetable
static inline void cachetable_lock(CACHETABLE ct __attribute__((unused))) { static inline void cachetable_lock(CACHETABLE ct __attribute__((unused))) {
#if DO_CACHETABLE_LOCK #if DO_CACHETABLE_LOCK
int r = toku_pthread_mutex_lock(&ct->mutex); assert(r == 0); int r = toku_pthread_mutex_lock(ct->mutex); assert(r == 0);
#endif #endif
} }
// unlock the cachetable mutex // Unlock the cachetable
static inline void cachetable_unlock(CACHETABLE ct __attribute__((unused))) { static inline void cachetable_unlock(CACHETABLE ct __attribute__((unused))) {
#if DO_CACHETABLE_LOCK #if DO_CACHETABLE_LOCK
int r = toku_pthread_mutex_unlock(&ct->mutex); assert(r == 0); int r = toku_pthread_mutex_unlock(ct->mutex); assert(r == 0);
#endif #endif
} }
// wait for writes to complete if the size in the write queue is 1/2 of // Wait for writes to complete if the size in the write queue is 1/2 of
// the cachetable // the cachetable
static inline void cachetable_wait_write(CACHETABLE ct) { static inline void cachetable_wait_write(CACHETABLE ct) {
while (2*ct->size_writing > ct->size_current) { while (2*ct->size_writing > ct->size_current) {
writequeue_wait_write(&ct->wq, &ct->mutex); workqueue_wait_write(&ct->wq, 0);
} }
} }
...@@ -135,45 +148,34 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn, ...@@ -135,45 +148,34 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn,
} }
} }
#endif #endif
TAGMALLOC(CACHETABLE, t); TAGMALLOC(CACHETABLE, ct);
if (t == 0) return ENOMEM; if (ct == 0) return ENOMEM;
t->n_in_table = 0; ct->n_in_table = 0;
t->table_size = 4; ct->table_size = 4;
MALLOC_N(t->table_size, t->table); MALLOC_N(ct->table_size, ct->table);
assert(t->table); assert(ct->table);
t->head = t->tail = 0; ct->head = ct->tail = 0;
u_int32_t i; u_int32_t i;
for (i=0; i<t->table_size; i++) { for (i=0; i<ct->table_size; i++) {
t->table[i]=0; ct->table[i]=0;
} }
t->cachefiles = 0; ct->cachefiles = 0;
t->size_current = 0; ct->size_current = 0;
t->size_limit = size_limit; ct->size_limit = size_limit;
t->size_writing = 0; ct->size_writing = 0;
t->lsn_of_checkpoint = initial_lsn; ct->lsn_of_checkpoint = initial_lsn;
t->logger = logger; ct->logger = logger;
t->checkpointing = 0; ct->checkpointing = 0;
int r; toku_init_workers(&ct->wq, &ct->threadpool);
writequeue_init(&t->wq); ct->mutex = workqueue_lock_ref(&ct->wq);
r = toku_pthread_mutex_init(&t->mutex, 0); assert(r == 0); *result = ct;
// set the max number of writeback threads to min(MAX_WRITER_THREADS,nprocs_online)
int nprocs = toku_os_get_number_active_processors();
if (nprocs > MAX_WRITER_THREADS) nprocs = MAX_WRITER_THREADS;
r = threadpool_create(&t->threadpool, nprocs); assert(r == 0);
#if DO_WRITER_THREAD
for (i=0; i < (u_int32_t)nprocs; i++)
threadpool_maybe_add(t->threadpool, cachetable_writer, t);
#endif
*result = t;
return 0; return 0;
} }
// What cachefile goes with particular fd? // What cachefile goes with particular fd?
int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf) { int toku_cachefile_of_filenum (CACHETABLE ct, FILENUM filenum, CACHEFILE *cf) {
CACHEFILE extant; CACHEFILE extant;
for (extant = t->cachefiles; extant; extant=extant->next) { for (extant = ct->cachefiles; extant; extant=extant->next) {
if (extant->filenum.fileid==filenum.fileid) { if (extant->filenum.fileid==filenum.fileid) {
*cf = extant; *cf = extant;
return 0; return 0;
...@@ -184,15 +186,15 @@ int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf) { ...@@ -184,15 +186,15 @@ int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf) {
static FILENUM next_filenum_to_use={0}; static FILENUM next_filenum_to_use={0};
static void cachefile_init_filenum(CACHEFILE newcf, int fd, const char *fname, struct fileid fileid) \ static void cachefile_init_filenum(CACHEFILE cf, int fd, const char *fname, struct fileid fileid) \
{ {
newcf->fd = fd; cf->fd = fd;
newcf->fileid = fileid; cf->fileid = fileid;
newcf->fname = fname ? toku_strdup(fname) : 0; cf->fname = fname ? toku_strdup(fname) : 0;
} }
// If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile. // If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile.
int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, const char *fname) { int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char *fname) {
int r; int r;
CACHEFILE extant; CACHEFILE extant;
struct fileid fileid; struct fileid fileid;
...@@ -201,17 +203,17 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, const char *fna ...@@ -201,17 +203,17 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, const char *fna
r=errno; close(fd); r=errno; close(fd);
return r; return r;
} }
for (extant = t->cachefiles; extant; extant=extant->next) { for (extant = ct->cachefiles; extant; extant=extant->next) {
if (memcmp(&extant->fileid, &fileid, sizeof(fileid))==0) { if (memcmp(&extant->fileid, &fileid, sizeof(fileid))==0) {
r = close(fd); r = close(fd);
assert(r == 0); assert(r == 0);
extant->refcount++; extant->refcount++;
*cf = extant; *cfptr = extant;
return 0; return 0;
} }
} }
try_again: try_again:
for (extant = t->cachefiles; extant; extant=extant->next) { for (extant = ct->cachefiles; extant; extant=extant->next) {
if (next_filenum_to_use.fileid==extant->filenum.fileid) { if (next_filenum_to_use.fileid==extant->filenum.fileid) {
next_filenum_to_use.fileid++; next_filenum_to_use.fileid++;
goto try_again; goto try_again;
...@@ -219,26 +221,30 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, const char *fna ...@@ -219,26 +221,30 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, const char *fna
} }
{ {
CACHEFILE MALLOC(newcf); CACHEFILE MALLOC(newcf);
newcf->cachetable = t; newcf->cachetable = ct;
newcf->filenum.fileid = next_filenum_to_use.fileid++; newcf->filenum.fileid = next_filenum_to_use.fileid++;
cachefile_init_filenum(newcf, fd, fname, fileid); cachefile_init_filenum(newcf, fd, fname, fileid);
newcf->refcount = 1; newcf->refcount = 1;
newcf->next = t->cachefiles; newcf->next = ct->cachefiles;
t->cachefiles = newcf; ct->cachefiles = newcf;
newcf->userdata = 0; newcf->userdata = 0;
newcf->close_userdata = 0; newcf->close_userdata = 0;
newcf->checkpoint_userdata = 0; newcf->checkpoint_userdata = 0;
*cf = newcf; *cfptr = newcf;
return 0; return 0;
} }
} }
int toku_cachetable_openf (CACHEFILE *cf, CACHETABLE t, const char *fname, int flags, mode_t mode) { int toku_cachetable_openf (CACHEFILE *cfptr, CACHETABLE ct, const char *fname, int flags, mode_t mode) {
int fd = open(fname, flags+O_BINARY, mode); int fd = open(fname, flags+O_BINARY, mode);
if (fd<0) return errno; if (fd<0) return errno;
return toku_cachetable_openfd (cf, t, fd, fname); return toku_cachetable_openfd (cfptr, ct, fd, fname);
}
WORKQUEUE toku_cachetable_get_workqueue(CACHETABLE ct) {
return &ct->wq;
} }
int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname) { int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname) {
...@@ -367,71 +373,71 @@ u_int32_t toku_cachetable_hash (CACHEFILE cachefile, BLOCKNUM key) ...@@ -367,71 +373,71 @@ u_int32_t toku_cachetable_hash (CACHEFILE cachefile, BLOCKNUM key)
} }
#if 0 #if 0
static unsigned int hashit (CACHETABLE t, CACHEKEY key, CACHEFILE cachefile) { static unsigned int hashit (CACHETABLE ct, CACHEKEY key, CACHEFILE cachefile) {
assert(0==(t->table_size & (t->table_size -1))); // make sure table is power of two assert(0==(ct->table_size & (ct->table_size -1))); // make sure table is power of two
return (toku_cachetable_hash(key,cachefile))&(t->table_size-1); return (toku_cachetable_hash(key,cachefile))&(ct->table_size-1);
} }
#endif #endif
static void cachetable_rehash (CACHETABLE t, u_int32_t newtable_size) { static void cachetable_rehash (CACHETABLE ct, u_int32_t newtable_size) {
// printf("rehash %p %d %d %d\n", t, primeindexdelta, t->n_in_table, t->table_size); // printf("rehash %p %d %d %d\n", t, primeindexdelta, ct->n_in_table, ct->table_size);
assert(newtable_size>=4 && ((newtable_size & (newtable_size-1))==0)); assert(newtable_size>=4 && ((newtable_size & (newtable_size-1))==0));
PAIR *newtable = toku_calloc(newtable_size, sizeof(*t->table)); PAIR *newtable = toku_calloc(newtable_size, sizeof(*ct->table));
u_int32_t i; u_int32_t i;
//printf("%s:%d newtable_size=%d\n", __FILE__, __LINE__, newtable_size); //printf("%s:%d newtable_size=%d\n", __FILE__, __LINE__, newtable_size);
assert(newtable!=0); assert(newtable!=0);
u_int32_t oldtable_size = t->table_size; u_int32_t oldtable_size = ct->table_size;
t->table_size=newtable_size; ct->table_size=newtable_size;
for (i=0; i<newtable_size; i++) newtable[i]=0; for (i=0; i<newtable_size; i++) newtable[i]=0;
for (i=0; i<oldtable_size; i++) { for (i=0; i<oldtable_size; i++) {
PAIR p; PAIR p;
while ((p=t->table[i])!=0) { while ((p=ct->table[i])!=0) {
unsigned int h = p->fullhash&(newtable_size-1); unsigned int h = p->fullhash&(newtable_size-1);
t->table[i] = p->hash_chain; ct->table[i] = p->hash_chain;
p->hash_chain = newtable[h]; p->hash_chain = newtable[h];
newtable[h] = p; newtable[h] = p;
} }
} }
toku_free(t->table); toku_free(ct->table);
// printf("Freed\n"); // printf("Freed\n");
t->table=newtable; ct->table=newtable;
//printf("Done growing or shrinking\n"); //printf("Done growing or shrinking\n");
} }
static void lru_remove (CACHETABLE t, PAIR p) { static void lru_remove (CACHETABLE ct, PAIR p) {
if (p->next) { if (p->next) {
p->next->prev = p->prev; p->next->prev = p->prev;
} else { } else {
assert(t->tail==p); assert(ct->tail==p);
t->tail = p->prev; ct->tail = p->prev;
} }
if (p->prev) { if (p->prev) {
p->prev->next = p->next; p->prev->next = p->next;
} else { } else {
assert(t->head==p); assert(ct->head==p);
t->head = p->next; ct->head = p->next;
} }
p->prev = p->next = 0; p->prev = p->next = 0;
} }
static void lru_add_to_list (CACHETABLE t, PAIR p) { static void lru_add_to_list (CACHETABLE ct, PAIR p) {
// requires that touch_me is not currently in the table. // requires that touch_me is not currently in the table.
assert(p->prev==0); assert(p->prev==0);
p->prev = 0; p->prev = 0;
p->next = t->head; p->next = ct->head;
if (t->head) { if (ct->head) {
t->head->prev = p; ct->head->prev = p;
} else { } else {
assert(!t->tail); assert(!ct->tail);
t->tail = p; ct->tail = p;
} }
t->head = p; ct->head = p;
} }
static void lru_touch (CACHETABLE t, PAIR p) { static void lru_touch (CACHETABLE ct, PAIR p) {
lru_remove(t,p); lru_remove(ct,p);
lru_add_to_list(t,p); lru_add_to_list(ct,p);
} }
static PAIR remove_from_hash_chain (PAIR remove_me, PAIR list) { static PAIR remove_from_hash_chain (PAIR remove_me, PAIR list) {
...@@ -445,10 +451,10 @@ static PAIR remove_from_hash_chain (PAIR remove_me, PAIR list) { ...@@ -445,10 +451,10 @@ static PAIR remove_from_hash_chain (PAIR remove_me, PAIR list) {
// Thus we need to rename it if it is dirty, // Thus we need to rename it if it is dirty,
// if it has been modified within the current checkpoint regime (hence non-strict inequality) // if it has been modified within the current checkpoint regime (hence non-strict inequality)
// and the last time it was written was in a previous checkpoint regime (strict inequality) // and the last time it was written was in a previous checkpoint regime (strict inequality)
static BOOL need_to_rename_p (CACHETABLE t, PAIR p) { static BOOL need_to_rename_p (CACHETABLE ct, PAIR p) {
return (BOOL)(p->dirty return (BOOL)(p->dirty
&& p->modified_lsn.lsn>=t->lsn_of_checkpoint.lsn // nonstrict && p->modified_lsn.lsn>=ct->lsn_of_checkpoint.lsn // nonstrict
&& p->written_lsn.lsn < t->lsn_of_checkpoint.lsn); // strict && p->written_lsn.lsn < ct->lsn_of_checkpoint.lsn); // strict
} }
// Remove a pair from the cachetable // Remove a pair from the cachetable
...@@ -476,18 +482,48 @@ static void cachetable_remove_pair (CACHETABLE ct, PAIR p) { ...@@ -476,18 +482,48 @@ static void cachetable_remove_pair (CACHETABLE ct, PAIR p) {
static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p) { static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p) {
if (ctpair_users(&p->rwlock) == 0) { if (ctpair_users(&p->rwlock) == 0) {
cachetable_remove_pair(ct, p); cachetable_remove_pair(ct, p);
#if DO_CALLBACK_UNLOCK
cachetable_unlock(ct); cachetable_unlock(ct);
#endif
p->flush_callback(p->cachefile, p->key, p->value, p->extraargs, p->size, FALSE, FALSE, p->flush_callback(p->cachefile, p->key, p->value, p->extraargs, p->size, FALSE, FALSE,
ct->lsn_of_checkpoint, need_to_rename_p(ct, p)); ct->lsn_of_checkpoint, need_to_rename_p(ct, p));
#if DO_CALLBACK_UNLOCK
cachetable_lock(ct); cachetable_lock(ct);
#endif
ctpair_destroy(p); ctpair_destroy(p);
} }
} }
// Read a pair from a cachefile into memory using the pair's fetch callback
static int cachetable_fetch_pair(CACHETABLE ct, CACHEFILE cf, PAIR p) {
void *toku_value = 0;
long size = 0;
LSN written_lsn = ZERO_LSN;
cachetable_unlock(ct);
WHEN_TRACE_CT(printf("%s:%d CT: fetch_callback(%lld...)\n", __FILE__, __LINE__, key));
int r = p->fetch_callback(cf, p->key, p->fullhash, &toku_value, &size, p->extraargs, &written_lsn);
cachetable_lock(ct);
if (r) {
cachetable_remove_pair(ct, p);
p->state = CTPAIR_INVALID;
ctpair_write_unlock(&p->rwlock);
if (ctpair_users(&p->rwlock) == 0)
ctpair_destroy(p);
return r;
}
lru_touch(ct, p);
p->state = CTPAIR_IDLE;
p->value = toku_value;
p->written_lsn = written_lsn;
p->size = size;
ct->size_current += size;
ctpair_write_unlock(&p->rwlock);
return 0;
}
static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remove); static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remove);
// Write a pair to storage // Write a pair to storage
...@@ -497,10 +533,10 @@ static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remov ...@@ -497,10 +533,10 @@ static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remov
// boolean is true, so the pair is not yet evicted from the cachetable. // boolean is true, so the pair is not yet evicted from the cachetable.
static void cachetable_write_pair(CACHETABLE ct, PAIR p) { static void cachetable_write_pair(CACHETABLE ct, PAIR p) {
ctpair_write_lock(&p->rwlock, &ct->mutex); ctpair_write_lock(&p->rwlock, ct->mutex);
#if DO_CALLBACK_UNLOCK
cachetable_unlock(ct); cachetable_unlock(ct);
#endif
// write callback // write callback
p->flush_callback(p->cachefile, p->key, p->value, p->extraargs, p->size, (BOOL)(p->dirty && p->write_me), TRUE, p->flush_callback(p->cachefile, p->key, p->value, p->extraargs, p->size, (BOOL)(p->dirty && p->write_me), TRUE,
ct->lsn_of_checkpoint, need_to_rename_p(ct, p)); ct->lsn_of_checkpoint, need_to_rename_p(ct, p));
...@@ -519,9 +555,8 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p) { ...@@ -519,9 +555,8 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p) {
break; break;
} }
#endif #endif
#if DO_CALLBACK_UNLOCK
cachetable_lock(ct); cachetable_lock(ct);
#endif
// the pair is no longer dirty once written // the pair is no longer dirty once written
if (p->dirty && p->write_me) if (p->dirty && p->write_me)
...@@ -530,7 +565,7 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p) { ...@@ -530,7 +565,7 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p) {
// stuff it into a completion queue for delayed completion if a completion queue exists // stuff it into a completion queue for delayed completion if a completion queue exists
// otherwise complete the write now // otherwise complete the write now
if (p->cq) if (p->cq)
writequeue_enq(p->cq, p); workqueue_enq(p->cq, &p->asyncwork, 1);
else else
cachetable_complete_write_pair(ct, p, TRUE); cachetable_complete_write_pair(ct, p, TRUE);
} }
...@@ -541,14 +576,14 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p) { ...@@ -541,14 +576,14 @@ static void cachetable_write_pair(CACHETABLE ct, PAIR p) {
static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remove) { static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remove) {
p->cq = 0; p->cq = 0;
p->writing = 0; p->state = CTPAIR_IDLE;
// maybe wakeup any stalled writers when the pending writes fall below // maybe wakeup any stalled writers when the pending writes fall below
// 1/8 of the size of the cachetable // 1/8 of the size of the cachetable
ct->size_writing -= p->size; ct->size_writing -= p->size;
assert(ct->size_writing >= 0); assert(ct->size_writing >= 0);
if (8*ct->size_writing <= ct->size_current) if (8*ct->size_writing <= ct->size_current)
writequeue_wakeup_write(&ct->wq); workqueue_wakeup_write(&ct->wq, 0);
ctpair_write_unlock(&p->rwlock); ctpair_write_unlock(&p->rwlock);
if (do_remove) if (do_remove)
...@@ -559,25 +594,27 @@ static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remov ...@@ -559,25 +594,27 @@ static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remov
// a thread pool. // a thread pool.
static void flush_and_remove (CACHETABLE ct, PAIR p, int write_me) { static void flush_and_remove (CACHETABLE ct, PAIR p, int write_me) {
p->writing = 1; p->state = CTPAIR_WRITING;
ct->size_writing += p->size; assert(ct->size_writing >= 0); ct->size_writing += p->size; assert(ct->size_writing >= 0);
p->write_me = (char)(write_me?1:0); p->write_me = (char)(write_me?1:0);
#if DO_WRITER_THREAD #if DO_WORKER_THREAD
WORKITEM wi = &p->asyncwork;
workitem_init(wi, cachetable_writer, p);
if (!p->dirty || !p->write_me) { if (!p->dirty || !p->write_me) {
// evictions without a write can be run in the current thread // evictions without a write can be run in the current thread
cachetable_write_pair(ct, p); cachetable_write_pair(ct, p);
} else { } else {
writequeue_enq(&ct->wq, p); workqueue_enq(&ct->wq, wi, 0);
} }
#else #else
cachetable_write_pair(ct, p); cachetable_write_pair(ct, p);
#endif #endif
} }
static int maybe_flush_some (CACHETABLE t, long size) { static int maybe_flush_some (CACHETABLE ct, long size) {
int r = 0; int r = 0;
again: again:
if (size + t->size_current > t->size_limit + t->size_writing) { if (size + ct->size_current > ct->size_limit + ct->size_writing) {
{ {
//unsigned long rss __attribute__((__unused__)) = check_max_rss(); //unsigned long rss __attribute__((__unused__)) = check_max_rss();
//printf("this-size=%.6fMB projected size = %.2fMB limit=%2.fMB rss=%2.fMB\n", size/(1024.0*1024.0), (size+t->size_current)/(1024.0*1024.0), t->size_limit/(1024.0*1024.0), rss/256.0); //printf("this-size=%.6fMB projected size = %.2fMB limit=%2.fMB rss=%2.fMB\n", size/(1024.0*1024.0), (size+t->size_current)/(1024.0*1024.0), t->size_limit/(1024.0*1024.0), rss/256.0);
...@@ -586,9 +623,9 @@ static int maybe_flush_some (CACHETABLE t, long size) { ...@@ -586,9 +623,9 @@ static int maybe_flush_some (CACHETABLE t, long size) {
} }
/* Try to remove one. */ /* Try to remove one. */
PAIR remove_me; PAIR remove_me;
for (remove_me = t->tail; remove_me; remove_me = remove_me->prev) { for (remove_me = ct->tail; remove_me; remove_me = remove_me->prev) {
if (!ctpair_users(&remove_me->rwlock) && !remove_me->writing) { if (!ctpair_users(&remove_me->rwlock) && remove_me->state == CTPAIR_IDLE) {
flush_and_remove(t, remove_me, 1); flush_and_remove(ct, remove_me, 1);
goto again; goto again;
} }
} }
...@@ -597,36 +634,40 @@ static int maybe_flush_some (CACHETABLE t, long size) { ...@@ -597,36 +634,40 @@ static int maybe_flush_some (CACHETABLE t, long size) {
return 0; // Don't indicate an error code. Instead let memory get overfull. return 0; // Don't indicate an error code. Instead let memory get overfull.
} }
if ((4 * t->n_in_table < t->table_size) && t->table_size > 4) if ((4 * ct->n_in_table < ct->table_size) && ct->table_size > 4)
cachetable_rehash(t, t->table_size/2); cachetable_rehash(ct, ct->table_size/2);
return r; return r;
} }
static int cachetable_insert_at(CACHEFILE cachefile, u_int32_t fullhash, CACHEKEY key, void *value, long size, static PAIR cachetable_insert_at(CACHETABLE ct,
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHEFILE cachefile, CACHEKEY key, void *value,
CACHETABLE_FETCH_CALLBACK fetch_callback, enum ctpair_state state,
void *extraargs, int dirty, u_int32_t fullhash,
LSN written_lsn) { long size,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
void *extraargs,
int dirty,
LSN written_lsn) {
TAGMALLOC(PAIR, p); TAGMALLOC(PAIR, p);
assert(p);
memset(p, 0, sizeof *p); memset(p, 0, sizeof *p);
ctpair_rwlock_init(&p->rwlock); p->cachefile = cachefile;
p->key = key;
p->value = value;
p->fullhash = fullhash; p->fullhash = fullhash;
p->dirty = (char)(dirty ? 1 : 0); //printf("%s:%d p=%p dirty=%d\n", __FILE__, __LINE__, p, p->dirty); p->dirty = (char)(dirty ? 1 : 0); //printf("%s:%d p=%p dirty=%d\n", __FILE__, __LINE__, p, p->dirty);
p->size = size; p->size = size;
p->writing = 0; p->state = state;
p->key = key;
p->value = value;
p->next = p->prev = 0;
p->cachefile = cachefile;
p->flush_callback = flush_callback; p->flush_callback = flush_callback;
p->fetch_callback = fetch_callback; p->fetch_callback = fetch_callback;
p->extraargs = extraargs; p->extraargs = extraargs;
p->modified_lsn.lsn = 0; p->modified_lsn.lsn = 0;
p->written_lsn = written_lsn; p->written_lsn = written_lsn;
p->fullhash = fullhash; p->fullhash = fullhash;
CACHETABLE ct = cachefile->cachetable; p->next = p->prev = 0;
ctpair_read_lock(&p->rwlock, &ct->mutex); ctpair_rwlock_init(&p->rwlock);
p->cq = 0; p->cq = 0;
lru_add_to_list(ct, p); lru_add_to_list(ct, p);
u_int32_t h = fullhash & (ct->table_size-1); u_int32_t h = fullhash & (ct->table_size-1);
...@@ -637,7 +678,7 @@ static int cachetable_insert_at(CACHEFILE cachefile, u_int32_t fullhash, CACHEKE ...@@ -637,7 +678,7 @@ static int cachetable_insert_at(CACHEFILE cachefile, u_int32_t fullhash, CACHEKE
if (ct->n_in_table > ct->table_size) { if (ct->n_in_table > ct->table_size) {
cachetable_rehash(ct, ct->table_size*2); cachetable_rehash(ct, ct->table_size*2);
} }
return 0; return p;
} }
enum { hash_histogram_max = 100 }; enum { hash_histogram_max = 100 };
...@@ -672,7 +713,7 @@ int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, v ...@@ -672,7 +713,7 @@ int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, v
// In practice, the functions better be the same. // In practice, the functions better be the same.
assert(p->flush_callback==flush_callback); assert(p->flush_callback==flush_callback);
assert(p->fetch_callback==fetch_callback); assert(p->fetch_callback==fetch_callback);
ctpair_read_lock(&p->rwlock, &ct->mutex); ctpair_read_lock(&p->rwlock, ct->mutex);
cachetable_unlock(ct); cachetable_unlock(ct);
note_hash_count(count); note_hash_count(count);
return -1; /* Already present. */ return -1; /* Already present. */
...@@ -685,28 +726,36 @@ int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, v ...@@ -685,28 +726,36 @@ int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, v
return r; return r;
} }
// flushing could change the table size, but wont' change the fullhash // flushing could change the table size, but wont' change the fullhash
r = cachetable_insert_at(cachefile, fullhash, key, value, size, flush_callback, fetch_callback, extraargs, 1, ZERO_LSN); PAIR p = cachetable_insert_at(ct, cachefile, key, value, CTPAIR_IDLE, fullhash, size, flush_callback, fetch_callback, extraargs, CACHETABLE_DIRTY, ZERO_LSN);
assert(p);
ctpair_read_lock(&p->rwlock, ct->mutex);
cachetable_unlock(ct); cachetable_unlock(ct);
note_hash_count(count); note_hash_count(count);
return r; return 0;
} }
int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value, long *sizep, int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value, long *sizep,
CACHETABLE_FLUSH_CALLBACK flush_callback, CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs) { CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs) {
CACHETABLE t = cachefile->cachetable; CACHETABLE ct = cachefile->cachetable;
PAIR p; PAIR p;
int count=0; int count=0;
cachetable_lock(t); cachetable_lock(ct);
cachetable_wait_write(t); cachetable_wait_write(ct);
for (p=t->table[fullhash&(t->table_size-1)]; p; p=p->hash_chain) { for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++; count++;
if (p->key.b==key.b && p->cachefile==cachefile) { if (p->key.b==key.b && p->cachefile==cachefile) {
ctpair_read_lock(&p->rwlock, ct->mutex);
if (p->state == CTPAIR_INVALID) {
if (ctpair_users(&p->rwlock) == 0)
ctpair_destroy(p);
cachetable_unlock(ct);
return ENODEV;
}
lru_touch(ct,p);
*value = p->value; *value = p->value;
if (sizep) *sizep = p->size; if (sizep) *sizep = p->size;
ctpair_read_lock(&p->rwlock, &t->mutex); cachetable_unlock(ct);
lru_touch(t,p);
cachetable_unlock(t);
note_hash_count(count); note_hash_count(count);
WHEN_TRACE_CT(printf("%s:%d cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value)); WHEN_TRACE_CT(printf("%s:%d cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value));
return 0; return 0;
...@@ -716,23 +765,22 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t ful ...@@ -716,23 +765,22 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t ful
int r; int r;
// Note. hashit(t,key) may have changed as a result of flushing. But fullhash won't have changed. // Note. hashit(t,key) may have changed as a result of flushing. But fullhash won't have changed.
{ {
void *toku_value; p = cachetable_insert_at(ct, cachefile, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, fetch_callback, extraargs, CACHETABLE_CLEAN, ZERO_LSN);
long size = 1; // compat assert(p);
LSN written_lsn; ctpair_write_lock(&p->rwlock, ct->mutex);
WHEN_TRACE_CT(printf("%s:%d CT: fetch_callback(%lld...)\n", __FILE__, __LINE__, key)); r = cachetable_fetch_pair(ct, cachefile, p);
if ((r=fetch_callback(cachefile, key, fullhash, &toku_value, &size, extraargs, &written_lsn))) { if (r) {
if (r == DB_BADFORMAT) toku_db_badformat(); cachetable_unlock(ct);
cachetable_unlock(t);
return r; return r;
} }
cachetable_insert_at(cachefile, fullhash, key, toku_value, size, flush_callback, fetch_callback, extraargs, 0, written_lsn); ctpair_read_lock(&p->rwlock, ct->mutex);
*value = toku_value; assert(p->state == CTPAIR_IDLE);
if (sizep)
*sizep = size; *value = p->value;
if (sizep) *sizep = p->size;
} }
r = maybe_flush_some(t, 0); r = maybe_flush_some(ct, 0);
if (r == DB_BADFORMAT) toku_db_badformat(); cachetable_unlock(ct);
cachetable_unlock(t);
WHEN_TRACE_CT(printf("%s:%d did fetch: cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value)); WHEN_TRACE_CT(printf("%s:%d did fetch: cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value));
return r; return r;
} }
...@@ -742,74 +790,102 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t ful ...@@ -742,74 +790,102 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t ful
// if it is being written, then allow the writer to evict it. This prevents writers // if it is being written, then allow the writer to evict it. This prevents writers
// being suspended on a block that was just selected for eviction. // being suspended on a block that was just selected for eviction.
int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value) { int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value) {
CACHETABLE t = cachefile->cachetable; CACHETABLE ct = cachefile->cachetable;
PAIR p; PAIR p;
int count = 0; int count = 0;
cachetable_lock(t); cachetable_lock(ct);
for (p=t->table[fullhash&(t->table_size-1)]; p; p=p->hash_chain) { for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++; count++;
if (p->key.b==key.b && p->cachefile==cachefile && !p->writing) { if (p->key.b==key.b && p->cachefile==cachefile && p->state == CTPAIR_IDLE) {
*value = p->value; *value = p->value;
ctpair_read_lock(&p->rwlock, &t->mutex); ctpair_read_lock(&p->rwlock, ct->mutex);
lru_touch(t,p); lru_touch(ct,p);
cachetable_unlock(t); cachetable_unlock(ct);
note_hash_count(count); note_hash_count(count);
//printf("%s:%d cachetable_maybe_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value); //printf("%s:%d cachetable_maybe_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value);
return 0; return 0;
} }
} }
cachetable_unlock(t); cachetable_unlock(ct);
note_hash_count(count); note_hash_count(count);
return -1; return -1;
} }
int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, int dirty, long size) { int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, int dirty, long size) {
CACHETABLE t = cachefile->cachetable; CACHETABLE ct = cachefile->cachetable;
PAIR p; PAIR p;
WHEN_TRACE_CT(printf("%s:%d unpin(%lld)", __FILE__, __LINE__, key)); WHEN_TRACE_CT(printf("%s:%d unpin(%lld)", __FILE__, __LINE__, key));
//printf("%s:%d is dirty now=%d\n", __FILE__, __LINE__, dirty); //printf("%s:%d is dirty now=%d\n", __FILE__, __LINE__, dirty);
int count = 0; int count = 0;
//assert(fullhash == toku_cachetable_hash(cachefile, key)); //assert(fullhash == toku_cachetable_hash(cachefile, key));
cachetable_lock(t); cachetable_lock(ct);
for (p=t->table[fullhash&(t->table_size-1)]; p; p=p->hash_chain) { for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++; count++;
if (p->key.b==key.b && p->cachefile==cachefile) { if (p->key.b==key.b && p->cachefile==cachefile) {
assert(p->rwlock.pinned>0); assert(p->rwlock.pinned>0);
ctpair_read_unlock(&p->rwlock); ctpair_read_unlock(&p->rwlock);
if (dirty) p->dirty = TRUE; if (dirty) p->dirty = TRUE;
if (size != 0) { if (size != 0) {
t->size_current -= p->size; if (p->writing) t->size_writing -= p->size; ct->size_current -= p->size; if (p->state == CTPAIR_WRITING) ct->size_writing -= p->size;
p->size = size; p->size = size;
t->size_current += p->size; if (p->writing) t->size_writing += p->size; ct->size_current += p->size; if (p->state == CTPAIR_WRITING) ct->size_writing += p->size;
} }
WHEN_TRACE_CT(printf("[count=%lld]\n", p->pinned)); WHEN_TRACE_CT(printf("[count=%lld]\n", p->pinned));
{ {
int r; int r;
if ((r=maybe_flush_some(t, 0))) { if ((r=maybe_flush_some(ct, 0))) {
cachetable_unlock(t); cachetable_unlock(ct);
return r; return r;
} }
} }
cachetable_unlock(t); cachetable_unlock(ct);
note_hash_count(count); note_hash_count(count);
return 0; return 0;
} }
} }
cachetable_unlock(t); cachetable_unlock(ct);
note_hash_count(count); note_hash_count(count);
return -1; return -1;
} }
int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
void *extraargs) {
CACHETABLE ct = cf->cachetable;
cachetable_lock(ct);
// lookup
PAIR p;
for (p = ct->table[fullhash&(ct->table_size-1)]; p; p = p->hash_chain)
if (p->key.b==key.b && p->cachefile==cf)
break;
// if not found then create a pair in the READING state and fetch it
if (p == 0) {
p = cachetable_insert_at(ct, cf, key, zero_value, CTPAIR_READING, fullhash, zero_size, flush_callback, fetch_callback, extraargs, CACHETABLE_CLEAN, ZERO_LSN);
assert(p);
ctpair_write_lock(&p->rwlock, ct->mutex);
#if DO_WORKER_THREAD
workitem_init(&p->asyncwork, cachetable_reader, p);
workqueue_enq(&ct->wq, &p->asyncwork, 0);
#else
cachetable_fetch_pair(ct, cf, p);
#endif
}
cachetable_unlock(ct);
return 0;
}
// effect: Move an object from one key to another key. // effect: Move an object from one key to another key.
// requires: The object is pinned in the table // requires: The object is pinned in the table
int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newkey) { int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newkey) {
CACHETABLE t = cachefile->cachetable; CACHETABLE ct = cachefile->cachetable;
PAIR *ptr_to_p,p; PAIR *ptr_to_p,p;
int count = 0; int count = 0;
u_int32_t fullhash = toku_cachetable_hash(cachefile, oldkey); u_int32_t fullhash = toku_cachetable_hash(cachefile, oldkey);
cachetable_lock(t); cachetable_lock(ct);
for (ptr_to_p = &t->table[fullhash&(t->table_size-1)], p = *ptr_to_p; for (ptr_to_p = &ct->table[fullhash&(ct->table_size-1)], p = *ptr_to_p;
p; p;
ptr_to_p = &p->hash_chain, p = *ptr_to_p) { ptr_to_p = &p->hash_chain, p = *ptr_to_p) {
count++; count++;
...@@ -818,15 +894,15 @@ int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newke ...@@ -818,15 +894,15 @@ int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newke
*ptr_to_p = p->hash_chain; *ptr_to_p = p->hash_chain;
p->key = newkey; p->key = newkey;
u_int32_t new_fullhash = toku_cachetable_hash(cachefile, newkey); u_int32_t new_fullhash = toku_cachetable_hash(cachefile, newkey);
u_int32_t nh = new_fullhash&(t->table_size-1); u_int32_t nh = new_fullhash&(ct->table_size-1);
p->fullhash = new_fullhash; p->fullhash = new_fullhash;
p->hash_chain = t->table[nh]; p->hash_chain = ct->table[nh];
t->table[nh] = p; ct->table[nh] = p;
cachetable_unlock(t); cachetable_unlock(ct);
return 0; return 0;
} }
} }
cachetable_unlock(t); cachetable_unlock(ct);
note_hash_count(count); note_hash_count(count);
return -1; return -1;
} }
...@@ -835,15 +911,15 @@ void toku_cachefile_verify (CACHEFILE cf) { ...@@ -835,15 +911,15 @@ void toku_cachefile_verify (CACHEFILE cf) {
toku_cachetable_verify(cf->cachetable); toku_cachetable_verify(cf->cachetable);
} }
void toku_cachetable_verify (CACHETABLE t) { void toku_cachetable_verify (CACHETABLE ct) {
cachetable_lock(t); cachetable_lock(ct);
// First clear all the verify flags by going through the hash chains // First clear all the verify flags by going through the hash chains
{ {
u_int32_t i; u_int32_t i;
for (i=0; i<t->table_size; i++) { for (i=0; i<ct->table_size; i++) {
PAIR p; PAIR p;
for (p=t->table[i]; p; p=p->hash_chain) { for (p=ct->table[i]; p; p=p->hash_chain) {
p->verify_flag=0; p->verify_flag=0;
} }
} }
...@@ -851,12 +927,12 @@ void toku_cachetable_verify (CACHETABLE t) { ...@@ -851,12 +927,12 @@ void toku_cachetable_verify (CACHETABLE t) {
// Now go through the LRU chain, make sure everything in the LRU chain is hashed, and set the verify flag. // Now go through the LRU chain, make sure everything in the LRU chain is hashed, and set the verify flag.
{ {
PAIR p; PAIR p;
for (p=t->head; p; p=p->next) { for (p=ct->head; p; p=p->next) {
assert(p->verify_flag==0); assert(p->verify_flag==0);
PAIR p2; PAIR p2;
u_int32_t fullhash = p->fullhash; u_int32_t fullhash = p->fullhash;
//assert(fullhash==toku_cachetable_hash(p->cachefile, p->key)); //assert(fullhash==toku_cachetable_hash(p->cachefile, p->key));
for (p2=t->table[fullhash&(t->table_size-1)]; p2; p2=p2->hash_chain) { for (p2=ct->table[fullhash&(ct->table_size-1)]; p2; p2=p2->hash_chain) {
if (p2==p) { if (p2==p) {
/* found it */ /* found it */
goto next; goto next;
...@@ -871,31 +947,31 @@ void toku_cachetable_verify (CACHETABLE t) { ...@@ -871,31 +947,31 @@ void toku_cachetable_verify (CACHETABLE t) {
// Now make sure everything in the hash chains has the verify_flag set to 1. // Now make sure everything in the hash chains has the verify_flag set to 1.
{ {
u_int32_t i; u_int32_t i;
for (i=0; i<t->table_size; i++) { for (i=0; i<ct->table_size; i++) {
PAIR p; PAIR p;
for (p=t->table[i]; p; p=p->hash_chain) { for (p=ct->table[i]; p; p=p->hash_chain) {
assert(p->verify_flag); assert(p->verify_flag);
} }
} }
} }
cachetable_unlock(t); cachetable_unlock(ct);
} }
static void assert_cachefile_is_flushed_and_removed (CACHETABLE t, CACHEFILE cf) { static void assert_cachefile_is_flushed_and_removed (CACHETABLE ct, CACHEFILE cf) {
u_int32_t i; u_int32_t i;
// Check it two ways // Check it two ways
// First way: Look through all the hash chains // First way: Look through all the hash chains
for (i=0; i<t->table_size; i++) { for (i=0; i<ct->table_size; i++) {
PAIR p; PAIR p;
for (p=t->table[i]; p; p=p->hash_chain) { for (p=ct->table[i]; p; p=p->hash_chain) {
assert(p->cachefile!=cf); assert(p->cachefile!=cf);
} }
} }
// Second way: Look through the LRU list. // Second way: Look through the LRU list.
{ {
PAIR p; PAIR p;
for (p=t->head; p; p=p->next) { for (p=ct->head; p; p=p->next) {
assert(p->cachefile!=cf); assert(p->cachefile!=cf);
} }
} }
...@@ -906,8 +982,8 @@ static void assert_cachefile_is_flushed_and_removed (CACHETABLE t, CACHEFILE cf) ...@@ -906,8 +982,8 @@ static void assert_cachefile_is_flushed_and_removed (CACHETABLE t, CACHEFILE cf)
static int cachefile_write_maybe_remove(CACHETABLE ct, CACHEFILE cf, BOOL do_remove) { static int cachefile_write_maybe_remove(CACHETABLE ct, CACHEFILE cf, BOOL do_remove) {
unsigned nfound = 0; unsigned nfound = 0;
struct writequeue cq; struct workqueue cq;
writequeue_init(&cq); workqueue_init(&cq);
unsigned i; unsigned i;
for (i=0; i < ct->table_size; i++) { for (i=0; i < ct->table_size; i++) {
PAIR p; PAIR p;
...@@ -915,17 +991,20 @@ static int cachefile_write_maybe_remove(CACHETABLE ct, CACHEFILE cf, BOOL do_rem ...@@ -915,17 +991,20 @@ static int cachefile_write_maybe_remove(CACHETABLE ct, CACHEFILE cf, BOOL do_rem
if (cf == 0 || p->cachefile==cf) { if (cf == 0 || p->cachefile==cf) {
nfound++; nfound++;
p->cq = &cq; p->cq = &cq;
if (!p->writing) if (p->state == CTPAIR_IDLE)
flush_and_remove(ct, p, 1); flush_and_remove(ct, p, 1);
} }
} }
} }
for (i=0; i<nfound; i++) { for (i=0; i<nfound; i++) {
PAIR p = 0; cachetable_unlock(ct);
int r = writequeue_deq(&cq, &ct->mutex, &p); assert(r == 0); WORKITEM wi = 0;
int r = workqueue_deq(&cq, &wi, 1); assert(r == 0);
PAIR p = workitem_arg(wi);
cachetable_lock(ct);
cachetable_complete_write_pair(ct, p, do_remove); cachetable_complete_write_pair(ct, p, do_remove);
} }
writequeue_destroy(&cq); workqueue_destroy(&cq);
if (do_remove) if (do_remove)
assert_cachefile_is_flushed_and_removed(ct, cf); assert_cachefile_is_flushed_and_removed(ct, cf);
...@@ -936,60 +1015,60 @@ static int cachefile_write_maybe_remove(CACHETABLE ct, CACHEFILE cf, BOOL do_rem ...@@ -936,60 +1015,60 @@ static int cachefile_write_maybe_remove(CACHETABLE ct, CACHEFILE cf, BOOL do_rem
} }
/* Require that it all be flushed. */ /* Require that it all be flushed. */
int toku_cachetable_close (CACHETABLE *tp) { int toku_cachetable_close (CACHETABLE *ctp) {
CACHETABLE t=*tp; CACHETABLE ct=*ctp;
int r; int r;
cachetable_lock(t); cachetable_lock(ct);
if ((r=cachefile_write_maybe_remove(t, 0, TRUE))) { if ((r=cachefile_write_maybe_remove(ct, 0, TRUE))) {
cachetable_unlock(t); cachetable_unlock(ct);
return r; return r;
} }
u_int32_t i; u_int32_t i;
for (i=0; i<t->table_size; i++) { for (i=0; i<ct->table_size; i++) {
if (t->table[i]) return -1; if (ct->table[i]) return -1;
} }
assert(t->size_writing == 0); assert(ct->size_writing == 0);
writequeue_set_closed(&t->wq); cachetable_unlock(ct);
cachetable_unlock(t); toku_destroy_workers(&ct->wq, &ct->threadpool);
threadpool_destroy(&t->threadpool); toku_free(ct->table);
writequeue_destroy(&t->wq); toku_free(ct);
r = toku_pthread_mutex_destroy(&t->mutex); assert(r == 0); *ctp = 0;
toku_free(t->table);
toku_free(t);
*tp = 0;
return 0; return 0;
} }
int toku_cachetable_unpin_and_remove (CACHEFILE cachefile, CACHEKEY key) { int toku_cachetable_unpin_and_remove (CACHEFILE cachefile, CACHEKEY key) {
int r = ENOENT; int r = ENOENT;
// Removing something already present is OK. // Removing something already present is OK.
CACHETABLE t = cachefile->cachetable; CACHETABLE ct = cachefile->cachetable;
PAIR p; PAIR p;
int count = 0; int count = 0;
cachetable_lock(t); cachetable_lock(ct);
u_int32_t fullhash = toku_cachetable_hash(cachefile, key); u_int32_t fullhash = toku_cachetable_hash(cachefile, key);
for (p=t->table[fullhash&(t->table_size-1)]; p; p=p->hash_chain) { for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++; count++;
if (p->key.b==key.b && p->cachefile==cachefile) { if (p->key.b==key.b && p->cachefile==cachefile) {
p->dirty = 0; // clear the dirty bit. We're just supposed to remove it. p->dirty = 0; // clear the dirty bit. We're just supposed to remove it.
assert(p->rwlock.pinned==1); assert(p->rwlock.pinned==1);
ctpair_read_unlock(&p->rwlock); ctpair_read_unlock(&p->rwlock);
struct writequeue cq; struct workqueue cq;
writequeue_init(&cq); workqueue_init(&cq);
p->cq = &cq; p->cq = &cq;
if (!p->writing) if (p->state == CTPAIR_IDLE)
flush_and_remove(t, p, 0); flush_and_remove(ct, p, 0);
PAIR pp = 0; cachetable_unlock(ct);
r = writequeue_deq(&cq, &t->mutex, &pp); WORKITEM wi = 0;
r = workqueue_deq(&cq, &wi, 1);
cachetable_lock(ct);
PAIR pp = workitem_arg(wi);
assert(r == 0 && pp == p); assert(r == 0 && pp == p);
cachetable_complete_write_pair(t, p, TRUE); cachetable_complete_write_pair(ct, p, TRUE);
writequeue_destroy(&cq); workqueue_destroy(&cq);
r = 0; r = 0;
goto done; goto done;
} }
} }
done: done:
cachetable_unlock(t); cachetable_unlock(ct);
note_hash_count(count); note_hash_count(count);
return r; return r;
} }
...@@ -1003,20 +1082,20 @@ static void flush_and_keep (PAIR flush_me) { ...@@ -1003,20 +1082,20 @@ static void flush_and_keep (PAIR flush_me) {
} }
} }
static int cachetable_fsync_pairs (CACHETABLE t, PAIR p) { static int cachetable_fsync_pairs (CACHETABLE ct, PAIR p) {
if (p) { if (p) {
int r = cachetable_fsync_pairs(t, p->hash_chain); int r = cachetable_fsync_pairs(ct, p->hash_chain);
if (r!=0) return r; if (r!=0) return r;
flush_and_keep(p); flush_and_keep(p);
} }
return 0; return 0;
} }
int cachetable_fsync (CACHETABLE t) { int cachetable_fsync (CACHETABLE ct) {
int i; int i;
int r; int r;
for (i=0; i<t->table_size; i++) { for (i=0; i<ct->table_size; i++) {
r=cachetable_fsync_pairs(t, t->table[i]); r=cachetable_fsync_pairs(ct, ct->table[i]);
if (r!=0) return r; if (r!=0) return r;
} }
return 0; return 0;
...@@ -1051,8 +1130,8 @@ int toku_cachetable_checkpoint (CACHETABLE ct) { ...@@ -1051,8 +1130,8 @@ int toku_cachetable_checkpoint (CACHETABLE ct) {
//?? This is a skeleton. It compiles, but doesn't do anything reasonable yet. //?? This is a skeleton. It compiles, but doesn't do anything reasonable yet.
//?? log_the_checkpoint(); //?? log_the_checkpoint();
struct writequeue cq; struct workqueue cq;
writequeue_init(&cq); workqueue_init(&cq);
cachetable_lock(ct); cachetable_lock(ct);
...@@ -1069,14 +1148,17 @@ int toku_cachetable_checkpoint (CACHETABLE ct) { ...@@ -1069,14 +1148,17 @@ int toku_cachetable_checkpoint (CACHETABLE ct) {
if (1) { if (1) {
nfound++; nfound++;
p->cq = &cq; p->cq = &cq;
if (!p->writing) if (p->state == CTPAIR_IDLE)
flush_and_remove(ct, p, 1); flush_and_remove(ct, p, 1);
} }
} }
} }
for (i=0; i<nfound; i++) { for (i=0; i<nfound; i++) {
PAIR p = 0; WORKITEM wi = 0;
int r = writequeue_deq(&cq, &ct->mutex, &p); assert(r == 0); cachetable_unlock(ct);
int r = workqueue_deq(&cq, &wi, 1); assert(r == 0);
cachetable_lock(ct);
PAIR p = workitem_arg(wi);
cachetable_complete_write_pair(ct, p, FALSE); cachetable_complete_write_pair(ct, p, FALSE);
} }
...@@ -1094,7 +1176,7 @@ int toku_cachetable_checkpoint (CACHETABLE ct) { ...@@ -1094,7 +1176,7 @@ int toku_cachetable_checkpoint (CACHETABLE ct) {
} }
cachetable_unlock(ct); cachetable_unlock(ct);
writequeue_destroy(&cq); workqueue_destroy(&cq);
return 0; return 0;
} }
...@@ -1107,38 +1189,37 @@ FILENUM toku_cachefile_filenum (CACHEFILE cf) { ...@@ -1107,38 +1189,37 @@ FILENUM toku_cachefile_filenum (CACHEFILE cf) {
return cf->filenum; return cf->filenum;
} }
#if DO_WRITER_THREAD #if DO_WORKER_THREAD
// The writer thread waits for work in the write queue and writes the pair // Worker thread function to write a pair from memory to its cachefile
static void cachetable_writer(WORKITEM wi) {
PAIR p = workitem_arg(wi);
CACHETABLE ct = p->cachefile->cachetable;
cachetable_lock(ct);
cachetable_write_pair(ct, p);
cachetable_unlock(ct);
}
static void *cachetable_writer(void *arg) { // Worker thread function to read a pair from a cachefile to memory
// printf("%lu:%s:start %p\n", toku_pthread_self(), __FUNCTION__, arg); static void cachetable_reader(WORKITEM wi) {
CACHETABLE ct = arg; PAIR p = workitem_arg(wi);
int r; CACHETABLE ct = p->cachefile->cachetable;
cachetable_lock(ct); cachetable_lock(ct);
while (1) { cachetable_fetch_pair(ct, p->cachefile, p);
PAIR p = 0;
r = writequeue_deq(&ct->wq, &ct->mutex, &p);
if (r != 0)
break;
cachetable_write_pair(ct, p);
}
cachetable_unlock(ct); cachetable_unlock(ct);
// printf("%lu:%s:exit %p\n", toku_pthread_self(), __FUNCTION__, arg);
return arg;
} }
#endif #endif
// debug functions // debug functions
int toku_cachetable_assert_all_unpinned (CACHETABLE t) { int toku_cachetable_assert_all_unpinned (CACHETABLE ct) {
u_int32_t i; u_int32_t i;
int some_pinned=0; int some_pinned=0;
cachetable_lock(t); cachetable_lock(ct);
for (i=0; i<t->table_size; i++) { for (i=0; i<ct->table_size; i++) {
PAIR p; PAIR p;
for (p=t->table[i]; p; p=p->hash_chain) { for (p=ct->table[i]; p; p=p->hash_chain) {
assert(ctpair_pinned(&p->rwlock)>=0); assert(ctpair_pinned(&p->rwlock)>=0);
if (ctpair_pinned(&p->rwlock)) { if (ctpair_pinned(&p->rwlock)) {
//printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value); //printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value);
...@@ -1146,18 +1227,18 @@ int toku_cachetable_assert_all_unpinned (CACHETABLE t) { ...@@ -1146,18 +1227,18 @@ int toku_cachetable_assert_all_unpinned (CACHETABLE t) {
} }
} }
} }
cachetable_unlock(t); cachetable_unlock(ct);
return some_pinned; return some_pinned;
} }
int toku_cachefile_count_pinned (CACHEFILE cf, int print_them) { int toku_cachefile_count_pinned (CACHEFILE cf, int print_them) {
u_int32_t i; u_int32_t i;
int n_pinned=0; int n_pinned=0;
CACHETABLE t = cf->cachetable; CACHETABLE ct = cf->cachetable;
cachetable_lock(t); cachetable_lock(ct);
for (i=0; i<t->table_size; i++) { for (i=0; i<ct->table_size; i++) {
PAIR p; PAIR p;
for (p=t->table[i]; p; p=p->hash_chain) { for (p=ct->table[i]; p; p=p->hash_chain) {
assert(ctpair_pinned(&p->rwlock)>=0); assert(ctpair_pinned(&p->rwlock)>=0);
if (ctpair_pinned(&p->rwlock) && (cf==0 || p->cachefile==cf)) { if (ctpair_pinned(&p->rwlock) && (cf==0 || p->cachefile==cf)) {
if (print_them) printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value); if (print_them) printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value);
...@@ -1165,7 +1246,7 @@ int toku_cachefile_count_pinned (CACHEFILE cf, int print_them) { ...@@ -1165,7 +1246,7 @@ int toku_cachefile_count_pinned (CACHEFILE cf, int print_them) {
} }
} }
} }
cachetable_unlock(t); cachetable_unlock(ct);
return n_pinned; return n_pinned;
} }
...@@ -1221,8 +1302,7 @@ int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, vo ...@@ -1221,8 +1302,7 @@ int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, vo
break; break;
} }
} }
cachetable_unlock(ct); cachetable_unlock(ct); note_hash_count(count);
note_hash_count(count);
return r; return r;
} }
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
#include <fcntl.h> #include <fcntl.h>
#include "brttypes.h" #include "brttypes.h"
#include "workqueue.h"
// Maintain a cache mapping from cachekeys to values (void*) // Maintain a cache mapping from cachekeys to values (void*)
// Some of the keys can be pinned. Don't pin too many or for too long. // Some of the keys can be pinned. Don't pin too many or for too long.
...@@ -49,6 +50,10 @@ int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, int fl ...@@ -49,6 +50,10 @@ int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, int fl
// Bind a file to a new cachefile object. // Bind a file to a new cachefile object.
int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/, const char */*fname (used for logging)*/); int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/, const char */*fname (used for logging)*/);
// Get access to the asynchronous work queue
// Returns: a pointer to the work queue
WORKQUEUE toku_cachetable_get_workqueue(CACHETABLE);
// The flush callback is called when a key value pair is being written to storage and possibly removed from the cachetable. // The flush callback is called when a key value pair is being written to storage and possibly removed from the cachetable.
// When write_me is true, the value should be written to storage. // When write_me is true, the value should be written to storage.
// When keep_me is false, the value should be freed. // When keep_me is false, the value should be freed.
...@@ -65,6 +70,7 @@ void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, int (*close_userd ...@@ -65,6 +70,7 @@ void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, int (*close_userd
// Effect: Store some cachefile-specific user data. When the last reference to a cachefile is closed, we call close_userdata(). // Effect: Store some cachefile-specific user data. When the last reference to a cachefile is closed, we call close_userdata().
// When the cachefile needs to be checkpointed, we call checkpoint_userdata(). // When the cachefile needs to be checkpointed, we call checkpoint_userdata().
// If userdata is already non-NULL, then we simply overwrite it. // If userdata is already non-NULL, then we simply overwrite it.
void *toku_cachefile_get_userdata(CACHEFILE); void *toku_cachefile_get_userdata(CACHEFILE);
// Effect: Get the user dataa. // Effect: Get the user dataa.
...@@ -97,8 +103,10 @@ int toku_cachetable_get_and_pin(CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, ...@@ -97,8 +103,10 @@ int toku_cachetable_get_and_pin(CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/,
int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, void**); int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, void**);
// cachetable object state WRT external memory // cachetable object state WRT external memory
#define CACHETABLE_CLEAN 0 enum cachetable_object_state {
#define CACHETABLE_DIRTY 1 CACHETABLE_CLEAN=0, // the cached object is clean WRT the cachefile
CACHETABLE_DIRTY=1, // the cached object is dirty WRT the cachefile
};
// Unpin a memory object // Unpin a memory object
// Effects: If the memory object is in the cachetable, then OR the dirty flag, // Effects: If the memory object is in the cachetable, then OR the dirty flag,
...@@ -110,6 +118,13 @@ int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY); /* Removing somethin ...@@ -110,6 +118,13 @@ int toku_cachetable_unpin_and_remove (CACHEFILE, CACHEKEY); /* Removing somethin
// Effect: Remove an object from the cachetable. Don't write it back. // Effect: Remove an object from the cachetable. Don't write it back.
// Requires: The object must be pinned exactly once. // Requires: The object must be pinned exactly once.
// Prefetch a memory object for a given key into the cachetable
// Returns: 0 if success
int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
void *extraargs);
int toku_cachetable_assert_all_unpinned (CACHETABLE); int toku_cachetable_assert_all_unpinned (CACHETABLE);
int toku_cachefile_count_pinned (CACHEFILE, int /*printthem*/ ); int toku_cachefile_count_pinned (CACHEFILE, int /*printthem*/ );
......
...@@ -34,8 +34,6 @@ REGRESSION_TESTS_RAW = \ ...@@ -34,8 +34,6 @@ REGRESSION_TESTS_RAW = \
brt-test4 \ brt-test4 \
brt-test5 \ brt-test5 \
cachetable-rwlock-test \ cachetable-rwlock-test \
cachetable-writequeue-test \
threadpool-test \
cachetable-test \ cachetable-test \
cachetable-test2 \ cachetable-test2 \
cachetable-put-test \ cachetable-put-test \
...@@ -74,6 +72,8 @@ REGRESSION_TESTS_RAW = \ ...@@ -74,6 +72,8 @@ REGRESSION_TESTS_RAW = \
test-leafentry \ test-leafentry \
test_oexcl \ test_oexcl \
test_toku_malloc_plain_free \ test_toku_malloc_plain_free \
threadpool-test \
workqueue-test \
x1764-test \ x1764-test \
ybt-test \ ybt-test \
# This line intentially kept commented so I can have a \ on the end of the previous line # This line intentially kept commented so I can have a \ on the end of the previous line
......
...@@ -149,6 +149,7 @@ static int fetch (CACHEFILE f, CACHEKEY key, u_int32_t fullhash __attribute__((_ ...@@ -149,6 +149,7 @@ static int fetch (CACHEFILE f, CACHEKEY key, u_int32_t fullhash __attribute__((_
assert (expect_f==f); assert (expect_f==f);
assert((long)extraargs==23); assert((long)extraargs==23);
*value = make_item(key.b); *value = make_item(key.b);
*sizep = test_object_size;
did_fetch=key; did_fetch=key;
written_lsn->lsn = 0; written_lsn->lsn = 0;
return 0; return 0;
......
#include "toku_portability.h"
#include "toku_os.h"
#include "test.h"
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <unistd.h> #include <unistd.h>
...@@ -9,14 +5,20 @@ ...@@ -9,14 +5,20 @@
#include <string.h> #include <string.h>
#include <errno.h> #include <errno.h>
#include <malloc.h> #include <malloc.h>
#include <toku_pthread.h>
#include "toku_portability.h"
#include "toku_os.h"
#include "toku_pthread.h"
#include "threadpool.h" #include "threadpool.h"
int verbose;
struct my_threadpool { struct my_threadpool {
THREADPOOL threadpool; THREADPOOL threadpool;
toku_pthread_mutex_t mutex; toku_pthread_mutex_t mutex;
toku_pthread_cond_t wait; toku_pthread_cond_t wait;
int closed; int closed;
int counter;
}; };
static void static void
...@@ -27,10 +29,11 @@ my_threadpool_init (struct my_threadpool *my_threadpool, int max_threads) { ...@@ -27,10 +29,11 @@ my_threadpool_init (struct my_threadpool *my_threadpool, int max_threads) {
r = toku_pthread_mutex_init(&my_threadpool->mutex, 0); assert(r == 0); r = toku_pthread_mutex_init(&my_threadpool->mutex, 0); assert(r == 0);
r = toku_pthread_cond_init(&my_threadpool->wait, 0); assert(r == 0); r = toku_pthread_cond_init(&my_threadpool->wait, 0); assert(r == 0);
my_threadpool->closed = 0; my_threadpool->closed = 0;
my_threadpool->counter = 0;
} }
static void static void
my_threadpool_destroy (struct my_threadpool *my_threadpool) { my_threadpool_destroy (struct my_threadpool *my_threadpool, int max_threads) {
int r; int r;
r = toku_pthread_mutex_lock(&my_threadpool->mutex); assert(r == 0); r = toku_pthread_mutex_lock(&my_threadpool->mutex); assert(r == 0);
my_threadpool->closed = 1; my_threadpool->closed = 1;
...@@ -39,30 +42,18 @@ my_threadpool_destroy (struct my_threadpool *my_threadpool) { ...@@ -39,30 +42,18 @@ my_threadpool_destroy (struct my_threadpool *my_threadpool) {
if (verbose) printf("current %d\n", threadpool_get_current_threads(my_threadpool->threadpool)); if (verbose) printf("current %d\n", threadpool_get_current_threads(my_threadpool->threadpool));
threadpool_destroy(&my_threadpool->threadpool); assert(my_threadpool->threadpool == 0); threadpool_destroy(&my_threadpool->threadpool); assert(my_threadpool->threadpool == 0);
assert(my_threadpool->counter == max_threads);
r = toku_pthread_mutex_destroy(&my_threadpool->mutex); assert(r == 0); r = toku_pthread_mutex_destroy(&my_threadpool->mutex); assert(r == 0);
r = toku_pthread_cond_destroy(&my_threadpool->wait); assert(r == 0); r = toku_pthread_cond_destroy(&my_threadpool->wait); assert(r == 0);
} }
static void * static void *
fbusy (void *arg) { my_thread_f (void *arg) {
struct my_threadpool *my_threadpool = arg;
int r;
r = toku_pthread_mutex_lock(&my_threadpool->mutex); assert(r == 0);
while (!my_threadpool->closed) {
r = toku_pthread_cond_wait(&my_threadpool->wait, &my_threadpool->mutex); assert(r == 0);
}
r = toku_pthread_mutex_unlock(&my_threadpool->mutex); assert(r == 0);
if (verbose) printf("%lu:%s:exit\n", (unsigned long)toku_os_gettid(), __FUNCTION__);
return arg;
}
static void *
fidle (void *arg) {
struct my_threadpool *my_threadpool = arg; struct my_threadpool *my_threadpool = arg;
int r; int r;
r = toku_pthread_mutex_lock(&my_threadpool->mutex); assert(r == 0); r = toku_pthread_mutex_lock(&my_threadpool->mutex); assert(r == 0);
my_threadpool->counter++;
while (!my_threadpool->closed) { while (!my_threadpool->closed) {
r = toku_pthread_cond_wait(&my_threadpool->wait, &my_threadpool->mutex); assert(r == 0); r = toku_pthread_cond_wait(&my_threadpool->wait, &my_threadpool->mutex); assert(r == 0);
} }
...@@ -92,7 +83,7 @@ usage (void) { ...@@ -92,7 +83,7 @@ usage (void) {
} }
int int
test_main(int argc, const char *argv[]) { main(int argc, const char *argv[]) {
int max_threads = 1; int max_threads = 1;
#if defined(__linux__) #if defined(__linux__)
int do_malloc_fail = 0; int do_malloc_fail = 0;
...@@ -121,29 +112,16 @@ test_main(int argc, const char *argv[]) { ...@@ -121,29 +112,16 @@ test_main(int argc, const char *argv[]) {
struct my_threadpool my_threadpool; struct my_threadpool my_threadpool;
THREADPOOL threadpool; THREADPOOL threadpool;
// test threadpool busy causes no threads to be created
my_threadpool_init(&my_threadpool, max_threads); my_threadpool_init(&my_threadpool, max_threads);
threadpool = my_threadpool.threadpool; threadpool = my_threadpool.threadpool;
if (verbose) printf("test threadpool_set_busy\n"); if (verbose) printf("test threadpool_set_busy\n");
for (i=0; i<2*max_threads; i++) { for (i=0; i<2*max_threads; i++) {
threadpool_maybe_add(threadpool, fbusy, &my_threadpool); assert(threadpool_get_current_threads(threadpool) == (i >= max_threads ? max_threads : i));
assert(threadpool_get_current_threads(threadpool) == 1); threadpool_maybe_add(threadpool, my_thread_f, &my_threadpool);
}
assert(threadpool_get_current_threads(threadpool) == 1);
my_threadpool_destroy(&my_threadpool);
// test threadpool idle causes up to max_threads to be created
my_threadpool_init(&my_threadpool, max_threads);
threadpool = my_threadpool.threadpool;
if (verbose) printf("test threadpool_set_idle\n");
for (i=0; i<2*max_threads; i++) {
threadpool_maybe_add(threadpool, fidle, &my_threadpool);
sleep(1);
assert(threadpool_get_current_threads(threadpool) <= max_threads);
} }
assert(threadpool_get_current_threads(threadpool) == max_threads); assert(threadpool_get_current_threads(threadpool) == max_threads);
my_threadpool_destroy(&my_threadpool); my_threadpool_destroy(&my_threadpool, max_threads);
#if DO_MALLOC_HOOK #if DO_MALLOC_HOOK
if (do_malloc_fail) { if (do_malloc_fail) {
if (verbose) printf("test threadpool_create with malloc failure\n"); if (verbose) printf("test threadpool_create with malloc failure\n");
......
#include "includes.h" #include <stdio.h>
#include <errno.h>
#include <string.h>
#include "toku_portability.h"
#include "toku_assert.h"
#include "toku_pthread.h"
#include "memory.h"
#include "workqueue.h"
#include "threadpool.h"
#include "test.h"
int verbose; int verbose;
typedef struct ctpair *PAIR; static WORKITEM
struct ctpair { new_workitem (void) {
PAIR next_wq; WORKITEM wi = (WORKITEM) toku_malloc(sizeof *wi); assert(wi);
}; return wi;
static PAIR
new_pair (void) {
PAIR p = (PAIR) toku_malloc(sizeof *p); assert(p);
return p;
} }
static void static void
destroy_pair(PAIR p) { destroy_workitem(WORKITEM wi) {
toku_free(p); toku_free(wi);
} }
#include "cachetable-writequeue.h"
// test simple create and destroy // test simple create and destroy
static void static void
test_create_destroy (void) { test_create_destroy (void) {
struct writequeue writequeue, *wq = &writequeue; if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
writequeue_init(wq); struct workqueue workqueue, *wq = &workqueue;
assert(writequeue_empty(wq)); workqueue_init(wq);
writequeue_destroy(wq); assert(workqueue_empty(wq));
workqueue_destroy(wq);
} }
// verify that the wq implements FIFO ordering // verify that the wq implements FIFO ordering
static void static void
test_simple_enq_deq (int n) { test_simple_enq_deq (int n) {
struct writequeue writequeue, *wq = &writequeue; if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
struct workqueue workqueue, *wq = &workqueue;
int r; int r;
toku_pthread_mutex_t mutex;
r = toku_pthread_mutex_init(&mutex, 0); assert(r == 0); workqueue_init(wq);
writequeue_init(wq); assert(workqueue_empty(wq));
assert(writequeue_empty(wq)); WORKITEM work[n];
PAIR pairs[n];
int i; int i;
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
pairs[i] = new_pair(); work[i] = new_workitem();
writequeue_enq(wq, pairs[i]); workqueue_enq(wq, work[i], 1);
assert(!writequeue_empty(wq)); assert(!workqueue_empty(wq));
} }
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
PAIR p = 0; WORKITEM wi = 0;
r = writequeue_deq(wq, &mutex, &p); r = workqueue_deq(wq, &wi, 1);
assert(r == 0 && p == pairs[i]); assert(r == 0 && wi == work[i]);
destroy_pair(p); destroy_workitem(wi);
} }
assert(writequeue_empty(wq)); assert(workqueue_empty(wq));
writequeue_destroy(wq); workqueue_destroy(wq);
r = toku_pthread_mutex_destroy(&mutex); assert(r == 0);
} }
// setting the wq closed should cause deq to return EINVAL // setting the wq closed should cause deq to return EINVAL
static void static void
test_set_closed (void) { test_set_closed (void) {
struct writequeue writequeue, *wq = &writequeue; if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
writequeue_init(wq); struct workqueue workqueue, *wq = &workqueue;
writequeue_set_closed(wq); workqueue_init(wq);
int r = writequeue_deq(wq, 0, 0); WORKITEM wi = 0;
assert(r == EINVAL); workqueue_set_closed(wq, 1);
writequeue_destroy(wq); int r = workqueue_deq(wq, &wi, 1);
assert(r == EINVAL && wi == 0);
workqueue_destroy(wq);
} }
// closing a wq with a blocked reader thread should cause the reader to get EINVAL // closing a wq with a blocked reader thread should cause the reader to get EINVAL
struct writequeue_with_mutex {
struct writequeue writequeue;
toku_pthread_mutex_t mutex;
};
static void
writequeue_with_mutex_init (struct writequeue_with_mutex *wqm) {
writequeue_init(&wqm->writequeue);
int r = toku_pthread_mutex_init(&wqm->mutex, 0); assert(r == 0);
}
static void
writequeue_with_mutex_destroy (struct writequeue_with_mutex *wqm) {
writequeue_destroy(&wqm->writequeue);
int r = toku_pthread_mutex_destroy(&wqm->mutex); assert(r == 0);
}
static void * static void *
test_set_closed_waiter(void *arg) { test_set_closed_waiter(void *arg) {
struct writequeue_with_mutex *wqm = arg; struct workqueue *wq = arg;
int r; int r;
r = toku_pthread_mutex_lock(&wqm->mutex); assert(r == 0); WORKITEM wi = 0;
PAIR p; r = workqueue_deq(wq, &wi, 1);
r = writequeue_deq(&wqm->writequeue, &wqm->mutex, &p); assert(r == EINVAL && wi == 0);
assert(r == EINVAL);
r = toku_pthread_mutex_unlock(&wqm->mutex); assert(r == 0);
return arg; return arg;
} }
static void static void
test_set_closed_thread (void) { test_set_closed_thread (void) {
struct writequeue_with_mutex writequeue_with_mutex, *wqm = &writequeue_with_mutex; if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
struct workqueue workqueue, *wq = &workqueue;
int r; int r;
writequeue_with_mutex_init(wqm); workqueue_init(wq);
toku_pthread_t tid; toku_pthread_t tid;
r = toku_pthread_create(&tid, 0, test_set_closed_waiter, wqm); assert(r == 0); r = toku_pthread_create(&tid, 0, test_set_closed_waiter, wq); assert(r == 0);
sleep(1); sleep(1);
r = toku_pthread_mutex_lock(&wqm->mutex); assert(r == 0); workqueue_set_closed(wq, 1);
writequeue_set_closed(&wqm->writequeue);
r = toku_pthread_mutex_unlock(&wqm->mutex); assert(r == 0);
void *ret; void *ret;
r = toku_pthread_join(tid, &ret); r = toku_pthread_join(tid, &ret);
assert(r == 0 && ret == wqm); assert(r == 0 && ret == wq);
writequeue_with_mutex_destroy(wqm); workqueue_destroy(wq);
} }
// verify writer reader flow control // verify writer reader flow control
...@@ -130,82 +111,92 @@ test_set_closed_thread (void) { ...@@ -130,82 +111,92 @@ test_set_closed_thread (void) {
// writers when the wq size <= 1/2 of the wq limit // writers when the wq size <= 1/2 of the wq limit
struct rwfc { struct rwfc {
toku_pthread_mutex_t mutex; struct workqueue workqueue;
struct writequeue writequeue;
int current, limit; int current, limit;
}; };
static void rwfc_init (struct rwfc *rwfc, int limit) { static void rwfc_init (struct rwfc *rwfc, int limit) {
int r; workqueue_init(&rwfc->workqueue);
r = toku_pthread_mutex_init(&rwfc->mutex, 0); assert(r == 0);
writequeue_init(&rwfc->writequeue);
rwfc->current = 0; rwfc->limit = limit; rwfc->current = 0; rwfc->limit = limit;
} }
static void static void
rwfc_destroy (struct rwfc *rwfc) { rwfc_destroy (struct rwfc *rwfc) {
int r; workqueue_destroy(&rwfc->workqueue);
writequeue_destroy(&rwfc->writequeue); }
r = toku_pthread_mutex_destroy(&rwfc->mutex); assert(r == 0);
static void
rwfc_do_read (WORKITEM wi) {
struct rwfc *rwfc = (struct rwfc *) workitem_arg(wi);
workqueue_lock(&rwfc->workqueue);
if (2*rwfc->current-- > rwfc->limit && 2*rwfc->current <= rwfc->limit) {
workqueue_wakeup_write(&rwfc->workqueue, 0);
}
workqueue_unlock(&rwfc->workqueue);
destroy_workitem(wi);
} }
static void * static void *
rwfc_reader (void *arg) { rwfc_worker (void *arg) {
struct rwfc *rwfc = arg; struct workqueue *wq = arg;
int r;
while (1) { while (1) {
PAIR ctpair; WORKITEM wi = 0;
r = toku_pthread_mutex_lock(&rwfc->mutex); assert(r == 0); int r = workqueue_deq(wq, &wi, 1);
r = writequeue_deq(&rwfc->writequeue, &rwfc->mutex, &ctpair);
if (r == EINVAL) { if (r == EINVAL) {
r = toku_pthread_mutex_unlock(&rwfc->mutex); assert(r == 0); assert(wi == 0);
break; break;
} }
if (2*rwfc->current-- > rwfc->limit && 2*rwfc->current <= rwfc->limit) {
writequeue_wakeup_write(&rwfc->writequeue);
}
r = toku_pthread_mutex_unlock(&rwfc->mutex); assert(r == 0);
destroy_pair(ctpair);
usleep(random() % 100); usleep(random() % 100);
wi->f(wi);
} }
return arg; return arg;
} }
static void static void
test_flow_control (int limit, int n) { test_flow_control (int limit, int n, int maxthreads) {
if (verbose) printf("%s:%d\n", __FUNCTION__, __LINE__);
struct rwfc my_rwfc, *rwfc = &my_rwfc; struct rwfc my_rwfc, *rwfc = &my_rwfc;
int r; THREADPOOL tp;
int i;
rwfc_init(rwfc, limit); rwfc_init(rwfc, limit);
toku_pthread_t tid; threadpool_create(&tp, maxthreads);
r = toku_pthread_create(&tid, 0, rwfc_reader, rwfc); assert(r == 0); for (i=0; i<maxthreads; i++)
threadpool_maybe_add(tp, rwfc_worker, &rwfc->workqueue);
sleep(1); // this is here to block the reader on the first deq sleep(1); // this is here to block the reader on the first deq
int i;
for (i=0; i<n; i++) { for (i=0; i<n; i++) {
PAIR ctpair = new_pair(); WORKITEM wi = new_workitem();
r = toku_pthread_mutex_lock(&rwfc->mutex); assert(r == 0); workitem_init(wi, rwfc_do_read, rwfc);
writequeue_enq(&rwfc->writequeue, ctpair); workqueue_lock(&rwfc->workqueue);
workqueue_enq(&rwfc->workqueue, wi, 0);
rwfc->current++; rwfc->current++;
while (rwfc->current >= rwfc->limit) { while (rwfc->current >= rwfc->limit) {
// printf("%d - %d %d\n", i, rwfc->current, rwfc->limit); // printf("%d - %d %d\n", i, rwfc->current, rwfc->limit);
writequeue_wait_write(&rwfc->writequeue, &rwfc->mutex); workqueue_wait_write(&rwfc->workqueue, 0);
} }
r = toku_pthread_mutex_unlock(&rwfc->mutex); assert(r == 0); workqueue_unlock(&rwfc->workqueue);
// toku_os_usleep(random() % 1); // toku_os_usleep(random() % 1);
} }
writequeue_set_closed(&rwfc->writequeue); workqueue_set_closed(&rwfc->workqueue, 1);
void *ret; threadpool_destroy(&tp);
r = toku_pthread_join(tid, &ret); assert(r == 0);
rwfc_destroy(rwfc); rwfc_destroy(rwfc);
} }
int int
test_main(int argc, const char *argv[]) { main(int argc, const char *argv[]) {
default_parse_args(argc, argv); int i;
for (i=1; i<argc; i++) {
const char *arg = argv[i];
if (strcmp(arg, "-v") == 0)
verbose++;
}
test_create_destroy(); test_create_destroy();
test_simple_enq_deq(0); test_simple_enq_deq(0);
test_simple_enq_deq(42); test_simple_enq_deq(42);
test_set_closed(); test_set_closed();
test_set_closed_thread(); test_set_closed_thread();
test_flow_control(8, 10000); test_flow_control(8, 10000, 1);
test_flow_control(8, 10000, 2);
test_flow_control(8, 10000, 17);
return 0; return 0;
} }
#include "includes.h" /* -*- mode: C; c-basic-offset: 4 -*- */
#include <stdio.h>
#include <errno.h>
#include "toku_portability.h"
#include "toku_pthread.h"
#include "toku_assert.h"
#include "memory.h"
#include "threadpool.h"
struct threadpool { struct threadpool {
int max_threads; int max_threads;
int current_threads; int current_threads;
toku_pthread_t pids[]; toku_pthread_t tids[];
}; };
int threadpool_create(THREADPOOL *threadpoolptr, int max_threads) { int threadpool_create(THREADPOOL *threadpoolptr, int max_threads) {
...@@ -15,7 +23,7 @@ int threadpool_create(THREADPOOL *threadpoolptr, int max_threads) { ...@@ -15,7 +23,7 @@ int threadpool_create(THREADPOOL *threadpoolptr, int max_threads) {
threadpool->current_threads = 0; threadpool->current_threads = 0;
int i; int i;
for (i=0; i<max_threads; i++) for (i=0; i<max_threads; i++)
threadpool->pids[i] = 0; threadpool->tids[i] = 0;
*threadpoolptr = threadpool; *threadpoolptr = threadpool;
return 0; return 0;
} }
...@@ -25,7 +33,7 @@ void threadpool_destroy(THREADPOOL *threadpoolptr) { ...@@ -25,7 +33,7 @@ void threadpool_destroy(THREADPOOL *threadpoolptr) {
int i; int i;
for (i=0; i<threadpool->current_threads; i++) { for (i=0; i<threadpool->current_threads; i++) {
int r; void *ret; int r; void *ret;
r = toku_pthread_join(threadpool->pids[i], &ret); r = toku_pthread_join(threadpool->tids[i], &ret);
assert(r == 0); assert(r == 0);
} }
*threadpoolptr = 0; *threadpoolptr = 0;
...@@ -34,7 +42,7 @@ void threadpool_destroy(THREADPOOL *threadpoolptr) { ...@@ -34,7 +42,7 @@ void threadpool_destroy(THREADPOOL *threadpoolptr) {
void threadpool_maybe_add(THREADPOOL threadpool, void *(*f)(void *), void *arg) { void threadpool_maybe_add(THREADPOOL threadpool, void *(*f)(void *), void *arg) {
if (threadpool->current_threads < threadpool->max_threads) { if (threadpool->current_threads < threadpool->max_threads) {
int r = toku_pthread_create(&threadpool->pids[threadpool->current_threads], 0, f, arg); int r = toku_pthread_create(&threadpool->tids[threadpool->current_threads], 0, f, arg);
if (r == 0) { if (r == 0) {
threadpool->current_threads++; threadpool->current_threads++;
} }
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef THREADPOOL_H #ifndef THREADPOOL_H
#define THREADPOOL_H #define THREADPOOL_H
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#include <stdio.h>
#include <errno.h>
#include "toku_portability.h"
#include "toku_assert.h"
#include "toku_os.h"
#include "toku_pthread.h"
#include "workqueue.h"
#include "threadpool.h"
#include "toku_worker.h"
// Create fixed number of worker threads, all waiting on a single queue
// of work items (WORKQUEUE).
void toku_init_workers(WORKQUEUE wq, THREADPOOL *tpptr) {
workqueue_init(wq);
int nprocs = toku_os_get_number_active_processors();
threadpool_create(tpptr, nprocs);
int i;
for (i=0; i<nprocs; i++)
threadpool_maybe_add(*tpptr, toku_worker, wq);
}
void toku_destroy_workers(WORKQUEUE wq, THREADPOOL *tpptr) {
workqueue_set_closed(wq, 1); // close the work queue and [see "A" in toku_worker()]
threadpool_destroy(tpptr); // wait for all of the worker threads to exit
workqueue_destroy(wq);
}
void *toku_worker(void *arg) {
// printf("%lu:%s:start %p\n", toku_pthread_self(), __FUNCTION__, arg);
WORKQUEUE wq = arg;
int r;
while (1) {
WORKITEM wi = 0;
r = workqueue_deq(wq, &wi, 1); // get work from the queue, block if empty
if (r != 0) // shut down worker threads when work queue is closed
break; // [see "A" in toku_destroy_workers() ]
wi->f(wi); // call the work handler function
}
// printf("%lu:%s:exit %p\n", toku_pthread_self(), __FUNCTION__, arg);
return arg;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef _TOKU_WORKER_H
#define _TOKU_WORKER_H
// initialize the work queue and worker threads
void toku_init_workers(WORKQUEUE wq, THREADPOOL *tpptr);
// destroy the work queue and worker threads
void toku_destroy_workers(WORKQUEUE wq, THREADPOOL *tpptr);
// this is the thread function for the worker threads in the worker thread
// pool. the arg is a pointer to the work queue that feeds work to the
// workers.
void *toku_worker(void *arg);
#endif
/* -*- mode: C; c-basic-offset: 4 -*- */
#ifndef _TOKU_WORKQUEUE_H
#define _TOKU_WORKQUEUE_H
#include <errno.h>
#include "toku_assert.h"
#include "toku_pthread.h"
struct workitem;
// A work function is called by a worker thread when the workitem (see below) is being handled
// by a worker thread.
typedef void (*WORKFUNC)(struct workitem *wi);
// A workitem contains the function that is called by a worker thread in a threadpool.
// A workitem is queued in a workqueue.
typedef struct workitem *WORKITEM;
struct workitem {
WORKFUNC f;
void *arg;
struct workitem *next;
};
// Initialize a workitem with a function and argument
static inline void workitem_init(WORKITEM wi, WORKFUNC f, void *arg) {
wi->f = f;
wi->arg = arg;
wi->next = 0;
}
// Access the workitem function
static inline WORKFUNC workitem_func(WORKITEM wi) {
return wi->f;
}
// Access the workitem argument
static inline void *workitem_arg(WORKITEM wi) {
return wi->arg;
}
// A workqueue is currently a fifo of workitems that feeds a thread pool. We may
// divide the workqueue into per worker thread queues.
typedef struct workqueue *WORKQUEUE;
struct workqueue {
WORKITEM head, tail; // list of workitem's
toku_pthread_mutex_t lock;
toku_pthread_cond_t wait_read; // wait for read
int want_read; // number of threads waiting to read
toku_pthread_cond_t wait_write; // wait for write
int want_write; // number of threads waiting to write
char closed; // kicks waiting threads off of the write queue
};
// Get a pointer to the workqueue lock. This is used by workqueue client software
// that wants to control the workqueue locking.
static inline toku_pthread_mutex_t *workqueue_lock_ref(WORKQUEUE wq) {
return &wq->lock;
}
// Lock the workqueue
static inline void workqueue_lock(WORKQUEUE wq) {
int r = toku_pthread_mutex_lock(&wq->lock); assert(r == 0);
}
// Unlock the workqueue
static inline void workqueue_unlock(WORKQUEUE wq) {
int r = toku_pthread_mutex_unlock(&wq->lock); assert(r == 0);
}
// Initialize a workqueue
// Expects: the workqueue is not initialized
// Effects: the workqueue is set to empty and the condition variable is initialized
__attribute__((unused))
static void workqueue_init(WORKQUEUE wq) {
int r;
r = toku_pthread_mutex_init(&wq->lock, 0); assert(r == 0);
wq->head = wq->tail = 0;
r = toku_pthread_cond_init(&wq->wait_read, 0); assert(r == 0);
wq->want_read = 0;
r = toku_pthread_cond_init(&wq->wait_write, 0); assert(r == 0);
wq->want_write = 0;
wq->closed = 0;
}
// Destroy a work queue
// Expects: the work queue must be initialized and empty
__attribute__((unused))
static void workqueue_destroy(WORKQUEUE wq) {
int r;
workqueue_lock(wq); // shutup helgrind
assert(wq->head == 0 && wq->tail == 0);
workqueue_unlock(wq);
r = toku_pthread_cond_destroy(&wq->wait_read); assert(r == 0);
r = toku_pthread_cond_destroy(&wq->wait_write); assert(r == 0);
r = toku_pthread_mutex_destroy(&wq->lock); assert(r == 0);
}
// Close the work queue
// Effects: signal any threads blocked in the work queue
__attribute__((unused))
static void workqueue_set_closed(WORKQUEUE wq, int dolock) {
int r;
if (dolock) workqueue_lock(wq);
wq->closed = 1;
if (dolock) workqueue_unlock(wq);
r = toku_pthread_cond_broadcast(&wq->wait_read); assert(r == 0);
r = toku_pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
// Determine whether or not the work queue is empty
// Returns: 1 if the work queue is empty, otherwise 0
static inline int workqueue_empty(WORKQUEUE wq) {
return wq->head == 0;
}
// Put a work item at the tail of the work queue
// Effects: append the work item to the end of the work queue and signal
// any work queue readers.
// Dolock controls whether or not the work queue lock should be taken.
__attribute__((unused))
static void workqueue_enq(WORKQUEUE wq, WORKITEM wi, int dolock) {
if (dolock) workqueue_lock(wq);
wi->next = 0;
if (wq->tail)
wq->tail->next = wi;
else
wq->head = wi;
wq->tail = wi;
if (wq->want_read) {
int r = toku_pthread_cond_signal(&wq->wait_read); assert(r == 0);
}
if (dolock) workqueue_unlock(wq);
}
// Get a work item from the head of the work queue
// Effects: wait until the workqueue is not empty, remove the first workitem from the
// queue and return it.
// Dolock controls whether or not the work queue lock should be taken.
// Success: returns 0 and set the wiptr
// Failure: returns non-zero
__attribute__((unused))
static int workqueue_deq(WORKQUEUE wq, WORKITEM *wiptr, int dolock) {
if (dolock) workqueue_lock(wq);
while (workqueue_empty(wq)) {
if (wq->closed) {
if (dolock) workqueue_unlock(wq);
return EINVAL;
}
wq->want_read++;
int r = toku_pthread_cond_wait(&wq->wait_read, &wq->lock); assert(r == 0);
wq->want_read--;
}
WORKITEM wi = wq->head;
wq->head = wi->next;
if (wq->head == 0)
wq->tail = 0;
wi->next = 0;
if (dolock) workqueue_unlock(wq);
*wiptr = wi;
return 0;
}
// Suspend a work queue writer thread
__attribute__((unused))
static void workqueue_wait_write(WORKQUEUE wq, int dolock) {
if (dolock) workqueue_lock(wq);
wq->want_write++;
int r = toku_pthread_cond_wait(&wq->wait_write, &wq->lock); assert(r == 0);
wq->want_write--;
if (dolock) workqueue_unlock(wq);
}
// Wakeup the waiting work queue writer threads
__attribute__((unused))
static void workqueue_wakeup_write(WORKQUEUE wq, int dolock) {
if (wq->want_write) {
if (dolock) workqueue_lock(wq);
if (wq->want_write) {
int r = toku_pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
if (dolock) workqueue_unlock(wq);
}
}
#endif
...@@ -21,7 +21,7 @@ void toku_do_assert(int,const char*/*expr_as_string*/,const char */*fun*/,const ...@@ -21,7 +21,7 @@ void toku_do_assert(int,const char*/*expr_as_string*/,const char */*fun*/,const
#define WHEN_NOT_GCOV(x) #define WHEN_NOT_GCOV(x)
#endif #endif
#undef assert
#ifdef SLOW_ASSERT #ifdef SLOW_ASSERT
#define assert(expr) toku_do_assert((expr) != 0, #expr, __FUNCTION__, __FILE__, __LINE__) #define assert(expr) toku_do_assert((expr) != 0, #expr, __FUNCTION__, __FILE__, __LINE__)
#else #else
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment