Commit 11e52586 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Log and recover for note splits works a little better (recovery runs, but the...

Log and recover for note splits works a little better (recovery runs, but the file is wrong for test_log5).  Addresses #27

git-svn-id: file:///svn/tokudb@1692 c7de825b-a66e-492c-adef-691d508d4ae1
parent d0e64cf7
......@@ -72,7 +72,7 @@ recover: LDFLAGS+=-lz
recover.o: log_header.h log-internal.h log.h yerror.h brttypes.h kv-pair.h memory.h key.h
recover: recover.o log_code.o memory.o log.o brt-serialize.o fifo.o pma.o ybt.o fingerprint.o mempool.o primes.o cachetable.o brt.o brt-verify.o key.o roll.o
roll.o: log_header.h log-internal.h log.h yerror.h brttypes.h kv-pair.h memory.h key.h cachetable.h
roll.o: log_header.h log-internal.h log.h yerror.h brttypes.h kv-pair.h memory.h key.h cachetable.h pma.h
log_code.o: log_header.h wbuf.h log-internal.h
log_code.c log_header.h: logformat
......
......@@ -107,7 +107,7 @@ struct kv_pair_tag {
#if PMA_USE_MEMPOOL
/* allocate a kv pair from the pma kv memory pool */
static struct kv_pair *kv_pair_malloc_mempool(void *key, int keylen, void *val, int vallen, struct mempool *mp) {
static struct kv_pair *kv_pair_malloc_mempool(const void *key, int keylen, const void *val, int vallen, struct mempool *mp) {
struct kv_pair *kv = toku_mempool_malloc(mp, sizeof (struct kv_pair) + keylen + vallen, 4);
if (kv)
kv_pair_init(kv, key, keylen, val, vallen);
......@@ -146,7 +146,7 @@ static int pma_compress_kvspace(PMA pma) {
/* malloc space for a kv pair from the pma memory pool and initialize it.
if the allocation fails, try to compress the memory pool and try again. */
static struct kv_pair *pma_malloc_kv_pair(PMA pma __attribute__((unused)), void *k, int ksize, void *v, int vsize) {
static struct kv_pair *pma_malloc_kv_pair(PMA pma __attribute__((unused)), const void *k, int ksize, const void *v, int vsize) {
#if PMA_USE_MEMPOOL
struct kv_pair *kv = kv_pair_malloc_mempool(k, ksize, v, vsize, &pma->kvspace);
if (kv == 0) {
......@@ -1600,9 +1600,34 @@ int toku_pma_clear_at_index (PMA pma, unsigned int idx) {
return 0;
}
// Move from a to b
static void pma_move (PMA pmaa, int idxa, u_int32_t randa, u_int32_t *fingerprinta, u_int32_t *n_in_bufa,
PMA pmab, int idxb, u_int32_t randb, u_int32_t *fingerprintb, u_int32_t *n_in_bufb) {
if (pmaa==pmab) {
assert(pmab->pairs[idxb]==0);
pmab->pairs[idxb] = pmaa->pairs[idxa];
pmaa->pairs[idxa] = 0;
} else {
struct kv_pair *pair = pmaa->pairs[idxa];
u_int32_t fdiff = toku_calccrc32_kvpair(kv_pair_key_const(pair), kv_pair_keylen(pair), kv_pair_val_const(pair), kv_pair_vallen(pair));
u_int32_t sizediff = PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + kv_pair_keylen(pair) + kv_pair_vallen(pair);
*fingerprinta -= randa*fdiff; *fingerprintb += randb*fdiff;
*n_in_bufa -= sizediff; *n_in_bufb += sizediff;
pmab->pairs[idxb] = pma_malloc_kv_pair(pmab, kv_pair_key_const(pair), kv_pair_keylen(pair), kv_pair_val_const(pair), kv_pair_vallen(pair));
pma_mfree_kv_pair(pmaa, pair);
pmaa->pairs[idxa] = 0;
pmaa->n_pairs_present--;
pmab->n_pairs_present++;
}
}
// assume no cursors
// Move stuff from pmaa to pmab
int toku_pma_move_indices (PMA pma_from, PMA pma_to, INTPAIRARRAY fromto) {
int toku_pma_move_indices (PMA pma_from, PMA pma_to, INTPAIRARRAY fromto,
u_int32_t rand_from, u_int32_t *fingerprint_from,
u_int32_t rand_to, u_int32_t *fingerprint_to,
u_int32_t *n_in_buf_from, u_int32_t *n_in_buf_to
) {
u_int32_t i;
for (i=0; i<fromto.size; i++) {
// First handle the case for sliding something left. We can simply move it.
......@@ -1611,9 +1636,8 @@ int toku_pma_move_indices (PMA pma_from, PMA pma_to, INTPAIRARRAY fromto) {
int b=fromto.array[i].b;
if (b==a) continue;
if (b<a) {
assert(pma_to->pairs[b]==0);
pma_to->pairs[b] = pma_from->pairs[a];
pma_from->pairs[a] = 0;
pma_move(pma_from, a, rand_from, fingerprint_from, n_in_buf_from,
pma_to, b, rand_to, fingerprint_to, n_in_buf_to);
continue;
}
}
......@@ -1631,9 +1655,8 @@ int toku_pma_move_indices (PMA pma_from, PMA pma_to, INTPAIRARRAY fromto) {
int a=fromto.array[jdown].a;
int b=fromto.array[jdown].b;
if (a!=b) {
assert(pma_to->pairs[b]==0);
pma_to->pairs[b] = pma_from->pairs[a];
pma_from->pairs[a] = 0;
pma_move(pma_from, a, rand_from, fingerprint_from, n_in_buf_from,
pma_to, b, rand_to, fingerprint_to, n_in_buf_to);
}
if (i==jdown) break; // Do it this way so everything can be unsigned and we won't try to go negative.
}
......@@ -1652,10 +1675,18 @@ static void reverse_fromto (INTPAIRARRAY fromto) {
}
}
int toku_pma_move_indices_back (PMA pma_backto, PMA pma_backfrom, INTPAIRARRAY fromto) {
int toku_pma_move_indices_back (PMA pma_backto, PMA pma_backfrom, INTPAIRARRAY fromto,
u_int32_t rand_backto, u_int32_t *fingerprint_backto,
u_int32_t rand_backfrom, u_int32_t *fingerprint_backfrom,
u_int32_t *n_in_buf_backto, u_int32_t *n_in_buf_backfrom
) {
int r;
reverse_fromto(fromto);
r = toku_pma_move_indices(pma_backfrom, pma_backto, fromto);
r = toku_pma_move_indices(pma_backfrom, pma_backto, fromto,
rand_backfrom, fingerprint_backfrom,
rand_backto, fingerprint_backto,
n_in_buf_backfrom, n_in_buf_backto
);
reverse_fromto(fromto);
return r;
}
......@@ -155,9 +155,17 @@ int toku_pma_clear_at_index (PMA, unsigned int /*index*/); // If the index is wr
// Requires: No open cursors on the pma.
int toku_pma_move_indices (PMA pma_from, PMA pma_to, INTPAIRARRAY fromto); // Return nonzero if the indices are somehow wrong.
// Return nonzero if the indices are somehow wrong.
int toku_pma_move_indices (PMA pma_from, PMA pma_to, INTPAIRARRAY fromto,
u_int32_t rand_from, u_int32_t *fingerprint_from,
u_int32_t rand_to, u_int32_t *fingerprint_to,
u_int32_t *n_in_buf_from, u_int32_t *n_in_buf_to);
// Move things backwards according to fromto.
int toku_pma_move_indices_back (PMA pma_backto, PMA pma_backfrom, INTPAIRARRAY fromto);
int toku_pma_move_indices_back (PMA pma_backto, PMA pma_backfrom, INTPAIRARRAY fromto,
u_int32_t rand_backto, u_int32_t *fingerprint_backto,
u_int32_t rand_backfrom, u_int32_t *fingerprint_backfrom,
u_int32_t *n_in_buf_backto, u_int32_t *n_in_buf_backfrom
);
void toku_pma_show_stats (void);
......
......@@ -302,8 +302,12 @@ void toku_recover_pmadistribute (struct logtype_pmadistribute *c) {
assert(c->fromto.array[i].b < toku_pma_index_limit(nodeb->u.l.buffer));
}
}
r = toku_pma_move_indices (nodea->u.l.buffer, nodeb->u.l.buffer, c->fromto);
// The bytes in bufer and fingerprint shouldn't change
r = toku_pma_move_indices (nodea->u.l.buffer, nodeb->u.l.buffer, c->fromto,
nodea->rand4fingerprint, &nodea->local_fingerprint,
nodeb->rand4fingerprint, &nodeb->local_fingerprint,
&nodea->u.l.n_bytes_in_buffer, &nodeb->u.l.n_bytes_in_buffer
);
// The bytes in buffer and fingerprint shouldn't change
VERIFY_COUNTS(nodea);
VERIFY_COUNTS(nodeb);
......@@ -338,7 +342,11 @@ int toku_rollback_pmadistribute (struct logtype_pmadistribute *le, TOKUTXN txn)
}
BRTNODE nodea = node_va;
BRTNODE nodeb = node_vb;
r = toku_pma_move_indices_back(nodea->u.l.buffer, nodeb->u.l.buffer, le->fromto);
r = toku_pma_move_indices_back(nodea->u.l.buffer, nodeb->u.l.buffer, le->fromto,
nodeb->rand4fingerprint, &nodeb->local_fingerprint,
nodea->rand4fingerprint, &nodea->local_fingerprint,
&nodeb->u.l.n_bytes_in_buffer, &nodea->u.l.n_bytes_in_buffer
);
if (r!=0) goto died1;
r = toku_cachetable_unpin(cf, le->old_diskoff, 1, toku_serialize_brtnode_size(nodea));
r = toku_cachetable_unpin(cf, le->new_diskoff, 1, toku_serialize_brtnode_size(nodeb));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment