Commit c3bb6b55 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#3344 speedup cachefile_flush_cachefile by maintaining a list of pairs for...

#3344 speedup cachefile_flush_cachefile by maintaining a list of pairs for each cachefile refs[t:3344]

git-svn-id: file:///svn/toku/tokudb@29770 c7de825b-a66e-492c-adef-691d508d4ae1
parent 02b729ac
......@@ -100,11 +100,12 @@ struct ctpair {
PAIR pending_next;
PAIR pending_prev;
struct rwlock rwlock; // multiple get's, single writer
struct rwlock rwlock; // multiple get's, single writer
struct workqueue *cq; // writers sometimes return ctpair's using this queue
struct workitem asyncwork; // work item for the worker threads
u_int32_t refs; //References that prevent descruction
int already_removed; //If a pair is removed from the cachetable, but cannot be freed because refs>0, this is set.
u_int32_t refs; // References that prevent destruction
int already_removed; // If a pair is removed from the cachetable, but cannot be freed because refs>0, this is set.
struct toku_list next_for_cachefile; // link in the cachefile list
};
static void * const zero_value = 0;
......@@ -207,6 +208,7 @@ enum cachefile_checkpoint_state {
struct cachefile {
CACHEFILE next;
CACHEFILE next_in_checkpoint;
struct toku_list pairs_for_cachefile; // list of pairs for this cachefile
BOOL for_checkpoint; //True if part of the in-progress checkpoint
u_int64_t refcount; /* CACHEFILEs are shared. Use a refcount to decide when to really close it.
* The reference count is one for every open DB.
......@@ -621,6 +623,7 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
r = toku_pthread_cond_init(&newcf->openfd_wait, NULL); resource_assert_zero(r);
r = toku_pthread_cond_init(&newcf->closefd_wait, NULL); resource_assert_zero(r);
toku_list_init(&newcf->pairs_for_cachefile);
*cfptr = newcf;
r = 0;
}
......@@ -849,6 +852,7 @@ int toku_cachefile_close (CACHEFILE *cfp, char **error_string, BOOL oplsn_valid,
rwlock_write_lock(&cf->checkpoint_lock, ct->mutex); //Just to make sure we can get it
rwlock_write_unlock(&cf->checkpoint_lock);
rwlock_destroy(&cf->checkpoint_lock);
assert(toku_list_empty(&cf->pairs_for_cachefile));
toku_free(cf);
*cfp = NULL;
cachetable_unlock(ct);
......@@ -900,6 +904,7 @@ int toku_cachefile_close (CACHEFILE *cfp, char **error_string, BOOL oplsn_valid,
rwlock_write_lock(&cf->checkpoint_lock, ct->mutex); //Just to make sure we can get it
rwlock_write_unlock(&cf->checkpoint_lock);
rwlock_destroy(&cf->checkpoint_lock);
assert(toku_list_empty(&cf->pairs_for_cachefile));
cachetable_unlock(ct);
r = close(cf->fd);
......@@ -1048,6 +1053,7 @@ pending_pairs_remove (CACHETABLE ct, PAIR p) {
static void cachetable_remove_pair (CACHETABLE ct, PAIR p) {
lru_remove(ct, p);
pending_pairs_remove(ct, p);
toku_list_remove(&p->next_for_cachefile);
assert(ct->n_in_table>0);
ct->n_in_table--;
......@@ -1307,6 +1313,7 @@ static PAIR cachetable_insert_at(CACHETABLE ct,
rwlock_init(&p->rwlock);
p->cq = 0;
lru_add_to_list(ct, p);
toku_list_push(&cachefile->pairs_for_cachefile, &p->next_for_cachefile);
u_int32_t h = fullhash & (ct->table_size-1);
p->hash_chain = ct->table[h];
ct->table[h] = p;
......@@ -1885,19 +1892,30 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
//Make a list of pairs that belong to this cachefile.
//Add a reference to them.
for (i=0; i < ct->table_size; i++) {
PAIR p;
for (p = ct->table[i]; p; p = p->hash_chain) {
if (cf == 0 || p->cachefile==cf) {
ctpair_add_ref(p);
list[num_pairs] = p;
num_pairs++;
if (num_pairs == list_size) {
list_size *= 2;
XREALLOC_N(list_size, list);
if (cf == NULL) {
for (i=0; i < ct->table_size; i++) {
PAIR p;
for (p = ct->table[i]; p; p = p->hash_chain) {
if (cf == 0 || p->cachefile==cf) {
ctpair_add_ref(p);
if (num_pairs == list_size) {
list_size *= 2;
XREALLOC_N(list_size, list);
}
list[num_pairs++] = p;
}
}
}
}
} else {
for (struct toku_list *next_pair = cf->pairs_for_cachefile.next; next_pair != &cf->pairs_for_cachefile; next_pair = next_pair->next) {
PAIR p = toku_list_struct(next_pair, struct ctpair, next_for_cachefile);
ctpair_add_ref(p);
if (num_pairs == list_size) {
list_size *= 2;
XREALLOC_N(list_size, list);
}
list[num_pairs++] = p;
}
}
//Loop through the list.
//It is safe to access the memory (will not have been freed).
......@@ -1939,7 +1957,10 @@ static int cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
assert(0);
}
workqueue_destroy(&cq);
assert_cachefile_is_flushed_and_removed(ct, cf);
if (cf)
assert(toku_list_empty(&cf->pairs_for_cachefile));
else
assert_cachefile_is_flushed_and_removed(ct, cf);
if ((4 * ct->n_in_table < ct->table_size) && (ct->table_size>4))
cachetable_rehash(ct, ct->table_size/2);
......@@ -1965,7 +1986,7 @@ toku_cachetable_close (CACHETABLE *ctp) {
}
int r;
cachetable_lock(ct);
if ((r=cachetable_flush_cachefile(ct, 0))) {
if ((r=cachetable_flush_cachefile(ct, NULL))) {
cachetable_unlock(ct);
return r;
}
......@@ -2433,19 +2454,17 @@ int toku_cachetable_assert_all_unpinned (CACHETABLE ct) {
}
int toku_cachefile_count_pinned (CACHEFILE cf, int print_them) {
u_int32_t i;
assert(cf != NULL);
int n_pinned=0;
CACHETABLE ct = cf->cachetable;
cachetable_lock(ct);
for (i=0; i<ct->table_size; i++) {
PAIR p;
for (p=ct->table[i]; p; p=p->hash_chain) {
assert(rwlock_readers(&p->rwlock)>=0);
if (rwlock_readers(&p->rwlock) && (cf==0 || p->cachefile==cf)) {
if (print_them) printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value);
n_pinned++;
}
}
for (struct toku_list *next_pair = cf->pairs_for_cachefile.next; next_pair != &cf->pairs_for_cachefile; next_pair = next_pair->next) {
PAIR p = toku_list_struct(next_pair, struct ctpair, next_for_cachefile);
assert(rwlock_readers(&p->rwlock) >= 0);
if (rwlock_readers(&p->rwlock)) {
if (print_them) printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value);
n_pinned++;
}
}
cachetable_unlock(ct);
return n_pinned;
......
......@@ -207,6 +207,7 @@ BDB_DONTRUN_TESTS = \
root_fifo_2 \
root_fifo_32 \
root_fifo_41 \
shutdown-3344 \
stat64 stat64_flatten \
stress-gc \
test1324 \
......
// measure the cost of closing db's with a full cache table
// create db 0 with txn 0
// create db's 1..N-1 with auto txn1
// fill the cache table with blocks for db 0
// close db 1..N-1 (these should be fast)
// close db 0
// abort txn 0
#include "test.h"
#include <byteswap.h>
static long htonl64(long x) {
#if BYTE_ORDER == LITTLE_ENDIAN
return bswap_64(x);
#else
#error
#endif
}
static inline float tdiff (struct timeval *a, struct timeval *b) {
return (a->tv_sec - b->tv_sec) +1e-6*(a->tv_usec - b->tv_usec);
}
static void
insert_row(DB_ENV *env UU(), DB_TXN *txn, DB *db, uint64_t rowi) {
int r;
// generate the key
char key_buffer[8];
uint64_t k = htonl64(rowi);
memcpy(key_buffer, &k, sizeof k);
// generate the val
char val_buffer[1024];
memset(val_buffer, 0, sizeof val_buffer);
DBT key = { .data = key_buffer, .size = sizeof key_buffer };
DBT value = { .data = val_buffer, .size = sizeof val_buffer };
//uint32_t put_flags = DB_YESOVERWRITE | (txn ? (DB_PRELOCKED_FILE_READ | DB_PRELOCKED_WRITE) : 0);
uint32_t put_flags = DB_YESOVERWRITE;
r = db->put(db, txn, &key, &value, put_flags); assert(r == 0);
}
static void
populate(DB_ENV *env, DB_TXN *txn, DB *db, uint64_t nrows) {
int r;
struct timeval tstart;
r = gettimeofday(&tstart, NULL); assert(r == 0);
struct timeval tlast = tstart;
for (uint64_t rowi = 0; rowi < nrows; rowi++) {
insert_row(env, txn, db, rowi);
// maybe report performance
uint64_t rows_per_report = 1000;
if (((rowi + 1) % rows_per_report) == 0) {
struct timeval tnow;
r = gettimeofday(&tnow, NULL); assert(r == 0);
float last_time = tdiff(&tnow, &tlast);
float total_time = tdiff(&tnow, &tstart);
if (verbose) {
fprintf(stderr, "%ld %.3f %.0f/s %.0f/s\n", rowi + 1, last_time, rows_per_report/last_time, rowi/total_time); fflush(stderr);
}
tlast = tnow;
}
}
}
static void
run_test(DB_ENV *env, int ndbs, int do_txn, uint32_t pagesize, uint64_t nrows) {
int r;
DB_TXN *txn0 = NULL;
if (do_txn) {
r = env->txn_begin(env, NULL, &txn0, 0); assert(r == 0);
}
DB *dbs[ndbs];
int i = 0;
{
DB *db = NULL;
if (verbose) fprintf(stderr, "creating %d\n", i);
r = db_create(&db, env, 0); assert(r == 0);
if (pagesize) {
r = db->set_pagesize(db, pagesize); assert(r == 0);
}
char db_filename[32]; sprintf(db_filename, "test%d", i);
r = db->open(db, txn0, db_filename, NULL, DB_BTREE, DB_CREATE, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); assert(r == 0);
if (do_txn) {
r = db->pre_acquire_table_lock(db, txn0); assert(r == 0);
}
dbs[i] = db;
}
for (i = 1; i < ndbs; i++) {
DB *db = NULL;
if (verbose) fprintf(stderr, "creating %d\n", i);
r = db_create(&db, env, 0); assert(r == 0);
if (pagesize) {
r = db->set_pagesize(db, pagesize); assert(r == 0);
}
DB_TXN *txn1 = NULL;
if (do_txn) {
r = env->txn_begin(env, NULL, &txn1, 0); assert(r == 0);
}
char db_filename[32]; sprintf(db_filename, "test%d", i);
r = db->open(db, txn1, db_filename, NULL, DB_BTREE, DB_CREATE, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); assert(r == 0);
if (do_txn) {
r = txn1->commit(txn1, 0); assert(r == 0);
}
dbs[i] = db;
}
if (verbose) fprintf(stderr, "populating\n");
populate(env, txn0, dbs[0], nrows);
for (i = 1; i < ndbs; i++) {
DB *db = dbs[i];
if (verbose) fprintf(stderr, "closing %d\n", i);
r = db->close(db, 0); assert(r == 0);
}
if (verbose) fprintf(stderr, "closing %d\n", 0);
r = dbs[0]->close(dbs[0], 0); assert(r == 0);
if (do_txn) {
if (verbose) fprintf(stderr, "abort txn0\n");
r = txn0->abort(txn0); assert(r == 0);
}
}
int
test_main(int argc, char * const argv[]) {
char *env_dir = "dir.shutdown.ca";
int ndbs = 1;
int do_txn = 1;
u_int32_t pagesize = 4096;
u_int64_t cachesize = 1000000000;
u_int64_t nrows = 500000;
for (int i = 1; i < argc ; i++) {
char * const arg = argv[i];
if (strcmp(arg, "-v") == 0) {
verbose++;
continue;
}
if (strcmp(arg, "-q") == 0) {
if (verbose > 0) verbose--;
continue;
}
if (strcmp(arg, "--txn") == 0 && i+1 < argc) {
do_txn = atoi(argv[++i]);
continue;
}
if (strcmp(arg, "--ndbs") == 0 && i+1 < argc) {
ndbs = atoi(argv[++i]);
continue;
}
if (strcmp(arg, "--pagesize") == 0 && i+1 < argc) {
pagesize = atoi(argv[++i]);
continue;
}
if (strcmp(arg, "--cachesize") == 0 && i+1 < argc) {
cachesize = atol(argv[++i]);
continue;
}
if (strcmp(arg, "--rows") == 0 && i+1 < argc) {
nrows = atol(argv[++i]);
continue;
}
assert(0);
}
// create clean env dir
char rm_cmd[strlen(env_dir) + strlen("rm -rf ") + 1];
snprintf(rm_cmd, sizeof(rm_cmd), "rm -rf %s", env_dir);
int r;
r = system(rm_cmd); assert(r == 0);
r = toku_os_mkdir(env_dir, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH); assert(r == 0);
DB_ENV *env = NULL;
r = db_env_create(&env, 0); assert(r == 0);
if (cachesize) {
const u_int64_t gig = 1 << 30;
r = env->set_cachesize(env, cachesize / gig, cachesize % gig, 1); assert(r == 0);
}
int env_open_flags = DB_CREATE | DB_PRIVATE | DB_INIT_MPOOL | DB_INIT_TXN | DB_INIT_LOCK | DB_INIT_LOG;
if (!do_txn)
env_open_flags &= ~(DB_INIT_TXN | DB_INIT_LOG);
r = env->open(env, env_dir, env_open_flags, S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); assert(r == 0);
run_test(env, ndbs, do_txn, pagesize, nrows);
if (verbose) fprintf(stderr, "closing env\n");
r = env->close(env, 0); assert(r == 0);
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment