Commit a82c6dd9 authored by Sergey Vojtovich's avatar Sergey Vojtovich

Fixup: one background thread for all caches

In reply to:
There is one thread per cache. Isn't that a bit overkill as we will have 3-4 caches?
parent 5bdb2ae3
......@@ -150,7 +150,6 @@ int open_cache(PMEM_APPEND_CACHE *cache, PMEM_APPEND_CACHE_DIRECTORY *dir,
cache->buffer_size= cache_end - cache_start -
sizeof(PMEM_APPEND_CACHE_HEADER) -
cache->header->file_name_length;
cache->stop_flusher= 0;
cache->flushed_eof= cache->header->flushed_eof;
cache->cached_eof= cache->header->cached_eof;
cache->reserved_eof= cache->cached_eof;
......@@ -223,16 +222,36 @@ static int flush_cache(PMEM_APPEND_CACHE *cache)
static void *flusher_thread(void *arg)
{
PMEM_APPEND_CACHE *cache= (PMEM_APPEND_CACHE*) arg;
while (!my_atomic_load64_explicit(&cache->stop_flusher,
MY_MEMORY_ORDER_RELAXED))
PMEM_APPEND_CACHE_DIRECTORY *dir= (PMEM_APPEND_CACHE_DIRECTORY*) arg;
pthread_mutex_lock(&dir->mutex);
while (!dir->stop_flusher)
{
if (flush_cache(cache))
abort();
for (LIST *list= dir->caches; list; list= list->next)
{
if (flush_cache((PMEM_APPEND_CACHE*) list->data))
abort();
}
pthread_mutex_unlock(&dir->mutex);
my_sleep(1000);
pthread_mutex_lock(&dir->mutex);
}
pthread_mutex_unlock(&dir->mutex);
return 0;
}
static int init_directory(PMEM_APPEND_CACHE_DIRECTORY *dir)
{
pthread_mutex_init(&dir->mutex, 0);
dir->caches= 0;
dir->stop_flusher= false;
if (pthread_create(&dir->flusher_thread, 0, flusher_thread, dir))
{
pthread_mutex_destroy(&dir->mutex);
pmem_unmap(dir->header, dir->mapped_length);
dir->header= 0;
return -1;
}
if (flush_cache(cache))
abort();
return 0;
}
......@@ -367,7 +386,7 @@ int pmem_append_cache_create(const char *path, uint64_t size,
PMEM_APPEND_CACHE_DIRECTORY dir;
int res= create_directory(&dir, path, size, n_caches);
if (!res && (res= pmem_append_cache_close(&dir)))
if (!res && (res= pmem_unmap(dir.header, dir.mapped_length)))
my_delete(path, MYF(MY_WME));
return res;
}
......@@ -389,6 +408,9 @@ int pmem_append_cache_open(PMEM_APPEND_CACHE_DIRECTORY *dir, const char *path)
if (!(dir->header= pmem_map_file(path, 0, 0, 0, &dir->mapped_length, 0)))
return -1;
if (init_directory(dir))
return -1;
if (dir->mapped_length < sizeof(PMEM_APPEND_CACHE_DIRECTORY_HEADER) ||
dir->header->magic != pmem_append_cache_magic ||
!dir->header->n_caches ||
......@@ -423,6 +445,12 @@ int pmem_append_cache_close(PMEM_APPEND_CACHE_DIRECTORY *dir)
{
int res= pmem_unmap(dir->header, dir->mapped_length);
dir->header= 0;
DBUG_ASSERT(!dir->caches);
pthread_mutex_lock(&dir->mutex);
dir->stop_flusher= true;
pthread_mutex_unlock(&dir->mutex);
pthread_join(dir->flusher_thread, 0);
pthread_mutex_destroy(&dir->mutex);
return res;
}
return 0;
......@@ -498,6 +526,7 @@ int pmem_append_cache_flush(PMEM_APPEND_CACHE_DIRECTORY *dir)
int pmem_append_cache_init(PMEM_APPEND_CACHE_DIRECTORY *dir, const char *path,
uint64_t size, uint32_t n_caches)
{
int res;
if (!path)
{
dir->header= 0;
......@@ -520,7 +549,9 @@ int pmem_append_cache_init(PMEM_APPEND_CACHE_DIRECTORY *dir, const char *path,
return -1;
}
create:
return create_directory(dir, path, size, n_caches);
if (!(res= create_directory(dir, path, size, n_caches)))
res= init_directory(dir);
return res;
}
......@@ -533,8 +564,7 @@ int pmem_append_cache_init(PMEM_APPEND_CACHE_DIRECTORY *dir, const char *path,
@param file_fd file descriptor
@param file_name name of file
Upon successful completion cache becomes usable, background flusher thread is
started.
Upon successful completion cache becomes usable.
It is only possible to attach to detached cache slot, that is only if
file_name_length is 0.
......@@ -594,9 +624,11 @@ int pmem_append_cache_attach(PMEM_APPEND_CACHE *cache,
cache->buffer+= file_name_length;
cache->buffer_size-= file_name_length;
if (pthread_create(&cache->flusher_thread, 0, flusher_thread, cache))
return -1;
cache->element.data= cache;
cache->dir= dir;
pthread_mutex_lock(&dir->mutex);
dir->caches= list_add(dir->caches, &cache->element);
pthread_mutex_unlock(&dir->mutex);
return 0;
}
......@@ -606,8 +638,8 @@ int pmem_append_cache_attach(PMEM_APPEND_CACHE *cache,
@param cache cache descriptor
Flushes cached data, stops background thread, marks directory slot free by
resetting file_name_length, cache becomes unusable.
Flushes cached data, marks directory slot free by resetting file_name_length,
cache becomes unusable.
Cache must not be accessed concurrently for the duration of this function
call.
......@@ -627,17 +659,15 @@ int pmem_append_cache_detach(PMEM_APPEND_CACHE *cache)
if (cache->write == no_cache_write)
return 0;
my_atomic_store32_explicit(&cache->stop_flusher, 1, MY_MEMORY_ORDER_RELAXED);
if (!(res= pthread_join(cache->flusher_thread, 0)))
pthread_mutex_lock(&cache->dir->mutex);
cache->dir->caches= list_delete(cache->dir->caches, &cache->element);
pthread_mutex_unlock(&cache->dir->mutex);
if (!(res= flush_cache(cache)))
{
if (cache->flushed_eof == cache->cached_eof)
{
cache->header->file_name_length= 0;
pmem_persist(&cache->header->file_name_length,
sizeof(cache->header->file_name_length));
}
else
res= -1;
cache->header->file_name_length= 0;
pmem_persist(&cache->header->file_name_length,
sizeof(cache->header->file_name_length));
}
return res;
}
......@@ -21,6 +21,7 @@
#include <stdint.h>
#include <stdbool.h>
#include <my_global.h>
#include <my_list.h>
#ifdef __cplusplus
......@@ -77,9 +78,13 @@ typedef struct st_pmem_append_cache_directory_header
/** In-memory cache directory descriptor. */
typedef struct st_pmem_append_cache_directory
{
LIST *caches;
pthread_t flusher_thread;
pthread_mutex_t mutex;
PMEM_APPEND_CACHE_DIRECTORY_HEADER *header;
uint64_t *start_offsets;
size_t mapped_length;
bool stop_flusher;
} PMEM_APPEND_CACHE_DIRECTORY;
......@@ -95,13 +100,13 @@ typedef struct st_pmem_append_cache_header
/** In-memory append cache descriptor. */
typedef struct st_pmem_append_cache
{
pthread_t flusher_thread;
LIST element;
PMEM_APPEND_CACHE_DIRECTORY *dir;
PMEM_APPEND_CACHE_HEADER *header;
char *file_name;
void *buffer;
uint64_t buffer_size;
File file_fd;
int32 stop_flusher;
size_t (*write)(struct st_pmem_append_cache *cache, const void *data,
size_t length, myf flags);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment