Commit 149ec9d8 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge the multihreaded writer changes as

{{{
svn merge -r 5899:5987 https://svn.tokutek.com/tokudb/tokudb
}}}
and resolve the conflicts.


git-svn-id: file:///svn/tokudb.1131b@5988 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9475c0c8
......@@ -18,7 +18,7 @@ TARGETS = $(patsubst %.cpp,%,$(SRCS))
# GCOV_FLAGS = -fprofile-arcs -ftest-coverage
CPPFLAGS = -I../ -I../../include
CXXFLAGS = -Wall $(OPTFLAGS) -g $(GCOV_FLAGS)
LDLIBS = ../../lib/libtokudb_cxx.a ../../lib/libtokudb.a -lz
LDLIBS = ../../lib/libtokudb_cxx.a ../../lib/libtokudb.a -lz -lpthread
ifneq ($(OSX),)
VGRIND=
......
......@@ -12,7 +12,7 @@ CXXFLAGS = -Wall -Werror -g $(OPTFLAGS) $(GCOV_FLAGS)
ifdef BDBDIR
BDB_CPPFLAGS = -I$(BDBDIR)/include
BDB_LDFLAGS = -L$(BDBDIR)/lib -ldb_cxx -lpthread -Wl,-rpath,$(BDBDIR)/lib
BDB_LDFLAGS = -L$(BDBDIR)/lib -ldb_cxx -Wl,-rpath,$(BDBDIR)/lib -lpthread
else
BDB_CPPFLAGS =
BDB_LDFLAGS = -ldb_cxx -lpthread
......@@ -47,7 +47,7 @@ clean:
db-benchmark-test-tokudb: ../lib/libtokudb_cxx.a
db-benchmark-test-tokudb: db-benchmark-test.cpp
$(CXX) $(CXXFLAGS) -I../include -L../lib -Wl,-rpath,$(PWD)/../lib $< -o $@ -ltokudb -ltokudb_cxx -lz -DDIRSUF=tokudb
$(CXX) $(CXXFLAGS) -I../include -L../lib -Wl,-rpath,$(PWD)/../lib $< -o $@ -ltokudb -ltokudb_cxx -DDIRSUF=tokudb -lz -lpthread
db-benchmark-test-bdb: db-benchmark-test.cpp
$(CXX) $(CXXFLAGS) $(BDB_CPPFLAGS) $(BDB_LDFLAGS) $< -o $@ -DDIRSUF=bdb
......@@ -23,13 +23,13 @@ CFLAGS = -Wall -Werror -g $(OPTFLAGS) $(GCOV_FLAGS) $(PROF_FLAGS)
LDFLAGS += -lpthread
ifdef BDBDIR
BDB_CPPFLAGS = -I$(BDBDIR)/include
BDB_LDFLAGS = -L$(BDBDIR)/lib -ldb -lpthread -Wl,-rpath,$(BDBDIR)/lib
BDB_LDFLAGS = -L$(BDBDIR)/lib -ldb -Wl,-rpath,$(BDBDIR)/lib -lpthread
else
BDB_CPPFLAGS =
BDB_LDFLAGS = -ldb
endif
TDB_CPPFLAGS = -I../include
TDB_LDFLAGS = -L../lib -ltokudb -lz -Wl,-rpath,$(PWD)/../lib
TDB_LDFLAGS = -L../lib -ltokudb -Wl,-rpath,$(PWD)/../lib -lpthread -lz
TARGET_BDB = db-benchmark-test-bdb
TARGET_TDB = db-benchmark-test-tokudb
......
......@@ -123,7 +123,7 @@ void setup (void) {
r = db->open(db, tid, dbfilename, NULL, DB_BTREE, DB_CREATE, 0644);
if (r!=0) fprintf(stderr, "errno=%d, %s\n", errno, strerror(errno));
assert(r == 0);
if (do_transactions && !singlex) {
if (do_transactions) {
if (singlex) do_prelock(db, tid);
else {
r=tid->commit(tid, 0);
......
......@@ -36,7 +36,7 @@ FORMAT=-Wmissing-format-attribute
endif
CFLAGS = -Wall -Wextra -Wcast-align -Wbad-function-cast -Wmissing-noreturn $(FORMAT) $(OPTFLAGS) -g3 -ggdb3 $(GCOV_FLAGS) $(PROF_FLAGS) -Werror $(FPICFLAGS) $(SHADOW) $(VISIBILITY)
LDFLAGS = $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS) -lz
LDFLAGS = $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS) -lz -lpthread
CPPFLAGS += -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE -D_XOPEN_SOURCE=500
# Add -Wconversion
......@@ -77,6 +77,7 @@ BRT_SOURCES = \
ybt \
x1764 \
trace_mem \
threadpool \
# keep this line so I can ha vea \ on the previous line
OFILES = newbrt.o $(CYG_ADD_LIBZ)
......
......@@ -183,8 +183,8 @@ struct brtenv {
// SPINLOCK checkpointing;
};
extern cachetable_flush_func_t toku_brtnode_flush_callback, toku_brtheader_flush_callback;
extern cachetable_fetch_func_t toku_brtnode_fetch_callback, toku_brtheader_fetch_callback;
extern void toku_brtnode_flush_callback(), toku_brtheader_flush_callback();
extern int toku_brtnode_fetch_callback(), toku_brtheader_fetch_callback();
extern int toku_read_and_pin_brt_header (CACHEFILE cf, struct brt_header **header);
extern int toku_unpin_brt_header (BRT brt);
extern CACHEKEY* toku_calculate_root_offset_pointer (BRT brt, u_int32_t *root_hash);
......
......@@ -3256,18 +3256,6 @@ static inline int brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) {
return r;
}
static inline int brt_cursor_copyout_with_dat(BRT_CURSOR cursor, DBT *key, DBT *val,
BRT pdb, DBT* dat, DBT* dat_source) {
int r = 0;
void** key_staticp = cursor->is_temporary_cursor ? &cursor->brt->skey : &cursor->skey;
void** val_staticp = cursor->is_temporary_cursor ? &cursor->brt->sval : &cursor->sval;
void** dat_staticp = &pdb->sval;
r = toku_dbt_set_three_values(key, (bytevec*)&cursor->key.data, cursor->key.size, key_staticp, FALSE,
val, (bytevec*)&cursor->val.data, cursor->val.size, val_staticp, FALSE,
dat, (bytevec*)&dat_source->data, dat_source->size, dat_staticp, FALSE);
return r;
}
int toku_brt_dbt_set(DBT* key, DBT* key_source) {
int r = toku_dbt_set_value(key, (bytevec*)&key_source->data, key_source->size, NULL, FALSE);
return r;
......
// A read lock is acquired by threads that get and pin an entry in the
// cachetable. A write lock is acquired by the writer thread when an entry
// is evicted from the cachetable and is being written storage.
// Properties:
// 1. multiple readers, no writers
// 2. one writer at a time
// 3. pending writers have priority over pending readers
// An external mutex must be locked when using these functions. An alternate
// design would bury a mutex into the rwlock itself. While this may
// increase parallelism at the expense of single thread performance, we
// are experimenting with a single higher level lock.
typedef struct ctpair_rwlock *CTPAIR_RWLOCK;
struct ctpair_rwlock {
int pinned; // the number of readers
int want_pin; // the number of blocked readers
pthread_cond_t wait_pin;
int writer; // the number of writers
int want_write; // the number of blocked writers
pthread_cond_t wait_write;
};
// initialize a read write lock
static void ctpair_rwlock_init(CTPAIR_RWLOCK rwlock) {
int r;
rwlock->pinned = rwlock->want_pin = 0;
r = pthread_cond_init(&rwlock->wait_pin, 0); assert(r == 0);
rwlock->writer = rwlock->want_write = 0;
r = pthread_cond_init(&rwlock->wait_write, 0); assert(r == 0);
}
// destroy a read write lock
static void ctpair_rwlock_destroy(CTPAIR_RWLOCK rwlock) {
int r;
assert(rwlock->pinned == 0 && rwlock->want_pin == 0);
assert(rwlock->writer == 0 && rwlock->want_write == 0);
r = pthread_cond_destroy(&rwlock->wait_pin); assert(r == 0);
r = pthread_cond_destroy(&rwlock->wait_write); assert(r == 0);
}
// obtain a read lock
// expects: mutex is locked
static inline void ctpair_read_lock(CTPAIR_RWLOCK rwlock, pthread_mutex_t *mutex) {
if (rwlock->writer || rwlock->want_write) {
rwlock->want_pin++;
while (rwlock->writer || rwlock->want_write) {
int r = pthread_cond_wait(&rwlock->wait_pin, mutex); assert(r == 0);
}
rwlock->want_pin--;
}
rwlock->pinned++;
}
// release a read lock
// expects: mutex is locked
static inline void ctpair_read_unlock(CTPAIR_RWLOCK rwlock) {
rwlock->pinned--;
if (rwlock->pinned == 0 && rwlock->want_write) {
int r = pthread_cond_signal(&rwlock->wait_write); assert(r == 0);
}
}
// obtain a write lock
// expects: mutex is locked
static inline void ctpair_write_lock(CTPAIR_RWLOCK rwlock, pthread_mutex_t *mutex) {
if (rwlock->pinned || rwlock->writer) {
rwlock->want_write++;
while (rwlock->pinned || rwlock->writer) {
int r = pthread_cond_wait(&rwlock->wait_write, mutex); assert(r == 0);
}
rwlock->want_write--;
}
rwlock->writer++;
}
// release a write lock
// expects: mutex is locked
static inline void ctpair_write_unlock(CTPAIR_RWLOCK rwlock) {
rwlock->writer--;
if (rwlock->writer == 0) {
if (rwlock->want_write) {
int r = pthread_cond_signal(&rwlock->wait_write); assert(r == 0);
} else if (rwlock->want_pin) {
int r = pthread_cond_broadcast(&rwlock->wait_pin); assert(r == 0);
}
}
}
// returns: the number of readers
static inline int ctpair_pinned(CTPAIR_RWLOCK rwlock) {
return rwlock->pinned;
}
// returns: the number of writers
static inline int ctpair_writers(CTPAIR_RWLOCK rwlock) {
return rwlock->writer;
}
// returns: the sum of the number of readers, pending readers, writers, and
// pending writers
static inline int ctpair_users(CTPAIR_RWLOCK rwlock) {
return rwlock->pinned + rwlock->want_pin + rwlock->writer + rwlock->want_write;
}
// When objects are evicted from the cachetable, they are written to storage by a
// thread in a thread pool. The pair's are placed onto a write queue that feeds
// the thread pool.
typedef struct writequeue *WRITEQUEUE;
struct writequeue {
PAIR head, tail; // head and tail of the linked list of pair's
pthread_cond_t wait_read; // wait for read
int want_read; // number of threads waiting to read
pthread_cond_t wait_write; // wait for write
int want_write; // number of threads waiting to write
int ninq; // number of pairs in the queue
char closed; // kicks waiting threads off of the write queue
};
// initialize a writequeue
// expects: the writequeue is not initialized
// effects: the writequeue is set to empty and the condition variable is initialized
static void writequeue_init(WRITEQUEUE wq) {
wq->head = wq->tail = 0;
int r;
r = pthread_cond_init(&wq->wait_read, 0); assert(r == 0);
wq->want_read = 0;
r = pthread_cond_init(&wq->wait_write, 0); assert(r == 0);
wq->want_write = 0;
wq->ninq = 0;
wq->closed = 0;
}
// destroy a writequeue
// expects: the writequeue must be initialized and empty
static void writequeue_destroy(WRITEQUEUE wq) {
assert(wq->head == 0 && wq->tail == 0);
int r;
r = pthread_cond_destroy(&wq->wait_read); assert(r == 0);
r = pthread_cond_destroy(&wq->wait_write); assert(r == 0);
}
// close the writequeue
// effects: signal any threads blocked in the writequeue
static void writequeue_set_closed(WRITEQUEUE wq) {
wq->closed = 1;
int r;
r = pthread_cond_broadcast(&wq->wait_read); assert(r == 0);
r = pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
// determine whether or not the write queue is empty
// return: 1 if the write queue is empty, otherwise 0
static int writequeue_empty(WRITEQUEUE wq) {
return wq->head == 0;
}
// put a pair on the tail of the write queue
// effects: append the pair to the end of the write queue and signal
// any waiters
static void writequeue_enq(WRITEQUEUE wq, PAIR pair) {
pair->next_wq = 0;
if (wq->tail)
wq->tail->next_wq = pair;
else
wq->head = pair;
wq->tail = pair;
wq->ninq++;
if (wq->want_read) {
int r = pthread_cond_signal(&wq->wait_read); assert(r == 0);
}
}
// get a pair from the head of the write queue
// effects: wait until the writequeue is not empty, remove the first pair from the
// write queue and return it
// returns: 0 if success, otherwise an error
static int writequeue_deq(WRITEQUEUE wq, pthread_mutex_t *mutex, PAIR *pairptr) {
while (writequeue_empty(wq)) {
if (wq->closed)
return EINVAL;
wq->want_read++;
int r = pthread_cond_wait(&wq->wait_read, mutex); assert(r == 0);
wq->want_read--;
}
PAIR pair = wq->head;
wq->head = pair->next_wq;
if (wq->head == 0)
wq->tail = 0;
wq->ninq--;
pair->next_wq = 0;
*pairptr = pair;
return 0;
}
// wait for write
static void writequeue_wait_write(WRITEQUEUE wq, pthread_mutex_t *mutex) {
wq->want_write++;
int r = pthread_cond_wait(&wq->wait_write, mutex); assert(r == 0);
wq->want_write--;
}
// wakeup writers
static void writequeue_wakeup_write(WRITEQUEUE wq) {
if (wq->want_write) {
int r = pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#include <errno.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <pthread.h>
#include "cachetable.h"
#include "hashfun.h"
#include "memory.h"
#include "toku_assert.h"
#include "brt-internal.h"
#include "log_header.h"
#include "threadpool.h"
#include "cachetable-rwlock.h"
#include <malloc.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <sys/stat.h>
#include <unistd.h>
#include <pthread.h>
// execute the cachetable callbacks using a writer thread 0->no 1->yes
#define DO_WRITER_THREAD 0
#if DO_WRITER_THREAD
static void *cachetable_writer(void *);
#endif
// use cachetable locks 0->no 1->yes
#define DO_CACHETABLE_LOCK 0
// unlock the cachetable while executing callbacks 0->no 1->yes
#define DO_CALLBACK_UNLOCK 0
// simulate long latency write operations with usleep. time in milliseconds.
#define DO_CALLBACK_USLEEP 0
#define DO_CALLBACK_BUSYWAIT 0
//#define TRACE_CACHETABLE
#ifdef TRACE_CACHETABLE
#define WHEN_TRACE_CT(x) x
......@@ -28,37 +45,51 @@
typedef struct ctpair *PAIR;
struct ctpair {
enum typ_tag tag;
long long pinned;
long size;
char dirty;
char verify_flag; // Used in verify_cachetable()
char writing; // writing back
char write_me;
CACHEKEY key;
void *value;
long size;
PAIR next,prev; // In LRU list.
PAIR hash_chain;
CACHEFILE cachefile;
CACHETABLE_FLUSH_FUNC_T flush_callback;
CACHETABLE_FETCH_FUNC_T fetch_callback;
CACHETABLE_FLUSH_CALLBACK flush_callback;
CACHETABLE_FETCH_CALLBACK fetch_callback;
void *extraargs;
int verify_flag; /* Used in verify_cachetable() */
LSN modified_lsn; // What was the LSN when modified (undefined if not dirty)
LSN written_lsn; // What was the LSN when written (we need to get this information when we fetch)
u_int32_t fullhash;
PAIR next_wq; // the ctpair's are linked into a write queue when evicted
struct ctpair_rwlock rwlock; // reader writer lock used to grant an exclusive lock to the writeback thread
struct writequeue *cq; // writers sometimes return ctpair's using this queue
};
#include "cachetable-writequeue.h"
static inline void ctpair_destroy(PAIR p) {
ctpair_rwlock_destroy(&p->rwlock);
toku_free(p);
}
// The cachetable is as close to an ENV as we get.
struct cachetable {
enum typ_tag tag;
u_int32_t n_in_table;
u_int32_t table_size;
PAIR *table;
PAIR *table; // hash table
PAIR head,tail; // of LRU list. head is the most recently used. tail is least recently used.
CACHEFILE cachefiles;
long size_current, size_limit;
CACHEFILE cachefiles; // list of cachefiles that use this cachetable
long size_current; // the sum of the sizes of the pairs in the cachetable
long size_limit; // the limit to the sum of the pair sizes
long size_writing; // the sum of the sizes of the pairs being written
LSN lsn_of_checkpoint; // the most recent checkpoint in the log.
TOKULOGGER logger;
#if DO_CACHETABLE_LOCK
pthread_mutex_t mutex;
#endif
pthread_mutex_t mutex; // course lock that protects the cachetable, the cachefiles, and the pair's
struct writequeue wq; // write queue for the writer threads
THREADPOOL threadpool; // pool of writer threads
};
// lock the cachetable mutex
......@@ -77,6 +108,15 @@ static inline void cachetable_unlock(CACHETABLE ct __attribute__((unused))) {
#endif
}
// wait for writes to complete if the size in the write queue is 1/2 of
// the cachetable
static inline void cachetable_wait_write(CACHETABLE ct) {
while (2*ct->size_writing > ct->size_current) {
writequeue_wait_write(&ct->wq, &ct->mutex);
}
}
struct fileid {
dev_t st_dev; /* device and inode are enough to uniquely identify a file in unix. */
ino_t st_ino;
......@@ -107,22 +147,33 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn,
}
}
TAGMALLOC(CACHETABLE, t);
u_int32_t i;
if (t == 0) return ENOMEM;
t->n_in_table = 0;
t->table_size = 4;
MALLOC_N(t->table_size, t->table);
assert(t->table);
t->head = t->tail = 0;
u_int32_t i;
for (i=0; i<t->table_size; i++) {
t->table[i]=0;
}
t->cachefiles = 0;
t->size_current = 0;
t->size_limit = size_limit;
t->size_writing = 0;
t->lsn_of_checkpoint = initial_lsn;
t->logger = logger;
#if DO_CACHTABLE_LOCK
int r = pthread_mutex_init(&t->mutex, 0); assert(r == 0);
int r;
writequeue_init(&t->wq);
r = pthread_mutex_init(&t->mutex, 0); assert(r == 0);
// set the max number of writeback threads to min(4,nprocs_online)
int nprocs = sysconf(_SC_NPROCESSORS_ONLN);
if (nprocs > 4) nprocs = 4;
r = threadpool_create(&t->threadpool, nprocs); assert(r == 0);
#if DO_WRITER_THREAD
threadpool_maybe_add(t->threadpool, cachetable_writer, t);
#endif
*result = t;
return 0;
......@@ -231,7 +282,7 @@ static CACHEFILE remove_cf_from_list (CACHEFILE cf, CACHEFILE list) {
}
}
static int cachefile_flush_and_remove (CACHEFILE cf);
static int cachetable_flush_cachefile (CACHETABLE, CACHEFILE cf, BOOL do_remove);
// Increment the reference count
void toku_cachefile_refup (CACHEFILE cf) {
......@@ -240,15 +291,21 @@ void toku_cachefile_refup (CACHEFILE cf) {
int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger) {
CACHEFILE cf = *cfp;
CACHETABLE ct = cf->cachetable;
cachetable_lock(ct);
assert(cf->refcount>0);
cf->refcount--;
if (cf->refcount==0) {
int r;
if ((r = cachefile_flush_and_remove(cf))) return r;
if ((r = cachetable_flush_cachefile(ct, cf, TRUE))) {
cachetable_unlock(ct);
return r;
}
cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles);
cachetable_unlock(ct);
r = close(cf->fd);
assert(r == 0);
cf->fd = -1;
cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles);
if (logger) {
assert(cf->fname);
BYTESTRING bs = {.len=strlen(cf->fname), .data=cf->fname};
......@@ -260,28 +317,35 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger) {
*cfp=0;
return r;
} else {
cachetable_unlock(ct);
*cfp=0;
return 0;
}
}
int toku_cachefile_flush (CACHEFILE cf) {
return cachefile_flush_and_remove(cf);
CACHETABLE ct = cf->cachetable;
cachetable_lock(ct);
int r = cachetable_flush_cachefile(ct, cf, TRUE);
cachetable_unlock(ct);
return r;
}
int toku_cachetable_assert_all_unpinned (CACHETABLE t) {
u_int32_t i;
int some_pinned=0;
cachetable_lock(t);
for (i=0; i<t->table_size; i++) {
PAIR p;
for (p=t->table[i]; p; p=p->hash_chain) {
assert(p->pinned>=0);
if (p->pinned) {
assert(ctpair_pinned(&p->rwlock)>=0);
if (ctpair_pinned(&p->rwlock)) {
printf("%s:%d pinned: %lld (%p)\n", __FILE__, __LINE__, p->key, p->value);
some_pinned=1;
}
}
}
cachetable_unlock(t);
return some_pinned;
}
......@@ -289,27 +353,21 @@ int toku_cachefile_count_pinned (CACHEFILE cf, int print_them) {
u_int32_t i;
int n_pinned=0;
CACHETABLE t = cf->cachetable;
cachetable_lock(t);
for (i=0; i<t->table_size; i++) {
PAIR p;
for (p=t->table[i]; p; p=p->hash_chain) {
assert(p->pinned>=0);
if (p->pinned && p->cachefile==cf) {
assert(ctpair_pinned(&p->rwlock)>=0);
if (ctpair_pinned(&p->rwlock) && (cf==0 || p->cachefile==cf)) {
if (print_them) printf("%s:%d pinned: %lld (%p)\n", __FILE__, __LINE__, p->key, p->value);
n_pinned++;
}
}
}
cachetable_unlock(t);
return n_pinned;
}
#if 0
unsigned int ct_hash_longlong (unsigned long long l) {
unsigned int r = hash_key((unsigned char*)&l, 8);
printf("%lld --> %d --> %d\n", l, r, r%64);
return r;
}
#endif
// This hash function comes from Jenkins: http://burtleburtle.net/bob/c/lookup3.c
// The idea here is to mix the bits thoroughly so that we don't have to do modulo by a prime number.
// Instead we can use a bitmask on a table of size power of two.
......@@ -419,23 +477,123 @@ static BOOL need_to_rename_p (CACHETABLE t, PAIR p) {
&& p->written_lsn.lsn < t->lsn_of_checkpoint.lsn); // strict
}
static void flush_and_remove (CACHETABLE t, PAIR remove_me, int write_me) {
lru_remove(t, remove_me);
//printf("flush_callback(%lld,%p)\n", remove_me->key, remove_me->value);
WHEN_TRACE_CT(printf("%s:%d CT flush_callback(%lld, %p, dirty=%d, 0)\n", __FILE__, __LINE__, remove_me->key, remove_me->value, remove_me->dirty && write_me));
//printf("%s:%d TAG=%x p=%p\n", __FILE__, __LINE__, remove_me->tag, remove_me);
//printf("%s:%d dirty=%d\n", __FILE__, __LINE__, remove_me->dirty);
remove_me->flush_callback(remove_me->cachefile, remove_me->key, remove_me->value, remove_me->size, remove_me->dirty && write_me, 0,
t->lsn_of_checkpoint, need_to_rename_p(t, remove_me));
assert(t->n_in_table>0);
t->n_in_table--;
// Remove a pair from the cachetable
// Effects: the pair is removed from the LRU list and from the cachetable's hash table.
// The size of the objects in the cachetable is adjusted by the size of the pair being
// removed.
static void cachetable_remove_pair (CACHETABLE ct, PAIR p) {
lru_remove(ct, p);
assert(ct->n_in_table>0);
ct->n_in_table--;
// Remove it from the hash chain.
{
unsigned int h = remove_me->fullhash&(t->table_size-1);
t->table[h] = remove_from_hash_chain (remove_me, t->table[h]);
unsigned int h = p->fullhash&(ct->table_size-1);
ct->table[h] = remove_from_hash_chain (p, ct->table[h]);
}
ct->size_current -= p->size; assert(ct->size_current >= 0);
}
// Maybe remove a pair from the cachetable and free it, depending on whether
// or not there are any threads interested in the pair. The flush callback
// is called with write_me and keep_me both false, and the pair is destroyed.
static void cachetable_maybe_remove_and_free_pair (CACHETABLE ct, PAIR p) {
if (ctpair_users(&p->rwlock) == 0) {
cachetable_remove_pair(ct, p);
#if DO_CALLBACK_UNLOCK
cachetable_unlock(ct);
#endif
p->flush_callback(p->cachefile, p->key, p->value, p->size, FALSE, FALSE,
ct->lsn_of_checkpoint, need_to_rename_p(ct, p));
ctpair_destroy(p);
#if DO_CALLBACK_UNLOCK
cachetable_lock(ct);
#endif
}
}
static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remove);
// Write a pair to storage
// Effects: an exclusive lock on the pair is obtained, the write callback is called,
// the pair dirty state is adjusted, and the write is completed. The write_me boolean
// is true when the pair is dirty and the pair is requested to be written. The keep_me
// boolean is true, so the pair is not yet evicted from the cachetable.
static void cachetable_write_pair(CACHETABLE ct, PAIR p) {
ctpair_write_lock(&p->rwlock, &ct->mutex);
#if DO_CALLBACK_UNLOCK
cachetable_unlock(ct);
#endif
// write callback
p->flush_callback(p->cachefile, p->key, p->value, p->size, p->dirty && p->write_me, TRUE,
ct->lsn_of_checkpoint, need_to_rename_p(ct, p));
#if DO_CALLBACK_USLEEP
usleep(DO_CALLBACK_USLEEP);
#endif
#if DO_CALLBACK_BUSYWAIT
struct timeval tstart;
gettimeofday(&tstart, 0);
long long ltstart = tstart.tv_sec * 1000000 + tstart.tv_usec;
while (1) {
struct timeval t;
gettimeofday(&t, 0);
long long lt = t.tv_sec * 1000000 + t.tv_usec;
if (lt - ltstart > DO_CALLBACK_BUSYWAIT)
break;
}
t->size_current -= remove_me->size;
toku_free(remove_me);
#endif
#if DO_CALLBACK_UNLOCK
cachetable_lock(ct);
#endif
// the pair is no longer dirty once written
if (p->dirty && p->write_me)
p->dirty = FALSE;
// stuff it into a completion queue for delayed completion if a completion queue exists
// otherwise complete the write now
if (p->cq)
writequeue_enq(p->cq, p);
else
cachetable_complete_write_pair(ct, p, TRUE);
}
// complete the write of a pair by reseting the writing flag, adjusting the write
// pending size, and maybe removing the pair from the cachetable if there are no
// references to it
static void cachetable_complete_write_pair (CACHETABLE ct, PAIR p, BOOL do_remove) {
p->cq = 0;
p->writing = 0;
// maybe wakeup any stalled writers when the pending writes fall below
// 1/8 of the size of the cachetable
ct->size_writing -= p->size;
assert(ct->size_writing >= 0);
if (8*ct->size_writing <= ct->size_current)
writequeue_wakeup_write(&ct->wq);
ctpair_write_unlock(&p->rwlock);
if (do_remove)
cachetable_maybe_remove_and_free_pair(ct, p);
}
// flush and remove a pair from the cachetable. the callbacks are run by a thread in
// a thread pool.
static void flush_and_remove (CACHETABLE ct, PAIR p, int write_me) {
p->writing = 1;
ct->size_writing += p->size; assert(ct->size_writing >= 0);
p->write_me = write_me;
#if DO_WRITER_THREAD
threadpool_maybe_add(ct->threadpool, cachetable_writer, ct);
writequeue_enq(&ct->wq, p);
#else
cachetable_write_pair(ct, p);
#endif
}
static unsigned long toku_maxrss=0;
......@@ -458,11 +616,10 @@ static unsigned long check_maxrss (void) {
}
static int maybe_flush_some (CACHETABLE t, long size __attribute__((unused))) {
static int maybe_flush_some (CACHETABLE t, long size) {
int r = 0;
again:
// if (t->n_in_table >= t->table_size) {
if (size + t->size_current > t->size_limit) {
if (size + t->size_current > t->size_limit + t->size_writing) {
{
unsigned long rss __attribute__((__unused__)) = check_maxrss();
//printf("this-size=%.6fMB projected size = %.2fMB limit=%2.fMB rss=%2.fMB\n", size/(1024.0*1024.0), (size+t->size_current)/(1024.0*1024.0), t->size_limit/(1024.0*1024.0), rss/256.0);
......@@ -472,7 +629,7 @@ static int maybe_flush_some (CACHETABLE t, long size __attribute__((unused))) {
/* Try to remove one. */
PAIR remove_me;
for (remove_me = t->tail; remove_me; remove_me = remove_me->prev) {
if (!remove_me->pinned) {
if (!ctpair_users(&remove_me->rwlock) && !remove_me->writing) {
flush_and_remove(t, remove_me, 1);
goto again;
}
......@@ -489,16 +646,17 @@ static int maybe_flush_some (CACHETABLE t, long size __attribute__((unused))) {
}
static int cachetable_insert_at(CACHEFILE cachefile, u_int32_t fullhash, CACHEKEY key, void *value, long size,
cachetable_flush_func_t flush_callback,
cachetable_fetch_func_t fetch_callback,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback,
void *extraargs, int dirty,
LSN written_lsn) {
TAGMALLOC(PAIR, p);
memset(p, 0, sizeof *p);
ctpair_rwlock_init(&p->rwlock);
p->fullhash = fullhash;
p->pinned = 1;
p->dirty = dirty;
p->dirty = dirty; //printf("%s:%d p=%p dirty=%d\n", __FILE__, __LINE__, p, p->dirty);
p->size = size;
//printf("%s:%d p=%p dirty=%d\n", __FILE__, __LINE__, p, p->dirty);
p->writing = 0;
p->key = key;
p->value = value;
p->next = p->prev = 0;
......@@ -510,6 +668,8 @@ static int cachetable_insert_at(CACHEFILE cachefile, u_int32_t fullhash, CACHEKE
p->written_lsn = written_lsn;
p->fullhash = fullhash;
CACHETABLE ct = cachefile->cachetable;
ctpair_read_lock(&p->rwlock, &ct->mutex);
p->cq = 0;
lru_add_to_list(ct, p);
u_int32_t h = fullhash & (ct->table_size-1);
p->hash_chain = ct->table[h];
......@@ -537,56 +697,58 @@ void note_hash_count (int count) {
}
int toku_cachetable_put(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void*value, long size,
cachetable_flush_func_t flush_callback, cachetable_fetch_func_t fetch_callback, void *extraargs) {
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs) {
WHEN_TRACE_CT(printf("%s:%d CT cachetable_put(%lld)=%p\n", __FILE__, __LINE__, key, value));
int count=0;
CACHETABLE ct = cachefile->cachetable;
int count=0;
cachetable_lock(ct);
cachetable_wait_write(ct);
{
PAIR p;
for (p=cachefile->cachetable->table[fullhash&(cachefile->cachetable->table_size-1)]; p; p=p->hash_chain) {
for (p=ct->table[fullhash&(cachefile->cachetable->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key==key && p->cachefile==cachefile) {
note_hash_count(count);
// Semantically, these two asserts are not strictly right. After all, when are two functions eq?
// In practice, the functions better be the same.
assert(p->flush_callback==flush_callback);
assert(p->fetch_callback==fetch_callback);
p->pinned++; /* Already present. But increment the pin count. */
ctpair_read_lock(&p->rwlock, &ct->mutex);
cachetable_unlock(ct);
note_hash_count(count);
return -1; /* Already present. */
}
}
}
int r;
note_hash_count(count);
if ((r=maybe_flush_some(cachefile->cachetable, size))) {
if ((r=maybe_flush_some(ct, size))) {
cachetable_unlock(ct);
return r;
}
// flushing could change the table size, but wont' change the fullhash
r = cachetable_insert_at(cachefile, fullhash, key, value, size, flush_callback, fetch_callback, extraargs, 1, ZERO_LSN);
cachetable_unlock(ct);
note_hash_count(count);
return r;
}
int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value, long *sizep,
cachetable_flush_func_t flush_callback, cachetable_fetch_func_t fetch_callback, void *extraargs) {
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs) {
CACHETABLE t = cachefile->cachetable;
cachetable_lock(t);
int tsize __attribute__((__unused__)) = t->table_size;
PAIR p;
int count=0;
cachetable_lock(t);
cachetable_wait_write(t);
for (p=t->table[fullhash&(t->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key==key && p->cachefile==cachefile) {
note_hash_count(count);
*value = p->value;
if (sizep) *sizep = p->size;
p->pinned++;
ctpair_read_lock(&p->rwlock, &t->mutex);
lru_touch(t,p);
cachetable_unlock(t);
note_hash_count(count);
WHEN_TRACE_CT(printf("%s:%d cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value));
return 0;
}
......@@ -607,15 +769,11 @@ int toku_cachetable_get_and_pin(CACHEFILE cachefile, CACHEKEY key, u_int32_t ful
*value = toku_value;
if (sizep)
*sizep = size;
// maybe_flush_some(t, size);
}
if ((r=maybe_flush_some(t, 0))) {
cachetable_unlock(t);
return r;
}
r = maybe_flush_some(t, 0);
cachetable_unlock(t);
WHEN_TRACE_CT(printf("%s:%d did fetch: cachtable_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value));
return 0;
return r;
}
int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, void**value) {
......@@ -625,12 +783,12 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3
cachetable_lock(t);
for (p=t->table[fullhash&(t->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key==key && p->cachefile==cachefile) {
note_hash_count(count);
if (p->key==key && p->cachefile==cachefile && !p->writing) {
*value = p->value;
p->pinned++;
ctpair_read_lock(&p->rwlock, &t->mutex);
lru_touch(t,p);
cachetable_unlock(t);
note_hash_count(count);
//printf("%s:%d cachetable_maybe_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value);
return 0;
}
......@@ -652,23 +810,24 @@ int toku_cachetable_unpin(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
for (p=t->table[fullhash&(t->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key==key && p->cachefile==cachefile) {
note_hash_count(count);
assert(p->pinned>0);
p->pinned--;
assert(p->rwlock.pinned>0);
ctpair_read_unlock(&p->rwlock);
p->dirty |= dirty;
if (size != 0) {
t->size_current -= p->size;
t->size_current -= p->size; if (p->writing) t->size_writing -= p->size;
p->size = size;
t->size_current += p->size;
t->size_current += p->size; if (p->writing) t->size_writing += p->size;
}
WHEN_TRACE_CT(printf("[count=%lld]\n", p->pinned));
{
int r;
if ((r=maybe_flush_some(t, 0))) {
cachetable_unlock(t); return r;
cachetable_unlock(t);
return r;
}
}
cachetable_unlock(t);
note_hash_count(count);
return 0;
}
}
......@@ -684,6 +843,7 @@ int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newke
PAIR *ptr_to_p,p;
int count = 0;
u_int32_t fullhash = toku_cachetable_hash(cachefile, oldkey);
cachetable_lock(t);
for (ptr_to_p = &t->table[fullhash&(t->table_size-1)], p = *ptr_to_p;
p;
ptr_to_p = &p->hash_chain, p = *ptr_to_p) {
......@@ -697,28 +857,22 @@ int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newke
p->fullhash = new_fullhash;
p->hash_chain = t->table[nh];
t->table[nh] = p;
cachetable_unlock(t);
return 0;
}
}
cachetable_unlock(t);
note_hash_count(count);
return -1;
}
static int cachetable_flush (CACHETABLE t) {
u_int32_t i;
for (i=0; i<t->table_size; i++) {
PAIR p;
while ((p = t->table[i]))
flush_and_remove(t, p, 1); // Must be careful, since flush_and_remove kills the linked list.
}
return 0;
}
void toku_cachefile_verify (CACHEFILE cf) {
toku_cachetable_verify(cf->cachetable);
}
void toku_cachetable_verify (CACHETABLE t) {
cachetable_lock(t);
// First clear all the verify flags by going through the hash chains
{
u_int32_t i;
......@@ -759,10 +913,11 @@ void toku_cachetable_verify (CACHETABLE t) {
}
}
}
cachetable_unlock(t);
}
static void assert_cachefile_is_flushed_and_removed (CACHEFILE cf) {
CACHETABLE t = cf->cachetable;
static void assert_cachefile_is_flushed_and_removed (CACHETABLE t, CACHEFILE cf) {
u_int32_t i;
// Check it two ways
// First way: Look through all the hash chains
......@@ -781,27 +936,35 @@ static void assert_cachefile_is_flushed_and_removed (CACHEFILE cf) {
}
}
// write all dirty entries and maybe remove them
static int cachefile_flush_and_remove (CACHEFILE cf) {
u_int32_t i;
CACHETABLE t = cf->cachetable;
for (i=0; i<t->table_size; i++) {
static int cachetable_flush_cachefile (CACHETABLE ct, CACHEFILE cf, BOOL do_remove) {
unsigned nfound = 0;
struct writequeue cq;
writequeue_init(&cq);
unsigned i;
for (i=0; i < ct->table_size; i++) {
PAIR p;
again:
p = t->table[i];
while (p) {
if (p->cachefile==cf) {
flush_and_remove(t, p, 1); // Must be careful, since flush_and_remove kills the linked list.
goto again;
} else {
p=p->hash_chain;
for (p = ct->table[i]; p; p=p->hash_chain) {
if (cf == 0 || p->cachefile==cf) {
nfound++;
p->cq = &cq;
if (!p->writing)
flush_and_remove(ct, p, 1);
}
}
}
assert_cachefile_is_flushed_and_removed(cf);
for (i=0; i<nfound; i++) {
PAIR p = 0;
int r = writequeue_deq(&cq, &ct->mutex, &p); assert(r == 0);
cachetable_complete_write_pair(ct, p, do_remove);
}
writequeue_destroy(&cq);
if (do_remove)
assert_cachefile_is_flushed_and_removed(ct, cf);
if ((4 * t->n_in_table < t->table_size) && (t->table_size>4))
cachetable_rehash(t, t->table_size/2);
if ((4 * ct->n_in_table < ct->table_size) && (ct->table_size>4))
cachetable_rehash(ct, ct->table_size/2);
return 0;
}
......@@ -809,27 +972,37 @@ static int cachefile_flush_and_remove (CACHEFILE cf) {
/* Require that it all be flushed. */
int toku_cachetable_close (CACHETABLE *tp) {
CACHETABLE t=*tp;
u_int32_t i;
int r;
if ((r=cachetable_flush(t))) return r;
cachetable_lock(t);
if ((r=cachetable_flush_cachefile(t, 0, TRUE))) {
cachetable_unlock(t);
return r;
}
u_int32_t i;
for (i=0; i<t->table_size; i++) {
if (t->table[i]) return -1;
}
#if DO_CACHETABLE_LOCK
assert(t->size_writing == 0);
writequeue_set_closed(&t->wq);
cachetable_unlock(t);
threadpool_destroy(&t->threadpool);
writequeue_destroy(&t->wq);
r = pthread_mutex_destroy(&t->mutex); assert(r == 0);
#endif
toku_free(t->table);
toku_free(t);
*tp = 0;
return 0;
}
#if 0
// this is broken. needs to wait for writebacks to complete
int toku_cachetable_remove (CACHEFILE cachefile, CACHEKEY key, int write_me) {
/* Removing something already present is OK. */
CACHETABLE t = cachefile->cachetable;
PAIR p;
int count = 0;
u_int32_t fullhash = toku_cachetable_hash(cachefile, key);
cachetable_lock(t);
for (p=t->table[fullhash&(t->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key==key && p->cachefile==cachefile) {
......@@ -840,9 +1013,11 @@ int toku_cachetable_remove (CACHEFILE cachefile, CACHEKEY key, int write_me) {
}
}
done:
cachetable_unlock(t);
note_hash_count(count);
return 0;
}
#endif
#if 0
static void flush_and_keep (PAIR flush_me) {
......@@ -890,23 +1065,107 @@ int cachefile_pread (CACHEFILE cf, void *buf, size_t count, off_t offset) {
#endif
/* debug functions */
int toku_cachetable_checkpoint (CACHETABLE ct) {
// Single threaded checkpoint.
// In future: for multithreaded checkpoint we should not proceed if the previous checkpoint has not finished.
// Requires: Everything is unpinned. (In the multithreaded version we have to wait for things to get unpinned and then
// grab them (or else the unpinner has to do something.)
// Algorithm: Write a checkpoint record to the log, noting the LSN of that record.
// Note the LSN of the previous checkpoint (stored in lsn_of_checkpoint)
// For every (unpinnned) dirty node in which the LSN is newer than the prev checkpoint LSN:
// flush the node (giving it a new nodeid, and fixing up the downpointer in the parent)
// Watch out since evicting the node modifies the hash table.
//?? This is a skeleton. It compiles, but doesn't do anything reasonable yet.
//?? log_the_checkpoint();
unsigned nfound = 0;
struct writequeue cq;
writequeue_init(&cq);
cachetable_lock(ct);
unsigned i;
for (i=0; i < ct->table_size; i++) {
PAIR p;
for (p = ct->table[i]; p; p=p->hash_chain) {
// p->dirty && p->modified_lsn.lsn>ct->lsn_of_checkpoint.lsn
if (1) {
nfound++;
p->cq = &cq;
if (!p->writing)
flush_and_remove(ct, p, 1);
}
}
}
for (i=0; i<nfound; i++) {
PAIR p = 0;
int r = writequeue_deq(&cq, &ct->mutex, &p); assert(r == 0);
cachetable_complete_write_pair(ct, p, FALSE);
}
cachetable_unlock(ct);
writequeue_destroy(&cq);
return 0;
}
TOKULOGGER toku_cachefile_logger (CACHEFILE cf) {
return cf->cachetable->logger;
}
FILENUM toku_cachefile_filenum (CACHEFILE cf) {
return cf->filenum;
}
u_int32_t toku_cachefile_fullhash_of_header (CACHEFILE cachefile) {
return cachefile->header_fullhash;
}
#if DO_WRITER_THREAD
// The writer thread waits for work in the write queue and writes the pair
static void *cachetable_writer(void *arg) {
// printf("%lu:%s:start %p\n", pthread_self(), __FUNCTION__, arg);
CACHETABLE ct = arg;
int r;
cachetable_lock(ct);
while (1) {
threadpool_set_thread_idle(ct->threadpool);
PAIR p = 0;
r = writequeue_deq(&ct->wq, &ct->mutex, &p);
if (r != 0)
break;
threadpool_set_thread_busy(ct->threadpool);
cachetable_write_pair(ct, p);
}
cachetable_unlock(ct);
// printf("%lu:%s:exit %p\n", pthread_self(), __FUNCTION__, arg);
return arg;
}
#endif
// debug functions
void toku_cachetable_print_state (CACHETABLE ct) {
u_int32_t i;
cachetable_lock(ct);
for (i=0; i<ct->table_size; i++) {
PAIR p = ct->table[i];
if (p != 0) {
printf("t[%d]=", i);
for (p=ct->table[i]; p; p=p->hash_chain) {
printf(" {%lld, %p, dirty=%d, pin=%lld, size=%ld}", p->key, p->cachefile, p->dirty, p->pinned, p->size);
printf(" {%lld, %p, dirty=%d, pin=%d, size=%ld}", p->key, p->cachefile, p->dirty, p->rwlock.pinned, p->size);
}
printf("\n");
}
}
cachetable_unlock(ct);
}
void toku_cachetable_get_state (CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr) {
cachetable_lock(ct);
if (num_entries_ptr)
*num_entries_ptr = ct->n_in_table;
if (hash_size_ptr)
......@@ -915,84 +1174,32 @@ void toku_cachetable_get_state (CACHETABLE ct, int *num_entries_ptr, int *hash_s
*size_current_ptr = ct->size_current;
if (size_limit_ptr)
*size_limit_ptr = ct->size_limit;
cachetable_unlock(ct);
}
int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, void **value_ptr,
int *dirty_ptr, long long *pin_ptr, long *size_ptr) {
PAIR p;
int count = 0;
int r = -1;
u_int32_t fullhash = toku_cachetable_hash(cf, key);
cachetable_lock(ct);
for (p = ct->table[fullhash&(ct->table_size-1)]; p; p = p->hash_chain) {
count++;
if (p->key == key) {
note_hash_count(count);
if (p->key == key && p->cachefile == cf) {
if (value_ptr)
*value_ptr = p->value;
if (dirty_ptr)
*dirty_ptr = p->dirty;
if (pin_ptr)
*pin_ptr = p->pinned;
*pin_ptr = p->rwlock.pinned;
if (size_ptr)
*size_ptr = p->size;
return 0;
r = 0;
break;
}
}
cachetable_unlock(ct);
note_hash_count(count);
return 1;
}
int toku_cachetable_checkpoint (CACHETABLE ct) {
// Single threaded checkpoint.
// In future: for multithreaded checkpoint we should not proceed if the previous checkpoint has not finished.
// Requires: Everything is unpinned. (In the multithreaded version we have to wait for things to get unpinned and then
// grab them (or else the unpinner has to do something.)
// Algorithm: Write a checkpoint record to the log, noting the LSN of that record.
// Note the LSN of the previous checkpoint (stored in lsn_of_checkpoint)
// For every (unpinnned) dirty node in which the LSN is newer than the prev checkpoint LSN:
// flush the node (giving it a new nodeid, and fixing up the downpointer in the parent)
// Watch out since evicting the node modifies the hash table.
//?? This is a skeleton. It compiles, but doesn't do anything reasonable yet.
//?? log_the_checkpoint();
int n_saved=0;
int n_in_table = ct->n_in_table;
struct save_something {
CACHEFILE cf;
DISKOFF key;
void *value;
long size;
LSN modified_lsn;
CACHETABLE_FLUSH_FUNC_T flush_callback;
} *MALLOC_N(n_in_table, info);
{
PAIR pair;
for (pair=ct->head; pair; pair=pair->next) {
assert(!pair->pinned);
if (pair->dirty && pair->modified_lsn.lsn>ct->lsn_of_checkpoint.lsn) {
//?? /save_something_about_the_pair(); // This read-only so it doesn't modify the table.
n_saved++;
}
}
}
{
int i;
for (i=0; i<n_saved; i++) {
info[i].flush_callback(info[i].cf, info[i].key, info[i].value, info[i].size, 1, 1, info[i].modified_lsn, 0);
}
}
toku_free(info);
return 0;
}
TOKULOGGER toku_cachefile_logger (CACHEFILE cf) {
return cf->cachetable->logger;
}
FILENUM toku_cachefile_filenum (CACHEFILE cf) {
return cf->filenum;
}
u_int32_t toku_cachefile_fullhash_of_header (CACHEFILE cachefile) {
return cachefile->header_fullhash;
return r;
}
......@@ -6,10 +6,6 @@
#include <fcntl.h>
#include "brttypes.h"
/* Implement the cache table. */
typedef long long CACHEKEY;
/* Maintain a cache mapping from cachekeys to values (void*)
* Some of the keys can be pinned. Don't pin too many or for too long.
* If the cachetable is too full, it will call the flush_callback() function with the key, the value, and the otherargs
......@@ -23,50 +19,78 @@ typedef long long CACHEKEY;
* table_size is the initial size of the cache table hash table (in number of entries)
* size limit is the upper bound of the sum of size of the entries in the cache table (total number of bytes)
*/
typedef long long CACHEKEY;
// create a new cachetable
// returns: if success, 0 is returned and result points to the new cachetable
int toku_create_cachetable(CACHETABLE */*result*/, long size_limit, LSN initial_lsn, TOKULOGGER);
// What is the cachefile that goes with a particular filenum?
// During a transaction, we cannot reuse a filenum.
int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf);
int toku_cachetable_checkpoint (CACHETABLE ct);
int toku_cachetable_close (CACHETABLE*); /* Flushes everything to disk, and destroys the cachetable. */
int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, int flags, mode_t mode);
int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/, const char */*fname (used for logging)*/);
typedef void (cachetable_flush_func_t)(CACHEFILE, CACHEKEY key, void*value, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p);
typedef cachetable_flush_func_t *CACHETABLE_FLUSH_FUNC_T;
// the flush callback (write, free)
typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, CACHEKEY key, void *value, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p);
/* If we are asked to fetch something, get it by calling this back. */
typedef int (cachetable_fetch_func_t)(CACHEFILE, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs, LSN *written_lsn);
typedef cachetable_fetch_func_t *CACHETABLE_FETCH_FUNC_T;
// the fetch callback
typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs, LSN *written_lsn);
// Put a key and value pair into the cachetable
// effects: if the key,cachefile is not in the cachetable, then insert the pair and pin it.
// returns: 0 if success, otherwise an error
/* Error if already present. On success, pin the value. */
int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
void* value, long size,
cachetable_flush_func_t flush_callback, cachetable_fetch_func_t fetch_callback, void *extraargs);
void *value, long size,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs);
int toku_cachetable_get_and_pin(CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/,
void**/*value*/, long *sizep,
cachetable_flush_func_t flush_callback, cachetable_fetch_func_t fetch_callback, void *extraargs);
void **/*value*/, long *sizep,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs);
// If the the item is already in memory, then return 0 and store it in the void**.
// If the item is not in memory, then return nonzero.
/* If the the item is already in memory, then return 0 and store it in the void**.
* If the item is not in memory, then return nonzero. */
int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, void**);
/* cachetable object state wrt external memory */
// cachetable object state wrt external memory
#define CACHETABLE_CLEAN 0
#define CACHETABLE_DIRTY 1
// Unpin by key
// effects: lookup a mapping using key,cachefile. if a pair is found, then OR the dirty bit into the pair
// and update the size of the pair. the read lock on the pair is released.
int toku_cachetable_unpin(CACHEFILE, CACHEKEY, u_int32_t fullhash, int dirty, long size); /* Note whether it is dirty when we unpin it. */
int toku_cachetable_remove (CACHEFILE, CACHEKEY, int /*write_me*/); /* Removing something already present is OK. */
int toku_cachetable_assert_all_unpinned (CACHETABLE);
int toku_cachefile_count_pinned (CACHEFILE, int /*printthem*/ );
/* Rename whatever is at oldkey to be newkey. Requires that the object be pinned. */
int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newkey);
//int cachetable_fsync_all (CACHETABLE); /* Flush everything to disk, but keep it in cache. */
int toku_cachetable_close (CACHETABLE*); /* Flushes everything to disk, and destroys the cachetable. */
int toku_cachefile_close (CACHEFILE*, TOKULOGGER);
int toku_cachefile_flush (CACHEFILE);
// effect: flush everything owned by the cachefile.
// effect: flush everything owned by the cachefile from the cachetable. all dirty
// blocks are written sto storage. all unpinned blocks are evicts from the cachetable.
// returns: 0 if success
void toku_cachefile_refup (CACHEFILE cfp);
......@@ -85,27 +109,25 @@ int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname);
// effect: bind the cachefile to a new fd and fname. the old fd is closed.
// returns: 0 if success
// Useful for debugging
void toku_cachetable_print_state (CACHETABLE ct);
void toku_cachetable_get_state(CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr);
int toku_cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, CACHEFILE cf, void **value_ptr,
int *dirty_ptr, long long *pin_ptr, long *size_ptr);
void toku_cachefile_verify (CACHEFILE cf); // Verify the whole cachetable that the CF is in. Slow.
void toku_cachetable_verify (CACHETABLE t); // Slow...
TOKULOGGER toku_cachefile_logger (CACHEFILE);
FILENUM toku_cachefile_filenum (CACHEFILE);
// What is the cachefile that goes with a particular filenum?
// During a transaction, we cannot reuse a filenum.
int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf);
int toku_cachetable_checkpoint (CACHETABLE ct);
FILENUM toku_cachefile_filenum (CACHEFILE);
u_int32_t toku_cachetable_hash (CACHEFILE cachefile, CACHEKEY key);
// Effect: Return a 32-bit hash key. The hash key shall be suitable for using with bitmasking for a table of size power-of-two.
u_int32_t toku_cachefile_fullhash_of_header (CACHEFILE cachefile);
// debug functions
void toku_cachetable_print_state (CACHETABLE ct);
void toku_cachetable_get_state(CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr);
int toku_cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, CACHEFILE cf,
void **value_ptr,
int *dirty_ptr,
long long *pin_ptr,
long *size_ptr);
void toku_cachefile_verify (CACHEFILE cf); // Verify the whole cachetable that the CF is in. Slow.
void toku_cachetable_verify (CACHETABLE t); // Slow...
#endif
......@@ -61,6 +61,9 @@ REGRESSION_TESTS = \
brt-test3 \
brt-test4 \
brt-test5 \
cachetable-rwlock-test \
cachetable-writequeue-test \
threadpool-test \
cachetable-test \
cachetable-test2 \
cachetable-put-test \
......@@ -69,6 +72,7 @@ REGRESSION_TESTS = \
cachetable-fd-test \
cachetable-flush-test \
cachetable-count-pinned-test \
cachetable-debug-test \
fifo-test \
list-test \
keyrange \
......
#include <stdio.h>
#include <unistd.h>
#include <assert.h>
#include "test.h"
#include "cachetable.h"
void flush() {
}
int fetch() {
return 0;
}
void cachetable_debug_test(int n) {
const int test_limit = n;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, 0777); assert(r == 0);
int num_entries, hash_size; long size_current, size_limit;
toku_cachetable_get_state(ct, &num_entries, &hash_size, &size_current, &size_limit);
assert(num_entries == 0);
assert(size_current == 0);
assert(size_limit == n);
// printf("%d %d %ld %ld\n", num_entries, hash_size, size_current, size_limit);
int i;
for (i=1; i<=n; i++) {
const int item_size = 1;
u_int32_t hi;
hi = toku_cachetable_hash(f1, i);
r = toku_cachetable_put(f1, i, hi, (void *)(long)i, item_size, flush, fetch, 0);
assert(r == 0);
void *v; int dirty; long long pinned; long pair_size;
r = toku_cachetable_get_key_state(ct, i, f1, &v, &dirty, &pinned, &pair_size);
assert(r == 0);
assert(v == (void *)(long)i);
assert(dirty == CACHETABLE_DIRTY);
assert(pinned == 1);
assert(pair_size == item_size);
r = toku_cachetable_unpin(f1, i, hi, CACHETABLE_CLEAN, 1);
assert(r == 0);
toku_cachetable_get_state(ct, &num_entries, &hash_size, &size_current, &size_limit);
assert(num_entries == i);
assert(size_current == i);
assert(size_limit == n);
toku_cachetable_print_state(ct);
}
toku_cachetable_verify(ct);
extern void print_hash_histogram();
print_hash_histogram();
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int main(int argc, const char *argv[]) {
int i;
for (i=1; i<argc; i++) {
if (strcmp(argv[i], "-v") == 0) {
verbose++;
continue;
}
}
cachetable_debug_test(8);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include "cachetable-rwlock.h"
int verbose = 0;
// test create and destroy
void test_create_destroy() {
struct ctpair_rwlock the_rwlock, *rwlock = &the_rwlock;
ctpair_rwlock_init(rwlock);
ctpair_rwlock_destroy(rwlock);
}
// test read lock and unlock with no writers
void test_simple_read_lock(int n) {
struct ctpair_rwlock the_rwlock, *rwlock = &the_rwlock;
ctpair_rwlock_init(rwlock);
assert(ctpair_pinned(rwlock) == 0);
int i;
for (i=1; i<=n; i++) {
ctpair_read_lock(rwlock, 0);
assert(ctpair_pinned(rwlock) == i);
assert(ctpair_users(rwlock) == i);
}
for (i=n-1; i>=0; i--) {
ctpair_read_unlock(rwlock);
assert(ctpair_pinned(rwlock) == i);
assert(ctpair_users(rwlock) == i);
}
ctpair_rwlock_destroy(rwlock);
}
// test write lock and unlock with no readers
void test_simple_write_lock() {
struct ctpair_rwlock the_rwlock, *rwlock = &the_rwlock;
ctpair_rwlock_init(rwlock);
assert(ctpair_users(rwlock) == 0);
ctpair_write_lock(rwlock, 0);
assert(ctpair_writers(rwlock) == 1);
assert(ctpair_users(rwlock) == 1);
ctpair_write_unlock(rwlock);
assert(ctpair_users(rwlock) == 0);
ctpair_rwlock_destroy(rwlock);
}
struct rw_event {
int e;
struct ctpair_rwlock the_rwlock;
pthread_mutex_t mutex;
};
void rw_event_init(struct rw_event *rwe) {
rwe->e = 0;
ctpair_rwlock_init(&rwe->the_rwlock);
int r = pthread_mutex_init(&rwe->mutex, 0); assert(r == 0);
}
void rw_event_destroy(struct rw_event *rwe) {
ctpair_rwlock_destroy(&rwe->the_rwlock);
int r = pthread_mutex_destroy(&rwe->mutex); assert(r == 0);
}
void *test_writer_priority_thread(void *arg) {
struct rw_event *rwe = arg;
int r;
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
ctpair_write_lock(&rwe->the_rwlock, &rwe->mutex);
rwe->e++; assert(rwe->e == 3);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
sleep(1);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
rwe->e++; assert(rwe->e == 4);
ctpair_write_unlock(&rwe->the_rwlock);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
return arg;
}
// test writer priority over new readers
void test_writer_priority() {
struct rw_event rw_event, *rwe = &rw_event;
int r;
rw_event_init(rwe);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
ctpair_read_lock(&rwe->the_rwlock, &rwe->mutex);
sleep(1);
rwe->e++; assert(rwe->e == 1);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
pthread_t tid;
r = pthread_create(&tid, 0, test_writer_priority_thread, rwe);
sleep(1);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
rwe->e++; assert(rwe->e == 2);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
sleep(1);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
ctpair_read_unlock(&rwe->the_rwlock);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
sleep(1);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
ctpair_read_lock(&rwe->the_rwlock, &rwe->mutex);
rwe->e++; assert(rwe->e == 5);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
sleep(1);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
ctpair_read_unlock(&rwe->the_rwlock);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
void *ret;
r = pthread_join(tid, &ret); assert(r == 0);
rw_event_destroy(rwe);
}
// test single writer
void *test_single_writer_thread(void *arg) {
struct rw_event *rwe = arg;
int r;
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
ctpair_write_lock(&rwe->the_rwlock, &rwe->mutex);
rwe->e++; assert(rwe->e == 3);
assert(ctpair_writers(&rwe->the_rwlock) == 1);
ctpair_write_unlock(&rwe->the_rwlock);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
return arg;
}
void test_single_writer() {
struct rw_event rw_event, *rwe = &rw_event;
int r;
rw_event_init(rwe);
assert(ctpair_writers(&rwe->the_rwlock) == 0);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
ctpair_write_lock(&rwe->the_rwlock, &rwe->mutex);
assert(ctpair_writers(&rwe->the_rwlock) == 1);
sleep(1);
rwe->e++; assert(rwe->e == 1);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
pthread_t tid;
r = pthread_create(&tid, 0, test_single_writer_thread, rwe);
sleep(1);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
rwe->e++; assert(rwe->e == 2);
assert(ctpair_writers(&rwe->the_rwlock) == 1);
assert(ctpair_users(&rwe->the_rwlock) == 2);
ctpair_write_unlock(&rwe->the_rwlock);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
void *ret;
r = pthread_join(tid, &ret); assert(r == 0);
assert(ctpair_writers(&rwe->the_rwlock) == 0);
rw_event_destroy(rwe);
}
int main(int argc, char *argv[]) {
int i;
for (i=1; i<argc; i++) {
char *arg = argv[i];
if (strcmp(arg, "-v") == 0) {
verbose++;
continue;
}
}
test_create_destroy();
test_simple_read_lock(0);
test_simple_read_lock(42);
test_simple_write_lock();
test_writer_priority();
test_single_writer();
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
int verbose;
typedef struct ctpair *PAIR;
struct ctpair {
PAIR next_wq;
};
PAIR new_pair() {
PAIR p = (PAIR) malloc(sizeof *p); assert(p);
return p;
}
void destroy_pair(PAIR p) {
free(p);
}
#include "cachetable-writequeue.h"
// test simple create and destroy
void test_create_destroy() {
struct writequeue writequeue, *wq = &writequeue;
writequeue_init(wq);
assert(writequeue_empty(wq));
writequeue_destroy(wq);
}
// verify that the wq implements FIFO ordering
void test_simple_enq_deq(int n) {
struct writequeue writequeue, *wq = &writequeue;
int r;
pthread_mutex_t mutex;
r = pthread_mutex_init(&mutex, 0); assert(r == 0);
writequeue_init(wq);
assert(writequeue_empty(wq));
PAIR pairs[n];
int i;
for (i=0; i<n; i++) {
pairs[i] = new_pair();
writequeue_enq(wq, pairs[i]);
assert(!writequeue_empty(wq));
}
for (i=0; i<n; i++) {
PAIR p;
r = writequeue_deq(wq, &mutex, &p);
assert(r == 0 && p == pairs[i]);
destroy_pair(p);
}
assert(writequeue_empty(wq));
writequeue_destroy(wq);
r = pthread_mutex_destroy(&mutex); assert(r == 0);
}
// setting the wq closed should cause deq to return EINVAL
void test_set_closed() {
struct writequeue writequeue, *wq = &writequeue;
writequeue_init(wq);
writequeue_set_closed(wq);
int r = writequeue_deq(wq, 0, 0);
assert(r == EINVAL);
writequeue_destroy(wq);
}
// closing a wq with a blocked reader thread should cause the reader to get EINVAL
struct writequeue_with_mutex {
struct writequeue writequeue;
pthread_mutex_t mutex;
};
void writequeue_with_mutex_init(struct writequeue_with_mutex *wqm) {
writequeue_init(&wqm->writequeue);
int r = pthread_mutex_init(&wqm->mutex, 0); assert(r == 0);
}
void writequeue_with_mutex_destroy(struct writequeue_with_mutex *wqm) {
writequeue_destroy(&wqm->writequeue);
int r = pthread_mutex_destroy(&wqm->mutex); assert(r == 0);
}
void *test_set_closed_waiter(void *arg) {
struct writequeue_with_mutex *wqm = arg;
int r;
r = pthread_mutex_lock(&wqm->mutex); assert(r == 0);
PAIR p;
r = writequeue_deq(&wqm->writequeue, &wqm->mutex, &p);
assert(r == EINVAL);
r = pthread_mutex_unlock(&wqm->mutex); assert(r == 0);
return arg;
}
void test_set_closed_thread() {
struct writequeue_with_mutex writequeue_with_mutex, *wqm = &writequeue_with_mutex;
int r;
writequeue_with_mutex_init(wqm);
pthread_t tid;
r = pthread_create(&tid, 0, test_set_closed_waiter, wqm); assert(r == 0);
sleep(1);
writequeue_set_closed(&wqm->writequeue);
void *ret;
r = pthread_join(tid, &ret);
assert(r == 0 && ret == wqm);
writequeue_with_mutex_destroy(wqm);
}
// verify writer reader flow control
// the write (main) thread writes as fast as possible until the wq is full. then it
// waits.
// the read thread reads from the wq slowly using a random delay. it wakes up any
// writers when the wq size <= 1/2 of the wq limit
struct rwfc {
pthread_mutex_t mutex;
struct writequeue writequeue;
int current, limit;
};
void rwfc_init(struct rwfc *rwfc, int limit) {
int r;
r = pthread_mutex_init(&rwfc->mutex, 0); assert(r == 0);
writequeue_init(&rwfc->writequeue);
rwfc->current = 0; rwfc->limit = limit;
}
void rwfc_destroy(struct rwfc *rwfc) {
int r;
writequeue_destroy(&rwfc->writequeue);
r = pthread_mutex_destroy(&rwfc->mutex); assert(r == 0);
}
void *rwfc_reader(void *arg) {
struct rwfc *rwfc = arg;
int r;
while (1) {
PAIR ctpair;
r = pthread_mutex_lock(&rwfc->mutex); assert(r == 0);
r = writequeue_deq(&rwfc->writequeue, &rwfc->mutex, &ctpair);
if (r == EINVAL) {
r = pthread_mutex_unlock(&rwfc->mutex); assert(r == 0);
break;
}
if (2*rwfc->current-- > rwfc->limit && 2*rwfc->current <= rwfc->limit) {
writequeue_wakeup_write(&rwfc->writequeue);
}
r = pthread_mutex_unlock(&rwfc->mutex); assert(r == 0);
destroy_pair(ctpair);
usleep(random() % 100);
}
return arg;
}
void test_flow_control(int limit, int n) {
struct rwfc my_rwfc, *rwfc = &my_rwfc;
int r;
rwfc_init(rwfc, limit);
pthread_t tid;
r = pthread_create(&tid, 0, rwfc_reader, rwfc); assert(r == 0);
sleep(1); // this is here to block the reader on the first deq
int i;
for (i=0; i<n; i++) {
PAIR ctpair = new_pair();
r = pthread_mutex_lock(&rwfc->mutex); assert(r == 0);
writequeue_enq(&rwfc->writequeue, ctpair);
rwfc->current++;
while (rwfc->current >= rwfc->limit) {
// printf("%d - %d %d\n", i, rwfc->current, rwfc->limit);
writequeue_wait_write(&rwfc->writequeue, &rwfc->mutex);
}
r = pthread_mutex_unlock(&rwfc->mutex); assert(r == 0);
// usleep(random() % 1);
}
writequeue_set_closed(&rwfc->writequeue);
void *ret;
r = pthread_join(tid, &ret); assert(r == 0);
rwfc_destroy(rwfc);
}
int main(int argc, char *argv[]) {
int i;
for (i=1; i<argc; i++) {
char *arg = argv[i];
if (strcmp(arg, "-v") == 0) {
verbose++;
continue;
}
}
test_create_destroy();
test_simple_enq_deq(0);
test_simple_enq_deq(42);
test_set_closed();
test_set_closed_thread();
test_flow_control(8, 10000);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <errno.h>
#include <malloc.h>
#include <pthread.h>
#include "threadpool.h"
int verbose = 0;
struct my_threadpool {
THREADPOOL threadpool;
pthread_mutex_t mutex;
pthread_cond_t wait;
int closed;
};
void my_threadpool_init(struct my_threadpool *my_threadpool, int max_threads) {
int r;
r = threadpool_create(&my_threadpool->threadpool, max_threads); assert(r == 0);
assert(my_threadpool != 0);
r = pthread_mutex_init(&my_threadpool->mutex, 0); assert(r == 0);
r = pthread_cond_init(&my_threadpool->wait, 0); assert(r == 0);
my_threadpool->closed = 0;
}
void my_threadpool_destroy(struct my_threadpool *my_threadpool) {
int r;
r = pthread_mutex_lock(&my_threadpool->mutex); assert(r == 0);
my_threadpool->closed = 1;
r = pthread_cond_broadcast(&my_threadpool->wait); assert(r == 0);
r = pthread_mutex_unlock(&my_threadpool->mutex); assert(r == 0);
if (verbose) printf("current %d\n", threadpool_get_current_threads(my_threadpool->threadpool));
threadpool_destroy(&my_threadpool->threadpool); assert(my_threadpool->threadpool == 0);
r = pthread_mutex_destroy(&my_threadpool->mutex); assert(r == 0);
r = pthread_cond_destroy(&my_threadpool->wait); assert(r == 0);
}
void *fbusy(void *arg) {
struct my_threadpool *my_threadpool = arg;
int r;
r = pthread_mutex_lock(&my_threadpool->mutex); assert(r == 0);
while (!my_threadpool->closed) {
r = pthread_cond_wait(&my_threadpool->wait, &my_threadpool->mutex); assert(r == 0);
}
r = pthread_mutex_unlock(&my_threadpool->mutex); assert(r == 0);
if (verbose) printf("%lu:%s:exit\n", pthread_self(), __FUNCTION__);
return arg;
}
void *fidle(void *arg) {
struct my_threadpool *my_threadpool = arg;
int r;
r = pthread_mutex_lock(&my_threadpool->mutex); assert(r == 0);
threadpool_set_thread_idle(my_threadpool->threadpool);
while (!my_threadpool->closed) {
r = pthread_cond_wait(&my_threadpool->wait, &my_threadpool->mutex); assert(r == 0);
}
r = pthread_mutex_unlock(&my_threadpool->mutex); assert(r == 0);
if (verbose) printf("%lu:%s:exit\n", pthread_self(), __FUNCTION__);
return arg;
}
#define DO_MALLOC_HOOK 1
#if DO_MALLOC_HOOK
static void *my_malloc_always_fails(size_t n, const __malloc_ptr_t p) {
n = n; p = p;
return 0;
}
#endif
int usage() {
printf("threadpool-test: [-v] [-malloc-fail] [N]\n");
printf("-malloc-fail simulate malloc failures\n");
printf("N max number of threads in the thread pool\n");
return 1;
}
int main(int argc, char *argv[]) {
int max_threads = 1;
int do_malloc_fail = 0;
int i;
for (i=1; i<argc; i++) {
char *arg = argv[i];
if (strcmp(arg, "-h") == 0 || strcmp(arg, "-help") == 0) {
return usage();
} else if (strcmp(arg, "-v") == 0) {
verbose++;
continue;
} else if (strcmp(arg, "-q") == 0) {
verbose = 0;
continue;
} else if (strcmp(arg, "-malloc-fail") == 0) {
do_malloc_fail = 1;
continue;
} else
max_threads = atoi(arg);
}
struct my_threadpool my_threadpool;
THREADPOOL threadpool;
// test threadpool busy causes no threads to be created
my_threadpool_init(&my_threadpool, max_threads);
threadpool = my_threadpool.threadpool;
if (verbose) printf("test threadpool_set_busy\n");
for (i=0; i<2*max_threads; i++) {
threadpool_maybe_add(threadpool, fbusy, &my_threadpool);
assert(threadpool_get_current_threads(threadpool) == 1);
}
assert(threadpool_get_current_threads(threadpool) == 1);
my_threadpool_destroy(&my_threadpool);
// test threadpool idle causes up to max_threads to be created
my_threadpool_init(&my_threadpool, max_threads);
threadpool = my_threadpool.threadpool;
if (verbose) printf("test threadpool_set_idle\n");
for (i=0; i<2*max_threads; i++) {
threadpool_maybe_add(threadpool, fidle, &my_threadpool);
sleep(1);
assert(threadpool_get_current_threads(threadpool) <= max_threads);
}
assert(threadpool_get_current_threads(threadpool) == max_threads);
my_threadpool_destroy(&my_threadpool);
#if DO_MALLOC_HOOK
if (do_malloc_fail) {
if (verbose) printf("test threadpool_create with malloc failure\n");
// test threadpool malloc fails causes ENOMEM
// glibc supports this. see malloc.h
threadpool = 0;
void *(*orig_malloc_hook) (size_t, const __malloc_ptr_t) = __malloc_hook;
__malloc_hook = my_malloc_always_fails;
int r;
r = threadpool_create(&threadpool, 0); assert(r == ENOMEM);
r = threadpool_create(&threadpool, 1); assert(r == ENOMEM);
__malloc_hook = orig_malloc_hook;
}
#endif
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <malloc.h>
#include <pthread.h>
#include <errno.h>
#include "threadpool.h"
// use gcc builtin fetch_and_add 0->no 1->yes
#define DO_ATOMIC_FETCH_AND_ADD 0
struct threadpool {
int max_threads;
int current_threads;
int busy_threads;
pthread_t pids[];
};
int threadpool_create(THREADPOOL *threadpoolptr, int max_threads) {
size_t size = sizeof (struct threadpool) + max_threads*sizeof (pthread_t);
struct threadpool *threadpool = malloc(size);
if (threadpool == 0)
return ENOMEM;
threadpool->max_threads = max_threads;
threadpool->current_threads = 0;
threadpool->busy_threads = 0;
int i;
for (i=0; i<max_threads; i++)
threadpool->pids[i] = 0;
*threadpoolptr = threadpool;
return 0;
}
void threadpool_destroy(THREADPOOL *threadpoolptr) {
struct threadpool *threadpool = *threadpoolptr;
int i;
for (i=0; i<threadpool->current_threads; i++) {
int r; void *ret;
r = pthread_join(threadpool->pids[i], &ret);
assert(r == 0);
}
*threadpoolptr = 0;
free(threadpool);
}
void threadpool_maybe_add(THREADPOOL threadpool, void *(*f)(void *), void *arg) {
if ((threadpool->current_threads == 0 || threadpool->busy_threads < threadpool->current_threads) && threadpool->current_threads < threadpool->max_threads) {
int r = pthread_create(&threadpool->pids[threadpool->current_threads], 0, f, arg);
if (r == 0) {
threadpool->current_threads++;
threadpool_set_thread_busy(threadpool);
}
}
}
void threadpool_set_thread_busy(THREADPOOL threadpool) {
#if DO_ATOMIC_FETCH_AND_ADD
(void) __sync_fetch_and_add(&threadpool->busy_threads, 1);
#else
threadpool->busy_threads++;
#endif
}
void threadpool_set_thread_idle(THREADPOOL threadpool) {
#if DO_ATOMIC_FETCH_AND_ADD
(void) __sync_fetch_and_add(&threadpool->busy_threads, -1);
#else
threadpool->busy_threads--;
#endif
}
int threadpool_get_current_threads(THREADPOOL threadpool) {
return threadpool->current_threads;
}
// A threadpool is a limited set of threads that can be used to apply a
// function to work contained in a work queue. The work queue is outside
// of the scope of the threadpool; the threadpool merely provides
// mechanisms to grow the number of threads in the threadpool on demand.
typedef struct threadpool *THREADPOOL;
// Create a new threadpool
// Effects: a new threadpool is allocated and initialized. the number of
// threads in the threadpool is limited to max_threads. initially, there
// are no threads in the pool.
// Returns: if there are no errors, the threadpool is set and zero is returned.
// Otherwise, an error number is returned.
int threadpool_create(THREADPOOL *threadpoolptr, int max_threads);
// Destroy a threadpool
// Effects: the calling thread joins with all of the threads in the threadpool.
// Effects: the threadpool memory is freed.
// Returns: the threadpool is set to null.
void threadpool_destroy(THREADPOOL *threadpoolptr);
// Maybe add a thread to the threadpool.
// Effects: the number of threads in the threadpool is expanded by 1 as long
// as the current number of threads in the threadpool is less than the max
// and there are no idle threads.
// Effects: if the thread is create, it calls the function f with argument arg
// Expects: external serialization on this function; only one thread may
// execute this function
void threadpool_maybe_add(THREADPOOL theadpool, void *(*f)(void *), void *arg);
// Set the current thread busy
// Effects: the threadpool keeps a count of the number of idle threads. It
// uses this count to control the creation of additional threads.
void threadpool_set_thread_busy(THREADPOOL);
// Set the current thread idle
void threadpool_set_thread_idle(THREADPOOL);
// get the current number of threads
int threadpool_get_current_threads(THREADPOOL);
CFLAGS = -Wall -W -Werror -g
pma: LDFLAGS=-lm
pma:
pma.o:
......@@ -21,8 +21,7 @@ CFLAGS += -Wbad-function-cast -Wcast-align -Waggregate-return
CFLAGS += -Wmissing-noreturn -Wmissing-format-attribute
CPPFLAGS += -L../ -L../../range_tree
CPPFLAGS += -I. -I../ -I../../range_tree -I../../../newbrt -I../../../include
LDFLAGS += -lz
LDFLAGS = -lpthread -lz
SRCS = $(wildcard *.c)
......
......@@ -19,6 +19,7 @@ CFLAGS += -Wmissing-noreturn -Wmissing-format-attribute
CPPFLAGS = -I. -I../../include -I../../newbrt
CPPFLAGS += -D_GNU_SOURCE -D_THREAD_SAFE -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE
CFLAGS += $(VISIBILITY) $(PROF_FLAGS)
LDFLAGS += -lpthread
ifneq ($(OSX),)
CFLAGS+=-fno-common
......
......@@ -17,6 +17,7 @@ CFLAGS = -W -Wall -Wextra -Werror $(OPTFLAGS) -g3 -ggdb3 $(GCOV_FLAGS)
CFLAGS += -Wbad-function-cast -Wcast-align -Wconversion -Waggregate-return
CFLAGS += -Wmissing-noreturn -Wmissing-format-attribute
CPPFLAGS += -I../ -I../../../newbrt -I../../../include
LDFLAGS = -lpthread
SRCS = $(wildcard *.c)
......
......@@ -2593,20 +2593,27 @@ static int toku_db_delboth_noassociate(DB *db, DB_TXN *txn, DBT *key, DBT *val,
u_int32_t lock_flags = get_prelocked_flags(flags);
flags &= ~lock_flags;
u_int32_t suppress_missing = flags&DB_DELETE_ANY;
u_int32_t delete_any = flags&DB_DELETE_ANY;
flags &= ~DB_DELETE_ANY;
if (flags!=0) return EINVAL;
//DB_DELETE_ANY supresses the DB_NOTFOUND return value indicating that the key was not found prior to the delete
//TODO: Speed up the DB_DELETE_ANY version by implementing it at the BRT layer.
if (delete_any) {
if (db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE)) {
DB_TXN* txn_anc = toku_txn_ancestor(txn);
if ((r = toku_txn_add_lt(txn_anc, db->i->lt))) goto any_cleanup;
TXNID id_anc = toku_txn_get_txnid(txn_anc->i->tokutxn);
r = toku_lt_acquire_write_lock(db->i->lt, db, id_anc, key, val);
if (r!=0) goto any_cleanup;
}
r = toku_brt_delete_both(db->i->brt, key, val, txn ? txn->i->tokutxn : NULL);
any_cleanup:
return r;
}
DBC *dbc;
if ((r = toku_db_cursor(db, txn, &dbc, 0, 0))) goto cursor_cleanup;
r = toku_c_get_noassociate(dbc, key, val, DB_GET_BOTH);
if (r!=0) {
if (suppress_missing && r==DB_NOTFOUND) r = 0;
goto cursor_cleanup;
}
if ((r = toku_c_get_noassociate(dbc, key, val, DB_GET_BOTH))) goto cursor_cleanup;
r = toku_c_del_noassociate(dbc, lock_flags);
cursor_cleanup:;
int r2 = toku_c_close(dbc);
......
......@@ -30,7 +30,7 @@ LDFLAGS = -L../lib -ltokudb -lpthread $(TDB_LOADLIBES) -lz
# vars to compile bins that handle tokudb using libtokudb.a
STATIC_CPPFLAGS = -I../include
STATIC_LDFLAGS = ../lib/libtokudb.a -lz
STATIC_LDFLAGS = ../lib/libtokudb.a -lz -lpthread
# vars to compile bins that handle bdb
BDB_CPPFLAGS = -I$(BDBDIR)/include
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment