Commit 61a2a36b authored by Yoni Fogel's avatar Yoni Fogel

Addresses #1730 Put in attempted fix for race condition.

Closign and opening a cachefile is now threadsafe (race condition when trying to open during a close).
If race condition hits, it will stall the open till the close is done.
All opens are now single threaded.  This can be improved if it causes a bottleneck.

Added comments for possible future race conditions after lock refinement.

git-svn-id: file:///svn/toku/tokudb@11644 c7de825b-a66e-492c-adef-691d508d4ae1
parent 84cf6125
...@@ -110,6 +110,7 @@ struct ctpair { ...@@ -110,6 +110,7 @@ struct ctpair {
static void * const zero_value = 0; static void * const zero_value = 0;
static int const zero_size = 0; static int const zero_size = 0;
static inline void static inline void
ctpair_add_ref(PAIR p) { ctpair_add_ref(PAIR p) {
assert(!p->already_removed); assert(!p->already_removed);
...@@ -138,13 +139,14 @@ struct cachetable { ...@@ -138,13 +139,14 @@ struct cachetable {
long size_limit; // the limit to the sum of the pair sizes long size_limit; // the limit to the sum of the pair sizes
long size_writing; // the sum of the sizes of the pairs being written long size_writing; // the sum of the sizes of the pairs being written
TOKULOGGER logger; TOKULOGGER logger;
toku_pthread_mutex_t *mutex; // coarse lock that protects the cachetable, the cachefiles, and the pair's toku_pthread_mutex_t *mutex; // coarse lock that protects the cachetable, the cachefiles, and the pairs
struct workqueue wq; // async work queue struct workqueue wq; // async work queue
THREADPOOL threadpool; // pool of worker threads THREADPOOL threadpool; // pool of worker threads
LSN lsn_of_checkpoint_in_progress; LSN lsn_of_checkpoint_in_progress;
PAIR pending_head; // list of pairs marked with checkpoint_pending PAIR pending_head; // list of pairs marked with checkpoint_pending
struct rwlock pending_lock; // multiple writer threads, single checkpoint thread struct rwlock pending_lock; // multiple writer threads, single checkpoint thread
struct minicron checkpointer; // the periodic checkpointing thread struct minicron checkpointer; // the periodic checkpointing thread
toku_pthread_mutex_t openfd_mutex; // make toku_cachetable_openfd() single-threaded
}; };
// Lock the cachetable // Lock the cachetable
...@@ -189,6 +191,7 @@ struct cachefile { ...@@ -189,6 +191,7 @@ struct cachefile {
int (*begin_checkpoint_userdata)(CACHEFILE cf, LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function. int (*begin_checkpoint_userdata)(CACHEFILE cf, LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function.
int (*checkpoint_userdata)(CACHEFILE cf, void *userdata); // when checkpointing a cachefile, call this function. int (*checkpoint_userdata)(CACHEFILE cf, void *userdata); // when checkpointing a cachefile, call this function.
int (*end_checkpoint_userdata)(CACHEFILE cf, void *userdata); // after checkpointing cachefiles call this function. int (*end_checkpoint_userdata)(CACHEFILE cf, void *userdata); // after checkpointing cachefiles call this function.
toku_pthread_cond_t openfd_wait; // openfd must wait until file is fully closed (purged from cachetable) if file is opened and closed simultaneously
}; };
static int static int
...@@ -240,6 +243,7 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_l ...@@ -240,6 +243,7 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_l
ct->logger = logger; ct->logger = logger;
toku_init_workers(&ct->wq, &ct->threadpool); toku_init_workers(&ct->wq, &ct->threadpool);
ct->mutex = workqueue_lock_ref(&ct->wq); ct->mutex = workqueue_lock_ref(&ct->wq);
int r = toku_pthread_mutex_init(&ct->openfd_mutex, NULL); assert(r == 0);
toku_minicron_setup(&ct->checkpointer, 0, checkpoint_thread, ct); // default is no checkpointing toku_minicron_setup(&ct->checkpointer, 0, checkpoint_thread, ct); // default is no checkpointing
*result = ct; *result = ct;
return 0; return 0;
...@@ -273,27 +277,40 @@ cachefile_refup (CACHEFILE cf) { ...@@ -273,27 +277,40 @@ cachefile_refup (CACHEFILE cf) {
cf->refcount++; cf->refcount++;
} }
// If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile. // If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile.
int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char *fname) { int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char *fname) {
int r; int r;
CACHEFILE extant; CACHEFILE extant;
struct fileid fileid; struct fileid fileid;
r = toku_os_get_unique_file_id(fd, &fileid); r = toku_os_get_unique_file_id(fd, &fileid);
if (r != 0) { if (r != 0) {
r=errno; close(fd); r=errno; close(fd);
return r; return r;
} }
r = toku_pthread_mutex_lock(&ct->openfd_mutex); // purpose is to make this function single-threaded
assert(r==0);
cachetable_lock(ct); cachetable_lock(ct);
for (extant = ct->cachefiles; extant; extant=extant->next) { for (extant = ct->cachefiles; extant; extant=extant->next) {
if (memcmp(&extant->fileid, &fileid, sizeof(fileid))==0) { if (memcmp(&extant->fileid, &fileid, sizeof(fileid))==0) {
//File is already open (and in cachetable as extant)
cachefile_refup(extant);
if (extant->refcount==1) {
// if another thread is closing this file, wait until the close is fully complete
r = toku_pthread_cond_wait(&extant->openfd_wait, ct->mutex);
assert(r == 0);
break; // other thread has closed this file, go create a new cachefile
}
r = close(fd); r = close(fd);
assert(r == 0); assert(r == 0);
cachefile_refup(extant); // re-use pre-existing cachefile
*cfptr = extant; *cfptr = extant;
cachetable_unlock(ct); r = 0;
return 0; goto exit;
} }
} }
//File is not open. Make a new cachefile.
try_again: try_again:
for (extant = ct->cachefiles; extant; extant=extant->next) { for (extant = ct->cachefiles; extant; extant=extant->next) {
if (next_filenum_to_use.fileid==extant->filenum.fileid) { if (next_filenum_to_use.fileid==extant->filenum.fileid) {
...@@ -302,6 +319,7 @@ int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char ...@@ -302,6 +319,7 @@ int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char
} }
} }
{ {
// create a new cachefile entry in the cachetable
CACHEFILE XCALLOC(newcf); CACHEFILE XCALLOC(newcf);
newcf->cachetable = ct; newcf->cachetable = ct;
newcf->filenum.fileid = next_filenum_to_use.fileid++; newcf->filenum.fileid = next_filenum_to_use.fileid++;
...@@ -309,17 +327,17 @@ int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char ...@@ -309,17 +327,17 @@ int toku_cachetable_openfd (CACHEFILE *cfptr, CACHETABLE ct, int fd, const char
newcf->refcount = 1; newcf->refcount = 1;
newcf->next = ct->cachefiles; newcf->next = ct->cachefiles;
ct->cachefiles = newcf; ct->cachefiles = newcf;
r = toku_pthread_cond_init(&newcf->openfd_wait, NULL); assert(r == 0);
newcf->userdata = 0;
newcf->close_userdata = 0;
newcf->checkpoint_userdata = 0;
newcf->begin_checkpoint_userdata = 0;
newcf->end_checkpoint_userdata = 0;
*cfptr = newcf; *cfptr = newcf;
cachetable_unlock(ct); r = 0;
return 0;
} }
exit:
{
int rm = toku_pthread_mutex_unlock(&ct->openfd_mutex);
assert (rm == 0);
}
cachetable_unlock(ct);
return r;
} }
//TEST_ONLY_FUNCTION //TEST_ONLY_FUNCTION
...@@ -410,14 +428,27 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string ...@@ -410,14 +428,27 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string
assert(!cf->next_in_checkpoint); assert(!cf->next_in_checkpoint);
assert(!cf->for_checkpoint); assert(!cf->for_checkpoint);
int r; int r;
// cachetable_flush_cachefile() may release and retake cachetable_lock,
// allowing another thread to get into toku_cachetable_openfd()
if ((r = cachetable_flush_cachefile(ct, cf))) { if ((r = cachetable_flush_cachefile(ct, cf))) {
error: error:
cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles); cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles);
if (cf->refcount > 0) {
int rs;
assert(cf->refcount == 1); // toku_cachetable_openfd() is single-threaded
rs = toku_pthread_cond_signal(&cf->openfd_wait); assert(rs == 0);
}
// we can destroy the condition variable because if there was another thread waiting, it was already signalled
{
int rd = toku_pthread_cond_destroy(&cf->openfd_wait);
assert(rd == 0);
}
if (cf->fname) toku_free(cf->fname); if (cf->fname) toku_free(cf->fname);
int r2 = close(cf->fd); int r2 = close(cf->fd);
if (r2!=0) fprintf(stderr, "%s:%d During error handling, could not close file r=%d errno=%d\n", __FILE__, __LINE__, r2, errno); if (r2!=0) fprintf(stderr, "%s:%d During error handling, could not close file r=%d errno=%d\n", __FILE__, __LINE__, r2, errno);
//assert(r == 0); //assert(r == 0);
toku_free(cf); toku_free(cf);
*cfp = NULL;
cachetable_unlock(ct); cachetable_unlock(ct);
return r; return r;
} }
...@@ -430,6 +461,18 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string ...@@ -430,6 +461,18 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string
cf->end_checkpoint_userdata = NULL; cf->end_checkpoint_userdata = NULL;
cf->userdata = NULL; cf->userdata = NULL;
cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles); cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles);
// refcount could be non-zero if another thread is trying to open this cachefile,
// but is blocked in toku_cachetable_openfd() waiting for us to finish closing it.
if (cf->refcount > 0) {
int rs;
assert(cf->refcount == 1); // toku_cachetable_openfd() is single-threaded
rs = toku_pthread_cond_signal(&cf->openfd_wait); assert(rs == 0);
}
// we can destroy the condition variable because if there was another thread waiting, it was already signalled
{
int rd = toku_pthread_cond_destroy(&cf->openfd_wait);
assert(rd == 0);
}
cachetable_unlock(ct); cachetable_unlock(ct);
r = close(cf->fd); r = close(cf->fd);
assert(r == 0); assert(r == 0);
...@@ -441,11 +484,11 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string ...@@ -441,11 +484,11 @@ int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string
} }
if (cf->fname) toku_free(cf->fname); if (cf->fname) toku_free(cf->fname);
toku_free(cf); toku_free(cf);
*cfp=0; *cfp=NULL;
return r; return r;
} else { } else {
cachetable_unlock(ct); cachetable_unlock(ct);
*cfp=0; *cfp=NULL;
return 0; return 0;
} }
} }
...@@ -1226,8 +1269,9 @@ static void assert_cachefile_is_flushed_and_removed (CACHETABLE ct, CACHEFILE cf ...@@ -1226,8 +1269,9 @@ static void assert_cachefile_is_flushed_and_removed (CACHETABLE ct, CACHEFILE cf
} }
} }
// Flush all of the pairs that belong to a cachefile (or all pairs if // Flush (write to disk) all of the pairs that belong to a cachefile (or all pairs if
// the cachefile is NULL. // the cachefile is NULL.
// Must be holding cachetable lock on entry.
static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) { static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
unsigned nfound = 0; unsigned nfound = 0;
struct workqueue cq; struct workqueue cq;
...@@ -1268,15 +1312,14 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) { ...@@ -1268,15 +1312,14 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
//and go to the next entry. //and go to the next entry.
for (i=0; i < num_pairs; i++) { for (i=0; i < num_pairs; i++) {
PAIR p = list[i]; PAIR p = list[i];
if (p->already_removed) { if (!p->already_removed) {
ctpair_destroy(p); //Release our reference
continue;
}
assert(cf == 0 || p->cachefile==cf); assert(cf == 0 || p->cachefile==cf);
nfound++; nfound++;
p->cq = &cq; p->cq = &cq;
if (p->state == CTPAIR_IDLE) if (p->state == CTPAIR_IDLE)
flush_and_maybe_remove(ct, p, TRUE); flush_and_maybe_remove(ct, p, TRUE); //TODO: 1485 If this is being removed, why is it counted in nfound?
//TODO: 1485 How are things being added to the queue?
}
ctpair_destroy(p); //Release our reference ctpair_destroy(p); //Release our reference
} }
toku_free(list); toku_free(list);
...@@ -1285,15 +1328,20 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) { ...@@ -1285,15 +1328,20 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
for (i=0; i<nfound; i++) { for (i=0; i<nfound; i++) {
cachetable_unlock(ct); cachetable_unlock(ct);
WORKITEM wi = 0; WORKITEM wi = 0;
//This workqueue's mutex is NOT the cachetable lock.
//You must not be holding the cachetable lock during the dequeue.
int r = workqueue_deq(&cq, &wi, 1); assert(r == 0); int r = workqueue_deq(&cq, &wi, 1); assert(r == 0);
cachetable_lock(ct); cachetable_lock(ct);
PAIR p = workitem_arg(wi); PAIR p = workitem_arg(wi);
p->cq = 0; p->cq = 0;
if (p->state == CTPAIR_READING) { if (p->state == CTPAIR_READING) { //TODO: 1485 Doesn't this mean someone ELSE is holding a lock?
rwlock_write_unlock(&p->rwlock); rwlock_write_unlock(&p->rwlock); //TODO: 1485 When did we grab a write lock? (the other person who grabbed the read (write) lock?
cachetable_maybe_remove_and_free_pair(ct, p); //Does this mean no one has a pin? since it never finished...
} else if (p->state == CTPAIR_WRITING) { // POSSIBLE CAUSE
cachetable_complete_write_pair(ct, p, TRUE); cachetable_maybe_remove_and_free_pair(ct, p); //TODO: 1485 MUST be removed. Can't be 'maybe_remove'
} else if (p->state == CTPAIR_WRITING) { //TODO: 1485 This could mean WE or SOMEONE ELSE is holding a lock, right? Can't be both.
//Someone else could have a PIN! This could be the cause.
cachetable_complete_write_pair(ct, p, TRUE); //TODO: 1485 MUST be removed. Can't be 'maybe_remove'
} else if (p->state == CTPAIR_INVALID) { } else if (p->state == CTPAIR_INVALID) {
abort_fetch_pair(p); abort_fetch_pair(p);
} else } else
...@@ -1336,6 +1384,7 @@ toku_cachetable_close (CACHETABLE *ctp) { ...@@ -1336,6 +1384,7 @@ toku_cachetable_close (CACHETABLE *ctp) {
} }
assert(ct->size_writing == 0); assert(ct->size_writing == 0);
rwlock_destroy(&ct->pending_lock); rwlock_destroy(&ct->pending_lock);
r = toku_pthread_mutex_destroy(&ct->openfd_mutex); assert(r == 0);
cachetable_unlock(ct); cachetable_unlock(ct);
toku_destroy_workers(&ct->wq, &ct->threadpool); toku_destroy_workers(&ct->wq, &ct->threadpool);
toku_free(ct->table); toku_free(ct->table);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment