Commit 1ee57421 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

start making db-benchmark-test recoverable. Addresses #27.

git-svn-id: file:///svn/tokudb@1893 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9f672292
......@@ -86,6 +86,8 @@ We wanted a mode in which matching data is overwritten, however
``matching'' is defined. Thus for non-duplicates, a matching pair is
one with the same key. For sorted duplicates, a matching pair is one
with the same key and data.
For high performance, use @code{DB_YESOVERWRITE}.
@end table
@c man end
......
......@@ -8,7 +8,7 @@
# GCOV_FLAGS = -fprofile-arcs -ftest-coverage
# PROF_FLAGS = -pg
OPTFLAGS = -O2
# OPTFLAGS = -O2
ifeq ($(VERBOSE),2)
VERBVERBOSE=-v
......
......@@ -213,68 +213,6 @@ int toku_logger_finish (TOKULOGGER logger, struct wbuf *wbuf) {
return toku_logger_log_bytes(logger, wbuf->ndone, wbuf->buf);
}
// Log an insertion of a key-value pair into a particular node of the tree.
int toku_logger_log_brt_insert_with_no_overwrite (TOKULOGGER logger,
TXNID txnid,
FILENUM fileid,
DISKOFF diskoff,
unsigned char *key,
int keylen,
unsigned char *val,
int vallen) {
if (logger->is_panicked) return EINVAL;
printf("%s:%d\n", __FILE__, __LINE__);
return 0;
int buflen=(keylen+vallen+4+4 // key and value
+1 // command
+8 // lsn
+8 // txnid
+4 // fileid
+8 // diskoff
+8 // crc and len
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init(&wbuf, buf, buflen) ;
wbuf_char(&wbuf, LT_INSERT_WITH_NO_OVERWRITE);
wbuf_LSN (&wbuf, logger->lsn); logger->lsn.lsn++;
wbuf_TXNID(&wbuf, txnid);
wbuf_FILENUM(&wbuf, fileid);
wbuf_DISKOFF(&wbuf, diskoff);
wbuf_bytes(&wbuf, key, keylen);
wbuf_bytes(&wbuf, val, vallen);
return toku_logger_finish (logger, &wbuf);
}
int toku_logger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF diskoff, int is_add, const struct kv_pair *pair) {
assert(is_add==0);
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
assert(db);
int keylen = pair->keylen;
int vallen = pair->vallen;
const int buflen=(keylen+vallen+4+4 // the key and value
+1 // log command
+8 // lsn
+8 // txnid
+8 // fileid
+8 // diskoff
+8 // crc & len
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init(&wbuf, buf, buflen) ;
wbuf_char(&wbuf, is_add ? LT_INSERT_WITH_NO_OVERWRITE : LT_DELETE);
wbuf_LSN (&wbuf, txn->logger->lsn);
txn->logger->lsn.lsn++;
wbuf_TXNID(&wbuf, txn->txnid64);
wbuf_FILENUM(&wbuf, db->i->fileid);
wbuf_DISKOFF(&wbuf, diskoff);
wbuf_bytes(&wbuf, kv_pair_key_const(pair), keylen);
wbuf_bytes(&wbuf, kv_pair_val_const(pair), vallen);
return toku_logger_finish(txn->logger, &wbuf);
}
int toku_logger_commit (TOKUTXN txn, int nosync) {
// panic handled in log_commit
int r = toku_log_commit(txn, txn->txnid64, nosync);
......@@ -314,31 +252,6 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid
return 0;
}
int toku_logger_log_block_rename (TOKULOGGER logger, FILENUM fileid, DISKOFF olddiskoff, DISKOFF newdiskoff, DISKOFF parentdiskoff, int childnum) {
if (logger->is_panicked) return EINVAL;
const int buflen=(+1 // log command
+8 // lsn
+8 // fileid
+8 // olddiskoff
+8 // newdiskoff
+8 // parentdiskoff
+4 // childnum
+8 // crc & len
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init (&wbuf, buf, buflen) ;
wbuf_char (&wbuf, LT_BLOCK_RENAME);
wbuf_LSN (&wbuf, logger->lsn);
logger->lsn.lsn++;
wbuf_FILENUM(&wbuf, fileid);
wbuf_DISKOFF(&wbuf, olddiskoff);
wbuf_DISKOFF(&wbuf, newdiskoff);
wbuf_DISKOFF(&wbuf, parentdiskoff);
wbuf_int (&wbuf, childnum);
return toku_logger_finish(logger, &wbuf);
}
int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
......@@ -359,24 +272,6 @@ int toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum) {
}
int toku_logger_log_unlink (TOKUTXN txn, const char *fname) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
const int fnamelen = strlen(fname);
const int buflen = (+1 // log command
+4 // length of fname
+fnamelen
+8 // crc & len
);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init (&wbuf, buf, buflen);
wbuf_char (&wbuf, LT_UNLINK);
wbuf_bytes(&wbuf, fname, fnamelen);
return toku_logger_finish(txn->logger, &wbuf);
};
int toku_logger_log_header (TOKUTXN txn, FILENUM filenum, struct brt_header *h) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
......
......@@ -23,16 +23,12 @@ int toku_logger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF
int toku_logger_commit (TOKUTXN txn, int no_sync);
int toku_logger_log_block_rename (TOKULOGGER /*logger*/, FILENUM /*fileid*/, DISKOFF /*olddiskoff*/, DISKOFF /*newdiskoff*/, DISKOFF /*parentdiskoff*/, int /*childnum*/);
int toku_logger_txn_begin (TOKUTXN /*parent*/,TOKUTXN *, TXNID /*txnid64*/, TOKULOGGER /*logger*/);
int toku_logger_log_fcreate (TOKUTXN, const char */*fname*/, int /*mode*/);
int toku_logger_log_fopen (TOKUTXN, const char * /*fname*/, FILENUM);
int toku_logger_log_unlink (TOKUTXN, const char */*fname*/);
int toku_logger_log_header (TOKUTXN, FILENUM, struct brt_header *);
int toku_logger_log_newbrtnode (TOKUTXN txn, FILENUM filenum, DISKOFF offset, u_int32_t height, u_int32_t nodesize, char is_dup_sort_mode, u_int32_t rand4fingerprint);
......@@ -66,7 +62,6 @@ int toku_read_and_print_logmagic (FILE *f, u_int32_t *version);
TXNID toku_txn_get_txnid (TOKUTXN);
LSN toku_txn_get_last_lsn (TOKUTXN);
static inline int toku_copy_FILENUM(FILENUM *target, FILENUM val) { *target = val; return 0; }
static inline void toku_free_FILENUM(FILENUM val __attribute__((__unused__))) {}
......
......@@ -112,6 +112,13 @@ const struct logtype logtypes[] = {
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
{"deleteinleaf", 'd', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
{"u_int32_t", "pmaidx", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
{"resizepma", 'R', FA{{"TXNID", "txnid", 0},
{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0},
......
......@@ -525,12 +525,21 @@ int toku_resize_pma_exactly (PMA pma, int oldsize, int newsize) {
return 0;
}
static int pma_resize_array(TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int asksize, int startz, LSN *node_lsn) {
static int pma_resize_array_nolog(PMA pma, int asksize, int startz, unsigned int *oldn, unsigned int *newn) {
unsigned int oldN = pma->N;
unsigned int n = pma_array_size(pma, asksize);
int r = toku_resize_pma_exactly(pma, startz, n);
if (r!=0) return r;
toku_pmainternal_calculate_parameters(pma);
*oldn = oldN;
*newn = n;
return 0;
}
static int pma_resize_array(TOKUTXN txn, FILENUM filenum, DISKOFF offset, PMA pma, int asksize, int startz, LSN *node_lsn) {
unsigned int oldN, n;
int r = pma_resize_array_nolog(pma, asksize, startz, &oldN, &n);
if (r!=0) return r;
toku_log_resizepma (txn, toku_txn_get_txnid(txn), filenum, offset, oldN, n);
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
return 0;
......@@ -839,8 +848,12 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
struct kv_pair *kv = pma->pairs[idx];
*replaced_v_size = kv->vallen;
*fingerprint -= rand4fingerprint*toku_calccrc32_kvpair(kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
r=toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, kv);
if (r!=0) return r;
{
const BYTESTRING deletedkey = { kv->keylen, kv_pair_key(kv) };
const BYTESTRING deleteddata = { kv->vallen, kv_pair_val(kv) };
r=toku_log_deleteinleaf(txn, toku_txn_get_txnid(txn), pma->filenum, diskoff, idx, deletedkey, deleteddata);
if (r!=0) return r;
}
if (txn && node_lsn) *node_lsn = toku_txn_get_last_lsn(txn);
if (v->size == (unsigned int) kv_pair_vallen(kv)) {
memcpy(kv_pair_val(kv), v->data, v->size);
......@@ -849,10 +862,8 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
pma->pairs[idx] = pma_malloc_kv_pair(pma, k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
}
r = toku_logger_log_phys_add_or_delete_in_leaf(pma->db, txn, diskoff, 0, pma->pairs[idx]);
if (txn && node_lsn && r==0) *node_lsn = toku_txn_get_last_lsn(txn);
*fingerprint += rand4fingerprint*toku_calccrc32_kvpair(k->data, k->size, v->data, v->size);
return r;
/* idx is live here */
goto logit_and_update_fingerprint;
}
if (kv_pair_inuse(pma->pairs[idx])) {
unsigned int newidx;
......@@ -867,6 +878,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
pma->n_pairs_present++;
*replaced_v_size = -1;
//printf("%s:%d txn=%p\n", __FILE__, __LINE__, txn);
logit_and_update_fingerprint:
{
const struct kv_pair *pair = pma->pairs[idx];
const BYTESTRING key = { pair->keylen, (char*)kv_pair_key_const(pair) };
......@@ -1018,12 +1030,20 @@ int toku_pma_split(TOKUTXN txn, FILENUM filenum,
/* put the first half of pairs into the left pma */
n = spliti;
error = pma_resize_array(txn, filenum, diskoff, pma, n + n/4, 0, lsn); // zeros the elements
// Since the new array is smaller than the old one, during recovery we need to do the resize after moving the elements.
// But we must actually do the resize first here so we can determine the size.
unsigned int oldn_for_logging, newn_for_logging;
error = pma_resize_array_nolog(pma, n + n/4, 0, // zeros the elements
&oldn_for_logging, &newn_for_logging);
assert(error == 0);
distribute_data(pma->pairs, toku_pma_index_limit(pma), &pairs[0], n, pma);
{
int r = pma_log_distribute(txn, filenum, diskoff, diskoff, spliti, &pairs[0], lsn, lsn);
if (r!=0) { toku_free(pairs); return r; }
r = toku_log_resizepma(txn, toku_txn_get_txnid(txn), filenum, diskoff, oldn_for_logging, newn_for_logging);
if (r!=0) { toku_free(pairs); return r; }
if (txn && lsn) *lsn = toku_txn_get_last_lsn(txn);
}
// Don't have to relocate kvpairs, because these ones are still there.
pma->n_pairs_present = spliti;
......
......@@ -43,7 +43,7 @@ int main (int argc, char *argv[]) {
}
if (r!=EOF) {
if (r==DB_BADFORMAT) {
fprintf(stderr, "Bad log format\n");
fprintf(stderr, "Bad log format at record %d\n", entrycount);
exit(1);
} else {
fprintf(stderr, "Huh? %s\n", strerror(r));
......
......@@ -230,7 +230,7 @@ void toku_recover_addchild (struct logtype_addchild *le) {
assert(node->height>0);
assert(le->childnum <= (unsigned)node->u.n.n_children);
unsigned int i;
for (i=node->u.n.n_children; i+1>le->childnum; i--) {
for (i=node->u.n.n_children; i>le->childnum; i--) {
node->u.n.childinfos[i]=node->u.n.childinfos[i-1];
BRTNODE_CHILD_DISKOFF(node,i) = BRTNODE_CHILD_DISKOFF(node, i-1);
node->u.n.buffers[i] = node->u.n.buffers[i-1];
......@@ -371,6 +371,51 @@ int toku_rollback_insertinleaf (struct logtype_insertinleaf *c, TOKUTXN txn) {
}
void toku_recover_deleteinleaf (struct logtype_deleteinleaf *c) {
struct cf_pair *pair;
int r = find_cachefile(c->filenum, &pair);
assert(r==0);
void *node_v;
assert(pair->brt);
r = toku_cachetable_get_and_pin(pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
BRTNODE node = node_v;
assert(node->height==0);
VERIFY_COUNTS(node);
r = toku_cachetable_get_and_pin(pair->cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, pair->brt);
assert(r==0);
node->local_fingerprint -= node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len);
node->u.l.n_bytes_in_buffer -= PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + c->key.len + c->data.len;
VERIFY_COUNTS(node);
node->log_lsn = c->lsn;
r = toku_cachetable_unpin(pair->cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
assert(r==0);
toku_free(c->key.data);
toku_free(c->data.data);
}
int toku_rollback_deleteinleaf (struct logtype_deleteinleaf *c, TOKUTXN txn) {
CACHEFILE cf;
BRT brt;
void *node_v;
int r = toku_cachefile_of_filenum(txn->logger->ct, c->filenum, &cf, &brt);
assert(r==0);
r = toku_cachetable_get_and_pin(cf, c->diskoff, &node_v, NULL, toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt);
if (r!=0) return r;
BRTNODE node = node_v;
DBT key,data;
r = toku_pma_set_at_index(node->u.l.buffer, c->pmaidx, toku_fill_dbt(&key, c->key.data, c->key.len), toku_fill_dbt(&data, c->data.data, c->data.len));
if (r!=0) return r;
node->local_fingerprint += node->rand4fingerprint*toku_calccrc32_kvpair(c->key.data, c->key.len,c->data.data, c->data.len);
node->u.l.n_bytes_in_buffer += PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + c->key.len + c->data.len;
VERIFY_COUNTS(node);
node->log_lsn = c->lsn;
r = toku_cachetable_unpin(cf, c->diskoff, 1, toku_serialize_brtnode_size(node));
return r;
}
// a newbrtnode should have been done before this
void toku_recover_resizepma (struct logtype_resizepma *c) {
struct cf_pair *pair;
......
......@@ -133,7 +133,6 @@ static void newmain (int count) {
int i;
u_int32_t version;
int r = toku_read_and_print_logmagic(stdin, &version);
assert(r==0);
for (i=0; i!=count; i++) {
r = toku_logprint_one_record(stdout, stdin);
if (r==EOF) break;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment