Commit 07591bd1 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

{{{test-recover3.tdb}}} doesn't crash during recovery now. (It crashes...

{{{test-recover3.tdb}}} doesn't crash during recovery now.  (It crashes because it recovered to the wrong state.)  Addresses #558.

git-svn-id: file:///svn/tokudb@3254 c7de825b-a66e-492c-adef-691d508d4ae1
parent 9e82871d
...@@ -668,10 +668,10 @@ int toku_gpma_split (GPMA pma, GPMA newpma, u_int32_t overhead, ...@@ -668,10 +668,10 @@ int toku_gpma_split (GPMA pma, GPMA newpma, u_int32_t overhead,
//toku_verify_gpma(pma); //toku_verify_gpma(pma);
//toku_verify_gpma(newpma); //toku_verify_gpma(newpma);
r = rcall(n_left, leftfroms, lefttos, leftitems, old_N, pma->N, extra);
if (r!=0) { goto L6; }
r = rcall_across_pmas(n_right, rightfroms, righttos, rightitems, 0, newpma->N, extra); r = rcall_across_pmas(n_right, rightfroms, righttos, rightitems, 0, newpma->N, extra);
if (r!=0) { goto L6; } if (r!=0) { goto L6; }
r = rcall(n_left, leftfroms, lefttos, leftitems, old_N, pma->N, extra);
if (r!=0) { goto L6; }
r=0; r=0;
goto L6; // free all that stuff goto L6; // free all that stuff
} }
......
...@@ -20,8 +20,8 @@ typedef int (*gpma_renumber_callback_t)(u_int32_t nitems, // How many things mo ...@@ -20,8 +20,8 @@ typedef int (*gpma_renumber_callback_t)(u_int32_t nitems, // How many things mo
u_int32_t *froms, // An array of indices indicating where things moved from u_int32_t *froms, // An array of indices indicating where things moved from
u_int32_t *tos, // An array of indices indicating where thigns moved to u_int32_t *tos, // An array of indices indicating where thigns moved to
struct gitem *items, // The actual items that were moved struct gitem *items, // The actual items that were moved
u_int32_t old_N, // The old size of the array u_int32_t old_N, // The old size of the target array
u_int32_t new_N, // The new size of teh array u_int32_t new_N, // The new size of the target array
void *extra); // Context void *extra); // Context
typedef void (*gpma_free_callback_t)(u_int32_t len, void*freeme, void*extra); typedef void (*gpma_free_callback_t)(u_int32_t len, void*freeme, void*extra);
...@@ -96,7 +96,7 @@ int toku_gpma_move_inside_pma_by_renumbering (GPMA, ...@@ -96,7 +96,7 @@ int toku_gpma_move_inside_pma_by_renumbering (GPMA,
int toku_gpma_split (GPMA pma, GPMA newpma, u_int32_t overhead, int toku_gpma_split (GPMA pma, GPMA newpma, u_int32_t overhead,
int (*realloc_data)(u_int32_t len, void *odata, void **ndata, void *extra), int (*realloc_data)(u_int32_t len, void *odata, void **ndata, void *extra),
gpma_renumber_callback_t rcall, gpma_renumber_callback_t rcall,
gpma_renumber_callback_t rcall_across_pmas, // This one is called for everything that moved gpma_renumber_callback_t rcall_across_pmas, // This one is called for everything that moved. It is called first (before the rcall)
void *extra); void *extra);
void toku_verify_gpma (GPMA pma); void toku_verify_gpma (GPMA pma);
......
...@@ -17,7 +17,7 @@ struct kv_pair { ...@@ -17,7 +17,7 @@ struct kv_pair {
}; };
/* return the size of a kv pair */ /* return the size of a kv pair */
static inline int kv_pair_size(struct kv_pair *pair) { static inline unsigned int kv_pair_size(struct kv_pair *pair) {
return sizeof (struct kv_pair) + pair->keylen + pair->vallen; return sizeof (struct kv_pair) + pair->keylen + pair->vallen;
} }
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
//#define DO_VERIFY_COUNTS #define DO_VERIFY_COUNTS
#ifdef DO_VERIFY_COUNTS #ifdef DO_VERIFY_COUNTS
#define VERIFY_COUNTS(n) toku_verify_counts(n) #define VERIFY_COUNTS(n) toku_verify_counts(n)
#else #else
...@@ -159,9 +159,10 @@ void toku_recover_newbrtnode (LSN lsn, FILENUM filenum,DISKOFF diskoff,u_int32_t ...@@ -159,9 +159,10 @@ void toku_recover_newbrtnode (LSN lsn, FILENUM filenum,DISKOFF diskoff,u_int32_t
assert(r==0); assert(r==0);
n->u.l.n_bytes_in_buffer=0; n->u.l.n_bytes_in_buffer=0;
{ {
void *mp = toku_malloc(n->nodesize); u_int32_t mpsize = n->nodesize + n->nodesize/4;
void *mp = toku_malloc(mpsize);
assert(mp); assert(mp);
toku_mempool_init(&n->u.l.buffer_mempool, mp, n->nodesize); toku_mempool_init(&n->u.l.buffer_mempool, mp, mpsize);
} }
} else { } else {
...@@ -470,10 +471,15 @@ void toku_recover_resizepma (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_ ...@@ -470,10 +471,15 @@ void toku_recover_resizepma (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_
assert(r==0); assert(r==0);
} }
int move_indices (GPMA from, GPMA to, INTPAIRARRAY fromto, int move_indices (GPMA from, struct mempool *from_mempool,
GPMA to, struct mempool *to_mempool,
INTPAIRARRAY fromto,
u_int32_t a_rand, u_int32_t *a_fp, u_int32_t a_rand, u_int32_t *a_fp,
u_int32_t b_rand, u_int32_t *b_fp, u_int32_t b_rand, u_int32_t *b_fp,
u_int32_t *a_nbytes, u_int32_t *b_nbytes) { u_int32_t *a_nbytes, u_int32_t *b_nbytes,
u_int32_t new_N) {
toku_verify_gpma(from);
toku_verify_gpma(to);
struct gitem *MALLOC_N(fromto.size, items); struct gitem *MALLOC_N(fromto.size, items);
if (items==0) return errno; if (items==0) return errno;
u_int32_t i; u_int32_t i;
...@@ -486,15 +492,42 @@ int move_indices (GPMA from, GPMA to, INTPAIRARRAY fromto, ...@@ -486,15 +492,42 @@ int move_indices (GPMA from, GPMA to, INTPAIRARRAY fromto,
from->items[idx].data = 0; from->items[idx].data = 0;
fp += toku_crc32(toku_null_crc, item.data, item.len); fp += toku_crc32(toku_null_crc, item.data, item.len);
sizediff += PMA_ITEM_OVERHEAD + item.len; sizediff += PMA_ITEM_OVERHEAD + item.len;
assert(kv_pair_size(item.data)==item.len);
} }
from->n_items_present -= fromto.size;
if (new_N!=toku_gpma_index_limit(to)) {
int r = toku_resize_gpma_exactly(to, new_N);
assert(r==0);
}
for (i=0; i<fromto.size; i++) { for (i=0; i<fromto.size; i++) {
to->items[fromto.array[i].b] = items[i]; int to_idx = fromto.array[i].b;
assert(to->items[to_idx].data==0);
if (from==to) {
to->items[to_idx] = items[i];
} else {
void *new_data = toku_mempool_malloc(to_mempool, items[i].len, 4);
if (new_data==0) {
int r = toku_brtnode_compress_kvspace(to, to_mempool);
assert(r==0);
new_data = toku_mempool_malloc(to_mempool, items[i].len, 4);
assert(new_data);
}
memcpy(new_data, items[i].data, items[i].len);
to->items[to_idx] = (struct gitem){items[i].len, new_data};
toku_mempool_mfree(from_mempool, items[i].data, items[i].len);
}
assert(kv_pair_size(to->items[to_idx].data)==to->items[to_idx].len);
} }
to->n_items_present += fromto.size;
*a_fp -= a_rand * fp; *a_fp -= a_rand * fp;
*b_fp += b_rand * fp; *b_fp += b_rand * fp;
*a_nbytes -= sizediff; *a_nbytes -= sizediff;
*b_nbytes += sizediff; *b_nbytes += sizediff;
toku_free(items); toku_free(items);
toku_verify_gpma(from);
toku_verify_gpma(to);
return 0; return 0;
} }
...@@ -510,24 +543,25 @@ void toku_recover_pmadistribute (LSN lsn, FILENUM filenum, DISKOFF old_diskoff, ...@@ -510,24 +543,25 @@ void toku_recover_pmadistribute (LSN lsn, FILENUM filenum, DISKOFF old_diskoff,
assert(r==0); assert(r==0);
BRTNODE nodea = node_va; assert(nodea->height==0); BRTNODE nodea = node_va; assert(nodea->height==0);
BRTNODE nodeb = node_vb; assert(nodeb->height==0); BRTNODE nodeb = node_vb; assert(nodeb->height==0);
if (new_N!=toku_gpma_index_limit(nodeb->u.l.buffer)) {
r = toku_resize_gpma_exactly(nodeb->u.l.buffer, new_N);
assert(r==0);
}
{ {
unsigned int i; unsigned int i;
//printf("{"); //printf("{");
for (i=0; i<fromto.size; i++) { for (i=0; i<fromto.size; i++) {
//printf(" {%d %d}", fromto.array[i].a, fromto.array[i].b); //printf(" {%d %d}", fromto.array[i].a, fromto.array[i].b);
assert(fromto.array[i].a < toku_gpma_index_limit(nodea->u.l.buffer)); assert(fromto.array[i].a < toku_gpma_index_limit(nodea->u.l.buffer));
assert(fromto.array[i].b < toku_gpma_index_limit(nodeb->u.l.buffer)); assert(fromto.array[i].b < new_N);
} }
//printf("}\n"); //printf("}\n");
} }
r = move_indices (nodea->u.l.buffer, nodeb->u.l.buffer, fromto, VERIFY_COUNTS(nodea);
r = move_indices (nodea->u.l.buffer, &nodea->u.l.buffer_mempool,
nodeb->u.l.buffer, &nodeb->u.l.buffer_mempool,
fromto,
nodea->rand4fingerprint, &nodea->local_fingerprint, nodea->rand4fingerprint, &nodea->local_fingerprint,
nodeb->rand4fingerprint, &nodeb->local_fingerprint, nodeb->rand4fingerprint, &nodeb->local_fingerprint,
&nodea->u.l.n_bytes_in_buffer, &nodeb->u.l.n_bytes_in_buffer &nodea->u.l.n_bytes_in_buffer, &nodeb->u.l.n_bytes_in_buffer,
new_N
); );
// The bytes in buffer and fingerprint shouldn't change // The bytes in buffer and fingerprint shouldn't change
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment