Commit 728b0164 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge main line (up to 6206) and resolve the differences.

{{{
svn merge -r5900:6206 https://svn.tokutek.com/tokudb/tokudb
}}}

Addresses #699, #1000, #1075, #1080, #1100, #1131, #1132, #1134, #1153, #1158.


git-svn-id: file:///svn/tokudb.1131b+1080a@6207 c7de825b-a66e-492c-adef-691d508d4ae1
parent 21f18970
......@@ -109,7 +109,7 @@ static long brtnode_memory_size(BRTNODE node) {
+fifo_sum;
#endif
} else {
return sizeof(*node)+toku_omt_memory_size(node->u.l.buffer)+toku_mempool_memory_size(&node->u.l.buffer_mempool);
return sizeof(*node)+toku_omt_memory_size(node->u.l.buffer)+toku_mempool_get_size(&node->u.l.buffer_mempool);
}
}
......@@ -3806,8 +3806,10 @@ int toku_brt_cursor_delete(BRT_CURSOR cursor, int flags, TOKUTXN txn) {
int r = 0;
if (!(flags & DB_DELETE_ANY))
r = brt_cursor_current(cursor, DB_CURRENT, 0, 0, toku_txn_logger(txn));
if (r == 0)
if (r == 0) {
if (cursor->current_in_omt) load_dbts_from_omt(cursor, &cursor->key, &cursor->val);
r = toku_brt_delete_both(cursor->brt, &cursor->key, &cursor->val, txn);
}
return r;
}
......
/* Tell me the diff between two brt files. */
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <fcntl.h>
#include <inttypes.h>
......@@ -119,8 +121,10 @@ void dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
for (i=0; i<n->u.n.n_children-1; i++) {
struct kv_pair *piv = n->u.n.childkeys[i];
printf(" pivot %d:", i);
assert(n->flags == 0 || n->flags == TOKU_DB_DUP+TOKU_DB_DUPSORT);
print_item(kv_pair_key_const(piv), kv_pair_keylen(piv));
assert(n->flags==0); // if not zero, we must print the other part of the pivot.
if (n->flags == TOKU_DB_DUP+TOKU_DB_DUPSORT)
print_item(kv_pair_val_const(piv), kv_pair_vallen(piv));
printf("\n");
}
printf(" children:\n");
......@@ -162,12 +166,33 @@ void dump_node (int f, BLOCKNUM blocknum, struct brt_header *h) {
toku_brtnode_free(&n);
}
void readline(char *line, int maxline) {
int i = 0;
int c;
while ((c = getchar()) != EOF && c != '\n' && i < maxline) {
line[i++] = c;
}
line[i++] = 0;
}
int split_fields(char *line, char *fields[], int maxfields) {
int i;
for (i=0; i<maxfields; i++, line=NULL) {
fields[i] = strtok(line, " ");
if (fields[i] == NULL) break;
}
return i;
}
int main (int argc, const char *argv[]) {
const char *arg0 = argv[0];
static int interactive = 0;
argc--; argv++;
while (argc>1) {
if (strcmp(argv[0], "--nodata")==0) {
dump_data = 0;
} else if (strcmp(argv[0], "--interactive") == 0) {
interactive = 1;
} else {
printf("Usage: %s [--nodata] brtfilename\n", arg0);
exit(1);
......@@ -179,9 +204,35 @@ int main (int argc, const char *argv[]) {
int f = open(n, O_RDONLY); assert(f>=0);
struct brt_header *h;
dump_header(f, &h);
BLOCKNUM blocknum;
for (blocknum.b=1; blocknum.b<h->unused_blocks.b; blocknum.b++) {
dump_node(f, blocknum, h);
if (interactive) {
while (1) {
printf("brtdump>"); fflush(stdout);
const int maxline = 64;
char line[maxline+1];
readline(line, maxline);
if (strcmp(line, "") == 0)
break;
const int maxfields = 2;
char *fields[maxfields];
int nfields = split_fields(line, fields, maxfields);
if (nfields == 0)
continue;
if (strcmp(fields[0], "header") == 0) {
toku_brtheader_free(h);
dump_header(f, &h);
} else if (strcmp(fields[0], "node") == 0 && nfields == 2) {
long long strtoll(char *, char **, int);
BLOCKNUM off = make_blocknum(strtoll(fields[1], NULL, 10));
dump_node(f, off, h);
} else if (strcmp(fields[0], "quit") == 0 || strcmp(fields[0], "q") == 0) {
break;
}
}
} else {
BLOCKNUM blocknum;
for (blocknum.b=1; blocknum.b<h->unused_blocks.b; blocknum.b++) {
dump_node(f, blocknum, h);
}
}
toku_brtheader_free(h);
toku_malloc_cleanup();
......
// When objects are evicted from the cachetable, they are written to storage by a
// thread in a thread pool. The pair's are placed onto a write queue that feeds
// the thread pool.
// thread in a thread pool. The objects are placed onto a write queue that feeds
// the thread pool. The write queue expects that an external mutex is used to
// protect it.
typedef struct writequeue *WRITEQUEUE;
struct writequeue {
......@@ -55,9 +56,10 @@ static int writequeue_empty(WRITEQUEUE wq) {
return wq->head == 0;
}
// put a pair on the tail of the write queue
// put a pair at the tail of the write queue
// expects: the mutex is locked
// effects: append the pair to the end of the write queue and signal
// any waiters
// any readers.
static void writequeue_enq(WRITEQUEUE wq, PAIR pair) {
pair->next_wq = 0;
......@@ -73,6 +75,7 @@ static void writequeue_enq(WRITEQUEUE wq, PAIR pair) {
}
// get a pair from the head of the write queue
// expects: the mutex is locked
// effects: wait until the writequeue is not empty, remove the first pair from the
// write queue and return it
// returns: 0 if success, otherwise an error
......@@ -95,7 +98,8 @@ static int writequeue_deq(WRITEQUEUE wq, pthread_mutex_t *mutex, PAIR *pairptr)
return 0;
}
// wait for write
// suspend the writer thread
// expects: the mutex is locked
static void writequeue_wait_write(WRITEQUEUE wq, pthread_mutex_t *mutex) {
wq->want_write++;
......@@ -103,11 +107,12 @@ static void writequeue_wait_write(WRITEQUEUE wq, pthread_mutex_t *mutex) {
wq->want_write--;
}
// wakeup writers
// wakeup the writer threads
// expects: the mutex is locked
static void writequeue_wakeup_write(WRITEQUEUE wq) {
if (wq->want_write) {
int r = pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
}
......@@ -18,6 +18,7 @@
#include "log_header.h"
#include "threadpool.h"
#include "cachetable-rwlock.h"
#include <malloc.h>
// execute the cachetable callbacks using a writer thread 0->no 1->yes
#define DO_WRITER_THREAD 1
......@@ -25,6 +26,9 @@
static void *cachetable_writer(void *);
#endif
// we use 4 threads since gunzip is 4 times faster than gzip
#define MAX_WRITER_THREADS 4
// use cachetable locks 0->no 1->yes
#define DO_CACHETABLE_LOCK 1
......@@ -87,9 +91,10 @@ struct cachetable {
long size_writing; // the sum of the sizes of the pairs being written
LSN lsn_of_checkpoint; // the most recent checkpoint in the log.
TOKULOGGER logger;
pthread_mutex_t mutex; // course lock that protects the cachetable, the cachefiles, and the pair's
pthread_mutex_t mutex; // coarse lock that protects the cachetable, the cachefiles, and the pair's
struct writequeue wq; // write queue for the writer threads
THREADPOOL threadpool; // pool of writer threads
char checkpointing; // checkpoint in progress
};
// lock the cachetable mutex
......@@ -165,14 +170,14 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn,
t->size_writing = 0;
t->lsn_of_checkpoint = initial_lsn;
t->logger = logger;
t->checkpointing = 0;
int r;
writequeue_init(&t->wq);
r = pthread_mutex_init(&t->mutex, 0); assert(r == 0);
// set the max number of writeback threads to min(4,nprocs_online)
// set the max number of writeback threads to min(MAX_WRITER_THREADS,nprocs_online)
int nprocs = sysconf(_SC_NPROCESSORS_ONLN);
if (nprocs > 4) nprocs = 4;
if (nprocs > MAX_WRITER_THREADS) nprocs = MAX_WRITER_THREADS;
r = threadpool_create(&t->threadpool, nprocs); assert(r == 0);
#if DO_WRITER_THREAD
......@@ -294,7 +299,7 @@ static CACHEFILE remove_cf_from_list (CACHEFILE cf, CACHEFILE list) {
}
}
static int cachetable_flush_cachefile (CACHETABLE, CACHEFILE cf, BOOL do_remove);
static int cachefile_write_maybe_remove (CACHETABLE, CACHEFILE cf, BOOL do_remove);
// Increment the reference count
void toku_cachefile_refup (CACHEFILE cf) {
......@@ -309,7 +314,7 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger) {
cf->refcount--;
if (cf->refcount==0) {
int r;
if ((r = cachetable_flush_cachefile(ct, cf, TRUE))) {
if ((r = cachefile_write_maybe_remove(ct, cf, TRUE))) {
cachetable_unlock(ct);
return r;
}
......@@ -344,48 +349,11 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger) {
int toku_cachefile_flush (CACHEFILE cf) {
CACHETABLE ct = cf->cachetable;
cachetable_lock(ct);
int r = cachetable_flush_cachefile(ct, cf, TRUE);
int r = cachefile_write_maybe_remove(ct, cf, TRUE);
cachetable_unlock(ct);
return r;
}
int toku_cachetable_assert_all_unpinned (CACHETABLE t) {
u_int32_t i;
int some_pinned=0;
cachetable_lock(t);
for (i=0; i<t->table_size; i++) {
PAIR p;
for (p=t->table[i]; p; p=p->hash_chain) {
assert(ctpair_pinned(&p->rwlock)>=0);
if (ctpair_pinned(&p->rwlock)) {
printf("%s:%d pinned: %" PRId64 " (%p)\n", __FILE__, __LINE__, p->key.b, p->value);
some_pinned=1;
}
}
}
cachetable_unlock(t);
return some_pinned;
}
int toku_cachefile_count_pinned (CACHEFILE cf, int print_them) {
u_int32_t i;
int n_pinned=0;
CACHETABLE t = cf->cachetable;
cachetable_lock(t);
for (i=0; i<t->table_size; i++) {
PAIR p;
for (p=t->table[i]; p; p=p->hash_chain) {
assert(ctpair_pinned(&p->rwlock)>=0);
if (ctpair_pinned(&p->rwlock) && (cf==0 || p->cachefile==cf)) {
if (print_them) printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value);
n_pinned++;
}
}
}
cachetable_unlock(t);
return n_pinned;
}
// This hash function comes from Jenkins: http://burtleburtle.net/bob/c/lookup3.c
// The idea here is to mix the bits thoroughly so that we don't have to do modulo by a prime number.
// Instead we can use a bitmask on a table of size power of two.
......@@ -607,8 +575,13 @@ static void flush_and_remove (CACHETABLE ct, PAIR p, int write_me) {
ct->size_writing += p->size; assert(ct->size_writing >= 0);
p->write_me = write_me;
#if DO_WRITER_THREAD
threadpool_maybe_add(ct->threadpool, cachetable_writer, ct);
writequeue_enq(&ct->wq, p);
if (!p->dirty || !p->write_me) {
// evictions without a write can be run in the current thread
cachetable_write_pair(ct, p);
} else {
threadpool_maybe_add(ct->threadpool, cachetable_writer, ct);
writequeue_enq(&ct->wq, p);
}
#else
cachetable_write_pair(ct, p);
#endif
......@@ -794,6 +767,10 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t ful
return r;
}
// Lookup a key in the cachetable. If it is found and it is not being written, then
// acquire a read lock on the pair, update the LRU list, and return sucess. However,
// if it is being written, then allow the writer to evict it. This prevents writers
// being suspended on a block that was just selected for eviction.
int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value) {
CACHETABLE t = cachefile->cachetable;
PAIR p;
......@@ -802,7 +779,6 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3
for (p=t->table[fullhash&(t->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile && !p->writing) {
note_hash_count(count);
*value = p->value;
ctpair_read_lock(&p->rwlock, &t->mutex);
lru_touch(t,p);
......@@ -955,16 +931,17 @@ static void assert_cachefile_is_flushed_and_removed (CACHETABLE t, CACHEFILE cf)
}
}
// write all dirty entries and maybe remove them
// Write all of the pairs associated with a cachefile to storage. Maybe remove
// these pairs from the cachetable after they have been written.
static int cachetable_flush_cachefile (CACHETABLE ct, CACHEFILE cf, BOOL do_remove) {
static int cachefile_write_maybe_remove(CACHETABLE ct, CACHEFILE cf, BOOL do_remove) {
unsigned nfound = 0;
struct writequeue cq;
writequeue_init(&cq);
unsigned i;
for (i=0; i < ct->table_size; i++) {
PAIR p;
for (p = ct->table[i]; p; p=p->hash_chain) {
for (p = ct->table[i]; p; p = p->hash_chain) {
if (cf == 0 || p->cachefile==cf) {
nfound++;
p->cq = &cq;
......@@ -993,7 +970,7 @@ int toku_cachetable_close (CACHETABLE *tp) {
CACHETABLE t=*tp;
int r;
cachetable_lock(t);
if ((r=cachetable_flush_cachefile(t, 0, TRUE))) {
if ((r=cachefile_write_maybe_remove(t, 0, TRUE))) {
cachetable_unlock(t);
return r;
}
......@@ -1083,11 +1060,7 @@ int cachefile_pread (CACHEFILE cf, void *buf, size_t count, off_t offset) {
}
#endif
int toku_cachetable_checkpoint (CACHETABLE ct) {
// Single threaded checkpoint.
// In future: for multithreaded checkpoint we should not proceed if the previous checkpoint has not finished.
// Requires: Everything is unpinned. (In the multithreaded version we have to wait for things to get unpinned and then
// grab them (or else the unpinner has to do something.)
// Algorithm: Write a checkpoint record to the log, noting the LSN of that record.
......@@ -1096,31 +1069,41 @@ int toku_cachetable_checkpoint (CACHETABLE ct) {
// flush the node (giving it a new nodeid, and fixing up the downpointer in the parent)
// Watch out since evicting the node modifies the hash table.
//?? This is a skeleton. It compiles, but doesn't do anything reasonable yet.
//?? log_the_checkpoint();
//?? This is a skeleton. It compiles, but doesn't do anything reasonable yet.
//?? log_the_checkpoint();
unsigned nfound = 0;
struct writequeue cq;
writequeue_init(&cq);
cachetable_lock(ct);
unsigned i;
for (i=0; i < ct->table_size; i++) {
PAIR p;
for (p = ct->table[i]; p; p=p->hash_chain) {
// p->dirty && p->modified_lsn.lsn>ct->lsn_of_checkpoint.lsn
if (1) {
nfound++;
p->cq = &cq;
if (!p->writing)
flush_and_remove(ct, p, 1);
}
}
}
for (i=0; i<nfound; i++) {
PAIR p = 0;
int r = writequeue_deq(&cq, &ct->mutex, &p); assert(r == 0);
cachetable_complete_write_pair(ct, p, FALSE);
// set the checkpoint in progress flag. if already set then just return.
if (!ct->checkpointing) {
ct->checkpointing = 1;
unsigned nfound = 0;
unsigned i;
for (i=0; i < ct->table_size; i++) {
PAIR p;
for (p = ct->table[i]; p; p=p->hash_chain) {
// p->dirty && p->modified_lsn.lsn>ct->lsn_of_checkpoint.lsn
if (1) {
nfound++;
p->cq = &cq;
if (!p->writing)
flush_and_remove(ct, p, 1);
}
}
}
for (i=0; i<nfound; i++) {
PAIR p = 0;
int r = writequeue_deq(&cq, &ct->mutex, &p); assert(r == 0);
cachetable_complete_write_pair(ct, p, FALSE);
}
ct->checkpointing = 0; // clear the checkpoint in progress flag
}
cachetable_unlock(ct);
writequeue_destroy(&cq);
......@@ -1162,21 +1145,58 @@ static void *cachetable_writer(void *arg) {
// debug functions
int toku_cachetable_assert_all_unpinned (CACHETABLE t) {
u_int32_t i;
int some_pinned=0;
cachetable_lock(t);
for (i=0; i<t->table_size; i++) {
PAIR p;
for (p=t->table[i]; p; p=p->hash_chain) {
assert(ctpair_pinned(&p->rwlock)>=0);
if (ctpair_pinned(&p->rwlock)) {
printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value);
some_pinned=1;
}
}
}
cachetable_unlock(t);
return some_pinned;
}
int toku_cachefile_count_pinned (CACHEFILE cf, int print_them) {
u_int32_t i;
int n_pinned=0;
CACHETABLE t = cf->cachetable;
cachetable_lock(t);
for (i=0; i<t->table_size; i++) {
PAIR p;
for (p=t->table[i]; p; p=p->hash_chain) {
assert(ctpair_pinned(&p->rwlock)>=0);
if (ctpair_pinned(&p->rwlock) && (cf==0 || p->cachefile==cf)) {
if (print_them) printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value);
n_pinned++;
}
}
}
cachetable_unlock(t);
return n_pinned;
}
void toku_cachetable_print_state (CACHETABLE ct) {
u_int32_t i;
cachetable_lock(ct);
for (i=0; i<ct->table_size; i++) {
PAIR p = ct->table[i];
if (p != 0) {
printf("t[%d]=", i);
for (p=ct->table[i]; p; p=p->hash_chain) {
printf(" {%"PRId64", %p, dirty=%d, pin=%d, size=%ld}", p->key.b, p->cachefile, p->dirty, p->rwlock.pinned, p->size);
}
printf("\n");
}
}
cachetable_unlock(ct);
}
u_int32_t i;
cachetable_lock(ct);
for (i=0; i<ct->table_size; i++) {
PAIR p = ct->table[i];
if (p != 0) {
printf("t[%d]=", i);
for (p=ct->table[i]; p; p=p->hash_chain) {
printf(" {%"PRId64", %p, dirty=%d, pin=%d, size=%ld}", p->key.b, p->cachefile, p->dirty, p->rwlock.pinned, p->size);
}
printf("\n");
}
}
cachetable_unlock(ct);
}
void toku_cachetable_get_state (CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr) {
cachetable_lock(ct);
......
......@@ -6,78 +6,104 @@
#include <fcntl.h>
#include "brttypes.h"
/* Maintain a cache mapping from cachekeys to values (void*)
* Some of the keys can be pinned. Don't pin too many or for too long.
* If the cachetable is too full, it will call the flush_callback() function with the key, the value, and the otherargs
and then remove the key-value pair from the cache.
* The callback won't be any of the currently pinned keys.
* Also when flushing an object, the cachetable drops all references to it,
* so you may need to free() it.
* Note: The cachetable should use a common pool of memory, flushing things across cachetables.
* (The first implementation doesn't)
* If you pin something twice, you must unpin it twice.
* table_size is the initial size of the cache table hash table (in number of entries)
* size limit is the upper bound of the sum of size of the entries in the cache table (total number of bytes)
*/
// Maintain a cache mapping from cachekeys to values (void*)
// Some of the keys can be pinned. Don't pin too many or for too long.
// If the cachetable is too full, it will call the flush_callback() function with the key, the value, and the otherargs
// and then remove the key-value pair from the cache.
// The callback won't be any of the currently pinned keys.
// Also when flushing an object, the cachetable drops all references to it,
// so you may need to free() it.
// Note: The cachetable should use a common pool of memory, flushing things across cachetables.
// (The first implementation doesn't)
// If you pin something twice, you must unpin it twice.
// table_size is the initial size of the cache table hash table (in number of entries)
// size limit is the upper bound of the sum of size of the entries in the cache table (total number of bytes)
typedef BLOCKNUM CACHEKEY;
// create a new cachetable
// returns: if success, 0 is returned and result points to the new cachetable
int toku_create_cachetable(CACHETABLE */*result*/, long size_limit, LSN initial_lsn, TOKULOGGER);
// Create a new cachetable.
// Effects: a new cachetable is created and initialized.
// The cachetable pointer is stored into result.
// The sum of the sizes of the memory objects is set to size_limit, in whatever
// units make sense to the user of the cachetable.
// Returns: If success, returns 0 and result points to the new cachetable. Otherwise,
// returns an error number.
// What is the cachefile that goes with a particular filenum?
// During a transaction, we cannot reuse a filenum.
int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf);
// Checkpoint the cachetable.
// Effects: ?
int toku_cachetable_checkpoint (CACHETABLE ct);
// Close the cachetable.
// Effects: All of the memory objects are flushed to disk, and the cachetable is
// destroyed.
int toku_cachetable_close (CACHETABLE*); /* Flushes everything to disk, and destroys the cachetable. */
// Open a file and bind the file to a new cachefile object.
int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, int flags, mode_t mode);
// Bind a file to a new cachefile object.
int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/, const char */*fname (used for logging)*/);
// the flush callback (write, free)
// The flush callback is called when a key value pair is being written to storage and possibly removed from the cachetable.
// When write_me is true, the value should be written to storage.
// When keep_me is false, the value should be freed.
// Returns: 0 if success, otherwise an error number.
typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, CACHEKEY key, void *value, void *extraargs, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p);
// the fetch callback
// The fetch callback is called when a thread is attempting to get and pin a memory
// object and it is not in the cachetable.
// Returns: 0 if success, otherwise an error number. The address and size of the object
// associated with the key are returned.
typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs, LSN *written_lsn);
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, int (*close_userdata)(CACHEFILE, void*));
// Effect: Store some cachefile-specific data. When the last reference to a cachefile is closed, we call close_userdata.
// Effect: Store some cachefile-specific user data. When the last reference to a cachefile is closed, we call close_userdata.
// If userdata is already non-NULL, then we simply overwrite it.
void *toku_cachefile_get_userdata(CACHEFILE);
// Effect: Get the user dataa.
// Put a memory object into the cachetable.
// Effects: Lookup the key in the cachetable. If the key is not in the cachetable,
// then insert the pair and pin it. Otherwise return an error. Some of the key
// value pairs may be evicted from the cachetable when the cachetable gets too big.
// Returns: 0 if the memory object is placed into the cachetable, otherwise an
// error number.
int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
void *value, long size,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs);
// Effect: Put a key and value pair into the cachetable
// If the key,cachefile is not in the cachetable, then insert the pair and pin it.
// returns: 0 if success, otherwise an error
// Get and pin a memory object.
// Effects: If the memory object is in the cachetable, acquire a read lock on it.
// Otherwise, fetch it from storage by calling the fetch callback. If the fetch
// succeeded, add the memory object to the cachetable with a read lock on it.
// Returns: 0 if the memory object is in memory, otherwise an error number.
int toku_cachetable_get_and_pin(CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/,
void **/*value*/, long *sizep,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs);
// If the the item is already in memory, then return 0 and store it in the void**.
// If the item is not in memory, then return nonzero.
// Maybe get and pin a memory object.
// Effects: This function is identical to the get_and_pin function except that it
// will not attempt to fetch a memory object that is not in the cachetable.
// Returns: If the the item is already in memory, then return 0 and store it in the
// void**. If the item is not in memory, then return a nonzero error number.
int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, void**);
// cachetable object state wrt external memory
// cachetable object state WRT external memory
#define CACHETABLE_CLEAN 0
#define CACHETABLE_DIRTY 1
// Unpin by key
// effects: lookup a mapping using key,cachefile. if a pair is found, then OR the dirty bit into the pair
// and update the size of the pair. the read lock on the pair is released.
int toku_cachetable_unpin(CACHEFILE, CACHEKEY, u_int32_t fullhash, int dirty, long size); /* Note whether it is dirty when we unpin it. */
// Unpin a memory object
// Effects: If the memory object is in the cachetable, then OR the dirty flag,
// update the size, and release the read lock on the memory object.
// Returns: 0 if success, otherwise returns an error number.
int toku_cachetable_unpin(CACHEFILE, CACHEKEY, u_int32_t fullhash, int dirty, long size);
int toku_cachetable_remove (CACHEFILE, CACHEKEY, int /*write_me*/); /* Removing something already present is OK. */
......@@ -85,53 +111,71 @@ int toku_cachetable_assert_all_unpinned (CACHETABLE);
int toku_cachefile_count_pinned (CACHEFILE, int /*printthem*/ );
/* Rename whatever is at oldkey to be newkey. Requires that the object be pinned. */
// Rename whatever is at oldkey to be newkey. Requires that the object be pinned.
int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newkey);
//int cachetable_fsync_all (CACHETABLE); /* Flush everything to disk, but keep it in cache. */
// Close the cachefile.
// Effects: All of the cached object associated with the cachefile are evicted from
// the cachetable. The flush callback is called for each of these objects. The
// close function does not return until all of the objects are evicted. The cachefile
// object is freed.
// Returns: 0 if success, otherwise returns an error number.
int toku_cachefile_close (CACHEFILE*, TOKULOGGER);
// Flush the cachefile.
// Effect: Flush everything owned by the cachefile from the cachetable. All dirty
// blocks are written. All unpinned blocks are evicted from the cachetable.
// Returns: 0 if success, otherwise returns an error number.
int toku_cachefile_flush (CACHEFILE);
// effect: flush everything owned by the cachefile from the cachetable. all dirty
// blocks are written sto storage. all unpinned blocks are evicts from the cachetable.
// returns: 0 if success
void toku_cachefile_refup (CACHEFILE cfp);
// Increment the reference count. Use close to decrement it.
void toku_cachefile_refup (CACHEFILE cfp);
// Return on success (different from pread and pwrite)
//int cachefile_pwrite (CACHEFILE, const void *buf, size_t count, off_t offset);
//int cachefile_pread (CACHEFILE, void *buf, size_t count, off_t offset);
// Get the file descriptor associated with the cachefile
// Return the file descriptor
int toku_cachefile_fd (CACHEFILE);
// get the file descriptor bound to this cachefile
// returns: the file descriptor
// Set the cachefile's fd and fname.
// Effect: Bind the cachefile to a new fd and fname. The old fd is closed.
// Returns: 0 if success, otherwise an error number
int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname);
// set the cachefile's fd and fname.
// effect: bind the cachefile to a new fd and fname. the old fd is closed.
// returns: 0 if success
// Return the logger associated with the cachefile
TOKULOGGER toku_cachefile_logger (CACHEFILE);
// Return the filenum associated with the cachefile
FILENUM toku_cachefile_filenum (CACHEFILE);
u_int32_t toku_cachetable_hash (CACHEFILE cachefile, CACHEKEY key);
// Effect: Return a 32-bit hash key. The hash key shall be suitable for using with bitmasking for a table of size power-of-two.
u_int32_t toku_cachetable_hash (CACHEFILE cachefile, CACHEKEY key);
u_int32_t toku_cachefile_fullhash_of_header (CACHEFILE cachefile);
// debug functions
// Print the contents of the cachetable. This is mainly used from gdb
void toku_cachetable_print_state (CACHETABLE ct);
// Get the state of the cachetable. This is used to verify the cachetable
void toku_cachetable_get_state(CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr);
// Get the state of a cachetable entry by key. This is used to verify the cachetable
int toku_cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, CACHEFILE cf,
void **value_ptr,
int *dirty_ptr,
long long *pin_ptr,
long *size_ptr);
void toku_cachefile_verify (CACHEFILE cf); // Verify the whole cachetable that the CF is in. Slow.
void toku_cachetable_verify (CACHETABLE t); // Slow...
// Verify the whole cachetable that the cachefile is in. Slow.
void toku_cachefile_verify (CACHEFILE cf);
// Verify the cachetable. Slow.
void toku_cachetable_verify (CACHETABLE t);
#endif
......@@ -22,11 +22,11 @@ void *toku_mempool_get_base(struct mempool *mp) {
return mp->base;
}
int toku_mempool_get_size(struct mempool *mp) {
size_t toku_mempool_get_size(struct mempool *mp) {
return mp->size;
}
int toku_mempool_get_frag_size(struct mempool *mp) {
size_t toku_mempool_get_frag_size(struct mempool *mp) {
return mp->frag_size;
}
......@@ -49,13 +49,10 @@ void *toku_mempool_malloc(struct mempool *mp, size_t size, int alignment) {
}
// if vp is null then we are freeing something, but not specifying what. The data won't be freed until compression is done.
void toku_mempool_mfree(struct mempool *mp, void *vp, int size) {
assert(size >= 0);
void toku_mempool_mfree(struct mempool *mp, void *vp, size_t size) {
if (vp) assert(toku_mempool_inrange(mp, vp, size));
mp->frag_size += size;
assert(mp->frag_size <= mp->size);
}
unsigned long toku_mempool_memory_size(struct mempool *mp) {
return mp->size;
}
......@@ -30,10 +30,10 @@ void toku_mempool_fini(struct mempool *mp);
void *toku_mempool_get_base(struct mempool *mp);
/* get the size of the memory pool */
int toku_mempool_get_size(struct mempool *mp);
size_t toku_mempool_get_size(struct mempool *mp);
/* get the amount of fragmented space in the memory pool */
int toku_mempool_get_frag_size(struct mempool *mp);
size_t toku_mempool_get_frag_size(struct mempool *mp);
/* allocate a chunk of memory from the memory pool suitably aligned */
void *toku_mempool_malloc(struct mempool *mp, size_t size, int alignment);
......@@ -41,14 +41,11 @@ void *toku_mempool_malloc(struct mempool *mp, size_t size, int alignment);
/* free a previously allocated chunk of memory. the free only updates
a count of the amount of free space in the memory pool. the memory
pool does not keep track of the locations of the free chunks */
void toku_mempool_mfree(struct mempool *mp, void *vp, int size);
void toku_mempool_mfree(struct mempool *mp, void *vp, size_t size);
/* verify that a memory range is contained within a mempool */
static inline int toku_mempool_inrange(struct mempool *mp, void *vp, int size) {
return mp->base <= vp && vp + size <= mp->base + mp->size;
static inline int toku_mempool_inrange(struct mempool *mp, void *vp, size_t size) {
return (mp->base <= vp) && (vp + size <= mp->base + mp->size);
}
unsigned long toku_mempool_memory_size(struct mempool *mp);
// Effect: Return the number of bytes that the mempool is using in main memory. Include fragmented space. Don't include the mp itself.
#endif
......@@ -68,12 +68,14 @@ REGRESSION_TESTS = \
cachetable-test \
cachetable-test2 \
cachetable-put-test \
cachetable-getandpin-test \
cachetable-unpin-test \
cachetable-rename-test \
cachetable-fd-test \
cachetable-flush-test \
cachetable-count-pinned-test \
cachetable-debug-test \
cachetable-debug-test \
fifo-test \
list-test \
keyrange \
......
#include <stdio.h>
#include <unistd.h>
#include <assert.h>
#include "test.h"
#include "cachetable.h"
void flush(CACHEFILE cf __attribute__((__unused__)),
CACHEKEY key __attribute__((__unused__)),
void *v __attribute__((__unused__)),
void *extraargs __attribute__((__unused__)),
long size __attribute__((__unused__)),
BOOL write_me __attribute__((__unused__)),
BOOL keep_me __attribute__((__unused__)),
LSN lsn __attribute__((__unused__)),
BOOL rename_p __attribute__((__unused__))
) {
assert((long) key.b == size);
if (!keep_me) free(v);
}
int fetch(CACHEFILE cf, CACHEKEY key, u_int32_t hash, void **vptr, long *sizep, void *extra, LSN *written_lsn) {
cf = cf; hash = hash; extra = extra; written_lsn = written_lsn;
*sizep = (long) key.b;
*vptr = malloc(*sizep);
return 0;
}
int fetch_error(CACHEFILE cf __attribute__((__unused__)),
CACHEKEY key __attribute__((__unused__)),
u_int32_t fullhash __attribute__((__unused__)),
void **value __attribute__((__unused__)),
long *sizep __attribute__((__unused__)),
void*extraargs __attribute__((__unused__)),
LSN *written_lsn __attribute__((__unused__))
) {
return -1;
}
void cachetable_getandpin_test(int n) {
const int test_limit = 1024*1024;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test_getandpin.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, 0777); assert(r == 0);
int i;
// test get_and_pin fails
for (i=1; i<=n; i++) {
u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i));
void *v; long size;
r = toku_cachetable_get_and_pin(f1, make_blocknum(i), hi, &v, &size, flush, fetch_error, 0);
assert(r == -1);
}
// test get_and_pin size
for (i=1; i<=n; i++) {
u_int32_t hi;
hi = toku_cachetable_hash(f1, make_blocknum(i));
void *v; long size;
r = toku_cachetable_get_and_pin(f1, make_blocknum(i), hi, &v, &size, flush, fetch, 0);
assert(r == 0);
assert(size == i);
r = toku_cachetable_unpin(f1, make_blocknum(i), hi, CACHETABLE_CLEAN, i);
assert(r == 0);
}
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int main(int argc, const char *argv[]) {
int i;
for (i=1; i<argc; i++) {
if (strcmp(argv[i], "-v") == 0) {
verbose++;
continue;
}
}
cachetable_getandpin_test(8);
return 0;
}
......@@ -7,12 +7,12 @@
#include "memory.h"
#include "mempool.h"
void test_mempool_limits(int size) {
void test_mempool_limits(size_t size) {
void *base = malloc(size);
struct mempool mempool;
toku_mempool_init(&mempool, base, size);
int i;
size_t i;
for (i=0;; i++) {
void *vp = toku_mempool_malloc(&mempool, 1, 1);
if (vp == 0)
......@@ -24,13 +24,13 @@ void test_mempool_limits(int size) {
free(base);
}
void test_mempool_malloc_mfree(int size) {
void test_mempool_malloc_mfree(size_t size) {
void *base = malloc(size);
struct mempool mempool;
toku_mempool_init(&mempool, base, size);
void *vp[size];
int i;
size_t i;
for (i=0;; i++) {
vp[i] = toku_mempool_malloc(&mempool, 1, 1);
if (vp[i] == 0)
......
CC = g++
CPPFLAGS = -I. -D_GNU_SOURCE
CFLAGS = -Wall -g
LDFLAGS = -lpthread
TARGET = worker-test
SRCS = $(wildcard *.c)
OBJS = $(patsubst %.c,%.o,$(SRCS))
$(TARGET): $(OBJS)
$(CC) $(CPPFLAGS) $(CFLAGS) -o $@ $^ $(LDFLAGS)
clean:
rm -rf $(TARGET) $(OBJS)
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <malloc.h>
#include <pthread.h>
#include <errno.h>
#include "threadpool.h"
// use gcc builtin fetch_and_add 0->no 1->yes
#define DO_ATOMIC_FETCH_AND_ADD 0
struct threadpool {
int max_threads;
int current_threads;
int busy_threads;
pthread_t pids[];
};
int threadpool_create(THREADPOOL *threadpoolptr, int max_threads) {
size_t size = sizeof (struct threadpool) + max_threads*sizeof (pthread_t);
struct threadpool *threadpool = (struct threadpool *) malloc(size);
if (threadpool == 0)
return ENOMEM;
threadpool->max_threads = max_threads;
threadpool->current_threads = 0;
threadpool->busy_threads = 0;
int i;
for (i=0; i<max_threads; i++)
threadpool->pids[i] = 0;
*threadpoolptr = threadpool;
return 0;
}
void threadpool_destroy(THREADPOOL *threadpoolptr) {
struct threadpool *threadpool = *threadpoolptr;
int i;
for (i=0; i<threadpool->current_threads; i++) {
int r; void *ret;
r = pthread_join(threadpool->pids[i], &ret);
assert(r == 0);
}
*threadpoolptr = 0;
free(threadpool);
}
void threadpool_maybe_add(THREADPOOL threadpool, void *(*f)(void *), void *arg) {
if (threadpool->current_threads < threadpool->max_threads) {
int r = pthread_create(&threadpool->pids[threadpool->current_threads], 0, f, arg);
if (r == 0) {
threadpool->current_threads++;
threadpool_set_thread_busy(threadpool);
}
}
}
void threadpool_set_thread_busy(THREADPOOL threadpool) {
#if DO_ATOMIC_FETCH_AND_ADD
(void) __sync_fetch_and_add(&threadpool->busy_threads, 1);
#else
threadpool->busy_threads++;
#endif
}
void threadpool_set_thread_idle(THREADPOOL threadpool) {
#if DO_ATOMIC_FETCH_AND_ADD
(void) __sync_fetch_and_add(&threadpool->busy_threads, -1);
#else
threadpool->busy_threads--;
#endif
}
int threadpool_get_current_threads(THREADPOOL threadpool) {
return threadpool->current_threads;
}
// A threadpool is a limited set of threads that can be used to apply a
// function to work contained in a work queue. The work queue is outside
// of the scope of the threadpool; the threadpool merely provides
// mechanisms to grow the number of threads in the threadpool on demand.
typedef struct threadpool *THREADPOOL;
// Create a new threadpool
// Effects: a new threadpool is allocated and initialized. the number of
// threads in the threadpool is limited to max_threads. initially, there
// are no threads in the pool.
// Returns: if there are no errors, the threadpool is set and zero is returned.
// Otherwise, an error number is returned.
int threadpool_create(THREADPOOL *threadpoolptr, int max_threads);
// Destroy a threadpool
// Effects: the calling thread joins with all of the threads in the threadpool.
// Effects: the threadpool memory is freed.
// Returns: the threadpool is set to null.
void threadpool_destroy(THREADPOOL *threadpoolptr);
// Maybe add a thread to the threadpool.
// Effects: the number of threads in the threadpool is expanded by 1 as long
// as the current number of threads in the threadpool is less than the max
// and there are no idle threads.
// Effects: if the thread is create, it calls the function f with argument arg
// Expects: external serialization on this function; only one thread may
// execute this function
void threadpool_maybe_add(THREADPOOL theadpool, void *(*f)(void *), void *arg);
// Set the current thread busy
// Effects: the threadpool keeps a count of the number of idle threads. It
// uses this count to control the creation of additional threads.
void threadpool_set_thread_busy(THREADPOOL);
// Set the current thread idle
void threadpool_set_thread_idle(THREADPOOL);
// get the current number of threads
int threadpool_get_current_threads(THREADPOOL);
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
int usage() {
printf("measure multi-thread work scheduling overhead\n");
printf("-nthreads N (number of worker threads, default 1)\n");
printf("-nworkitems N (number of work items, default 1)\n");
printf("-usleeptime N (work time, default 100)\n");
printf("-ntests N (number of test iterations, default 1)\n");
printf("-adaptive (use adaptive mutex locks, default no)\n");
return 1;
}
typedef struct workitem *WORKITEM;
struct workitem {
struct workitem *next_wq;
int usleeptime;
};
#include "workqueue.h"
#include "threadpool.h"
int usleeptime = 100;
void do_work(WORKITEM wi __attribute__((unused))) {
#if 0
// sleep for usleeptime microseconds
usleep(usleeptime);
#else
// busy wait for usleeptime loop interations
int n = wi->usleeptime;
volatile int i;
for (i=0; i<n; i++);
#endif
}
// per thread argument that includes the work queues and locks
struct runner_arg {
pthread_mutex_t *lock;
WORKQUEUE wq;
WORKQUEUE cq;
};
void *runner_thread(void *arg) {
int r;
struct runner_arg *runner = (struct runner_arg *)arg;
r = pthread_mutex_lock(runner->lock); assert(r == 0);
while (1) {
WORKITEM wi;
r = workqueue_deq(runner->wq, runner->lock, &wi);
if (r != 0) break;
r = pthread_mutex_unlock(runner->lock); assert(r == 0);
do_work(wi);
r = pthread_mutex_lock(runner->lock); assert(r == 0);
workqueue_enq(runner->cq, wi);
}
r = pthread_mutex_unlock(runner->lock); assert(r == 0);
return arg;
}
static inline void lockit(pthread_mutex_t *lock, int nthreads) {
if (nthreads > 0) {
int r = pthread_mutex_lock(lock); assert(r == 0);
}
}
static inline void unlockit(pthread_mutex_t *lock, int nthreads) {
if (nthreads > 0) {
int r = pthread_mutex_unlock(lock); assert(r == 0);
}
}
int main(int argc, char *argv[]) {
int ntests = 1;
int nworkitems = 1;
int nthreads = 1;
int adaptive = 0;
int r;
int i;
for (i=1; i<argc; i++) {
char *arg = argv[i];
if (strcmp(arg, "-help") == 0) {
return usage();
}
if (strcmp(arg, "-ntests") == 0) {
assert(i+1 < argc);
ntests = atoi(argv[++i]);
}
if (strcmp(arg, "-nworkitems") == 0) {
assert(i+1 < argc);
nworkitems = atoi(argv[++i]);
}
if (strcmp(arg, "-nthreads") == 0) {
assert(i+1 < argc);
nthreads = atoi(argv[++i]);
}
if (strcmp(arg, "-usleeptime") == 0) {
assert(i+1 < argc);
usleeptime = atoi(argv[++i]);
}
if (strcmp(arg, "-adaptive") == 0) {
adaptive++;
}
}
pthread_mutex_t lock;
pthread_mutexattr_t mattr;
r = pthread_mutexattr_init(&mattr); assert(r == 0);
if (adaptive) {
r = pthread_mutexattr_settype(&mattr, PTHREAD_MUTEX_ADAPTIVE_NP); assert(r == 0);
}
r = pthread_mutex_init(&lock, &mattr); assert(r == 0);
struct workqueue wq;
workqueue_init(&wq);
struct workqueue cq;
workqueue_init(&cq);
THREADPOOL tp;
r = threadpool_create(&tp, nthreads); assert(r == 0);
struct runner_arg runner_arg;
runner_arg.lock = &lock;
runner_arg.wq = &wq;
runner_arg.cq = &cq;
for (i=0; i<nthreads; i++)
threadpool_maybe_add(tp, runner_thread, &runner_arg);
int t;
for (t=0; t<ntests; t++) {
struct workitem work[nworkitems];
if (nworkitems == 1) {
// single work items are run in the main thread
work[0].usleeptime = usleeptime;
do_work(&work[0]);
} else {
lockit(&lock, nthreads);
// put all the work on the work queue
int i;
for (i=0; i<nworkitems; i++) {
work[i].usleeptime = usleeptime;
workqueue_enq(&wq, &work[i]);
}
// run some of the work in the main thread
int ndone = 0;
while (!workqueue_empty(&wq)) {
WORKITEM wi;
workqueue_deq(&wq, &lock, &wi);
unlockit(&lock, nthreads);
do_work(wi);
lockit(&lock, nthreads);
ndone++;
}
// make sure all of the work has completed
for (i=ndone; i<nworkitems; i++) {
WORKITEM wi;
r = workqueue_deq(&cq, &lock, &wi);
assert(r == 0);
}
unlockit(&lock, nthreads);
}
}
workqueue_set_closed(&wq);
threadpool_destroy(&tp);
workqueue_destroy(&wq);
workqueue_destroy(&cq);
return 0;
}
#include <cilk-lib.cilkh>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <string.h>
int usage() {
printf("measure multi-thread work scheduling overhead\n");
printf("-nworkitems N (number of work items, default 1)\n");
printf("-usleeptime N (work time, default 100)\n");
printf("-ntests N (number of test iterations, default 1)\n");
return 1;
}
typedef struct workitem *WORKITEM;
struct workitem {
int usleeptime;
};
cilk void do_work(WORKITEM wi) {
#if 0
// sleep for usleeptime microseconds
usleep(wi->usleeptime);
#else
// busy wait for usleeptime loop interations
int n = wi->usleeptime;
volatile int i;
for (i=0; i<n; i++);
#endif
}
cilk int main(int argc, char *argv[]) {
int ntests = 1;
int nworkitems = 1;
int usleeptime = 100;
int i;
int t;
struct workitem *work;
for (i=1; i<argc; i++) {
char *arg = argv[i];
if (strcmp(arg, "-help") == 0) {
return usage();
}
if (strcmp(arg, "-ntests") == 0) {
assert(i+1 < argc);
ntests = atoi(argv[++i]);
}
if (strcmp(arg, "-nworkitems") == 0) {
assert(i+1 < argc);
nworkitems = atoi(argv[++i]);
}
if (strcmp(arg, "-usleeptime") == 0) {
assert(i+1 < argc);
usleeptime = atoi(argv[++i]);
}
}
printf("ntests=%d nworkitems=%d usleeptime=%d\n", ntests, nworkitems, usleeptime);
work = (struct workitem *) calloc(nworkitems, sizeof (struct workitem));
for (t=0; t<ntests; t++) {
for (i=0; i<nworkitems; i++) {
work[i].usleeptime = usleeptime;
spawn do_work(&work[i]);
}
sync;
}
free(work);
return 0;
}
typedef struct workqueue *WORKQUEUE;
struct workqueue {
WORKITEM head, tail; // head and tail of the linked list of work items
pthread_cond_t wait_read; // wait for read
int want_read; // number of threads waiting to read
pthread_cond_t wait_write; // wait for write
int want_write; // number of threads waiting to write
int ninq; // number of work items in the queue
char closed; // kicks waiting threads off of the write queue
};
// initialize a workqueue
// expects: the workqueue is not initialized
// effects: the workqueue is set to empty and the condition variable is initialized
static void workqueue_init(WORKQUEUE wq) {
wq->head = wq->tail = 0;
int r;
r = pthread_cond_init(&wq->wait_read, 0); assert(r == 0);
wq->want_read = 0;
r = pthread_cond_init(&wq->wait_write, 0); assert(r == 0);
wq->want_write = 0;
wq->ninq = 0;
wq->closed = 0;
}
// destroy a workqueue
// expects: the workqueue must be initialized and empty
static void workqueue_destroy(WORKQUEUE wq) {
assert(wq->head == 0 && wq->tail == 0);
int r;
r = pthread_cond_destroy(&wq->wait_read); assert(r == 0);
r = pthread_cond_destroy(&wq->wait_write); assert(r == 0);
}
// close the workqueue
// effects: signal any threads blocked in the workqueue
static void workqueue_set_closed(WORKQUEUE wq) {
wq->closed = 1;
int r;
r = pthread_cond_broadcast(&wq->wait_read); assert(r == 0);
r = pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
// determine whether or not the write queue is empty
// return: 1 if the write queue is empty, otherwise 0
static int workqueue_empty(WORKQUEUE wq) {
return wq->head == 0;
}
// put a work item at the tail of the write queue
// expects: the mutex is locked
// effects: append the workitem to the end of the write queue and signal
// any readers
static void workqueue_enq(WORKQUEUE wq, WORKITEM workitem) {
workitem->next_wq = 0;
if (wq->tail)
wq->tail->next_wq = workitem;
else
wq->head = workitem;
wq->tail = workitem;
wq->ninq++;
if (wq->want_read) {
int r = pthread_cond_signal(&wq->wait_read); assert(r == 0);
}
}
// get a workitem from the head of the write queue
// expects: the mutex is locked
// effects: wait until the workqueue is not empty, remove the first workitem from the
// write queue and return it
// returns: 0 if success, otherwise an error
static int workqueue_deq(WORKQUEUE wq, pthread_mutex_t *mutex, WORKITEM *workitemptr) {
while (workqueue_empty(wq)) {
if (wq->closed)
return EINVAL;
wq->want_read++;
int r = pthread_cond_wait(&wq->wait_read, mutex); assert(r == 0);
wq->want_read--;
}
WORKITEM workitem = wq->head;
wq->head = workitem->next_wq;
if (wq->head == 0)
wq->tail = 0;
wq->ninq--;
workitem->next_wq = 0;
*workitemptr = workitem;
return 0;
}
#if 0
// suspend the writer thread
// expects: the mutex is locked
static void workqueue_wait_write(WORKQUEUE wq, pthread_mutex_t *mutex) {
wq->want_write++;
int r = pthread_cond_wait(&wq->wait_write, mutex); assert(r == 0);
wq->want_write--;
}
// wakeup the writer threads
// expects: the mutex is locked
static void workqueue_wakeup_write(WORKQUEUE wq) {
if (wq->want_write) {
int r = pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment