Commit d6ab1513 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge the tokudb.1665 branch into the main line. Fixes #1665.

{{{
svn merge -r 11185:11261 https://svn.tokutek.com/tokudb/toku/tokudb.1665
}}}


git-svn-id: file:///svn/toku/tokudb@11264 c7de825b-a66e-492c-adef-691d508d4ae1
parent faa3df9e
...@@ -9,6 +9,7 @@ extern "C" { ...@@ -9,6 +9,7 @@ extern "C" {
#endif #endif
#include <pthread.h> #include <pthread.h>
#include <time.h>
typedef pthread_attr_t toku_pthread_attr_t; typedef pthread_attr_t toku_pthread_attr_t;
typedef pthread_t toku_pthread_t; typedef pthread_t toku_pthread_t;
...@@ -18,6 +19,7 @@ typedef pthread_condattr_t toku_pthread_condattr_t; ...@@ -18,6 +19,7 @@ typedef pthread_condattr_t toku_pthread_condattr_t;
typedef pthread_cond_t toku_pthread_cond_t; typedef pthread_cond_t toku_pthread_cond_t;
typedef pthread_rwlock_t toku_pthread_rwlock_t; typedef pthread_rwlock_t toku_pthread_rwlock_t;
typedef pthread_rwlockattr_t toku_pthread_rwlockattr_t; typedef pthread_rwlockattr_t toku_pthread_rwlockattr_t;
typedef struct timespec toku_timespec_t;
static inline int static inline int
toku_pthread_rwlock_init(toku_pthread_rwlock_t *__restrict rwlock, const toku_pthread_rwlockattr_t *__restrict attr) { toku_pthread_rwlock_init(toku_pthread_rwlock_t *__restrict rwlock, const toku_pthread_rwlockattr_t *__restrict attr) {
...@@ -125,6 +127,11 @@ int toku_pthread_cond_wait(toku_pthread_cond_t *cond, toku_pthread_mutex_t *mute ...@@ -125,6 +127,11 @@ int toku_pthread_cond_wait(toku_pthread_cond_t *cond, toku_pthread_mutex_t *mute
return pthread_cond_wait(cond, mutex); return pthread_cond_wait(cond, mutex);
} }
static inline
int toku_pthread_cond_timedwait(toku_pthread_cond_t *cond, toku_pthread_mutex_t *mutex, toku_timespec_t *wakeup_at) {
return pthread_cond_timedwait(cond, mutex, wakeup_at);
}
static inline static inline
int toku_pthread_cond_signal(toku_pthread_cond_t *cond) { int toku_pthread_cond_signal(toku_pthread_cond_t *cond) {
return pthread_cond_signal(cond); return pthread_cond_signal(cond);
......
...@@ -54,6 +54,7 @@ BRT_SOURCES = \ ...@@ -54,6 +54,7 @@ BRT_SOURCES = \
log_code \ log_code \
memarena \ memarena \
mempool \ mempool \
minicron \
omt \ omt \
recover \ recover \
roll \ roll \
......
...@@ -17,6 +17,8 @@ ...@@ -17,6 +17,8 @@
#include "rwlock.h" #include "rwlock.h"
#include "toku_worker.h" #include "toku_worker.h"
#include "log_header.h" #include "log_header.h"
#include "checkpoint.h"
#include "minicron.h"
#if !defined(TOKU_CACHETABLE_DO_EVICT_FROM_WRITER) #if !defined(TOKU_CACHETABLE_DO_EVICT_FROM_WRITER)
#error #error
...@@ -142,6 +144,7 @@ struct cachetable { ...@@ -142,6 +144,7 @@ struct cachetable {
LSN lsn_of_checkpoint_in_progress; LSN lsn_of_checkpoint_in_progress;
PAIR pending_head; // list of pairs marked with checkpoint_pending PAIR pending_head; // list of pairs marked with checkpoint_pending
struct rwlock pending_lock; // multiple writer threads, single checkpoint thread struct rwlock pending_lock; // multiple writer threads, single checkpoint thread
struct minicron checkpointer; // the periodic checkpointing thread
}; };
// Lock the cachetable // Lock the cachetable
...@@ -188,6 +191,32 @@ struct cachefile { ...@@ -188,6 +191,32 @@ struct cachefile {
int (*end_checkpoint_userdata)(CACHEFILE cf, void *userdata); // after checkpointing cachefiles call this function. int (*end_checkpoint_userdata)(CACHEFILE cf, void *userdata); // after checkpointing cachefiles call this function.
}; };
static int
checkpoint_thread (void *cachetable_v)
// Effect: If checkpoint_period>0 thn periodically run a checkpoint.
// If someone changes the checkpoint_period (calling toku_set_checkpoint_period), then the checkpoint will run sooner or later.
// If someone sets the checkpoint_shutdown boolean , then this thread exits.
// This thread notices those changes by waiting on a condition variable.
{
char *error_string;
CACHETABLE ct = cachetable_v;
printf("%s:%d Checkpointing\n", __FILE__, __LINE__);
int r = toku_checkpoint(ct, ct->logger, &error_string);
if (r) {
if (error_string) {
fprintf(stderr, "%s:%d Got error %d while doing: %s\n", __FILE__, __LINE__, r, error_string);
} else {
fprintf(stderr, "%s:%d Got error %d while doing checkpoint\n", __FILE__, __LINE__, r);
}
abort(); // Don't quite know what to do with these errors.
}
return r;
}
int toku_set_checkpoint_period (CACHETABLE ct, u_int32_t new_period) {
return toku_minicron_change_period(&ct->checkpointer, new_period);
}
int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_lsn), TOKULOGGER logger) { int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_lsn), TOKULOGGER logger) {
#if defined __linux__ #if defined __linux__
{ {
...@@ -208,6 +237,7 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_l ...@@ -208,6 +237,7 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN UU(initial_l
ct->logger = logger; ct->logger = logger;
toku_init_workers(&ct->wq, &ct->threadpool); toku_init_workers(&ct->wq, &ct->threadpool);
ct->mutex = workqueue_lock_ref(&ct->wq); ct->mutex = workqueue_lock_ref(&ct->wq);
toku_minicron_setup(&ct->checkpointer, 0, checkpoint_thread, ct); // default is no checkpointing
*result = ct; *result = ct;
return 0; return 0;
} }
...@@ -1271,6 +1301,10 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) { ...@@ -1271,6 +1301,10 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
/* Require that it all be flushed. */ /* Require that it all be flushed. */
int toku_cachetable_close (CACHETABLE *ctp) { int toku_cachetable_close (CACHETABLE *ctp) {
CACHETABLE ct=*ctp; CACHETABLE ct=*ctp;
{
int r = toku_minicron_shutdown(&ct->checkpointer);
assert(r==0);
}
int r; int r;
cachetable_lock(ct); cachetable_lock(ct);
if ((r=cachetable_flush_cachefile(ct, 0))) { if ((r=cachetable_flush_cachefile(ct, 0))) {
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#ident "$Id:$"
#include <errno.h>
#include <string.h>
#include "toku_assert.h"
#include "brttypes.h"
#include "minicron.h"
#include "toku_portability.h"
static void
toku_gettime (struct timespec *a) {
struct timeval tv;
gettimeofday(&tv, 0);
a->tv_sec = tv.tv_sec;
a->tv_nsec = tv.tv_usec * 1000LL;
}
static int
timespec_compare (struct timespec *a, struct timespec *b) {
if (a->tv_sec > b->tv_sec) return 1;
if (a->tv_sec < b->tv_sec) return -1;
if (a->tv_nsec > b->tv_nsec) return 1;
if (a->tv_nsec < b->tv_nsec) return -1;
return 0;
}
// Implementation notes:
// When calling do_shutdown or change_period, the mutex is obtained, the variables in the minicron struct are modified, and
// the condition variable is signalled. Possibly the minicron thread will miss the signal. To avoid this problem, whenever
// the minicron thread acquires the mutex, it must check to see what the variables say to do (e.g., should it shut down?).
static void*
minicron_do (void *pv)
{
struct minicron *p = pv;
int r = toku_pthread_mutex_lock(&p->mutex);
assert(r==0);
while (1) {
if (p->do_shutdown) {
r = toku_pthread_mutex_unlock(&p->mutex);
assert(r==0);
return 0;
}
if (p->period_in_seconds==0) {
// if we aren't supposed to do it then just do an untimed wait.
r = toku_pthread_cond_wait(&p->condvar, &p->mutex);
assert(r==0);
} else {
// Recompute the wakeup time every time (instead of once per call to f) in case the period changges.
struct timespec wakeup_at = p->time_of_last_call_to_f;
wakeup_at.tv_sec += p->period_in_seconds;
struct timespec now;
toku_gettime(&now);
//printf("wakeup at %.6f (after %d seconds) now=%.6f\n", wakeup_at.tv_sec + wakeup_at.tv_nsec*1e-9, p->period_in_seconds, now.tv_sec + now.tv_nsec*1e-9);
r = toku_pthread_cond_timedwait(&p->condvar, &p->mutex, &wakeup_at);
if (r!=0 && r!=ETIMEDOUT) fprintf(stderr, "%s:%d r=%d (%s)", __FILE__, __LINE__, r, strerror(r));
assert(r==0 || r==ETIMEDOUT);
}
// Now we woke up, and we should figure out what to do
if (p->do_shutdown) {
r = toku_pthread_mutex_unlock(&p->mutex);
assert(r==0);
return 0;
}
if (p->period_in_seconds >0) {
// maybe do a checkpoint
struct timespec now;
toku_gettime(&now);
struct timespec time_to_call = p->time_of_last_call_to_f;
time_to_call.tv_sec += p->period_in_seconds;
int compare = timespec_compare(&time_to_call, &now);
//printf("compare(%.6f, %.6f)=%d\n", time_to_call.tv_sec + time_to_call.tv_nsec*1e-9, now.tv_sec+now.tv_nsec*1e-9, compare);
if (compare <= 0) {
r = toku_pthread_mutex_unlock(&p->mutex);
assert(r==0);
r = p->f(p->arg);
assert(r==0);
r = toku_pthread_mutex_lock(&p->mutex);
assert(r==0);
toku_gettime(&p->time_of_last_call_to_f); // the period is measured between calls to f.
}
}
}
}
int
toku_minicron_setup(struct minicron *p, u_int32_t period_in_seconds, int(*f)(void *), void *arg)
{
p->f = f;
p->arg = arg;
toku_gettime(&p->time_of_last_call_to_f);
//printf("now=%.6f", p->time_of_last_call_to_f.tv_sec + p->time_of_last_call_to_f.tv_nsec*1e-9);
p->period_in_seconds = period_in_seconds;
p->do_shutdown = FALSE;
{ int r = toku_pthread_mutex_init(&p->mutex, 0); assert(r==0); }
{ int r = toku_pthread_cond_init (&p->condvar, 0); assert(r==0); }
//printf("%s:%d setup period=%d\n", __FILE__, __LINE__, period_in_seconds);
return toku_pthread_create(&p->thread, 0, minicron_do, p);
}
int
toku_minicron_change_period(struct minicron *p, u_int32_t new_period)
{
int r = toku_pthread_mutex_lock(&p->mutex); assert(r==0);
p->period_in_seconds = new_period;
r = toku_pthread_cond_signal(&p->condvar); assert(r==0);
r = toku_pthread_mutex_unlock(&p->mutex); assert(r==0);
return 0;
}
int
toku_minicron_shutdown(struct minicron *p) {
int r = toku_pthread_mutex_lock(&p->mutex); assert(r==0);
p->do_shutdown = TRUE;
//printf("%s:%d signalling\n", __FILE__, __LINE__);
r = toku_pthread_cond_signal(&p->condvar); assert(r==0);
r = toku_pthread_mutex_unlock(&p->mutex); assert(r==0);
void *returned_value;
//printf("%s:%d joining\n", __FILE__, __LINE__);
r = toku_pthread_join(p->thread, &returned_value);
if (r!=0) fprintf(stderr, "%s:%d r=%d (%s)\n", __FILE__, __LINE__, r, strerror(r));
assert(r==0); assert(returned_value==0);
r = toku_pthread_cond_destroy(&p->condvar); assert(r==0);
//printf("%s:%d shutdowned\n", __FILE__, __LINE__);
return 0;
}
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved."
#ident "$Id:$"
#include <toku_pthread.h>
#include <toku_time.h>
#include "brttypes.h"
// Specification:
// A minicron is a miniature cron job for executing a job periodically inside a pthread.
// To create a minicron,
// 1) allocate a "struct minicron" somewhere.
// Rationale: This struct can be stored inside another struct (such as the cachetable), avoiding a malloc/free pair.
// 2) call toku_minicron_setup, specifying a period (in seconds), a function, and some arguments.
// If the period is positive then the function is called periodically (with the period specified)
// Note: The period is measured from when the previous call to f finishes to when the new call starts.
// Thus, if the period is 5 minutes, and it takes 8 minutes to run f, then the actual periodicity is 13 minutes.
// Rationale: If f always takes longer than f to run, then it will get "behind". This module makes getting behind explicit.
// 3) When finished, call toku_minicron_shutdown.
// 4) If you want to change the period, then call toku_minicron_change_period. The time since f finished is applied to the new period
// and the call is rescheduled. (If the time since f finished is more than the new period, then f is called immediately).
struct minicron {
pthread_t thread;
struct timespec time_of_last_call_to_f;
pthread_mutex_t mutex;
pthread_cond_t condvar;
int (*f)(void*);
void *arg;
u_int32_t period_in_seconds;
BOOL do_shutdown;
};
int toku_minicron_setup (struct minicron *s, u_int32_t period_in_seconds, int(*f)(void *), void *arg);
int toku_minicron_change_period(struct minicron *p, u_int32_t new_period);
int toku_minicron_shutdown(struct minicron *p);
...@@ -84,6 +84,7 @@ REGRESSION_TESTS_RAW = \ ...@@ -84,6 +84,7 @@ REGRESSION_TESTS_RAW = \
log-test6 \ log-test6 \
log-test7 \ log-test7 \
memtest \ memtest \
minicron-test \
omt-cursor-test \ omt-cursor-test \
omt-test \ omt-test \
shortcut \ shortcut \
......
#include "minicron.h"
#include <unistd.h>
#include <assert.h>
static double
tdiff (struct timeval *a, struct timeval *b) {
return (a->tv_sec-b->tv_sec) + (a->tv_usec-b->tv_usec)*1e-6;
}
struct timeval starttime;
static double elapsed (void) {
struct timeval now;
gettimeofday(&now, 0);
return tdiff(&now, &starttime);
}
static int __attribute__((__noreturn__))
never_run (void *a) {
assert(a==0);
assert(0);
}
// Can we start something with period=0 (the function should never run) and shut it down.
static void*
test1 (void* v)
{
struct minicron m;
int r = toku_minicron_setup(&m, 0, never_run, 0); assert(r==0);
sleep(1);
r = toku_minicron_shutdown(&m); assert(r==0);
return v;
}
// Can we start something with period=10 and shut it down after 2 seconds (the function should never run) .
static void*
test2 (void* v)
{
struct minicron m;
int r = toku_minicron_setup(&m, 10, never_run, 0); assert(r==0);
sleep(2);
r = toku_minicron_shutdown(&m); assert(r==0);
return v;
}
struct tenx {
struct timeval tv;
int counter;
};
static int
run_5x (void *v) {
struct tenx *tx=v;
struct timeval now;
gettimeofday(&now, 0);
double diff = tdiff(&now, &tx->tv);
printf("T=%f\n", diff);
assert(diff>0.5 + tx->counter);
assert(diff<1.5 + tx->counter);
tx->counter++;
return 0;
}
// Start something with period=1 and run it a few times
static void*
test3 (void* v)
{
struct minicron m;
struct tenx tx;
gettimeofday(&tx.tv, 0);
tx.counter=0;
int r = toku_minicron_setup(&m, 1, run_5x, &tx); assert(r==0);
sleep(5);
r = toku_minicron_shutdown(&m); assert(r==0);
assert(tx.counter>=4 && tx.counter<=5); // after 5 seconds it could have run 4 or 5 times.
return v;
}
static int
run_3sec (void *v) {
printf("start3sec at %.6f\n", elapsed());
int *counter = v;
(*counter)++;
sleep(3);
printf("end3sec at %.6f\n", elapsed());
return 0;
}
// make sure that if f is really slow that it doesn't run too many times
static void*
test4 (void *v) {
struct minicron m;
int counter = 0;
int r = toku_minicron_setup(&m, 2, run_3sec, &counter); assert(r==0);
sleep(9);
r = toku_minicron_shutdown(&m); assert(r==0);
assert(counter==2);
return v;
}
static void*
test5 (void *v) {
struct minicron m;
int counter = 0;
int r = toku_minicron_setup(&m, 10, run_3sec, &counter); assert(r==0);
r = toku_minicron_change_period(&m, 2); assert(r==0);
sleep(9);
r = toku_minicron_shutdown(&m); assert(r==0);
assert(counter==2);
return v;
}
static void*
test6 (void *v) {
struct minicron m;
int r = toku_minicron_setup(&m, 5, never_run, 0); assert(r==0);
r = toku_minicron_change_period(&m, 0); assert(r==0);
sleep(7);
r = toku_minicron_shutdown(&m); assert(r==0);
return v;
}
typedef void*(*ptf)(void*);
int
main (int argc __attribute__((__unused__)), const char *argv[] __attribute__((__unused__)))
{
gettimeofday(&starttime, 0);
ptf testfuns[] = {test1, test2, test3,
test4,
test5,
test6
};
#define N (sizeof(testfuns)/sizeof(testfuns[0]))
pthread_t tests[N];
unsigned int i;
for (i=0; i<N; i++) {
int r=toku_pthread_create(tests+i, 0, testfuns[i], 0);
assert(r==0);
}
for (i=0; i<N; i++) {
void *v;
int r=toku_pthread_join(tests[i], &v);
assert(r==0);
assert(v==0);
}
return 0;
}
...@@ -54,9 +54,11 @@ else ...@@ -54,9 +54,11 @@ else
TDB_TESTS = $(patsubst %.c,%.tdb$(BINSUF),$(SRCS)) TDB_TESTS = $(patsubst %.c,%.tdb$(BINSUF),$(SRCS))
endif endif
# For diskfull.bdb: db-4.6 seems OK, but db-4.3 segfaults
BDB_DONTRUN_TESTS = \ BDB_DONTRUN_TESTS = \
bug1381 \ bug1381 \
bug627 \ bug627 \
diskfull \
test_abort1 \ test_abort1 \
test_abort4 \ test_abort4 \
test_abort5 \ test_abort5 \
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment