Commit 18271587 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4909], remove tabs from cachetable.c

git-svn-id: file:///svn/toku/tokudb@43728 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9162f7d0
/* -*- mode: C; c-basic-offset: 4 -*- */
/* -*- mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: expandtab:ts=8:sw=4:softtabstop=4:
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
......@@ -60,9 +61,9 @@ static CACHETABLE_STATUS_S ct_status;
#define STATUS_INIT(k,t,l) { \
ct_status.status[k].keyname = #k; \
ct_status.status[k].type = t; \
ct_status.status[k].legend = "cachetable: " l; \
ct_status.status[k].keyname = #k; \
ct_status.status[k].type = t; \
ct_status.status[k].legend = "cachetable: " l; \
}
static void
......@@ -265,7 +266,7 @@ struct cachetable {
void
toku_cachetable_get_status(CACHETABLE ct, CACHETABLE_STATUS statp) {
if (!ct_status.initialized)
status_init();
status_init();
STATUS_VALUE(CT_MISS) = cachetable_miss;
STATUS_VALUE(CT_MISSTIME) = cachetable_misstime;
......@@ -444,7 +445,7 @@ checkpoint_thread (void *cachetable_v)
int r = toku_checkpoint(ct, ct->logger, NULL, NULL, NULL, NULL, SCHEDULED_CHECKPOINT);
if (r) {
fprintf(stderr, "%s:%d Got error %d while doing checkpoint\n", __FILE__, __LINE__, r);
abort(); // Don't quite know what to do with these errors.
abort(); // Don't quite know what to do with these errors.
}
return r;
}
......@@ -571,10 +572,10 @@ int toku_cachefile_of_iname_in_env (CACHETABLE ct, const char *iname_in_env, CAC
for (extant = ct->cachefiles; extant; extant=extant->next) {
if (extant->fname_in_env &&
!strcmp(extant->fname_in_env, iname_in_env)) {
*cf = extant;
r = 0;
*cf = extant;
r = 0;
break;
}
}
}
cachefiles_unlock(ct);
return r;
......@@ -589,12 +590,12 @@ int toku_cachefile_of_filenum (CACHETABLE ct, FILENUM filenum, CACHEFILE *cf) {
int r = ENOENT;
*cf = NULL;
for (extant = ct->cachefiles; extant; extant=extant->next) {
if (extant->filenum.fileid==filenum.fileid) {
if (extant->filenum.fileid==filenum.fileid) {
assert(!extant->is_closing);
*cf = extant;
*cf = extant;
r = 0;
break;
}
}
}
cachefiles_unlock(ct);
return r;
......@@ -732,8 +733,8 @@ toku_cachetable_unreserve_filenum (CACHETABLE ct, FILENUM reserved_filenum) {
}
int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd,
const char *fname_in_env,
BOOL with_filenum, FILENUM filenum, BOOL reserved) {
const char *fname_in_env,
BOOL with_filenum, FILENUM filenum, BOOL reserved) {
int r;
CACHEFILE extant;
struct fileid fileid;
......@@ -750,17 +751,17 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
cachetable_lock(ct);
cachefiles_lock(ct);
for (extant = ct->cachefiles; extant; extant=extant->next) {
if (memcmp(&extant->fileid, &fileid, sizeof(fileid))==0) {
if (memcmp(&extant->fileid, &fileid, sizeof(fileid))==0) {
//File is already open (and in cachetable as extant)
assert(!extant->is_closing);
assert(!is_filenum_reserved(ct, extant->filenum));
r = close(fd); // no change for t:2444
r = close(fd); // no change for t:2444
assert(r == 0);
// re-use pre-existing cachefile
*cfptr = extant;
r = 0;
goto exit;
}
// re-use pre-existing cachefile
*cfptr = extant;
r = 0;
goto exit;
}
}
//File is not open. Make a new cachefile.
......@@ -797,13 +798,13 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
}
}
{
// create a new cachefile entry in the cachetable
// create a new cachefile entry in the cachetable
CACHEFILE XCALLOC(newcf);
newcf->cachetable = ct;
newcf->filenum.fileid = with_filenum ? filenum.fileid : next_filenum_to_use.fileid++;
cachefile_init_filenum(newcf, fd, fname_in_env, fileid);
newcf->next = ct->cachefiles;
ct->cachefiles = newcf;
newcf->next = ct->cachefiles;
ct->cachefiles = newcf;
rwlock_init(&newcf->fdlock);
newcf->most_recent_global_checkpoint_that_finished_early = ZERO_LSN;
......@@ -812,8 +813,8 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
toku_cond_init(&newcf->background_wait, NULL);
toku_list_init(&newcf->pairs_for_cachefile);
*cfptr = newcf;
r = 0;
*cfptr = newcf;
r = 0;
}
exit:
cachefiles_unlock(ct);
......@@ -864,8 +865,8 @@ int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname_in_env) {
close(cf->fd); // no change for t:2444
cf->fd = -1;
if (cf->fname_in_env) {
toku_free(cf->fname_in_env);
cf->fname_in_env = NULL;
toku_free(cf->fname_in_env);
cf->fname_in_env = NULL;
}
//It is safe to have the name repeated since this is a ft-only test function.
//There isn't an environment directory so its both env/cwd.
......@@ -918,10 +919,10 @@ toku_cachefile_truncate (CACHEFILE cf, toku_off_t new_size) {
static CACHEFILE remove_cf_from_list_locked (CACHEFILE cf, CACHEFILE list) {
if (list==0) return 0;
else if (list==cf) {
return list->next;
return list->next;
} else {
list->next = remove_cf_from_list_locked(cf, list->next);
return list;
list->next = remove_cf_from_list_locked(cf, list->next);
return list;
}
}
......@@ -951,45 +952,45 @@ int toku_cachefile_close (CACHEFILE *cfp, char **error_string, BOOL oplsn_valid,
assert(!cf->for_checkpoint);
assert(!cf->is_closing);
cf->is_closing = TRUE; //Mark this cachefile so that no one will re-use it.
int r;
// cachetable_flush_cachefile() may release and retake cachetable_lock,
// allowing another thread to get into either/both of
int r;
// cachetable_flush_cachefile() may release and retake cachetable_lock,
// allowing another thread to get into either/both of
// - toku_cachetable_openfd()
// - toku_cachefile_of_iname_and_add_reference()
cachetable_flush_cachefile(ct, cf);
cachetable_flush_cachefile(ct, cf);
if (0) {
error:
remove_cf_from_cachefiles_list(cf);
error:
remove_cf_from_cachefiles_list(cf);
assert(!cf->next_in_checkpoint); //checkpoint cannot run on a closing file
assert(!cf->for_checkpoint); //checkpoint cannot run on a closing file
if (cf->fname_in_env) toku_free(cf->fname_in_env);
if (cf->fname_in_env) toku_free(cf->fname_in_env);
rwlock_write_lock(&cf->fdlock, ct->mutex);
if ( !toku_cachefile_is_dev_null_unlocked(cf) ) {
int r3 = toku_file_fsync_without_accounting(cf->fd); //t:2444
if (r3!=0) fprintf(stderr, "%s:%d During error handling, could not fsync file r=%d errno=%d\n", __FILE__, __LINE__, r3, errno);
}
int r2 = close(cf->fd);
if (r2!=0) fprintf(stderr, "%s:%d During error handling, could not close file r=%d errno=%d\n", __FILE__, __LINE__, r2, errno);
//assert(r == 0);
int r2 = close(cf->fd);
if (r2!=0) fprintf(stderr, "%s:%d During error handling, could not close file r=%d errno=%d\n", __FILE__, __LINE__, r2, errno);
//assert(r == 0);
rwlock_write_unlock(&cf->fdlock);
rwlock_destroy(&cf->fdlock);
assert(toku_list_empty(&cf->pairs_for_cachefile));
toku_free(cf);
cachetable_unlock(ct);
return r;
toku_free(cf);
cachetable_unlock(ct);
return r;
}
if (cf->close_userdata) {
if (cf->close_userdata) {
rwlock_prefer_read_lock(&cf->fdlock, ct->mutex);
r = cf->close_userdata(cf, cf->fd, cf->userdata, error_string, oplsn_valid, oplsn);
rwlock_read_unlock(&cf->fdlock);
if (r!=0) goto error;
}
cf->close_userdata = NULL;
cf->checkpoint_userdata = NULL;
cf->begin_checkpoint_userdata = NULL;
cf->end_checkpoint_userdata = NULL;
cf->userdata = NULL;
}
cf->close_userdata = NULL;
cf->checkpoint_userdata = NULL;
cf->begin_checkpoint_userdata = NULL;
cf->end_checkpoint_userdata = NULL;
cf->userdata = NULL;
remove_cf_from_cachefiles_list(cf);
toku_cond_destroy(&cf->background_wait);
rwlock_write_lock(&cf->fdlock, ct->mutex); //Just make sure we can get it.
......@@ -1006,13 +1007,13 @@ int toku_cachefile_close (CACHEFILE *cfp, char **error_string, BOOL oplsn_valid,
assert(toku_list_empty(&cf->pairs_for_cachefile));
cachetable_unlock(ct);
r = close(cf->fd);
assert(r == 0);
r = close(cf->fd);
assert(r == 0);
cf->fd = -1;
if (cf->fname_in_env) toku_free(cf->fname_in_env);
toku_free(cf);
return r;
if (cf->fname_in_env) toku_free(cf->fname_in_env);
toku_free(cf);
return r;
}
}
......@@ -1080,13 +1081,13 @@ static void cachetable_rehash (CACHETABLE ct, u_int32_t newtable_size) {
ct->table_size=newtable_size;
for (i=0; i<newtable_size; i++) newtable[i]=0;
for (i=0; i<oldtable_size; i++) {
PAIR p;
while ((p=ct->table[i])!=0) {
unsigned int h = p->fullhash&(newtable_size-1);
ct->table[i] = p->hash_chain;
p->hash_chain = newtable[h];
newtable[h] = p;
}
PAIR p;
while ((p=ct->table[i])!=0) {
unsigned int h = p->fullhash&(newtable_size-1);
ct->table[i] = p->hash_chain;
p->hash_chain = newtable[h];
newtable[h] = p;
}
}
toku_free(ct->table);
// printf("Freed\n");
......@@ -1160,13 +1161,13 @@ static PAIR remove_from_hash_chain (PAIR remove_me, PAIR list) {
static void
pending_pairs_remove (CACHETABLE ct, PAIR p) {
if (p->pending_next) {
p->pending_next->pending_prev = p->pending_prev;
p->pending_next->pending_prev = p->pending_prev;
}
if (p->pending_prev) {
p->pending_prev->pending_next = p->pending_next;
p->pending_prev->pending_next = p->pending_next;
}
else if (ct->pending_head==p) {
ct->pending_head = p->pending_next;
ct->pending_head = p->pending_next;
}
p->pending_prev = p->pending_next = NULL;
}
......@@ -2024,32 +2025,32 @@ int toku_cachetable_put_with_dep_pairs(
maybe_flush_some(ct, attr.size);
int rval;
{
BEGIN_CRITICAL_REGION; // checkpoint may not begin inside critical region, detect and crash if one begins
get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra);
rval = cachetable_put_internal(
cachefile,
*key,
*fullhash,
value,
attr,
write_callback
);
//
// now that we have inserted the row, let's checkpoint the
// dependent nodes, if they need checkpointing
//
checkpoint_dependent_pairs(
ct,
num_dependent_pairs,
dependent_cfs,
dependent_keys,
dependent_fullhash,
dependent_dirty
);
END_CRITICAL_REGION; // checkpoint after this point would no longer cause a threadsafety bug
BEGIN_CRITICAL_REGION; // checkpoint may not begin inside critical region, detect and crash if one begins
get_key_and_fullhash(key, fullhash, get_key_and_fullhash_extra);
rval = cachetable_put_internal(
cachefile,
*key,
*fullhash,
value,
attr,
write_callback
);
//
// now that we have inserted the row, let's checkpoint the
// dependent nodes, if they need checkpointing
//
checkpoint_dependent_pairs(
ct,
num_dependent_pairs,
dependent_cfs,
dependent_keys,
dependent_fullhash,
dependent_dirty
);
END_CRITICAL_REGION; // checkpoint after this point would no longer cause a threadsafety bug
}
cachetable_unlock(ct);
return rval;
......@@ -2233,7 +2234,7 @@ static void cachetable_fetch_pair(
assert(!toku_cachefile_is_dev_null_unlocked(cf));
r = fetch_callback(cf, cf->fd, key, fullhash, &toku_value, &disk_data, &attr, &dirty, read_extraargs);
if (dirty)
p->dirty = CACHETABLE_DIRTY;
p->dirty = CACHETABLE_DIRTY;
cachetable_lock(ct);
rwlock_read_unlock(&cf->fdlock);
......@@ -2469,8 +2470,8 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3
cachetable_lock(ct);
cachetable_maybe_get_and_pins++;
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
if (!p->checkpoint_pending && //If checkpoint pending, we would need to first write it, which would make it clean
p->dirty &&
nb_mutex_users(&p->value_nb_mutex) == 0
......@@ -2484,7 +2485,7 @@ int toku_cachetable_maybe_get_and_pin (CACHEFILE cachefile, CACHEKEY key, u_int3
//printf("%s:%d cachetable_maybe_get_and_pin(%lld)--> %p\n", __FILE__, __LINE__, key, *value);
}
break;
}
}
}
cachetable_unlock(ct);
return r;
......@@ -2501,8 +2502,8 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key,
cachetable_lock(ct);
cachetable_maybe_get_and_pins++;
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
if (!p->checkpoint_pending && //If checkpoint pending, we would need to first write it, which would make it clean (if the pin would be used for writes. If would be used for read-only we could return it, but that would increase complexity)
nb_mutex_users(&p->value_nb_mutex) == 0
) {
......@@ -2514,7 +2515,7 @@ int toku_cachetable_maybe_get_and_pin_clean (CACHEFILE cachefile, CACHEKEY key,
//printf("%s:%d cachetable_maybe_get_and_pin_clean(%lld)--> %p\n", __FILE__, __LINE__, key, *value);
}
break;
}
}
}
cachetable_unlock(ct);
return r;
......@@ -2532,9 +2533,9 @@ cachetable_unpin_internal(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
//assert(fullhash == toku_cachetable_hash(cachefile, key));
if (!have_ct_lock) cachetable_lock(ct);
for (PAIR p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
assert(nb_mutex_writers(&p->value_nb_mutex)>0);
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
assert(nb_mutex_writers(&p->value_nb_mutex)>0);
// this is a client thread that is unlocking the PAIR
// That is, a cleaner, flusher, or get_and_pin thread
// So, there must not be a completion queue lying around
......@@ -2545,22 +2546,22 @@ cachetable_unpin_internal(CACHEFILE cachefile, CACHEKEY key, u_int32_t fullhash,
// exist
assert(!p->cq);
nb_mutex_write_unlock(&p->value_nb_mutex);
if (dirty) p->dirty = CACHETABLE_DIRTY;
if (dirty) p->dirty = CACHETABLE_DIRTY;
if (attr.is_valid) {
PAIR_ATTR old_attr = p->attr;
PAIR_ATTR new_attr = attr;
cachetable_change_pair_attr(ct, old_attr, new_attr);
p->attr = attr;
}
WHEN_TRACE_CT(printf("[count=%lld]\n", p->pinned));
{
WHEN_TRACE_CT(printf("[count=%lld]\n", p->pinned));
{
if (flush) {
maybe_flush_some(ct, 0);
}
}
}
r = 0; // we found one
break;
}
}
}
if (!have_ct_lock) cachetable_unlock(ct);
return r;
......@@ -2615,8 +2616,8 @@ int toku_cachetable_get_and_pin_nonblocking (
int count = 0;
PAIR p;
for (p = ct->table[fullhash&(ct->table_size-1)]; p; p = p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cf) {
count++;
if (p->key.b==key.b && p->cachefile==cf) {
//
// In Doofenshmirtz, we keep the root to leaf path pinned
......@@ -2852,12 +2853,12 @@ int toku_cachefile_prefetch(CACHEFILE cf, CACHEKEY key, u_int32_t fullhash,
*doing_prefetch = TRUE;
}
}
else {
else {
// sanity check, we already have an assert
// before locking the PAIR
assert(!p->cq);
nb_mutex_write_unlock(&p->value_nb_mutex);
}
}
}
cachetable_unlock(ct);
return 0;
......@@ -2956,45 +2957,45 @@ void toku_cachetable_verify (CACHETABLE ct) {
// First clear all the verify flags by going through the hash chains
{
u_int32_t i;
for (i=0; i<ct->table_size; i++) {
PAIR p;
for (p=ct->table[i]; p; p=p->hash_chain) {
p->verify_flag=0;
}
}
u_int32_t i;
for (i=0; i<ct->table_size; i++) {
PAIR p;
for (p=ct->table[i]; p; p=p->hash_chain) {
p->verify_flag=0;
}
}
}
// Now go through the clock chain, make sure everything in the LRU chain is hashed, and set the verify flag.
{
PAIR p;
PAIR p;
BOOL is_first = TRUE;
for (p=ct->clock_head; ct->clock_head!=NULL && (p!=ct->clock_head || is_first); p=p->clock_next) {
for (p=ct->clock_head; ct->clock_head!=NULL && (p!=ct->clock_head || is_first); p=p->clock_next) {
is_first=FALSE;
assert(p->verify_flag==0);
PAIR p2;
u_int32_t fullhash = p->fullhash;
//assert(fullhash==toku_cachetable_hash(p->cachefile, p->key));
for (p2=ct->table[fullhash&(ct->table_size-1)]; p2; p2=p2->hash_chain) {
if (p2==p) {
/* found it */
goto next;
}
}
fprintf(stderr, "Something in the clock chain is not hashed\n");
assert(0);
next:
p->verify_flag = 1;
}
assert(p->verify_flag==0);
PAIR p2;
u_int32_t fullhash = p->fullhash;
//assert(fullhash==toku_cachetable_hash(p->cachefile, p->key));
for (p2=ct->table[fullhash&(ct->table_size-1)]; p2; p2=p2->hash_chain) {
if (p2==p) {
/* found it */
goto next;
}
}
fprintf(stderr, "Something in the clock chain is not hashed\n");
assert(0);
next:
p->verify_flag = 1;
}
}
// Now make sure everything in the hash chains has the verify_flag set to 1.
{
u_int32_t i;
for (i=0; i<ct->table_size; i++) {
PAIR p;
for (p=ct->table[i]; p; p=p->hash_chain) {
assert(p->verify_flag);
}
}
u_int32_t i;
for (i=0; i<ct->table_size; i++) {
PAIR p;
for (p=ct->table[i]; p; p=p->hash_chain) {
assert(p->verify_flag);
}
}
}
cachetable_unlock(ct);
......@@ -3113,7 +3114,7 @@ static void cachetable_flush_cachefile(CACHETABLE ct, CACHEFILE cf) {
//If 'already_removed' is set, then we should release our reference
//and go to the next entry.
for (i=0; i < num_pairs; i++) {
PAIR p = list[i];
PAIR p = list[i];
if (!p->already_removed) {
assert(cf == 0 || p->cachefile==cf);
nfound++;
......@@ -3201,7 +3202,7 @@ toku_cachetable_close (CACHETABLE *ctp) {
cachetable_flush_cachefile(ct, NULL);
u_int32_t i;
for (i=0; i<ct->table_size; i++) {
if (ct->table[i]) return -1;
if (ct->table[i]) return -1;
}
assert(ct->size_evicting == 0);
rwlock_destroy(&ct->pending_lock);
......@@ -3237,9 +3238,9 @@ int toku_cachetable_unpin_and_remove (
u_int32_t fullhash = toku_cachetable_hash(cachefile, key);
for (p=ct->table[fullhash&(ct->table_size-1)]; p; p=p->hash_chain) {
count++;
if (p->key.b==key.b && p->cachefile==cachefile) {
p->dirty = CACHETABLE_CLEAN; // clear the dirty bit. We're just supposed to remove it.
assert(nb_mutex_writers(&p->value_nb_mutex));
if (p->key.b==key.b && p->cachefile==cachefile) {
p->dirty = CACHETABLE_CLEAN; // clear the dirty bit. We're just supposed to remove it.
assert(nb_mutex_writers(&p->value_nb_mutex));
// grab disk_nb_mutex to ensure any background thread writing
// out a cloned value completes
nb_mutex_lock(&p->disk_nb_mutex, ct->mutex);
......@@ -3416,39 +3417,39 @@ log_open_txn (OMTVALUE txnv, u_int32_t UU(index), void *UU(extra)) {
case TOKUTXN_LIVE:
case TOKUTXN_COMMITTING:
case TOKUTXN_ABORTING: {
int r = toku_log_xstillopen(logger, NULL, 0,
toku_txn_get_txnid(txn),
toku_txn_get_txnid(toku_logger_txn_parent(txn)),
txn->rollentry_raw_count,
open_filenums,
txn->force_fsync_on_commit,
txn->num_rollback_nodes,
txn->num_rollentries,
txn->spilled_rollback_head,
txn->spilled_rollback_tail,
txn->current_rollback);
assert(r==0);
return 0;
int r = toku_log_xstillopen(logger, NULL, 0,
toku_txn_get_txnid(txn),
toku_txn_get_txnid(toku_logger_txn_parent(txn)),
txn->rollentry_raw_count,
open_filenums,
txn->force_fsync_on_commit,
txn->num_rollback_nodes,
txn->num_rollentries,
txn->spilled_rollback_head,
txn->spilled_rollback_tail,
txn->current_rollback);
assert(r==0);
return 0;
}
case TOKUTXN_PREPARING: {
TOKU_XA_XID xa_xid;
toku_txn_get_prepared_xa_xid(txn, &xa_xid);
int r = toku_log_xstillopenprepared(logger, NULL, 0,
toku_txn_get_txnid(txn),
&xa_xid,
txn->rollentry_raw_count,
open_filenums,
txn->force_fsync_on_commit,
txn->num_rollback_nodes,
txn->num_rollentries,
txn->spilled_rollback_head,
txn->spilled_rollback_tail,
txn->current_rollback);
assert(r==0);
return 0;
TOKU_XA_XID xa_xid;
toku_txn_get_prepared_xa_xid(txn, &xa_xid);
int r = toku_log_xstillopenprepared(logger, NULL, 0,
toku_txn_get_txnid(txn),
&xa_xid,
txn->rollentry_raw_count,
open_filenums,
txn->force_fsync_on_commit,
txn->num_rollback_nodes,
txn->num_rollentries,
txn->spilled_rollback_head,
txn->spilled_rollback_tail,
txn->current_rollback);
assert(r==0);
return 0;
}
case TOKUTXN_RETIRED:
return 0;
return 0;
}
// default is an error
assert(0);
......@@ -3468,10 +3469,10 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
{
unsigned i;
cachetable_lock(ct);
//Initialize accountability counters
ct->checkpoint_num_files = 0;
ct->checkpoint_num_txns = 0;
cachetable_lock(ct);
//Initialize accountability counters
ct->checkpoint_num_files = 0;
ct->checkpoint_num_txns = 0;
//Make list of cachefiles to be included in checkpoint.
{
......@@ -3496,48 +3497,48 @@ toku_cachetable_begin_checkpoint (CACHETABLE ct, TOKULOGGER logger) {
cachefiles_unlock(ct);
}
if (logger) {
// The checkpoint must be performed after the lock is acquired.
{
LSN begin_lsn={.lsn=-1}; // we'll need to store the lsn of the checkpoint begin in all the trees that are checkpointed.
int r = toku_log_begin_checkpoint(logger, &begin_lsn, 0, 0);
assert(r==0);
ct->lsn_of_checkpoint_in_progress = begin_lsn;
}
// Log all the open files
{
if (logger) {
// The checkpoint must be performed after the lock is acquired.
{
LSN begin_lsn={.lsn=-1}; // we'll need to store the lsn of the checkpoint begin in all the trees that are checkpointed.
int r = toku_log_begin_checkpoint(logger, &begin_lsn, 0, 0);
assert(r==0);
ct->lsn_of_checkpoint_in_progress = begin_lsn;
}
// Log all the open files
{
//Must loop through ALL open files (even if not included in checkpoint).
CACHEFILE cf;
CACHEFILE cf;
cachefiles_lock(ct);
for (cf = ct->cachefiles; cf; cf=cf->next) {
for (cf = ct->cachefiles; cf; cf=cf->next) {
if (cf->log_fassociate_during_checkpoint) {
int r = cf->log_fassociate_during_checkpoint(cf, cf->userdata);
ct->checkpoint_num_files++;
ct->checkpoint_num_files++;
assert(r==0);
}
}
}
cachefiles_unlock(ct);
}
// Log all the open transactions MUST BE AFTER OPEN FILES
{
}
// Log all the open transactions MUST BE AFTER OPEN FILES
{
ct->checkpoint_num_txns = toku_omt_size(logger->live_txns);
int r = toku_omt_iterate(logger->live_txns, log_open_txn, NULL);
assert(r==0);
}
// Log rollback suppression for all the open files MUST BE AFTER TXNS
{
assert(r==0);
}
// Log rollback suppression for all the open files MUST BE AFTER TXNS
{
//Must loop through ALL open files (even if not included in checkpoint).
CACHEFILE cf;
CACHEFILE cf;
cachefiles_lock(ct);
for (cf = ct->cachefiles; cf; cf=cf->next) {
for (cf = ct->cachefiles; cf; cf=cf->next) {
if (cf->log_suppress_rollback_during_checkpoint) {
int r = cf->log_suppress_rollback_during_checkpoint(cf, cf->userdata);
assert(r==0);
}
}
}
cachefiles_unlock(ct);
}
}
}
}
unsigned int npending = 0;
//
......@@ -3812,14 +3813,14 @@ int toku_cachetable_assert_all_unpinned (CACHETABLE ct) {
int some_pinned=0;
cachetable_lock(ct);
for (i=0; i<ct->table_size; i++) {
PAIR p;
for (p=ct->table[i]; p; p=p->hash_chain) {
assert(nb_mutex_writers(&p->value_nb_mutex)>=0);
if (nb_mutex_writers(&p->value_nb_mutex)) {
//printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value_data);
some_pinned=1;
}
}
PAIR p;
for (p=ct->table[i]; p; p=p->hash_chain) {
assert(nb_mutex_writers(&p->value_nb_mutex)>=0);
if (nb_mutex_writers(&p->value_nb_mutex)) {
//printf("%s:%d pinned: %"PRId64" (%p)\n", __FILE__, __LINE__, p->key.b, p->value_data);
some_pinned=1;
}
}
}
cachetable_unlock(ct);
return some_pinned;
......@@ -3874,14 +3875,14 @@ void toku_cachetable_get_state (CACHETABLE ct, int *num_entries_ptr, int *hash_s
}
int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, void **value_ptr,
int *dirty_ptr, long long *pin_ptr, long *size_ptr) {
int *dirty_ptr, long long *pin_ptr, long *size_ptr) {
PAIR p;
int count = 0;
int r = -1;
u_int32_t fullhash = toku_cachetable_hash(cf, key);
cachetable_lock(ct);
for (p = ct->table[fullhash&(ct->table_size-1)]; p; p = p->hash_chain) {
count++;
count++;
if (p->key.b == key.b && p->cachefile == cf) {
if (value_ptr)
*value_ptr = p->value_data;
......@@ -3901,12 +3902,12 @@ int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, vo
void
toku_cachefile_set_userdata (CACHEFILE cf,
void *userdata,
void *userdata,
int (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
int (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*),
int (*close_userdata)(CACHEFILE, int, void*, char**, BOOL, LSN),
int (*checkpoint_userdata)(CACHEFILE, int, void*),
int (*begin_checkpoint_userdata)(LSN, void*),
int (*close_userdata)(CACHEFILE, int, void*, char**, BOOL, LSN),
int (*checkpoint_userdata)(CACHEFILE, int, void*),
int (*begin_checkpoint_userdata)(LSN, void*),
int (*end_checkpoint_userdata)(CACHEFILE, int, void*),
int (*note_pin_by_checkpoint)(CACHEFILE, void*),
int (*note_unpin_by_checkpoint)(CACHEFILE, void*)) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment