Commit 67803a97 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4909], remove tabs from cachetable.c

git-svn-id: file:///svn/toku/tokudb@43728 c7de825b-a66e-492c-adef-691d508d4ae1
parent 25a5623a
/* -*- mode: C; c-basic-offset: 4 -*- */ /* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$" #ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved." #ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it." #ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
...@@ -60,9 +61,9 @@ static CACHETABLE_STATUS_S ct_status; ...@@ -60,9 +61,9 @@ static CACHETABLE_STATUS_S ct_status;
#define STATUS_INIT(k,t,l) { \ #define STATUS_INIT(k,t,l) { \
ct_status.status[k].keyname = #k; \ ct_status.status[k].keyname = #k; \
ct_status.status[k].type = t; \ ct_status.status[k].type = t; \
ct_status.status[k].legend = "cachetable: " l; \ ct_status.status[k].legend = "cachetable: " l; \
} }
static void static void
...@@ -265,7 +266,7 @@ struct cachetable { ...@@ -265,7 +266,7 @@ struct cachetable {
void void
toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) { toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
if (!ct_status.initialized) if (!ct_status.initialized)
status_init(); status_init();
STATUS_VALUE(CT_MISS) = cachetable_miss; STATUS_VALUE(CT_MISS) = cachetable_miss;
STATUS_VALUE(CT_MISSTIME) = cachetable_misstime; STATUS_VALUE(CT_MISSTIME) = cachetable_misstime;
...@@ -444,7 +445,7 @@ checkpoint_thread (void *cachetable_v) ...@@ -444,7 +445,7 @@ checkpoint_thread (void *cachetable_v)
int r = toku_checkpoint(ct, ct->logger, NULL, NULL, NULL, NULL, SCHEDULED_CHECKPOINT); int r = toku_checkpoint(ct, ct->logger, NULL, NULL, NULL, NULL, SCHEDULED_CHECKPOINT);
if (r) { if (r) {
fprintf(stderr, "%s:%d Got error %d while doing checkpoint\n", __FILE__, __LINE__, r); fprintf(stderr, "%s:%d Got error %d while doing checkpoint\n", __FILE__, __LINE__, r);
abort(); // Don't quite know what to do with these errors. abort(); // Don't quite know what to do with these errors.
} }
return r; return r;
} }
...@@ -571,10 +572,10 @@ int toku_cachefile_of_iname_in_env (CACHETABLE ct, const char *iname_in_env, CAC ...@@ -571,10 +572,10 @@ int toku_cachefile_of_iname_in_env (CACHETABLE ct, const char *iname_in_env, CAC
for (extant = ct->cachefiles; extant; extant=extant->next) { for (extant = ct->cachefiles; extant; extant=extant->next) {
if (extant->fname_in_env && if (extant->fname_in_env &&
!strcmp(extant->fname_in_env, iname_in_env)) { !strcmp(extant->fname_in_env, iname_in_env)) {
*cf = extant; *cf = extant;
r = 0; r = 0;
break; break;
} }
} }
cachefiles_unlock(ct); cachefiles_unlock(ct);
return r; return r;
...@@ -589,12 +590,12 @@ int toku_cachefile_of_filenum (CACHETABLE ct, FILENUM filenum, CACHEFILE *cf) { ...@@ -589,12 +590,12 @@ int toku_cachefile_of_filenum (CACHETABLE ct, FILENUM filenum, CACHEFILE *cf) {
int r = ENOENT; int r = ENOENT;
*cf = NULL; *cf = NULL;
for (extant = ct->cachefiles; extant; extant=extant->next) { for (extant = ct->cachefiles; extant; extant=extant->next) {
if (extant->filenum.fileid==filenum.fileid) { if (extant->filenum.fileid==filenum.fileid) {
assert(!extant->is_closing); assert(!extant->is_closing);
*cf = extant; *cf = extant;
r = 0; r = 0;
break; break;
} }
} }
cachefiles_unlock(ct); cachefiles_unlock(ct);
return r; return r;
...@@ -732,8 +733,8 @@ toku_cachetable_unreserve_filenum (CACHETABLE ct, FILENUM reserved_filenum) { ...@@ -732,8 +733,8 @@ toku_cachetable_unreserve_filenum (CACHETABLE ct, FILENUM reserved_filenum) {
} }
int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd, int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd,
const char *fname_in_env, const char *fname_in_env,
BOOL with_filenum, FILENUM filenum, BOOL reserved) { BOOL with_filenum, FILENUM filenum, BOOL reserved) {
int r; int r;
CACHEFILE extant; CACHEFILE extant;
struct fileid fileid; struct fileid fileid;
...@@ -750,17 +751,17 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd ...@@ -750,17 +751,17 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
cachetable_lock(ct); cachetable_lock(ct);
cachefiles_lock(ct); cachefiles_lock(ct);
for (extant = ct->cachefiles; extant; extant=extant->next) { for (extant = ct->cachefiles; extant; extant=extant->next) {
if (memcmp(&extant->fileid, &fileid, sizeof(fileid))==0) { if (memcmp(&extant->fileid, &fileid, sizeof(fileid))==0) {
//File is already open (and in cachetable as extant) //File is already open (and in cachetable as extant)
assert(!extant->is_closing); assert(!extant->is_closing);
assert(!is_filenum_reserved(ct, extant->filenum)); assert(!is_filenum_reserved(ct, extant->filenum));
r = close(fd); // no change for t:2444 r = close(fd); // no change for t:2444
assert(r == 0); assert(r == 0);
// re-use pre-existing cachefile // re-use pre-existing cachefile
*cfptr = extant; *cfptr = extant;
r = 0; r = 0;
goto exit; goto exit;
} }
} }
//File is not open. Make a new cachefile. //File is not open. Make a new cachefile.
...@@ -797,13 +798,13 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd ...@@ -797,13 +798,13 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
} }
} }
{ {
// create a new cachefile entry in the cachetable // create a new cachefile entry in the cachetable
CACHEFILE XCALLOC(newcf); CACHEFILE XCALLOC(newcf);
newcf->cachetable = ct; newcf->cachetable = ct;
newcf->filenum.fileid = with_filenum ? filenum.fileid : next_filenum_to_use.fileid++; newcf->filenum.fileid = with_filenum ? filenum.fileid : next_filenum_to_use.fileid++;
cachefile_init_filenum(newcf, fd, fname_in_env, fileid); cachefile_init_filenum(newcf, fd, fname_in_env, fileid);
newcf->next = ct->cachefiles; newcf->next = ct->cachefiles;
ct->cachefiles = newcf; ct->cachefiles = newcf;
rwlock_init(&newcf->fdlock); rwlock_init(&newcf->fdlock);
newcf->most_recent_global_checkpoint_that_finished_early = ZERO_LSN; newcf->most_recent_global_checkpoint_that_finished_early = ZERO_LSN;
...@@ -812,8 +813,8 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd ...@@ -812,8 +813,8 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
toku_cond_init(&newcf->background_wait, NULL); toku_cond_init(&newcf->background_wait, NULL);
toku_list_init(&newcf->pairs_for_cachefile); toku_list_init(&newcf->pairs_for_cachefile);
*cfptr = newcf; *cfptr = newcf;
r = 0; r = 0;
} }
exit: exit:
cachefiles_unlock(ct); cachefiles_unlock(ct);
...@@ -864,8 +865,8 @@ int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname_in_env) { ...@@ -864,8 +865,8 @@ int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname_in_env) {
close(cf->fd); // no change for t:2444 close(cf->fd); // no change for t:2444
cf->fd = -1; cf->fd = -1;
if (cf->fname_in_env) { if (cf->fname_in_env) {
toku_free(cf->fname_in_env); toku_free(cf->fname_in_env);
cf->fname_in_env = NULL; cf->fname_in_env = NULL;
} }
//It is safe to have the name repeated since this is a ft-only test function. //It is safe to have the name repeated since this is a ft-only test function.
//There isn't an environment directory so its both env/cwd. //There isn't an environment directory so its both env/cwd.
...@@ -918,10 +919,10 @@ toku_cachefile_truncate (CACHEFILE cf, toku_off_t new_size) { ...@@ -918,10 +919,10 @@ toku_cachefile_truncate (CACHEFILE cf, toku_off_t new_size) {
static CACHEFILE remove_cf_from_list_locked (CACHEFILE cf, CACHEFILE list) { static CACHEFILE remove_cf_from_list_locked (CACHEFILE cf, CACHEFILE list) {
if (list==0) return 0; if (list==0) return 0;
else if (list==cf) { else if (list==cf) {
return list->next; return list->next;
} else { } else {
list->next = remove_cf_from_list_locked(cf, list->next); list->next = remove_cf_from_list_locked(cf, list->next);
return list; return list;
} }
} }
...@@ -951,45 +952,45 @@ int toku_cachefile_close (CACHEFILE *cfp, char **error_string, BOOL oplsn_valid, ...@@ -951,45 +952,45 @@ int toku_cachefile_close (CACHEFILE *cfp, char **error_string, BOOL oplsn_valid,
assert(!cf->for_checkpoint); assert(!cf->for_checkpoint);
assert(!cf->is_closing); assert(!cf->is_closing);
cf->is_closing = TRUE; //Mark this cachefile so that no one will re-use it. cf->is_closing = TRUE; //Mark this cachefile so that no one will re-use it.
int r; int r;
// cachetable_flush_cachefile() may release and retake cachetable_lock, // cachetable_flush_cachefile() may release and retake cachetable_lock,
// allowing another thread to get into either/both of // allowing another thread to get into either/both of
// - toku_cachetable_openfd() // - toku_cachetable_openfd()
// - toku_cachefile_of_iname_and_add_reference() // - toku_cachefile_of_iname_and_add_reference()
cachetable_flush_cachefile(ct, cf); cachetable_flush_cachefile(ct, cf);
if (0) { if (0) {
error: error:
remove_cf_from_cachefiles_list(cf); remove_cf_from_cachefiles_list(cf);
assert(!cf->next_in_checkpoint); //checkpoint cannot run on a closing file assert(!cf->next_in_checkpoint); //checkpoint cannot run on a closing file
assert(!cf->for_checkpoint); //checkpoint cannot run on a closing file assert(!cf->for_checkpoint); //checkpoint cannot run on a closing file
if (cf->fname_in_env) toku_free(cf->fname_in_env); if (cf->fname_in_env) toku_free(cf->fname_in_env);
rwlock_write_lock(&cf->fdlock, ct->mutex); rwlock_write_lock(&cf->fdlock, ct->mutex);
if ( !toku_cachefile_is_dev_null_unlocked(cf) ) { if ( !toku_cachefile_is_dev_null_unlocked(cf) ) {
int r3 = toku_file_fsync_without_accounting(cf->fd); //t:2444 int r3 = toku_file_fsync_without_accounting(cf->fd); //t:2444
if (r3!=0) fprintf(stderr, "%s:%d During error handling, could not fsync file r=%d errno=%d\n", __FILE__, __LINE__, r3, errno); if (r3!=0) fprintf(stderr, "%s:%d During error handling, could not fsync file r=%d errno=%d\n", __FILE__, __LINE__, r3, errno);
} }
int r2 = close(cf->fd); int r2 = close(cf->fd);
if (r2!=0) fprintf(stderr, "%s:%d During error handling, could not close file r=%d errno=%d\n", __FILE__, __LINE__, r2, errno); if (r2!=0) fprintf(stderr, "%s:%d During error handling, could not close file r=%d errno=%d\n", __FILE__, __LINE__, r2, errno);
//assert(r == 0); //assert(r == 0);
rwlock_write_unlock(&cf->fdlock); rwlock_write_unlock(&cf->fdlock);
rwlock_destroy(&cf->fdlock); rwlock_destroy(&cf->fdlock);
assert(toku_list_empty(&cf->pairs_for_cachefile)); assert(toku_list_empty(&cf->pairs_for_cachefile));
toku_free(cf); toku_free(cf);
cachetable_unlock(ct); cachetable_unlock(ct);
return r; return r;
} }
if (cf->close_userdata) { if (cf->close_userdata) {
rwlock_prefer_read_lock(&cf->fdlock, ct->mutex); rwlock_prefer_read_lock(&cf->fdlock, ct->mutex);
r = cf->close_userdata(cf, cf->fd, cf->userdata, error_string, oplsn_valid, oplsn); r = cf->close_userdata(cf, cf->fd, cf->userdata, error_string, oplsn_valid, oplsn);
rwlock_read_unlock(&cf->fdlock); rwlock_read_unlock(&cf->fdlock);
if (r!=0) goto error; if (r!=0) goto error;
} }
cf->close_userdata = NULL; cf->close_userdata = NULL;
cf->checkpoint_userdata = NULL; cf->checkpoint_userdata = NULL;
cf->begin_checkpoint_userdata = NULL; cf->begin_checkpoint_userdata = NULL;
cf->end_checkpoint_userdata = NULL; cf->end_checkpoint_userdata = NULL;
cf->userdata = NULL; cf->userdata = NULL;
remove_cf_from_cachefiles_list(cf); remove_cf_from_cachefiles_list(cf);
toku_cond_destroy(&cf->background_wait); toku_cond_destroy(&cf->background_wait);
rwlock_write_lock(&cf->fdlock, ct->mutex); //Just make sure we can get it. rwlock_write_lock(&cf->fdlock, ct->mutex); //Just make sure we can get it.
...@@ -1006,13 +1007,13 @@ int toku_cachefile_close (CACHEFILE *cfp, char **error_string, BOOL oplsn_valid, ...@@ -1006,13 +1007,13 @@ int toku_cachefile_close (CACHEFILE *cfp, char **error_string, BOOL oplsn_valid,
assert(toku_list_empty(&cf->pairs_for_cachefile)); assert(toku_list_empty(&cf->pairs_for_cachefile));
cachetable_unlock(ct); cachetable_unlock(ct);
r = close(cf->fd); r = close(cf->fd);
assert(r == 0); assert(r == 0);
cf->fd = -1; cf->fd = -1;
if (cf->fname_in_env) toku_free(cf->fname_in_env); if (cf->fname_in_env) toku_free(cf->fname_in_env);
toku_free(cf); toku_free(cf);
return r; return r;
} }
} }
...@@ -1080,13 +1081,13 @@ static void cachetable_rehash (CACHETABLE ct, u_int32_t newtable_size) { ...@@ -1080,13 +1081,13 @@ static void cachetable_rehash (CACHETABLE ct, u_int32_t newtable_size) {
ct->table_size=newtable_size; ct->table_size=newtable_size;
for (i=0; i<newtable_size; i++) newtable[i]=0; for (i=0; i<newtable_size; i++) newtable[i]=0;
for (i=0; i<oldtable_size; i++) { for (i=0; i<oldtable_size; i++) {
PAIR p; PAIR p;
while ((p=ct->table[i])!=0) { while ((p=ct->table[i])!=0) {
unsigned int h = p->fullhash&(newtable_size-1); unsigned int h = p->fullhash&(newtable_size-1);
ct->table[i] = p->hash_chain; ct->table[i] = p->hash_chain;
p->hash_chain = newtable[h]; p->hash_chain = newtable[h];
newtable[h] = p; newtable[h] = p;
} }
} }
toku_free(ct->table); toku_free(ct->table);
// printf("Freed\n"); // printf("Freed\n");
...@@ -1160,13 +1161,13 @@ static PAIR remove_from_hash_chain (PAIR remove_me, PAIR list) { ...@@ -1160,13 +1161,13 @@ static PAIR remove_from_hash_chain (PAIR remove_me, PAIR list) {
static void static void
pending_pairs_remove (CACHETABLE ct, PAIR p) { pending_pairs_remove (CACHETABLE ct, PAIR p) {
if (p->pending_next) { if (p->pending_next) {
p->pending_next->pending_prev = p->pending_prev; p->pending_next->pending_prev = p->pending_prev;
} }
if (p->pending_prev) { if (p->pending_prev) {
p->pending_prev->pending_next = p->pending_next; p->pending_prev->pending_next = p->pending_next;
} }
else if (ct->pending_head==p) { else if (ct->pending_head==p) {
ct->pending_head = p->pending_next; ct->pending_head = p->pending_next;
} }
p->pending_prev = p->pending_next = NULL; p->pending_prev = p->pending_next = NULL;
} }
...@@ -2024,32 +2025,32 @@ int toku_cachetable_put_with_dep_pairs( ...@@ -2024,32 +2025,32 @@ int toku_cachetable_put_with_dep_pairs(
maybe_flush_some(ct, attr.size); maybe_flush_some(ct, attr.size);
int rval; int rval;
{ {
BEGIN_CRITICAL_REGION; // checkpoint may not begin inside critical region, detect and crash if one begins BEGIN_CRITICAL_REGION; // checkpoint may not begin inside critical region, detect and crash if one begins
get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra); get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra);
rval = cachetable_put_internal( rval = cachetable_put_internal(
cachefile, cachefile,
*key, *key,
*fullhash, *fullhash,
value, value,
attr, attr,
write_callback write_callback
); );
// //
// now that we have inserted the row, let's checkpoint the // now that we have inserted the row, let's checkpoint the
// dependent nodes, if they need checkpointing // dependent nodes, if they need checkpointing
// //
checkpoint_dependent_pairs( checkpoint_dependent_pairs(
ct, ct,
num_dependent_pairs, num_dependent_pairs,
dependent_cfs, dependent_cfs,
dependent_keys, dependent_keys,
dependent_fullhash, dependent_fullhash,
dependent_dirty dependent_dirty
); );
END_CRITICAL_REGION; // checkpoint after this point would no longer cause a threadsafety bug END_CRITICAL_REGION; // checkpoint after this point would no longer cause a threadsafety bug
} }
cachetable_unlock(ct); cachetable_unlock(ct);
return rval; return rval;
...@@ -2233,7 +2234,7 @@ static void cachetable_fetch_pair( ...@@ -2233,7 +2234,7 @@ static void cachetable_fetch_pair(
assert(!toku_cachefile_is_dev_null_unlocked(cf)); assert(!toku_cachefile_is_dev_null_unlocked(cf));
r = fetch_callback(cf, cf->fd, key, fullhash, &toku_value, &disk_data, &attr, &dirty, read_extraargs); r = fetch_callback(cf, cf->fd, key, fullhash, &toku_value, &disk_data, &attr, &dirty, read_extraargs);
if (dirty) if (dirty)
p->dirty = CACHETABLE_DIRTY; p->dirty = CACHETABLE_DIRTY;
cachetable_lock(ct); cachetable_lock(ct);
rwlock_read_unlock(&cf->fdlock); rwlock_read_unlock(&cf->fdlock);
...@@ -2469,8 +2470,8 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3 ...@@ -2469,8 +2470,8 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3
cachetable_lock(ct); cachetable_lock(ct);
cachetable_maybe_get_and_pins++; cachetable_maybe_get_and_pins++;
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) { for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++; count++;
if (p->key.b==key.b && p->cachefile==cachefile) { if (p->key.b==key.b && p->cachefile==cachefile) {
if (!p->checkpoint_pending && //If checkpoint pending, we would need to first write it, which would make it clean if (!p->checkpoint_pending && //If checkpoint pending, we would need to first write it, which would make it clean
p->dirty && p->dirty &&
nb_mutex_users(&p->value_nb_mutex) == 0 nb_mutex_users(&p->value_nb_mutex) == 0
...@@ -2484,7 +2485,7 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3 ...@@ -2484,7 +2485,7 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3
//printf("%s:%d cachetable_maybe_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value); //printf("%s:%d cachetable_maybe_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value);
} }
break; break;
} }
} }
cachetable_unlock(ct); cachetable_unlock(ct);
return r; return r;
...@@ -2501,8 +2502,8 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, ...@@ -2501,8 +2502,8 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key,
cachetable_lock(ct); cachetable_lock(ct);
cachetable_maybe_get_and_pins++; cachetable_maybe_get_and_pins++;
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) { for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++; count++;
if (p->key.b==key.b && p->cachefile==cachefile) { if (p->key.b==key.b && p->cachefile==cachefile) {
if (!p->checkpoint_pending && //If checkpoint pending, we would need to first write it, which would make it clean (if the pin would be used for writes. If would be used for read-only we could return it, but that would increase complexity) if (!p->checkpoint_pending && //If checkpoint pending, we would need to first write it, which would make it clean (if the pin would be used for writes. If would be used for read-only we could return it, but that would increase complexity)
nb_mutex_users(&p->value_nb_mutex) == 0 nb_mutex_users(&p->value_nb_mutex) == 0
) { ) {
...@@ -2514,7 +2515,7 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key, ...@@ -2514,7 +2515,7 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key,
//printf("%s:%d cachetable_maybe_get_and_pin_clean(%lld)--> %p\n", __FILE__, __LINE__, key, *value); //printf("%s:%d cachetable_maybe_get_and_pin_clean(%lld)--> %p\n", __FILE__, __LINE__, key, *value);
} }
break; break;
} }
} }
cachetable_unlock(ct); cachetable_unlock(ct);
return r; return r;
...@@ -2532,9 +2533,9 @@ cachetable_unpin_internal(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, ...@@ -2532,9 +2533,9 @@ cachetable_unpin_internal(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
//assert(fullhash == toku_cachetable_hash(cachefile, key)); //assert(fullhash == toku_cachetable_hash(cachefile, key));
if (!have_ct_lock) cachetable_lock(ct); if (!have_ct_lock) cachetable_lock(ct);
for (PAIR p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) { for (PAIR p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++; count++;
if (p->key.b==key.b && p->cachefile==cachefile) { if (p->key.b==key.b && p->cachefile==cachefile) {
assert(nb_mutex_writers(&p->value_nb_mutex)>0); assert(nb_mutex_writers(&p->value_nb_mutex)>0);
// this is a client thread that is unlocking the PAIR // this is a client thread that is unlocking the PAIR
// That is, a cleaner, flusher, or get_and_pin thread // That is, a cleaner, flusher, or get_and_pin thread
// So, there must not be a completion queue lying around // So, there must not be a completion queue lying around
...@@ -2545,22 +2546,22 @@ cachetable_unpin_internal(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash, ...@@ -2545,22 +2546,22 @@ cachetable_unpin_internal(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
// exist // exist
assert(!p->cq); assert(!p->cq);
nb_mutex_write_unlock(&p->value_nb_mutex); nb_mutex_write_unlock(&p->value_nb_mutex);
if (dirty) p->dirty = CACHETABLE_DIRTY; if (dirty) p->dirty = CACHETABLE_DIRTY;
if (attr.is_valid) { if (attr.is_valid) {
PAIR_ATTR old_attr = p->attr; PAIR_ATTR old_attr = p->attr;
PAIR_ATTR new_attr = attr; PAIR_ATTR new_attr = attr;
cachetable_change_pair_attr(ct, old_attr, new_attr); cachetable_change_pair_attr(ct, old_attr, new_attr);
p->attr = attr; p->attr = attr;
} }
WHEN_TRACE_CT(printf("[count=%lld]\n", p->pinned)); WHEN_TRACE_CT(printf("[count=%lld]\n", p->pinned));
{ {
if (flush) { if (flush) {
maybe_flush_some(ct, 0); maybe_flush_some(ct, 0);
} }
} }
r = 0; // we found one r = 0; // we found one
break; break;
} }
} }
if (!have_ct_lock) cachetable_unlock(ct); if (!have_ct_lock) cachetable_unlock(ct);
return r; return r;
...@@ -2615,8 +2616,8 @@ int toku_cachetable_get_and_pin_nonblocking ( ...@@ -2615,8 +2616,8 @@ int toku_cachetable_get_and_pin_nonblocking (
int count = 0; int count = 0;
PAIR p; PAIR p;
for (p = ct->table[fullhash&(ct->table_size-1)]; p; p = p->hash_chain) { for (p = ct->table[fullhash&(ct->table_size-1)]; p; p = p->hash_chain) {
count++; count++;
if (p->key.b==key.b && p->cachefile==cf) { if (p->key.b==key.b && p->cachefile==cf) {
// //
// In Doofenshmirtz, we keep the root to leaf path pinned // In Doofenshmirtz, we keep the root to leaf path pinned
...@@ -2852,12 +2853,12 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash, ...@@ -2852,12 +2853,12 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
*doing_prefetch = TRUE; *doing_prefetch = TRUE;
} }
} }
else { else {
// sanity check, we already have an assert // sanity check, we already have an assert
// before locking the PAIR // before locking the PAIR
assert(!p->cq); assert(!p->cq);
nb_mutex_write_unlock(&p->value_nb_mutex); nb_mutex_write_unlock(&p->value_nb_mutex);
} }
} }
cachetable_unlock(ct); cachetable_unlock(ct);
return 0; return 0;
...@@ -2956,45 +2957,45 @@ void toku_cachetable_verify (CACHETABLE ct) { ...@@ -2956,45 +2957,45 @@ void toku_cachetable_verify (CACHETABLE ct) {
// First clear all the verify flags by going through the hash chains // First clear all the verify flags by going through the hash chains
{ {
u_int32_t i; u_int32_t i;
for (i=0; i<ct->table_size; i++) { for (i=0; i<ct->table_size; i++) {
PAIR p; PAIR p;
for (p=ct->table[i]; p; p=p->hash_chain) { for (p=ct->table[i]; p; p=p->hash_chain) {
p->verify_flag=0; p->verify_flag=0;
} }
} }
} }
// Now go through the clock chain, make sure everything in the LRU chain is hashed, and set the verify flag. // Now go through the clock chain, make sure everything in the LRU chain is hashed, and set the verify flag.
{ {
PAIR p; PAIR p;
BOOL is_first = TRUE; BOOL is_first = TRUE;
for (p=ct->clock_head; ct->clock_head!=NULL && (p!=ct->clock_head || is_first); p=p->clock_next) { for (p=ct->clock_head; ct->clock_head!=NULL && (p!=ct->clock_head || is_first); p=p->clock_next) {
is_first=FALSE; is_first=FALSE;
assert(p->verify_flag==0); assert(p->verify_flag==0);
PAIR p2; PAIR p2;
u_int32_t fullhash = p->fullhash; u_int32_t fullhash = p->fullhash;
//assert(fullhash==toku_cachetable_hash(p->cachefile, p->key)); //assert(fullhash==toku_cachetable_hash(p->cachefile, p->key));
for (p2=ct->table[fullhash&(ct->table_size-1)]; p2; p2=p2->hash_chain) { for (p2=ct->table[fullhash&(ct->table_size-1)]; p2; p2=p2->hash_chain) {
if (p2==p) { if (p2==p) {
/* found it */ /* found it */
goto next; goto next;
} }
} }
fprintf(stderr, "Something in the clock chain is not hashed\n"); fprintf(stderr, "Something in the clock chain is not hashed\n");
assert(0); assert(0);
next: next:
p->verify_flag = 1; p->verify_flag = 1;
} }
} }
// Now make sure everything in the hash chains has the verify_flag set to 1. // Now make sure everything in the hash chains has the verify_flag set to 1.
{ {
u_int32_t i; u_int32_t i;
for (i=0; i<ct->table_size; i++) { for (i=0; i<ct->table_size; i++) {
PAIR p; PAIR p;
for (p=ct->table[i]; p; p=p->hash_chain) { for (p=ct->table[i]; p; p=p->hash_chain) {
assert(p->verify_flag); assert(p->verify_flag);
} }
} }
} }
cachetable_unlock(ct); cachetable_unlock(ct);
...@@ -3113,7 +3114,7 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) { ...@@ -3113,7 +3114,7 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
//If 'already_removed' is set, then we should release our reference //If 'already_removed' is set, then we should release our reference
//and go to the next entry. //and go to the next entry.
for (i=0; i < num_pairs; i++) { for (i=0; i < num_pairs; i++) {
PAIR p = list[i]; PAIR p = list[i];
if (!p->already_removed) { if (!p->already_removed) {
assert(cf == 0 || p->cachefile==cf); assert(cf == 0 || p->cachefile==cf);
nfound++; nfound++;
...@@ -3201,7 +3202,7 @@ toku_cachetable_close (CACHETABLE *ctp) { ...@@ -3201,7 +3202,7 @@ toku_cachetable_close (CACHETABLE *ctp) {
cachetable_flush_cachefile(ct, NULL); cachetable_flush_cachefile(ct, NULL);
u_int32_t i; u_int32_t i;
for (i=0; i<ct->table_size; i++) { for (i=0; i<ct->table_size; i++) {
if (ct->table[i]) return -1; if (ct->table[i]) return -1;
} }
assert(ct->size_evicting == 0); assert(ct->size_evicting == 0);
rwlock_destroy(&ct->pending_lock); rwlock_destroy(&ct->pending_lock);
...@@ -3237,9 +3238,9 @@ int toku_cachetable_unpin_and_remove ( ...@@ -3237,9 +3238,9 @@ int toku_cachetable_unpin_and_remove (
u_int32_t fullhash = toku_cachetable_hash(cachefile, key); u_int32_t fullhash = toku_cachetable_hash(cachefile, key);
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) { for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++; count++;
if (p->key.b==key.b && p->cachefile==cachefile) { if (p->key.b==key.b && p->cachefile==cachefile) {
p->dirty = CACHETABLE_CLEAN; // clear the dirty bit. We're just supposed to remove it. p->dirty = CACHETABLE_CLEAN; // clear the dirty bit. We're just supposed to remove it.
assert(nb_mutex_writers(&p->value_nb_mutex)); assert(nb_mutex_writers(&p->value_nb_mutex));
// grab disk_nb_mutex to ensure any background thread writing // grab disk_nb_mutex to ensure any background thread writing
// out a cloned value completes // out a cloned value completes
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex); nb_mutex_lock(&p->disk_nb_mutex, ct->mutex);
...@@ -3416,39 +3417,39 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) { ...@@ -3416,39 +3417,39 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
case TOKUTXN_LIVE: case TOKUTXN_LIVE:
case TOKUTXN_COMMITTING: case TOKUTXN_COMMITTING:
case TOKUTXN_ABORTING: { case TOKUTXN_ABORTING: {
int r = toku_log_xstillopen(logger, NULL, 0, int r = toku_log_xstillopen(logger, NULL, 0,
toku_txn_get_txnid(txn), toku_txn_get_txnid(txn),
toku_txn_get_txnid(toku_logger_txn_parent(txn)), toku_txn_get_txnid(toku_logger_txn_parent(txn)),
txn->rollentry_raw_count, txn->rollentry_raw_count,
open_filenums, open_filenums,
txn->force_fsync_on_commit, txn->force_fsync_on_commit,
txn->num_rollback_nodes, txn->num_rollback_nodes,
txn->num_rollentries, txn->num_rollentries,
txn->spilled_rollback_head, txn->spilled_rollback_head,
txn->spilled_rollback_tail, txn->spilled_rollback_tail,
txn->current_rollback); txn->current_rollback);
assert(r==0); assert(r==0);
return 0; return 0;
} }
case TOKUTXN_PREPARING: { case TOKUTXN_PREPARING: {
TOKU_XA_XID xa_xid; TOKU_XA_XID xa_xid;
toku_txn_get_prepared_xa_xid(txn, &xa_xid); toku_txn_get_prepared_xa_xid(txn, &xa_xid);
int r = toku_log_xstillopenprepared(logger, NULL, 0, int r = toku_log_xstillopenprepared(logger, NULL, 0,
toku_txn_get_txnid(txn), toku_txn_get_txnid(txn),
&xa_xid, &xa_xid,
txn->rollentry_raw_count, txn->rollentry_raw_count,
open_filenums, open_filenums,
txn->force_fsync_on_commit, txn->force_fsync_on_commit,
txn->num_rollback_nodes, txn->num_rollback_nodes,
txn->num_rollentries, txn->num_rollentries,
txn->spilled_rollback_head, txn->spilled_rollback_head,
txn->spilled_rollback_tail, txn->spilled_rollback_tail,
txn->current_rollback); txn->current_rollback);
assert(r==0); assert(r==0);
return 0; return 0;
} }
case TOKUTXN_RETIRED: case TOKUTXN_RETIRED:
return 0; return 0;
} }
// default is an error // default is an error
assert(0); assert(0);
...@@ -3468,10 +3469,10 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) { ...@@ -3468,10 +3469,10 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
{ {
unsigned i; unsigned i;
cachetable_lock(ct); cachetable_lock(ct);
//Initialize accountability counters //Initialize accountability counters
ct->checkpoint_num_files = 0; ct->checkpoint_num_files = 0;
ct->checkpoint_num_txns = 0; ct->checkpoint_num_txns = 0;
//Make list of cachefiles to be included in checkpoint. //Make list of cachefiles to be included in checkpoint.
{ {
...@@ -3496,48 +3497,48 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) { ...@@ -3496,48 +3497,48 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
cachefiles_unlock(ct); cachefiles_unlock(ct);
} }
if (logger) { if (logger) {
// The checkpoint must be performed after the lock is acquired. // The checkpoint must be performed after the lock is acquired.
{ {
LSN begin_lsn={.lsn=-1}; // we'll need to store the lsn of the checkpoint begin in all the trees that are checkpointed. LSN begin_lsn={.lsn=-1}; // we'll need to store the lsn of the checkpoint begin in all the trees that are checkpointed.
int r = toku_log_begin_checkpoint(logger, &begin_lsn, 0, 0); int r = toku_log_begin_checkpoint(logger, &begin_lsn, 0, 0);
assert(r==0); assert(r==0);
ct->lsn_of_checkpoint_in_progress = begin_lsn; ct->lsn_of_checkpoint_in_progress = begin_lsn;
} }
// Log all the open files // Log all the open files
{ {
//Must loop through ALL open files (even if not included in checkpoint). //Must loop through ALL open files (even if not included in checkpoint).
CACHEFILE cf; CACHEFILE cf;
cachefiles_lock(ct); cachefiles_lock(ct);
for (cf = ct->cachefiles; cf; cf=cf->next) { for (cf = ct->cachefiles; cf; cf=cf->next) {
if (cf->log_fassociate_during_checkpoint) { if (cf->log_fassociate_during_checkpoint) {
int r = cf->log_fassociate_during_checkpoint(cf, cf->userdata); int r = cf->log_fassociate_during_checkpoint(cf, cf->userdata);
ct->checkpoint_num_files++; ct->checkpoint_num_files++;
assert(r==0); assert(r==0);
} }
} }
cachefiles_unlock(ct); cachefiles_unlock(ct);
} }
// Log all the open transactions MUST BE AFTER OPEN FILES // Log all the open transactions MUST BE AFTER OPEN FILES
{ {
ct->checkpoint_num_txns = toku_omt_size(logger->live_txns); ct->checkpoint_num_txns = toku_omt_size(logger->live_txns);
int r = toku_omt_iterate(logger->live_txns, log_open_txn, NULL); int r = toku_omt_iterate(logger->live_txns, log_open_txn, NULL);
assert(r==0); assert(r==0);
} }
// Log rollback suppression for all the open files MUST BE AFTER TXNS // Log rollback suppression for all the open files MUST BE AFTER TXNS
{ {
//Must loop through ALL open files (even if not included in checkpoint). //Must loop through ALL open files (even if not included in checkpoint).
CACHEFILE cf; CACHEFILE cf;
cachefiles_lock(ct); cachefiles_lock(ct);
for (cf = ct->cachefiles; cf; cf=cf->next) { for (cf = ct->cachefiles; cf; cf=cf->next) {
if (cf->log_suppress_rollback_during_checkpoint) { if (cf->log_suppress_rollback_during_checkpoint) {
int r = cf->log_suppress_rollback_during_checkpoint(cf, cf->userdata); int r = cf->log_suppress_rollback_during_checkpoint(cf, cf->userdata);
assert(r==0); assert(r==0);
} }
} }
cachefiles_unlock(ct); cachefiles_unlock(ct);
} }
} }
unsigned int npending = 0; unsigned int npending = 0;
// //
...@@ -3812,14 +3813,14 @@ int toku_cachetable_assert_all_unpinned (CACHETABLE ct) { ...@@ -3812,14 +3813,14 @@ int toku_cachetable_assert_all_unpinned (CACHETABLE ct) {
int some_pinned=0; int some_pinned=0;
cachetable_lock(ct); cachetable_lock(ct);
for (i=0; i<ct->table_size; i++) { for (i=0; i<ct->table_size; i++) {
PAIR p; PAIR p;
for (p=ct->table[i]; p; p=p->hash_chain) { for (p=ct->table[i]; p; p=p->hash_chain) {
assert(nb_mutex_writers(&p->value_nb_mutex)>=0); assert(nb_mutex_writers(&p->value_nb_mutex)>=0);
if (nb_mutex_writers(&p->value_nb_mutex)) { if (nb_mutex_writers(&p->value_nb_mutex)) {
//printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value_data); //printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value_data);
some_pinned=1; some_pinned=1;
} }
} }
} }
cachetable_unlock(ct); cachetable_unlock(ct);
return some_pinned; return some_pinned;
...@@ -3874,14 +3875,14 @@ void toku_cachetable_get_state (CACHETABLE ct, int *num_entries_ptr, int *hash_s ...@@ -3874,14 +3875,14 @@ void toku_cachetable_get_state (CACHETABLE ct, int *num_entries_ptr, int *hash_s
} }
int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, void **value_ptr, int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, void **value_ptr,
int *dirty_ptr, long long *pin_ptr, long *size_ptr) { int *dirty_ptr, long long *pin_ptr, long *size_ptr) {
PAIR p; PAIR p;
int count = 0; int count = 0;
int r = -1; int r = -1;
u_int32_t fullhash = toku_cachetable_hash(cf, key); u_int32_t fullhash = toku_cachetable_hash(cf, key);
cachetable_lock(ct); cachetable_lock(ct);
for (p = ct->table[fullhash&(ct->table_size-1)]; p; p = p->hash_chain) { for (p = ct->table[fullhash&(ct->table_size-1)]; p; p = p->hash_chain) {
count++; count++;
if (p->key.b == key.b && p->cachefile == cf) { if (p->key.b == key.b && p->cachefile == cf) {
if (value_ptr) if (value_ptr)
*value_ptr = p->value_data; *value_ptr = p->value_data;
...@@ -3901,12 +3902,12 @@ int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, vo ...@@ -3901,12 +3902,12 @@ int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, vo
void void
toku_cachefile_set_userdata (CACHEFILE cf, toku_cachefile_set_userdata (CACHEFILE cf,
void *userdata, void *userdata,
int (*log_fassociate_during_checkpoint)(CACHEFILE, void*), int (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
int (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*), int (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*),
int (*close_userdata)(CACHEFILE, int, void*, char**, BOOL, LSN), int (*close_userdata)(CACHEFILE, int, void*, char**, BOOL, LSN),
int (*checkpoint_userdata)(CACHEFILE, int, void*), int (*checkpoint_userdata)(CACHEFILE, int, void*),
int (*begin_checkpoint_userdata)(LSN, void*), int (*begin_checkpoint_userdata)(LSN, void*),
int (*end_checkpoint_userdata)(CACHEFILE, int, void*), int (*end_checkpoint_userdata)(CACHEFILE, int, void*),
int (*note_pin_by_checkpoint)(CACHEFILE, void*), int (*note_pin_by_checkpoint)(CACHEFILE, void*),
int (*note_unpin_by_checkpoint)(CACHEFILE, void*)) { int (*note_unpin_by_checkpoint)(CACHEFILE, void*)) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment