Commit ee060abf authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Get abort to run a little bit. Addresses #253.

git-svn-id: file:///svn/tokudb@1608 c7de825b-a66e-492c-adef-691d508d4ae1
parent bfaef940
......@@ -63,4 +63,7 @@ typedef struct intpairarray {
} *array;
} INTPAIRARRAY;
typedef struct cachetable *CACHETABLE;
typedef struct cachefile *CACHEFILE;
#endif
......@@ -88,6 +88,16 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn,
return 0;
}
// What cachefile goes with particular fd?
int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf, BRT *brt) {
CACHEFILE extant;
for (extant = t->cachefiles; extant; extant=extant->next) {
if (extant->filenum.fileid==filenum.fileid) { *cf = extant; return 0; }
}
*brt=0; // This is wrong. But the tests will notice right away.
return ENOENT;
}
int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd) {
int r;
CACHEFILE extant;
......
......@@ -9,8 +9,6 @@
/* Implement the cache table. */
typedef long long CACHEKEY;
typedef struct cachetable *CACHETABLE;
typedef struct cachefile *CACHEFILE;
/* Maintain a cache mapping from cachekeys to values (void*)
* Some of the keys can be pinned. Don't pin too many or for too long.
......@@ -84,4 +82,8 @@ void toku_cachetable_verify (CACHETABLE t); // Slow...
TOKULOGGER toku_cachefile_logger (CACHEFILE);
FILENUM toku_cachefile_filenum (CACHEFILE);
// What is the cachefile that goes with a particular filenum?
// During a transaction, we cannot reuse a filenum.
int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf, BRT *brt);
#endif
......@@ -20,6 +20,7 @@ struct tokulogger {
LSN lsn;
char buf[LOGGER_BUF_SIZE];
int n_in_buf;
CACHETABLE ct;
};
int toku_logger_find_next_unused_log_file(const char *directory, long long *result);
......
......@@ -74,6 +74,11 @@ int toku_logger_create (TOKULOGGER *resultp) {
return 0;
}
void toku_logger_set_cachetable (TOKULOGGER tl, CACHETABLE ct) {
tl->ct = ct;
}
int toku_logger_open (const char *directory, TOKULOGGER logger) {
if (logger->is_open) return EINVAL;
if (logger->is_panicked) return EINVAL;
......
......@@ -3,12 +3,14 @@
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include <errno.h>
#include "../include/db.h"
#include "brttypes.h"
#include "kv-pair.h"
#include <errno.h>
int toku_logger_create(TOKULOGGER */*resultp*/);
void toku_logger_set_cachetable (TOKULOGGER, CACHETABLE);
int toku_logger_open(const char */*directory*/, TOKULOGGER);
int toku_logger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes);
int toku_logger_close(TOKULOGGER *logger);
......
......@@ -1577,6 +1577,15 @@ int toku_pma_set_at_index (PMA pma, unsigned int idx, DBT *key, DBT *value) {
return 0;
}
int toku_pma_clear_at_index (PMA pma, unsigned int idx) {
if (idx>=pma->N) return -1;
if (!kv_pair_inuse(pma->pairs[idx])) return -1;
pma_mfree_kv_pair(pma, pma->pairs[idx]);
pma->pairs[idx]=0;
pma->n_pairs_present++;
return 0;
}
// assume no cursors
int toku_pma_move_indices (PMA pma, INTPAIRARRAY fromto) {
u_int32_t i;
......
......@@ -151,6 +151,8 @@ void toku_pma_verify_fingerprint (PMA pma, u_int32_t rand4fingerprint, u_int32_t
int toku_resize_pma_exactly (PMA pma, int oldsize, int newsize);
int toku_pma_set_at_index (PMA, unsigned int /*index*/, DBT */*key*/, DBT */*value*/); // If the index is wrong or there is a value already, return nonzero
int toku_pma_clear_at_index (PMA, unsigned int /*index*/); // If the index is wrong or there is a value already, return nonzero
// Requires: No open cursors on the pma.
int toku_pma_move_indices (PMA pma, INTPAIRARRAY fromto); // Return nonzero if the indices are somehow wrong.
......
......@@ -215,7 +215,23 @@ void toku_recover_insertinleaf (struct logtype_insertinleaf *c) {
toku_free(c->data.data);
}
int toku_rollback_insertinleaf (struct logtype_insertinleaf *le, TOKUTXN txn) ABORTIT
int toku_rollback_insertinleaf (struct logtype_insertinleaf *c, TOKUTXN txn) {
CACHEFILE cf;
BRT brt;
void *node_v;
int r = toku_cachefile_of_filenum(txn->logger->ct, c->filenum, &cf, &brt);
assert(r==0);
r = toku_cachetable_get_and_pin(cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
BRTNODE node = node_v;
r = toku_pma_clear_at_index(node->u.l.buffer, c->pmaidx);
if (r!=0) return r;
node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len);
node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + c->key.len + c->data.len;
VERIFY_COUNTS(node);
r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
return r;
}
// a newbrtnode should have been done before this
......
......@@ -381,6 +381,9 @@ static int toku_db_env_open(DB_ENV * env, const char *home, u_int32_t flags, int
r = toku_brt_create_cachetable(&env->i->cachetable, env->i->cachetable_size, ZERO_LSN, env->i->logger);
if (r!=0) goto died2;
toku_logger_set_cachetable(env->i->logger, env->i->cachetable);
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment