Commit a55bb70d authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Turn on multithreaded writes, and also lock the pwrites and block allocator. ...

Turn on multithreaded writes, and also lock the pwrites and block allocator.  Addresses #1080, #1000.

git-svn-id: file:///svn/tokudb.1131b+1080a@6177 c7de825b-a66e-492c-adef-691d508d4ae1
parent cb729724
...@@ -110,7 +110,8 @@ block_allocator_alloc_block_at (BLOCK_ALLOCATOR ba, u_int64_t size, u_int64_t of ...@@ -110,7 +110,8 @@ block_allocator_alloc_block_at (BLOCK_ALLOCATOR ba, u_int64_t size, u_int64_t of
VALIDATE(ba); VALIDATE(ba);
} }
static u_int64_t align (u_int64_t value, BLOCK_ALLOCATOR ba) { static inline u_int64_t
align (u_int64_t value, BLOCK_ALLOCATOR ba) {
return ((value+ba->alignment-1)/ba->alignment)*ba->alignment; return ((value+ba->alignment-1)/ba->alignment)*ba->alignment;
} }
......
...@@ -10,10 +10,11 @@ ...@@ -10,10 +10,11 @@
#include "kv-pair.h" #include "kv-pair.h"
#include "mempool.h" #include "mempool.h"
#include <arpa/inet.h>
#include <inttypes.h> #include <inttypes.h>
#include <unistd.h> #include <pthread.h>
#include <stdio.h> #include <stdio.h>
#include <arpa/inet.h> #include <unistd.h>
#include <zlib.h> #include <zlib.h>
#if 0 #if 0
...@@ -27,6 +28,62 @@ static u_int64_t ntohll(u_int64_t v) { ...@@ -27,6 +28,62 @@ static u_int64_t ntohll(u_int64_t v) {
} }
#endif #endif
static u_int64_t umin64(u_int64_t a, u_int64_t b) {
if (a<b) return a;
return b;
}
static inline u_int64_t alignup (u_int64_t a, u_int64_t b) {
return ((a+b-1)/b)*b;
}
static void maybe_preallocate_in_file (int fd, u_int64_t size) {
return;
struct stat sbuf;
{
int r = fstat(fd, &sbuf);
assert(r==0);
}
assert(sbuf.st_size >= 0);
if ((size_t)sbuf.st_size < size) {
const int N = umin64(size, 16<<20); // Double the size of the file, or add 16MB, whichever is less.
char *MALLOC_N(N, wbuf);
memset(wbuf, 0, N);
off_t start_write = alignup(sbuf.st_size, 4096);
assert(start_write >= sbuf.st_size);
ssize_t r = pwrite(fd, wbuf, N, start_write);
assert(r==N);
}
}
// This mutex protects pwrite from running in parallel, and also protects modifications to the block allocator.
static pthread_mutex_t pwrite_mutex = PTHREAD_MUTEX_INITIALIZER;
static int pwrite_is_locked=0;
static inline void
lock_for_pwrite (void) {
// Locks the pwrite_mutex.
int r = pthread_mutex_lock(&pwrite_mutex);
assert(r==0);
pwrite_is_locked = 1;
}
static inline void
unlock_for_pwrite (void) {
pwrite_is_locked = 0;
int r = pthread_mutex_unlock(&pwrite_mutex);
assert(r==0);
}
ssize_t
toku_pwrite (int fd, const void *buf, size_t count, off_t offset)
// requires that the pwrite has been locked
{
assert(pwrite_is_locked);
maybe_preallocate_in_file(fd, offset+count);
return pwrite(fd, buf, count, offset);
}
// Don't include the compressed data size or the uncompressed data size. // Don't include the compressed data size or the uncompressed data size.
static const int brtnode_header_overhead = (8+ // magic "tokunode" or "tokuleaf" static const int brtnode_header_overhead = (8+ // magic "tokunode" or "tokuleaf"
...@@ -253,6 +310,7 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, BRT brt ...@@ -253,6 +310,7 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, BRT brt
//write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone); //write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone);
{ {
lock_for_pwrite();
// If the node has never been written, then write the whole buffer, including the zeros // If the node has never been written, then write the whole buffer, including the zeros
assert(blocknum.b>=0); assert(blocknum.b>=0);
//printf("%s:%d brt=%p\n", __FILE__, __LINE__, brt); //printf("%s:%d brt=%p\n", __FILE__, __LINE__, brt);
...@@ -282,9 +340,10 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, BRT brt ...@@ -282,9 +340,10 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, BRT brt
block_allocator_alloc_block(brt->h->block_allocator, n_to_write, &offset); block_allocator_alloc_block(brt->h->block_allocator, n_to_write, &offset);
brt->h->block_translation[blocknum.b].diskoff = offset; brt->h->block_translation[blocknum.b].diskoff = offset;
brt->h->block_translation[blocknum.b].size = n_to_write; brt->h->block_translation[blocknum.b].size = n_to_write;
ssize_t r=pwrite(fd, compressed_buf, n_to_write, offset); ssize_t r=toku_pwrite(fd, compressed_buf, n_to_write, offset);
if (r<0) printf("r=%ld errno=%d\n", (long)r, errno); if (r<0) printf("r=%ld errno=%d\n", (long)r, errno);
assert(r==(ssize_t)n_to_write); assert(r==(ssize_t)n_to_write);
unlock_for_pwrite();
} }
//printf("%s:%d wrote %d bytes for %lld size=%lld\n", __FILE__, __LINE__, w.ndone, off, size); //printf("%s:%d wrote %d bytes for %lld size=%lld\n", __FILE__, __LINE__, w.ndone, off, size);
...@@ -643,6 +702,7 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h) ...@@ -643,6 +702,7 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h)
} }
int toku_serialize_brt_header_to (int fd, struct brt_header *h) { int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
lock_for_pwrite();
{ {
struct wbuf w; struct wbuf w;
unsigned int size = toku_serialize_brt_header_size (h); unsigned int size = toku_serialize_brt_header_size (h);
...@@ -650,7 +710,7 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) { ...@@ -650,7 +710,7 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
int r=toku_serialize_brt_header_to_wbuf(&w, h); int r=toku_serialize_brt_header_to_wbuf(&w, h);
assert(r==0); assert(r==0);
assert(w.ndone==size); assert(w.ndone==size);
ssize_t nwrote = pwrite(fd, w.buf, w.ndone, 0); ssize_t nwrote = toku_pwrite(fd, w.buf, w.ndone, 0);
if (nwrote<0) perror("pwrite"); if (nwrote<0) perror("pwrite");
assert((size_t)nwrote==w.ndone); assert((size_t)nwrote==w.ndone);
toku_free(w.buf); toku_free(w.buf);
...@@ -668,10 +728,11 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) { ...@@ -668,10 +728,11 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
} }
u_int32_t checksum = x1764_finish(&w.checksum); u_int32_t checksum = x1764_finish(&w.checksum);
wbuf_int(&w, checksum); wbuf_int(&w, checksum);
ssize_t nwrote = pwrite(fd, w.buf, size, h->block_translation_address_on_disk); ssize_t nwrote = toku_pwrite(fd, w.buf, size, h->block_translation_address_on_disk);
assert(nwrote==(ssize_t)size); assert(nwrote==(ssize_t)size);
toku_free(w.buf); toku_free(w.buf);
}; };
unlock_for_pwrite();
return 0; return 0;
} }
...@@ -708,6 +769,7 @@ int deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_heade ...@@ -708,6 +769,7 @@ int deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_heade
if (h->block_translation_address_on_disk == 0) { if (h->block_translation_address_on_disk == 0) {
h->block_translation = 0; h->block_translation = 0;
} else { } else {
lock_for_pwrite();
block_allocator_alloc_block_at(h->block_allocator, h->block_translation_size_on_disk, h->block_translation_address_on_disk); block_allocator_alloc_block_at(h->block_allocator, h->block_translation_size_on_disk, h->block_translation_address_on_disk);
XMALLOC_N(h->translated_blocknum_limit, h->block_translation); XMALLOC_N(h->translated_blocknum_limit, h->block_translation);
unsigned char *XMALLOC_N(h->block_translation_size_on_disk, tbuf); unsigned char *XMALLOC_N(h->block_translation_size_on_disk, tbuf);
...@@ -737,6 +799,7 @@ int deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_heade ...@@ -737,6 +799,7 @@ int deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_heade
block_allocator_alloc_block_at(h->block_allocator, h->block_translation[i].size, h->block_translation[i].diskoff); block_allocator_alloc_block_at(h->block_allocator, h->block_translation[i].size, h->block_translation[i].diskoff);
//printf("%s:%d %ld %ld\n", __FILE__, __LINE__, h->block_translation[i].diskoff, h->block_translation[i].size); //printf("%s:%d %ld %ld\n", __FILE__, __LINE__, h->block_translation[i].diskoff, h->block_translation[i].size);
} }
unlock_for_pwrite();
toku_free(tbuf); toku_free(tbuf);
} }
if (h->n_named_roots>=0) { if (h->n_named_roots>=0) {
...@@ -815,13 +878,14 @@ unsigned int toku_brtnode_pivot_key_len (BRTNODE node, struct kv_pair *pk) { ...@@ -815,13 +878,14 @@ unsigned int toku_brtnode_pivot_key_len (BRTNODE node, struct kv_pair *pk) {
// and it would be more complex to batch up several writes. // and it would be more complex to batch up several writes.
int toku_serialize_fifo_at (int fd, off_t freeoff, FIFO fifo) { int toku_serialize_fifo_at (int fd, off_t freeoff, FIFO fifo) {
//printf("%s:%d Serializing fifo at %" PRId64 " (count=%d)\n", __FILE__, __LINE__, freeoff, toku_fifo_n_entries(fifo)); //printf("%s:%d Serializing fifo at %" PRId64 " (count=%d)\n", __FILE__, __LINE__, freeoff, toku_fifo_n_entries(fifo));
lock_for_pwrite();
{ {
int size=4; int size=4;
char buf[size]; char buf[size];
struct wbuf w; struct wbuf w;
wbuf_init(&w, buf, size); wbuf_init(&w, buf, size);
wbuf_int(&w, toku_fifo_n_entries(fifo)); wbuf_int(&w, toku_fifo_n_entries(fifo));
ssize_t r = pwrite(fd, w.buf, size, freeoff); ssize_t r = toku_pwrite(fd, w.buf, size, freeoff);
if (r!=size) return errno; if (r!=size) return errno;
freeoff+=size; freeoff+=size;
} }
...@@ -838,12 +902,16 @@ int toku_serialize_fifo_at (int fd, off_t freeoff, FIFO fifo) { ...@@ -838,12 +902,16 @@ int toku_serialize_fifo_at (int fd, off_t freeoff, FIFO fifo) {
//printf("%s:%d Writing %d bytes: %s\n", __FILE__, __LINE__, vallen, (char*)val); //printf("%s:%d Writing %d bytes: %s\n", __FILE__, __LINE__, vallen, (char*)val);
wbuf_bytes(&w, val, vallen); wbuf_bytes(&w, val, vallen);
assert(w.ndone==size); assert(w.ndone==size);
ssize_t r = pwrite(fd, w.buf, (size_t)size, freeoff); ssize_t r = toku_pwrite(fd, w.buf, (size_t)size, freeoff);
if (r<0) return errno; if (r<0) {
unlock_for_pwrite();
return errno;
}
assert(r==(ssize_t)size); assert(r==(ssize_t)size);
freeoff+=size; freeoff+=size;
toku_free(buf); toku_free(buf);
})); }));
unlock_for_pwrite();
return 0; return 0;
} }
......
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
#include "cachetable-rwlock.h" #include "cachetable-rwlock.h"
// execute the cachetable callbacks using a writer thread 0->no 1->yes // execute the cachetable callbacks using a writer thread 0->no 1->yes
#define DO_WRITER_THREAD 0 #define DO_WRITER_THREAD 1
#if DO_WRITER_THREAD #if DO_WRITER_THREAD
static void *cachetable_writer(void *); static void *cachetable_writer(void *);
#endif #endif
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment