Commit 001f15c2 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

closes[t:2622] when realloc fails in the brtloader writer, mark the dbuf in...

closes[t:2622] when realloc fails in the brtloader writer, mark the dbuf in error and check the error later

git-svn-id: file:///svn/toku/tokudb@20350 c7de825b-a66e-492c-adef-691d508d4ae1
parent d6dc7aad
...@@ -1752,6 +1752,7 @@ struct dbuf { ...@@ -1752,6 +1752,7 @@ struct dbuf {
unsigned char *buf; unsigned char *buf;
int buflen; int buflen;
int off; int off;
int error;
}; };
struct leaf_buf { struct leaf_buf {
...@@ -1832,10 +1833,12 @@ static void seek_align(struct dbout *out) { ...@@ -1832,10 +1833,12 @@ static void seek_align(struct dbout *out) {
} }
static void dbuf_init (struct dbuf *dbuf) { static void dbuf_init (struct dbuf *dbuf) {
dbuf->buf=0; dbuf->buf = 0;
dbuf->buflen=0; dbuf->buflen = 0;
dbuf->off=0; dbuf->off = 0;
dbuf->error = 0;
} }
static void dbuf_destroy (struct dbuf *dbuf) { static void dbuf_destroy (struct dbuf *dbuf) {
toku_free(dbuf->buf); dbuf->buf = NULL; toku_free(dbuf->buf); dbuf->buf = NULL;
} }
...@@ -1852,6 +1855,7 @@ static int64_t allocate_block (struct dbout *out) ...@@ -1852,6 +1855,7 @@ static int64_t allocate_block (struct dbout *out)
out->n_translations_limit *= 2; out->n_translations_limit *= 2;
} }
REALLOC_N(out->n_translations_limit, out->translation); REALLOC_N(out->n_translations_limit, out->translation);
lazy_assert(out->translation);
} }
out->n_translations++; out->n_translations++;
dbout_unlock(out); dbout_unlock(out);
...@@ -1860,13 +1864,21 @@ static int64_t allocate_block (struct dbout *out) ...@@ -1860,13 +1864,21 @@ static int64_t allocate_block (struct dbout *out)
static void putbuf_bytes (struct dbuf *dbuf, const void *bytes, int nbytes) { static void putbuf_bytes (struct dbuf *dbuf, const void *bytes, int nbytes) {
if (dbuf->off + nbytes > dbuf->buflen) { if (dbuf->off + nbytes > dbuf->buflen) {
void *oldbuf = dbuf->buf;
int oldbuflen = dbuf->buflen;
dbuf->buflen += dbuf->off + nbytes; dbuf->buflen += dbuf->off + nbytes;
dbuf->buflen *= 2; dbuf->buflen *= 2;
REALLOC_N(dbuf->buflen, dbuf->buf); REALLOC_N(dbuf->buflen, dbuf->buf);
lazy_assert(dbuf->buf); if (dbuf->buf == NULL) {
dbuf->error = errno;
dbuf->buf = oldbuf;
dbuf->buflen = oldbuflen;
}
}
if (!dbuf->error) {
memcpy(dbuf->buf + dbuf->off, bytes, nbytes);
dbuf->off += nbytes;
} }
memcpy(dbuf->buf + dbuf->off, bytes, nbytes);
dbuf->off += nbytes;
} }
static void putbuf_int8 (struct dbuf *dbuf, unsigned char v) { static void putbuf_int8 (struct dbuf *dbuf, unsigned char v) {
...@@ -1876,6 +1888,7 @@ static void putbuf_int8 (struct dbuf *dbuf, unsigned char v) { ...@@ -1876,6 +1888,7 @@ static void putbuf_int8 (struct dbuf *dbuf, unsigned char v) {
static void putbuf_int32 (struct dbuf *dbuf, int v) { static void putbuf_int32 (struct dbuf *dbuf, int v) {
putbuf_bytes(dbuf, &v, 4); putbuf_bytes(dbuf, &v, 4);
} }
static void putbuf_int64 (struct dbuf *dbuf, unsigned long long v) { static void putbuf_int64 (struct dbuf *dbuf, unsigned long long v) {
putbuf_int32(dbuf, v>>32); putbuf_int32(dbuf, v>>32);
putbuf_int32(dbuf, v&0xFFFFFFFF); putbuf_int32(dbuf, v&0xFFFFFFFF);
...@@ -1884,13 +1897,21 @@ static void putbuf_int64 (struct dbuf *dbuf, unsigned long long v) { ...@@ -1884,13 +1897,21 @@ static void putbuf_int64 (struct dbuf *dbuf, unsigned long long v) {
static void putbuf_int32_at(struct dbuf *dbuf, int off, int v) { static void putbuf_int32_at(struct dbuf *dbuf, int off, int v) {
const int nbytes = 4; const int nbytes = 4;
if (off+nbytes > dbuf->buflen) { if (off+nbytes > dbuf->buflen) {
void *oldbuf = dbuf->buf;
int oldbuflen = dbuf->buflen;
dbuf->buflen += dbuf->off + nbytes; dbuf->buflen += dbuf->off + nbytes;
dbuf->buflen *= 2; dbuf->buflen *= 2;
REALLOC_N(dbuf->buflen, dbuf->buf); REALLOC_N(dbuf->buflen, dbuf->buf);
lazy_assert(dbuf->buf); if (dbuf->buf == NULL) {
dbuf->error = errno;
dbuf->buf = oldbuf;
dbuf->buflen = oldbuflen;
}
} }
memcpy(dbuf->buf + off, &v, 4); if (!dbuf->error)
memcpy(dbuf->buf + off, &v, 4);
} }
static void putbuf_int64_at(struct dbuf *dbuf, int off, unsigned long long v) { static void putbuf_int64_at(struct dbuf *dbuf, int off, unsigned long long v) {
unsigned int a = v>>32; unsigned int a = v>>32;
unsigned int b = v&0xFFFFFFFF; unsigned int b = v&0xFFFFFFFF;
...@@ -2428,15 +2449,17 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int ...@@ -2428,15 +2449,17 @@ static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int
putbuf_bytes(&lbuf->dbuf, key, keylen); putbuf_bytes(&lbuf->dbuf, key, keylen);
putbuf_bytes(&lbuf->dbuf, val, vallen); putbuf_bytes(&lbuf->dbuf, val, vallen);
int le_len = 1+4+4+keylen+vallen; int le_len = 1+4+4+keylen+vallen;
invariant(le_off + le_len == lbuf->dbuf.off); if (!lbuf->dbuf.error) {
u_int32_t this_x = x1764_memory(lbuf->dbuf.buf + le_off, le_len); invariant(le_off + le_len == lbuf->dbuf.off);
u_int32_t this_prod = lbuf->rand4fingerprint * this_x; u_int32_t this_x = x1764_memory(lbuf->dbuf.buf + le_off, le_len);
lbuf->local_fingerprint += this_prod; u_int32_t this_prod = lbuf->rand4fingerprint * this_x;
if (0) { lbuf->local_fingerprint += this_prod;
printf("%s:%d x1764(buf+%d, %d)=%8x\n", __FILE__, __LINE__, le_off, le_len, this_x); if (0) {
printf("%s:%d rand4fingerprint=%8x\n", __FILE__, __LINE__, lbuf->rand4fingerprint); printf("%s:%d x1764(buf+%d, %d)=%8x\n", __FILE__, __LINE__, le_off, le_len, this_x);
printf("%s:%d this_prod=%8x\n", __FILE__, __LINE__, this_prod); printf("%s:%d rand4fingerprint=%8x\n", __FILE__, __LINE__, lbuf->rand4fingerprint);
printf("%s:%d local_fingerprint=%8x\n", __FILE__, __LINE__, lbuf->local_fingerprint); printf("%s:%d this_prod=%8x\n", __FILE__, __LINE__, this_prod);
printf("%s:%d local_fingerprint=%8x\n", __FILE__, __LINE__, lbuf->local_fingerprint);
}
} }
} }
...@@ -2470,83 +2493,86 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr ...@@ -2470,83 +2493,86 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
putbuf_int32_at(&lbuf->dbuf, lbuf->n_in_buf_p, lbuf->n_in_buf); putbuf_int32_at(&lbuf->dbuf, lbuf->n_in_buf_p, lbuf->n_in_buf);
//print_bytestring(lbuf->dbuf.buf, lbuf->dbuf.off, 200); result = lbuf->dbuf.error;
if (result == 0) {
int n_uncompressed_bytes_at_beginning = (8 // tokuleaf
+4 // layout version
+4 // layout version original
);
int uncompressed_len = lbuf->dbuf.off - n_uncompressed_bytes_at_beginning;
// choose sub block size and number
int sub_block_size, n_sub_blocks;
choose_sub_block_size(uncompressed_len, max_sub_blocks, &sub_block_size, &n_sub_blocks);
int header_len = n_uncompressed_bytes_at_beginning + sub_block_header_size(n_sub_blocks) + sizeof (uint32_t);
// initialize the sub blocks
// struct sub_block sub_block[n_sub_blocks]; RFP cilk++ dynamic array bug, use malloc instead
struct sub_block *XMALLOC_N(n_sub_blocks, sub_block);
for (int i = 0; i < n_sub_blocks; i++)
sub_block_init(&sub_block[i]);
set_all_sub_block_sizes(uncompressed_len, sub_block_size, n_sub_blocks, sub_block);
// allocate space for the compressed bufer //print_bytestring(lbuf->dbuf.buf, lbuf->dbuf.off, 200);
int bound = get_sum_compressed_size_bound(n_sub_blocks, sub_block);
unsigned char *MALLOC_N(header_len + bound, compressed_buf);
if (compressed_buf == NULL) {
result = errno;
} else {
// compress and checksum the sub blocks int n_uncompressed_bytes_at_beginning = (8 // tokuleaf
int compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block, +4 // layout version
(char *) (lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning), +4 // layout version original
(char *) (compressed_buf + header_len), 1); );
int uncompressed_len = lbuf->dbuf.off - n_uncompressed_bytes_at_beginning;
// cppy the uncompressed header to the compressed buffer // choose sub block size and number
memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning); int sub_block_size, n_sub_blocks;
choose_sub_block_size(uncompressed_len, max_sub_blocks, &sub_block_size, &n_sub_blocks);
// serialize the sub block header int header_len = n_uncompressed_bytes_at_beginning + sub_block_header_size(n_sub_blocks) + sizeof (uint32_t);
memcpy(compressed_buf+16, &n_sub_blocks, 4);
for (int i = 0; i < n_sub_blocks; i++) { // initialize the sub blocks
memcpy(compressed_buf+20+12*i+0, &sub_block[i].compressed_size, 4); // struct sub_block sub_block[n_sub_blocks]; RFP cilk++ dynamic array bug, use malloc instead
memcpy(compressed_buf+20+12*i+4, &sub_block[i].uncompressed_size, 4); struct sub_block *XMALLOC_N(n_sub_blocks, sub_block);
memcpy(compressed_buf+20+12*i+8, &sub_block[i].xsum, 4); for (int i = 0; i < n_sub_blocks; i++)
} sub_block_init(&sub_block[i]);
set_all_sub_block_sizes(uncompressed_len, sub_block_size, n_sub_blocks, sub_block);
// compute the header checksum and serialize it // allocate space for the compressed bufer
u_int32_t header_xsum = x1764_memory(compressed_buf, header_len - sizeof (u_int32_t)); int bound = get_sum_compressed_size_bound(n_sub_blocks, sub_block);
memcpy(compressed_buf + header_len - sizeof (u_int32_t), &header_xsum, 4); unsigned char *MALLOC_N(header_len + bound, compressed_buf);
if (compressed_buf == NULL) {
result = errno;
} else {
dbout_lock(out); // compress and checksum the sub blocks
long long off_of_leaf = out->current_off; int compressed_len = compress_all_sub_blocks(n_sub_blocks, sub_block,
int size = header_len + compressed_len; (char *) (lbuf->dbuf.buf + n_uncompressed_bytes_at_beginning),
if (0) { (char *) (compressed_buf + header_len), 1);
fprintf(stderr, "uncompressed buf size=%d (amount of data compressed)\n", uncompressed_len);
fprintf(stderr, "compressed buf size=%d, off=%lld\n", compressed_len, off_of_leaf); // cppy the uncompressed header to the compressed buffer
fprintf(stderr, "compressed bytes are:"); memcpy(compressed_buf, lbuf->dbuf.buf, n_uncompressed_bytes_at_beginning);
//for (int i=0; i<compressed_len; i++) {
// unsigned char c = compressed_buf[28+i]; // serialize the sub block header
// if (isprint(c)) fprintf(stderr, "%c", c); memcpy(compressed_buf+16, &n_sub_blocks, 4);
// else fprintf(stderr, "\\%03o", compressed_buf[28+i]); for (int i = 0; i < n_sub_blocks; i++) {
//} memcpy(compressed_buf+20+12*i+0, &sub_block[i].compressed_size, 4);
fprintf(stderr, "\ntotal bytes written = %d, last byte is \\%o\n", size, compressed_buf[size-1]); memcpy(compressed_buf+20+12*i+4, &sub_block[i].uncompressed_size, 4);
} memcpy(compressed_buf+20+12*i+8, &sub_block[i].xsum, 4);
}
// compute the header checksum and serialize it
u_int32_t header_xsum = x1764_memory(compressed_buf, header_len - sizeof (u_int32_t));
memcpy(compressed_buf + header_len - sizeof (u_int32_t), &header_xsum, 4);
dbout_lock(out);
long long off_of_leaf = out->current_off;
int size = header_len + compressed_len;
if (0) {
fprintf(stderr, "uncompressed buf size=%d (amount of data compressed)\n", uncompressed_len);
fprintf(stderr, "compressed buf size=%d, off=%lld\n", compressed_len, off_of_leaf);
fprintf(stderr, "compressed bytes are:");
//for (int i=0; i<compressed_len; i++) {
// unsigned char c = compressed_buf[28+i];
// if (isprint(c)) fprintf(stderr, "%c", c);
// else fprintf(stderr, "\\%03o", compressed_buf[28+i]);
//}
fprintf(stderr, "\ntotal bytes written = %d, last byte is \\%o\n", size, compressed_buf[size-1]);
}
result = write_literal(out, compressed_buf, size); result = write_literal(out, compressed_buf, size);
if (result == 0) { if (result == 0) {
//printf("translation[%lld].off = %lld\n", lbuf->blocknum, off_of_leaf); //printf("translation[%lld].off = %lld\n", lbuf->blocknum, off_of_leaf);
out->translation[lbuf->blocknum].off = off_of_leaf; out->translation[lbuf->blocknum].off = off_of_leaf;
out->translation[lbuf->blocknum].size = size; out->translation[lbuf->blocknum].size = size;
seek_align_locked(out); seek_align_locked(out);
}
dbout_unlock(out);
} }
dbout_unlock(out);
toku_free(sub_block); // RFP cilk++ bug
toku_free(compressed_buf);
} }
toku_free(sub_block); // RFP cilk++ bug
toku_free(compressed_buf);
dbuf_destroy(&lbuf->dbuf); dbuf_destroy(&lbuf->dbuf);
toku_free(lbuf); toku_free(lbuf);
......
...@@ -81,9 +81,11 @@ static ssize_t bad_pwrite(int fd, const void * bp, size_t len, toku_off_t off) { ...@@ -81,9 +81,11 @@ static ssize_t bad_pwrite(int fd, const void * bp, size_t len, toku_off_t off) {
static int my_malloc_event = 0; static int my_malloc_event = 0;
static int my_malloc_count = 0, my_big_malloc_count = 0; static int my_malloc_count = 0, my_big_malloc_count = 0;
static int my_realloc_count = 0, my_big_realloc_count = 0;
static void reset_my_malloc_counts(void) { static void reset_my_malloc_counts(void) {
my_malloc_count = my_big_malloc_count = 0; my_malloc_count = my_big_malloc_count = 0;
my_realloc_count = my_big_realloc_count = 0;
} }
static void *my_malloc(size_t n) { static void *my_malloc(size_t n) {
...@@ -109,6 +111,29 @@ static void *my_malloc(size_t n) { ...@@ -109,6 +111,29 @@ static void *my_malloc(size_t n) {
return malloc(n); return malloc(n);
} }
static void *my_realloc(void *p, size_t n) {
void *caller = __builtin_return_address(0);
if (!((void*)toku_realloc <= caller && caller <= (void*)toku_free))
goto skip;
my_realloc_count++;
if (n >= 64*1024) {
my_big_realloc_count++;
if (my_malloc_event) {
caller = __builtin_return_address(1);
if ((void*)toku_xrealloc <= caller && caller <= (void*)toku_malloc_report)
goto skip;
event_count++;
if (event_count == event_count_trigger) {
event_hit();
errno = ENOMEM;
return NULL;
}
}
}
skip:
return realloc(p, n);
}
static int qsort_compare_ints (const void *a, const void *b) { static int qsort_compare_ints (const void *a, const void *b) {
int avalue = *(int*)a; int avalue = *(int*)a;
int bvalue = *(int*)b; int bvalue = *(int*)b;
...@@ -207,6 +232,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_ ...@@ -207,6 +232,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
assert(fd>=0); assert(fd>=0);
toku_set_func_malloc(my_malloc); toku_set_func_malloc(my_malloc);
toku_set_func_realloc(my_realloc);
brtloader_set_os_fwrite(bad_fwrite); brtloader_set_os_fwrite(bad_fwrite);
toku_set_func_write(bad_write); toku_set_func_write(bad_write);
toku_set_func_pwrite(bad_pwrite); toku_set_func_pwrite(bad_pwrite);
...@@ -217,6 +243,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_ ...@@ -217,6 +243,7 @@ static void write_dbfile (char *template, int n, char *output_name, BOOL expect_
assert(expect_error ? r != 0 : r == 0); assert(expect_error ? r != 0 : r == 0);
toku_set_func_malloc(NULL); toku_set_func_malloc(NULL);
toku_set_func_realloc(NULL);
brtloader_set_os_fwrite(NULL); brtloader_set_os_fwrite(NULL);
toku_set_func_write(NULL); toku_set_func_write(NULL);
toku_set_func_pwrite(NULL); toku_set_func_pwrite(NULL);
...@@ -289,6 +316,7 @@ int test_main (int argc, const char *argv[]) { ...@@ -289,6 +316,7 @@ int test_main (int argc, const char *argv[]) {
write_dbfile(template, n, output_name, FALSE); write_dbfile(template, n, output_name, FALSE);
if (verbose) printf("my_malloc_count=%d big_count=%d\n", my_malloc_count, my_big_malloc_count); if (verbose) printf("my_malloc_count=%d big_count=%d\n", my_malloc_count, my_big_malloc_count);
if (verbose) printf("my_realloc_count=%d big_count=%d\n", my_realloc_count, my_big_realloc_count);
int event_limit = event_count; int event_limit = event_count;
if (verbose) printf("event_limit=%d\n", event_limit); if (verbose) printf("event_limit=%d\n", event_limit);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment