Commit ce6b1710 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge the multihreaded writer changes as

{{{
svn merge -r 5899:5987 https://svn.tokutek.com/tokudb/tokudb
}}}
and resolve the conflicts.


git-svn-id: file:///svn/tokudb.1131b@5988 c7de825b-a66e-492c-adef-691d508d4ae1
parent cda54db4
......@@ -18,7 +18,7 @@ TARGETS = $(patsubst %.cpp,%,$(SRCS))
# GCOV_FLAGS = -fprofile-arcs -ftest-coverage
CPPFLAGS = -I../ -I../../include
CXXFLAGS = -Wall $(OPTFLAGS) -g $(GCOV_FLAGS)
LDLIBS = ../../lib/libtokudb_cxx.a ../../lib/libtokudb.a -lz
LDLIBS = ../../lib/libtokudb_cxx.a ../../lib/libtokudb.a -lz -lpthread
ifneq ($(OSX),)
VGRIND=
......
......@@ -12,7 +12,7 @@ CXXFLAGS = -Wall -Werror -g $(OPTFLAGS) $(GCOV_FLAGS)
ifdef BDBDIR
BDB_CPPFLAGS = -I$(BDBDIR)/include
BDB_LDFLAGS = -L$(BDBDIR)/lib -ldb_cxx -lpthread -Wl,-rpath,$(BDBDIR)/lib
BDB_LDFLAGS = -L$(BDBDIR)/lib -ldb_cxx -Wl,-rpath,$(BDBDIR)/lib -lpthread
else
BDB_CPPFLAGS =
BDB_LDFLAGS = -ldb_cxx -lpthread
......@@ -47,7 +47,7 @@ clean:
db-benchmark-test-tokudb: ../lib/libtokudb_cxx.a
db-benchmark-test-tokudb: db-benchmark-test.cpp
$(CXX) $(CXXFLAGS) -I../include -L../lib -Wl,-rpath,$(PWD)/../lib $< -o $@ -ltokudb -ltokudb_cxx -lz -DDIRSUF=tokudb
$(CXX) $(CXXFLAGS) -I../include -L../lib -Wl,-rpath,$(PWD)/../lib $< -o $@ -ltokudb -ltokudb_cxx -DDIRSUF=tokudb -lz -lpthread
db-benchmark-test-bdb: db-benchmark-test.cpp
$(CXX) $(CXXFLAGS) $(BDB_CPPFLAGS) $(BDB_LDFLAGS) $< -o $@ -DDIRSUF=bdb
......@@ -23,13 +23,13 @@ CFLAGS = -Wall -Werror -g $(OPTFLAGS) $(GCOV_FLAGS) $(PROF_FLAGS)
LDFLAGS += -lpthread
ifdef BDBDIR
BDB_CPPFLAGS = -I$(BDBDIR)/include
BDB_LDFLAGS = -L$(BDBDIR)/lib -ldb -lpthread -Wl,-rpath,$(BDBDIR)/lib
BDB_LDFLAGS = -L$(BDBDIR)/lib -ldb -Wl,-rpath,$(BDBDIR)/lib -lpthread
else
BDB_CPPFLAGS =
BDB_LDFLAGS = -ldb
endif
TDB_CPPFLAGS = -I../include
TDB_LDFLAGS = -L../lib -ltokudb -lz -Wl,-rpath,$(PWD)/../lib
TDB_LDFLAGS = -L../lib -ltokudb -Wl,-rpath,$(PWD)/../lib -lpthread -lz
TARGET_BDB = db-benchmark-test-bdb
TARGET_TDB = db-benchmark-test-tokudb
......
......@@ -123,7 +123,7 @@ void setup (void) {
r = db->open(db, tid, dbfilename, NULL, DB_BTREE, DB_CREATE, 0644);
if (r!=0) fprintf(stderr, "errno=%d, %s\n", errno, strerror(errno));
assert(r == 0);
if (do_transactions && !singlex) {
if (do_transactions) {
if (singlex) do_prelock(db, tid);
else {
r=tid->commit(tid, 0);
......
......@@ -36,7 +36,7 @@ FORMAT=-Wmissing-format-attribute
endif
CFLAGS = -Wall -Wextra -Wcast-align -Wbad-function-cast -Wmissing-noreturn $(FORMAT) $(OPTFLAGS) -g3 -ggdb3 $(GCOV_FLAGS) $(PROF_FLAGS) -Werror $(FPICFLAGS) $(SHADOW) $(VISIBILITY)
LDFLAGS = $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS) -lz
LDFLAGS = $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS) -lz -lpthread
CPPFLAGS += -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE -D_XOPEN_SOURCE=500
# Add -Wconversion
......@@ -77,6 +77,7 @@ BRT_SOURCES = \
ybt \
x1764 \
trace_mem \
threadpool \
# keep this line so I can ha vea \ on the previous line
OFILES = newbrt.o $(CYG_ADD_LIBZ)
......
......@@ -183,8 +183,8 @@ struct brtenv {
// SPINLOCK checkpointing;
};
extern cachetable_flush_func_t toku_brtnode_flush_callback, toku_brtheader_flush_callback;
extern cachetable_fetch_func_t toku_brtnode_fetch_callback, toku_brtheader_fetch_callback;
extern void toku_brtnode_flush_callback(), toku_brtheader_flush_callback();
extern int toku_brtnode_fetch_callback(), toku_brtheader_fetch_callback();
extern int toku_read_and_pin_brt_header (CACHEFILE cf, struct brt_header **header);
extern int toku_unpin_brt_header (BRT brt);
extern CACHEKEY* toku_calculate_root_offset_pointer (BRT brt, u_int32_t *root_hash);
......
......@@ -3256,18 +3256,6 @@ static inline int brt_cursor_copyout(BRT_CURSOR cursor, DBT *key, DBT *val) {
return r;
}
static inline int brt_cursor_copyout_with_dat(BRT_CURSOR cursor, DBT *key, DBT *val,
BRT pdb, DBT* dat, DBT* dat_source) {
int r = 0;
void** key_staticp = cursor->is_temporary_cursor ? &cursor->brt->skey : &cursor->skey;
void** val_staticp = cursor->is_temporary_cursor ? &cursor->brt->sval : &cursor->sval;
void** dat_staticp = &pdb->sval;
r = toku_dbt_set_three_values(key, (bytevec*)&cursor->key.data, cursor->key.size, key_staticp, FALSE,
val, (bytevec*)&cursor->val.data, cursor->val.size, val_staticp, FALSE,
dat, (bytevec*)&dat_source->data, dat_source->size, dat_staticp, FALSE);
return r;
}
int toku_brt_dbt_set(DBT* key, DBT* key_source) {
int r = toku_dbt_set_value(key, (bytevec*)&key_source->data, key_source->size, NULL, FALSE);
return r;
......
// A read lock is acquired by threads that get and pin an entry in the
// cachetable. A write lock is acquired by the writer thread when an entry
// is evicted from the cachetable and is being written storage.
// Properties:
// 1. multiple readers, no writers
// 2. one writer at a time
// 3. pending writers have priority over pending readers
// An external mutex must be locked when using these functions. An alternate
// design would bury a mutex into the rwlock itself. While this may
// increase parallelism at the expense of single thread performance, we
// are experimenting with a single higher level lock.
typedef struct ctpair_rwlock *CTPAIR_RWLOCK;
struct ctpair_rwlock {
int pinned; // the number of readers
int want_pin; // the number of blocked readers
pthread_cond_t wait_pin;
int writer; // the number of writers
int want_write; // the number of blocked writers
pthread_cond_t wait_write;
};
// initialize a read write lock
static void ctpair_rwlock_init(CTPAIR_RWLOCK rwlock) {
int r;
rwlock->pinned = rwlock->want_pin = 0;
r = pthread_cond_init(&rwlock->wait_pin, 0); assert(r == 0);
rwlock->writer = rwlock->want_write = 0;
r = pthread_cond_init(&rwlock->wait_write, 0); assert(r == 0);
}
// destroy a read write lock
static void ctpair_rwlock_destroy(CTPAIR_RWLOCK rwlock) {
int r;
assert(rwlock->pinned == 0 && rwlock->want_pin == 0);
assert(rwlock->writer == 0 && rwlock->want_write == 0);
r = pthread_cond_destroy(&rwlock->wait_pin); assert(r == 0);
r = pthread_cond_destroy(&rwlock->wait_write); assert(r == 0);
}
// obtain a read lock
// expects: mutex is locked
static inline void ctpair_read_lock(CTPAIR_RWLOCK rwlock, pthread_mutex_t *mutex) {
if (rwlock->writer || rwlock->want_write) {
rwlock->want_pin++;
while (rwlock->writer || rwlock->want_write) {
int r = pthread_cond_wait(&rwlock->wait_pin, mutex); assert(r == 0);
}
rwlock->want_pin--;
}
rwlock->pinned++;
}
// release a read lock
// expects: mutex is locked
static inline void ctpair_read_unlock(CTPAIR_RWLOCK rwlock) {
rwlock->pinned--;
if (rwlock->pinned == 0 && rwlock->want_write) {
int r = pthread_cond_signal(&rwlock->wait_write); assert(r == 0);
}
}
// obtain a write lock
// expects: mutex is locked
static inline void ctpair_write_lock(CTPAIR_RWLOCK rwlock, pthread_mutex_t *mutex) {
if (rwlock->pinned || rwlock->writer) {
rwlock->want_write++;
while (rwlock->pinned || rwlock->writer) {
int r = pthread_cond_wait(&rwlock->wait_write, mutex); assert(r == 0);
}
rwlock->want_write--;
}
rwlock->writer++;
}
// release a write lock
// expects: mutex is locked
static inline void ctpair_write_unlock(CTPAIR_RWLOCK rwlock) {
rwlock->writer--;
if (rwlock->writer == 0) {
if (rwlock->want_write) {
int r = pthread_cond_signal(&rwlock->wait_write); assert(r == 0);
} else if (rwlock->want_pin) {
int r = pthread_cond_broadcast(&rwlock->wait_pin); assert(r == 0);
}
}
}
// returns: the number of readers
static inline int ctpair_pinned(CTPAIR_RWLOCK rwlock) {
return rwlock->pinned;
}
// returns: the number of writers
static inline int ctpair_writers(CTPAIR_RWLOCK rwlock) {
return rwlock->writer;
}
// returns: the sum of the number of readers, pending readers, writers, and
// pending writers
static inline int ctpair_users(CTPAIR_RWLOCK rwlock) {
return rwlock->pinned + rwlock->want_pin + rwlock->writer + rwlock->want_write;
}
// When objects are evicted from the cachetable, they are written to storage by a
// thread in a thread pool. The pair's are placed onto a write queue that feeds
// the thread pool.
typedef struct writequeue *WRITEQUEUE;
struct writequeue {
PAIR head, tail; // head and tail of the linked list of pair's
pthread_cond_t wait_read; // wait for read
int want_read; // number of threads waiting to read
pthread_cond_t wait_write; // wait for write
int want_write; // number of threads waiting to write
int ninq; // number of pairs in the queue
char closed; // kicks waiting threads off of the write queue
};
// initialize a writequeue
// expects: the writequeue is not initialized
// effects: the writequeue is set to empty and the condition variable is initialized
static void writequeue_init(WRITEQUEUE wq) {
wq->head = wq->tail = 0;
int r;
r = pthread_cond_init(&wq->wait_read, 0); assert(r == 0);
wq->want_read = 0;
r = pthread_cond_init(&wq->wait_write, 0); assert(r == 0);
wq->want_write = 0;
wq->ninq = 0;
wq->closed = 0;
}
// destroy a writequeue
// expects: the writequeue must be initialized and empty
static void writequeue_destroy(WRITEQUEUE wq) {
assert(wq->head == 0 && wq->tail == 0);
int r;
r = pthread_cond_destroy(&wq->wait_read); assert(r == 0);
r = pthread_cond_destroy(&wq->wait_write); assert(r == 0);
}
// close the writequeue
// effects: signal any threads blocked in the writequeue
static void writequeue_set_closed(WRITEQUEUE wq) {
wq->closed = 1;
int r;
r = pthread_cond_broadcast(&wq->wait_read); assert(r == 0);
r = pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
// determine whether or not the write queue is empty
// return: 1 if the write queue is empty, otherwise 0
static int writequeue_empty(WRITEQUEUE wq) {
return wq->head == 0;
}
// put a pair on the tail of the write queue
// effects: append the pair to the end of the write queue and signal
// any waiters
static void writequeue_enq(WRITEQUEUE wq, PAIR pair) {
pair->next_wq = 0;
if (wq->tail)
wq->tail->next_wq = pair;
else
wq->head = pair;
wq->tail = pair;
wq->ninq++;
if (wq->want_read) {
int r = pthread_cond_signal(&wq->wait_read); assert(r == 0);
}
}
// get a pair from the head of the write queue
// effects: wait until the writequeue is not empty, remove the first pair from the
// write queue and return it
// returns: 0 if success, otherwise an error
static int writequeue_deq(WRITEQUEUE wq, pthread_mutex_t *mutex, PAIR *pairptr) {
while (writequeue_empty(wq)) {
if (wq->closed)
return EINVAL;
wq->want_read++;
int r = pthread_cond_wait(&wq->wait_read, mutex); assert(r == 0);
wq->want_read--;
}
PAIR pair = wq->head;
wq->head = pair->next_wq;
if (wq->head == 0)
wq->tail = 0;
wq->ninq--;
pair->next_wq = 0;
*pairptr = pair;
return 0;
}
// wait for write
static void writequeue_wait_write(WRITEQUEUE wq, pthread_mutex_t *mutex) {
wq->want_write++;
int r = pthread_cond_wait(&wq->wait_write, mutex); assert(r == 0);
wq->want_write--;
}
// wakeup writers
static void writequeue_wakeup_write(WRITEQUEUE wq) {
if (wq->want_write) {
int r = pthread_cond_broadcast(&wq->wait_write); assert(r == 0);
}
}
This diff is collapsed.
......@@ -6,10 +6,6 @@
#include <fcntl.h>
#include "brttypes.h"
/* Implement the cache table. */
typedef long long CACHEKEY;
/* Maintain a cache mapping from cachekeys to values (void*)
* Some of the keys can be pinned. Don't pin too many or for too long.
* If the cachetable is too full, it will call the flush_callback() function with the key, the value, and the otherargs
......@@ -23,50 +19,78 @@ typedef long long CACHEKEY;
* table_size is the initial size of the cache table hash table (in number of entries)
* size limit is the upper bound of the sum of size of the entries in the cache table (total number of bytes)
*/
typedef long long CACHEKEY;
// create a new cachetable
// returns: if success, 0 is returned and result points to the new cachetable
int toku_create_cachetable(CACHETABLE */*result*/, long size_limit, LSN initial_lsn, TOKULOGGER);
// What is the cachefile that goes with a particular filenum?
// During a transaction, we cannot reuse a filenum.
int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf);
int toku_cachetable_checkpoint (CACHETABLE ct);
int toku_cachetable_close (CACHETABLE*); /* Flushes everything to disk, and destroys the cachetable. */
int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, int flags, mode_t mode);
int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/, const char */*fname (used for logging)*/);
typedef void (cachetable_flush_func_t)(CACHEFILE, CACHEKEY key, void*value, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p);
typedef cachetable_flush_func_t *CACHETABLE_FLUSH_FUNC_T;
// the flush callback (write, free)
typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, CACHEKEY key, void *value, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p);
/* If we are asked to fetch something, get it by calling this back. */
typedef int (cachetable_fetch_func_t)(CACHEFILE, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs, LSN *written_lsn);
typedef cachetable_fetch_func_t *CACHETABLE_FETCH_FUNC_T;
// the fetch callback
typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs, LSN *written_lsn);
// Put a key and value pair into the cachetable
// effects: if the key,cachefile is not in the cachetable, then insert the pair and pin it.
// returns: 0 if success, otherwise an error
/* Error if already present. On success, pin the value. */
int toku_cachetable_put(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
void* value, long size,
cachetable_flush_func_t flush_callback, cachetable_fetch_func_t fetch_callback, void *extraargs);
void *value, long size,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs);
int toku_cachetable_get_and_pin(CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/,
void**/*value*/, long *sizep,
cachetable_flush_func_t flush_callback, cachetable_fetch_func_t fetch_callback, void *extraargs);
void **/*value*/, long *sizep,
CACHETABLE_FLUSH_CALLBACK flush_callback,
CACHETABLE_FETCH_CALLBACK fetch_callback, void *extraargs);
// If the the item is already in memory, then return 0 and store it in the void**.
// If the item is not in memory, then return nonzero.
/* If the the item is already in memory, then return 0 and store it in the void**.
* If the item is not in memory, then return nonzero. */
int toku_cachetable_maybe_get_and_pin (CACHEFILE, CACHEKEY, u_int32_t /*fullhash*/, void**);
/* cachetable object state wrt external memory */
// cachetable object state wrt external memory
#define CACHETABLE_CLEAN 0
#define CACHETABLE_DIRTY 1
// Unpin by key
// effects: lookup a mapping using key,cachefile. if a pair is found, then OR the dirty bit into the pair
// and update the size of the pair. the read lock on the pair is released.
int toku_cachetable_unpin(CACHEFILE, CACHEKEY, u_int32_t fullhash, int dirty, long size); /* Note whether it is dirty when we unpin it. */
int toku_cachetable_remove (CACHEFILE, CACHEKEY, int /*write_me*/); /* Removing something already present is OK. */
int toku_cachetable_assert_all_unpinned (CACHETABLE);
int toku_cachefile_count_pinned (CACHEFILE, int /*printthem*/ );
/* Rename whatever is at oldkey to be newkey. Requires that the object be pinned. */
int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newkey);
//int cachetable_fsync_all (CACHETABLE); /* Flush everything to disk, but keep it in cache. */
int toku_cachetable_close (CACHETABLE*); /* Flushes everything to disk, and destroys the cachetable. */
int toku_cachefile_close (CACHEFILE*, TOKULOGGER);
int toku_cachefile_flush (CACHEFILE);
// effect: flush everything owned by the cachefile.
// effect: flush everything owned by the cachefile from the cachetable. all dirty
// blocks are written sto storage. all unpinned blocks are evicts from the cachetable.
// returns: 0 if success
void toku_cachefile_refup (CACHEFILE cfp);
......@@ -85,27 +109,25 @@ int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname);
// effect: bind the cachefile to a new fd and fname. the old fd is closed.
// returns: 0 if success
// Useful for debugging
void toku_cachetable_print_state (CACHETABLE ct);
void toku_cachetable_get_state(CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr);
int toku_cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, CACHEFILE cf, void **value_ptr,
int *dirty_ptr, long long *pin_ptr, long *size_ptr);
void toku_cachefile_verify (CACHEFILE cf); // Verify the whole cachetable that the CF is in. Slow.
void toku_cachetable_verify (CACHETABLE t); // Slow...
TOKULOGGER toku_cachefile_logger (CACHEFILE);
FILENUM toku_cachefile_filenum (CACHEFILE);
// What is the cachefile that goes with a particular filenum?
// During a transaction, we cannot reuse a filenum.
int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf);
int toku_cachetable_checkpoint (CACHETABLE ct);
FILENUM toku_cachefile_filenum (CACHEFILE);
u_int32_t toku_cachetable_hash (CACHEFILE cachefile, CACHEKEY key);
// Effect: Return a 32-bit hash key. The hash key shall be suitable for using with bitmasking for a table of size power-of-two.
u_int32_t toku_cachefile_fullhash_of_header (CACHEFILE cachefile);
// debug functions
void toku_cachetable_print_state (CACHETABLE ct);
void toku_cachetable_get_state(CACHETABLE ct, int *num_entries_ptr, int *hash_size_ptr, long *size_current_ptr, long *size_limit_ptr);
int toku_cachetable_get_key_state(CACHETABLE ct, CACHEKEY key, CACHEFILE cf,
void **value_ptr,
int *dirty_ptr,
long long *pin_ptr,
long *size_ptr);
void toku_cachefile_verify (CACHEFILE cf); // Verify the whole cachetable that the CF is in. Slow.
void toku_cachetable_verify (CACHETABLE t); // Slow...
#endif
......@@ -61,6 +61,9 @@ REGRESSION_TESTS = \
brt-test3 \
brt-test4 \
brt-test5 \
cachetable-rwlock-test \
cachetable-writequeue-test \
threadpool-test \
cachetable-test \
cachetable-test2 \
cachetable-put-test \
......@@ -69,6 +72,7 @@ REGRESSION_TESTS = \
cachetable-fd-test \
cachetable-flush-test \
cachetable-count-pinned-test \
cachetable-debug-test \
fifo-test \
list-test \
keyrange \
......
#include <stdio.h>
#include <unistd.h>
#include <assert.h>
#include "test.h"
#include "cachetable.h"
void flush() {
}
int fetch() {
return 0;
}
void cachetable_debug_test(int n) {
const int test_limit = n;
int r;
CACHETABLE ct;
r = toku_create_cachetable(&ct, test_limit, ZERO_LSN, NULL_LOGGER); assert(r == 0);
char fname1[] = __FILE__ "test1.dat";
unlink(fname1);
CACHEFILE f1;
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, 0777); assert(r == 0);
int num_entries, hash_size; long size_current, size_limit;
toku_cachetable_get_state(ct, &num_entries, &hash_size, &size_current, &size_limit);
assert(num_entries == 0);
assert(size_current == 0);
assert(size_limit == n);
// printf("%d %d %ld %ld\n", num_entries, hash_size, size_current, size_limit);
int i;
for (i=1; i<=n; i++) {
const int item_size = 1;
u_int32_t hi;
hi = toku_cachetable_hash(f1, i);
r = toku_cachetable_put(f1, i, hi, (void *)(long)i, item_size, flush, fetch, 0);
assert(r == 0);
void *v; int dirty; long long pinned; long pair_size;
r = toku_cachetable_get_key_state(ct, i, f1, &v, &dirty, &pinned, &pair_size);
assert(r == 0);
assert(v == (void *)(long)i);
assert(dirty == CACHETABLE_DIRTY);
assert(pinned == 1);
assert(pair_size == item_size);
r = toku_cachetable_unpin(f1, i, hi, CACHETABLE_CLEAN, 1);
assert(r == 0);
toku_cachetable_get_state(ct, &num_entries, &hash_size, &size_current, &size_limit);
assert(num_entries == i);
assert(size_current == i);
assert(size_limit == n);
toku_cachetable_print_state(ct);
}
toku_cachetable_verify(ct);
extern void print_hash_histogram();
print_hash_histogram();
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
int main(int argc, const char *argv[]) {
int i;
for (i=1; i<argc; i++) {
if (strcmp(argv[i], "-v") == 0) {
verbose++;
continue;
}
}
cachetable_debug_test(8);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
#include "cachetable-rwlock.h"
int verbose = 0;
// test create and destroy
void test_create_destroy() {
struct ctpair_rwlock the_rwlock, *rwlock = &the_rwlock;
ctpair_rwlock_init(rwlock);
ctpair_rwlock_destroy(rwlock);
}
// test read lock and unlock with no writers
void test_simple_read_lock(int n) {
struct ctpair_rwlock the_rwlock, *rwlock = &the_rwlock;
ctpair_rwlock_init(rwlock);
assert(ctpair_pinned(rwlock) == 0);
int i;
for (i=1; i<=n; i++) {
ctpair_read_lock(rwlock, 0);
assert(ctpair_pinned(rwlock) == i);
assert(ctpair_users(rwlock) == i);
}
for (i=n-1; i>=0; i--) {
ctpair_read_unlock(rwlock);
assert(ctpair_pinned(rwlock) == i);
assert(ctpair_users(rwlock) == i);
}
ctpair_rwlock_destroy(rwlock);
}
// test write lock and unlock with no readers
void test_simple_write_lock() {
struct ctpair_rwlock the_rwlock, *rwlock = &the_rwlock;
ctpair_rwlock_init(rwlock);
assert(ctpair_users(rwlock) == 0);
ctpair_write_lock(rwlock, 0);
assert(ctpair_writers(rwlock) == 1);
assert(ctpair_users(rwlock) == 1);
ctpair_write_unlock(rwlock);
assert(ctpair_users(rwlock) == 0);
ctpair_rwlock_destroy(rwlock);
}
struct rw_event {
int e;
struct ctpair_rwlock the_rwlock;
pthread_mutex_t mutex;
};
void rw_event_init(struct rw_event *rwe) {
rwe->e = 0;
ctpair_rwlock_init(&rwe->the_rwlock);
int r = pthread_mutex_init(&rwe->mutex, 0); assert(r == 0);
}
void rw_event_destroy(struct rw_event *rwe) {
ctpair_rwlock_destroy(&rwe->the_rwlock);
int r = pthread_mutex_destroy(&rwe->mutex); assert(r == 0);
}
void *test_writer_priority_thread(void *arg) {
struct rw_event *rwe = arg;
int r;
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
ctpair_write_lock(&rwe->the_rwlock, &rwe->mutex);
rwe->e++; assert(rwe->e == 3);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
sleep(1);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
rwe->e++; assert(rwe->e == 4);
ctpair_write_unlock(&rwe->the_rwlock);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
return arg;
}
// test writer priority over new readers
void test_writer_priority() {
struct rw_event rw_event, *rwe = &rw_event;
int r;
rw_event_init(rwe);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
ctpair_read_lock(&rwe->the_rwlock, &rwe->mutex);
sleep(1);
rwe->e++; assert(rwe->e == 1);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
pthread_t tid;
r = pthread_create(&tid, 0, test_writer_priority_thread, rwe);
sleep(1);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
rwe->e++; assert(rwe->e == 2);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
sleep(1);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
ctpair_read_unlock(&rwe->the_rwlock);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
sleep(1);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
ctpair_read_lock(&rwe->the_rwlock, &rwe->mutex);
rwe->e++; assert(rwe->e == 5);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
sleep(1);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
ctpair_read_unlock(&rwe->the_rwlock);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
void *ret;
r = pthread_join(tid, &ret); assert(r == 0);
rw_event_destroy(rwe);
}
// test single writer
void *test_single_writer_thread(void *arg) {
struct rw_event *rwe = arg;
int r;
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
ctpair_write_lock(&rwe->the_rwlock, &rwe->mutex);
rwe->e++; assert(rwe->e == 3);
assert(ctpair_writers(&rwe->the_rwlock) == 1);
ctpair_write_unlock(&rwe->the_rwlock);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
return arg;
}
void test_single_writer() {
struct rw_event rw_event, *rwe = &rw_event;
int r;
rw_event_init(rwe);
assert(ctpair_writers(&rwe->the_rwlock) == 0);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
ctpair_write_lock(&rwe->the_rwlock, &rwe->mutex);
assert(ctpair_writers(&rwe->the_rwlock) == 1);
sleep(1);
rwe->e++; assert(rwe->e == 1);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
pthread_t tid;
r = pthread_create(&tid, 0, test_single_writer_thread, rwe);
sleep(1);
r = pthread_mutex_lock(&rwe->mutex); assert(r == 0);
rwe->e++; assert(rwe->e == 2);
assert(ctpair_writers(&rwe->the_rwlock) == 1);
assert(ctpair_users(&rwe->the_rwlock) == 2);
ctpair_write_unlock(&rwe->the_rwlock);
r = pthread_mutex_unlock(&rwe->mutex); assert(r == 0);
void *ret;
r = pthread_join(tid, &ret); assert(r == 0);
assert(ctpair_writers(&rwe->the_rwlock) == 0);
rw_event_destroy(rwe);
}
int main(int argc, char *argv[]) {
int i;
for (i=1; i<argc; i++) {
char *arg = argv[i];
if (strcmp(arg, "-v") == 0) {
verbose++;
continue;
}
}
test_create_destroy();
test_simple_read_lock(0);
test_simple_read_lock(42);
test_simple_write_lock();
test_writer_priority();
test_single_writer();
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <errno.h>
#include <string.h>
#include <pthread.h>
int verbose;
typedef struct ctpair *PAIR;
struct ctpair {
PAIR next_wq;
};
PAIR new_pair() {
PAIR p = (PAIR) malloc(sizeof *p); assert(p);
return p;
}
void destroy_pair(PAIR p) {
free(p);
}
#include "cachetable-writequeue.h"
// test simple create and destroy
void test_create_destroy() {
struct writequeue writequeue, *wq = &writequeue;
writequeue_init(wq);
assert(writequeue_empty(wq));
writequeue_destroy(wq);
}
// verify that the wq implements FIFO ordering
void test_simple_enq_deq(int n) {
struct writequeue writequeue, *wq = &writequeue;
int r;
pthread_mutex_t mutex;
r = pthread_mutex_init(&mutex, 0); assert(r == 0);
writequeue_init(wq);
assert(writequeue_empty(wq));
PAIR pairs[n];
int i;
for (i=0; i<n; i++) {
pairs[i] = new_pair();
writequeue_enq(wq, pairs[i]);
assert(!writequeue_empty(wq));
}
for (i=0; i<n; i++) {
PAIR p;
r = writequeue_deq(wq, &mutex, &p);
assert(r == 0 && p == pairs[i]);
destroy_pair(p);
}
assert(writequeue_empty(wq));
writequeue_destroy(wq);
r = pthread_mutex_destroy(&mutex); assert(r == 0);
}
// setting the wq closed should cause deq to return EINVAL
void test_set_closed() {
struct writequeue writequeue, *wq = &writequeue;
writequeue_init(wq);
writequeue_set_closed(wq);
int r = writequeue_deq(wq, 0, 0);
assert(r == EINVAL);
writequeue_destroy(wq);
}
// closing a wq with a blocked reader thread should cause the reader to get EINVAL
struct writequeue_with_mutex {
struct writequeue writequeue;
pthread_mutex_t mutex;
};
void writequeue_with_mutex_init(struct writequeue_with_mutex *wqm) {
writequeue_init(&wqm->writequeue);
int r = pthread_mutex_init(&wqm->mutex, 0); assert(r == 0);
}
void writequeue_with_mutex_destroy(struct writequeue_with_mutex *wqm) {
writequeue_destroy(&wqm->writequeue);
int r = pthread_mutex_destroy(&wqm->mutex); assert(r == 0);
}
void *test_set_closed_waiter(void *arg) {
struct writequeue_with_mutex *wqm = arg;
int r;
r = pthread_mutex_lock(&wqm->mutex); assert(r == 0);
PAIR p;
r = writequeue_deq(&wqm->writequeue, &wqm->mutex, &p);
assert(r == EINVAL);
r = pthread_mutex_unlock(&wqm->mutex); assert(r == 0);
return arg;
}
void test_set_closed_thread() {
struct writequeue_with_mutex writequeue_with_mutex, *wqm = &writequeue_with_mutex;
int r;
writequeue_with_mutex_init(wqm);
pthread_t tid;
r = pthread_create(&tid, 0, test_set_closed_waiter, wqm); assert(r == 0);
sleep(1);
writequeue_set_closed(&wqm->writequeue);
void *ret;
r = pthread_join(tid, &ret);
assert(r == 0 && ret == wqm);
writequeue_with_mutex_destroy(wqm);
}
// verify writer reader flow control
// the write (main) thread writes as fast as possible until the wq is full. then it
// waits.
// the read thread reads from the wq slowly using a random delay. it wakes up any
// writers when the wq size <= 1/2 of the wq limit
struct rwfc {
pthread_mutex_t mutex;
struct writequeue writequeue;
int current, limit;
};
void rwfc_init(struct rwfc *rwfc, int limit) {
int r;
r = pthread_mutex_init(&rwfc->mutex, 0); assert(r == 0);
writequeue_init(&rwfc->writequeue);
rwfc->current = 0; rwfc->limit = limit;
}
void rwfc_destroy(struct rwfc *rwfc) {
int r;
writequeue_destroy(&rwfc->writequeue);
r = pthread_mutex_destroy(&rwfc->mutex); assert(r == 0);
}
void *rwfc_reader(void *arg) {
struct rwfc *rwfc = arg;
int r;
while (1) {
PAIR ctpair;
r = pthread_mutex_lock(&rwfc->mutex); assert(r == 0);
r = writequeue_deq(&rwfc->writequeue, &rwfc->mutex, &ctpair);
if (r == EINVAL) {
r = pthread_mutex_unlock(&rwfc->mutex); assert(r == 0);
break;
}
if (2*rwfc->current-- > rwfc->limit && 2*rwfc->current <= rwfc->limit) {
writequeue_wakeup_write(&rwfc->writequeue);
}
r = pthread_mutex_unlock(&rwfc->mutex); assert(r == 0);
destroy_pair(ctpair);
usleep(random() % 100);
}
return arg;
}
void test_flow_control(int limit, int n) {
struct rwfc my_rwfc, *rwfc = &my_rwfc;
int r;
rwfc_init(rwfc, limit);
pthread_t tid;
r = pthread_create(&tid, 0, rwfc_reader, rwfc); assert(r == 0);
sleep(1); // this is here to block the reader on the first deq
int i;
for (i=0; i<n; i++) {
PAIR ctpair = new_pair();
r = pthread_mutex_lock(&rwfc->mutex); assert(r == 0);
writequeue_enq(&rwfc->writequeue, ctpair);
rwfc->current++;
while (rwfc->current >= rwfc->limit) {
// printf("%d - %d %d\n", i, rwfc->current, rwfc->limit);
writequeue_wait_write(&rwfc->writequeue, &rwfc->mutex);
}
r = pthread_mutex_unlock(&rwfc->mutex); assert(r == 0);
// usleep(random() % 1);
}
writequeue_set_closed(&rwfc->writequeue);
void *ret;
r = pthread_join(tid, &ret); assert(r == 0);
rwfc_destroy(rwfc);
}
int main(int argc, char *argv[]) {
int i;
for (i=1; i<argc; i++) {
char *arg = argv[i];
if (strcmp(arg, "-v") == 0) {
verbose++;
continue;
}
}
test_create_destroy();
test_simple_enq_deq(0);
test_simple_enq_deq(42);
test_set_closed();
test_set_closed_thread();
test_flow_control(8, 10000);
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <errno.h>
#include <malloc.h>
#include <pthread.h>
#include "threadpool.h"
int verbose = 0;
struct my_threadpool {
THREADPOOL threadpool;
pthread_mutex_t mutex;
pthread_cond_t wait;
int closed;
};
void my_threadpool_init(struct my_threadpool *my_threadpool, int max_threads) {
int r;
r = threadpool_create(&my_threadpool->threadpool, max_threads); assert(r == 0);
assert(my_threadpool != 0);
r = pthread_mutex_init(&my_threadpool->mutex, 0); assert(r == 0);
r = pthread_cond_init(&my_threadpool->wait, 0); assert(r == 0);
my_threadpool->closed = 0;
}
void my_threadpool_destroy(struct my_threadpool *my_threadpool) {
int r;
r = pthread_mutex_lock(&my_threadpool->mutex); assert(r == 0);
my_threadpool->closed = 1;
r = pthread_cond_broadcast(&my_threadpool->wait); assert(r == 0);
r = pthread_mutex_unlock(&my_threadpool->mutex); assert(r == 0);
if (verbose) printf("current %d\n", threadpool_get_current_threads(my_threadpool->threadpool));
threadpool_destroy(&my_threadpool->threadpool); assert(my_threadpool->threadpool == 0);
r = pthread_mutex_destroy(&my_threadpool->mutex); assert(r == 0);
r = pthread_cond_destroy(&my_threadpool->wait); assert(r == 0);
}
void *fbusy(void *arg) {
struct my_threadpool *my_threadpool = arg;
int r;
r = pthread_mutex_lock(&my_threadpool->mutex); assert(r == 0);
while (!my_threadpool->closed) {
r = pthread_cond_wait(&my_threadpool->wait, &my_threadpool->mutex); assert(r == 0);
}
r = pthread_mutex_unlock(&my_threadpool->mutex); assert(r == 0);
if (verbose) printf("%lu:%s:exit\n", pthread_self(), __FUNCTION__);
return arg;
}
void *fidle(void *arg) {
struct my_threadpool *my_threadpool = arg;
int r;
r = pthread_mutex_lock(&my_threadpool->mutex); assert(r == 0);
threadpool_set_thread_idle(my_threadpool->threadpool);
while (!my_threadpool->closed) {
r = pthread_cond_wait(&my_threadpool->wait, &my_threadpool->mutex); assert(r == 0);
}
r = pthread_mutex_unlock(&my_threadpool->mutex); assert(r == 0);
if (verbose) printf("%lu:%s:exit\n", pthread_self(), __FUNCTION__);
return arg;
}
#define DO_MALLOC_HOOK 1
#if DO_MALLOC_HOOK
static void *my_malloc_always_fails(size_t n, const __malloc_ptr_t p) {
n = n; p = p;
return 0;
}
#endif
int usage() {
printf("threadpool-test: [-v] [-malloc-fail] [N]\n");
printf("-malloc-fail simulate malloc failures\n");
printf("N max number of threads in the thread pool\n");
return 1;
}
int main(int argc, char *argv[]) {
int max_threads = 1;
int do_malloc_fail = 0;
int i;
for (i=1; i<argc; i++) {
char *arg = argv[i];
if (strcmp(arg, "-h") == 0 || strcmp(arg, "-help") == 0) {
return usage();
} else if (strcmp(arg, "-v") == 0) {
verbose++;
continue;
} else if (strcmp(arg, "-q") == 0) {
verbose = 0;
continue;
} else if (strcmp(arg, "-malloc-fail") == 0) {
do_malloc_fail = 1;
continue;
} else
max_threads = atoi(arg);
}
struct my_threadpool my_threadpool;
THREADPOOL threadpool;
// test threadpool busy causes no threads to be created
my_threadpool_init(&my_threadpool, max_threads);
threadpool = my_threadpool.threadpool;
if (verbose) printf("test threadpool_set_busy\n");
for (i=0; i<2*max_threads; i++) {
threadpool_maybe_add(threadpool, fbusy, &my_threadpool);
assert(threadpool_get_current_threads(threadpool) == 1);
}
assert(threadpool_get_current_threads(threadpool) == 1);
my_threadpool_destroy(&my_threadpool);
// test threadpool idle causes up to max_threads to be created
my_threadpool_init(&my_threadpool, max_threads);
threadpool = my_threadpool.threadpool;
if (verbose) printf("test threadpool_set_idle\n");
for (i=0; i<2*max_threads; i++) {
threadpool_maybe_add(threadpool, fidle, &my_threadpool);
sleep(1);
assert(threadpool_get_current_threads(threadpool) <= max_threads);
}
assert(threadpool_get_current_threads(threadpool) == max_threads);
my_threadpool_destroy(&my_threadpool);
#if DO_MALLOC_HOOK
if (do_malloc_fail) {
if (verbose) printf("test threadpool_create with malloc failure\n");
// test threadpool malloc fails causes ENOMEM
// glibc supports this. see malloc.h
threadpool = 0;
void *(*orig_malloc_hook) (size_t, const __malloc_ptr_t) = __malloc_hook;
__malloc_hook = my_malloc_always_fails;
int r;
r = threadpool_create(&threadpool, 0); assert(r == ENOMEM);
r = threadpool_create(&threadpool, 1); assert(r == ENOMEM);
__malloc_hook = orig_malloc_hook;
}
#endif
return 0;
}
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <malloc.h>
#include <pthread.h>
#include <errno.h>
#include "threadpool.h"
// use gcc builtin fetch_and_add 0->no 1->yes
#define DO_ATOMIC_FETCH_AND_ADD 0
struct threadpool {
int max_threads;
int current_threads;
int busy_threads;
pthread_t pids[];
};
int threadpool_create(THREADPOOL *threadpoolptr, int max_threads) {
size_t size = sizeof (struct threadpool) + max_threads*sizeof (pthread_t);
struct threadpool *threadpool = malloc(size);
if (threadpool == 0)
return ENOMEM;
threadpool->max_threads = max_threads;
threadpool->current_threads = 0;
threadpool->busy_threads = 0;
int i;
for (i=0; i<max_threads; i++)
threadpool->pids[i] = 0;
*threadpoolptr = threadpool;
return 0;
}
void threadpool_destroy(THREADPOOL *threadpoolptr) {
struct threadpool *threadpool = *threadpoolptr;
int i;
for (i=0; i<threadpool->current_threads; i++) {
int r; void *ret;
r = pthread_join(threadpool->pids[i], &ret);
assert(r == 0);
}
*threadpoolptr = 0;
free(threadpool);
}
void threadpool_maybe_add(THREADPOOL threadpool, void *(*f)(void *), void *arg) {
if ((threadpool->current_threads == 0 || threadpool->busy_threads < threadpool->current_threads) && threadpool->current_threads < threadpool->max_threads) {
int r = pthread_create(&threadpool->pids[threadpool->current_threads], 0, f, arg);
if (r == 0) {
threadpool->current_threads++;
threadpool_set_thread_busy(threadpool);
}
}
}
void threadpool_set_thread_busy(THREADPOOL threadpool) {
#if DO_ATOMIC_FETCH_AND_ADD
(void) __sync_fetch_and_add(&threadpool->busy_threads, 1);
#else
threadpool->busy_threads++;
#endif
}
void threadpool_set_thread_idle(THREADPOOL threadpool) {
#if DO_ATOMIC_FETCH_AND_ADD
(void) __sync_fetch_and_add(&threadpool->busy_threads, -1);
#else
threadpool->busy_threads--;
#endif
}
int threadpool_get_current_threads(THREADPOOL threadpool) {
return threadpool->current_threads;
}
// A threadpool is a limited set of threads that can be used to apply a
// function to work contained in a work queue. The work queue is outside
// of the scope of the threadpool; the threadpool merely provides
// mechanisms to grow the number of threads in the threadpool on demand.
typedef struct threadpool *THREADPOOL;
// Create a new threadpool
// Effects: a new threadpool is allocated and initialized. the number of
// threads in the threadpool is limited to max_threads. initially, there
// are no threads in the pool.
// Returns: if there are no errors, the threadpool is set and zero is returned.
// Otherwise, an error number is returned.
int threadpool_create(THREADPOOL *threadpoolptr, int max_threads);
// Destroy a threadpool
// Effects: the calling thread joins with all of the threads in the threadpool.
// Effects: the threadpool memory is freed.
// Returns: the threadpool is set to null.
void threadpool_destroy(THREADPOOL *threadpoolptr);
// Maybe add a thread to the threadpool.
// Effects: the number of threads in the threadpool is expanded by 1 as long
// as the current number of threads in the threadpool is less than the max
// and there are no idle threads.
// Effects: if the thread is create, it calls the function f with argument arg
// Expects: external serialization on this function; only one thread may
// execute this function
void threadpool_maybe_add(THREADPOOL theadpool, void *(*f)(void *), void *arg);
// Set the current thread busy
// Effects: the threadpool keeps a count of the number of idle threads. It
// uses this count to control the creation of additional threads.
void threadpool_set_thread_busy(THREADPOOL);
// Set the current thread idle
void threadpool_set_thread_idle(THREADPOOL);
// get the current number of threads
int threadpool_get_current_threads(THREADPOOL);
CFLAGS = -Wall -W -Werror -g
pma: LDFLAGS=-lm
pma:
pma.o:
......@@ -21,8 +21,7 @@ CFLAGS += -Wbad-function-cast -Wcast-align -Waggregate-return
CFLAGS += -Wmissing-noreturn -Wmissing-format-attribute
CPPFLAGS += -L../ -L../../range_tree
CPPFLAGS += -I. -I../ -I../../range_tree -I../../../newbrt -I../../../include
LDFLAGS += -lz
LDFLAGS = -lpthread -lz
SRCS = $(wildcard *.c)
......
......@@ -19,6 +19,7 @@ CFLAGS += -Wmissing-noreturn -Wmissing-format-attribute
CPPFLAGS = -I. -I../../include -I../../newbrt
CPPFLAGS += -D_GNU_SOURCE -D_THREAD_SAFE -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE
CFLAGS += $(VISIBILITY) $(PROF_FLAGS)
LDFLAGS += -lpthread
ifneq ($(OSX),)
CFLAGS+=-fno-common
......
......@@ -17,6 +17,7 @@ CFLAGS = -W -Wall -Wextra -Werror $(OPTFLAGS) -g3 -ggdb3 $(GCOV_FLAGS)
CFLAGS += -Wbad-function-cast -Wcast-align -Wconversion -Waggregate-return
CFLAGS += -Wmissing-noreturn -Wmissing-format-attribute
CPPFLAGS += -I../ -I../../../newbrt -I../../../include
LDFLAGS = -lpthread
SRCS = $(wildcard *.c)
......@@ -99,7 +100,7 @@ LDFLAGS = -lz
%.lin: %.c $(HEADERS) $(LINEAR_BINS)
$(CC) -DDIR=\"dir.$<.lin\" $(CFLAGS) $(CPPFLAGS) $< -o $@ $(LINEAR_BINS) $(LDFLAGS)
%.tlog: %.c $(HEADERS) $(TLOG_BINS)
$(CC) -DDIR=\"dir.$<.log\" $(CFLAGS) $(CPPFLAGS) $< -o $@ $(TLOG_BINS) -DTOKU_RT_NOOVERLAPS $(LDFLAGS)
$(CC) -DDIR=\"dir.$<.log\" $(CFLAGS) $(CPPFLAGS) $< -o $@ $(TLOG_BINS) -DTOKU_RT_NOOVERLAPS $(LDFLAGS)
%.log: %.c $(HEADERS) $(LOG_BINS)
$(CC) -DDIR=\"dir.$<.log\" $(CFLAGS) $(CPPFLAGS) $< -o $@ $(LOG_BINS) $(LDFLAGS)
......
......@@ -2593,20 +2593,27 @@ static int toku_db_delboth_noassociate(DB *db, DB_TXN *txn, DBT *key, DBT *val,
u_int32_t lock_flags = get_prelocked_flags(flags);
flags &= ~lock_flags;
u_int32_t suppress_missing = flags&DB_DELETE_ANY;
u_int32_t delete_any = flags&DB_DELETE_ANY;
flags &= ~DB_DELETE_ANY;
if (flags!=0) return EINVAL;
//DB_DELETE_ANY supresses the DB_NOTFOUND return value indicating that the key was not found prior to the delete
//TODO: Speed up the DB_DELETE_ANY version by implementing it at the BRT layer.
if (delete_any) {
if (db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE)) {
DB_TXN* txn_anc = toku_txn_ancestor(txn);
if ((r = toku_txn_add_lt(txn_anc, db->i->lt))) goto any_cleanup;
TXNID id_anc = toku_txn_get_txnid(txn_anc->i->tokutxn);
r = toku_lt_acquire_write_lock(db->i->lt, db, id_anc, key, val);
if (r!=0) goto any_cleanup;
}
r = toku_brt_delete_both(db->i->brt, key, val, txn ? txn->i->tokutxn : NULL);
any_cleanup:
return r;
}
DBC *dbc;
if ((r = toku_db_cursor(db, txn, &dbc, 0, 0))) goto cursor_cleanup;
r = toku_c_get_noassociate(dbc, key, val, DB_GET_BOTH);
if (r!=0) {
if (suppress_missing && r==DB_NOTFOUND) r = 0;
goto cursor_cleanup;
}
if ((r = toku_c_get_noassociate(dbc, key, val, DB_GET_BOTH))) goto cursor_cleanup;
r = toku_c_del_noassociate(dbc, lock_flags);
cursor_cleanup:;
int r2 = toku_c_close(dbc);
......
......@@ -30,7 +30,7 @@ LDFLAGS = -L../lib -ltokudb -lpthread $(TDB_LOADLIBES) -lz
# vars to compile bins that handle tokudb using libtokudb.a
STATIC_CPPFLAGS = -I../include
STATIC_LDFLAGS = ../lib/libtokudb.a -lz
STATIC_LDFLAGS = ../lib/libtokudb.a -lz -lpthread
# vars to compile bins that handle bdb
BDB_CPPFLAGS = -I$(BDBDIR)/include
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment