Commit ba897de1 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge from 2212 branch. Fixes #2212. Fixes[t:2212].

git-svn-id: file:///svn/toku/tokudb@16106 c7de825b-a66e-492c-adef-691d508d4ae1
parent 4d4257da
...@@ -50,10 +50,10 @@ build: $(TARGETS) ...@@ -50,10 +50,10 @@ build: $(TARGETS)
check: check-default check: check-default
check-default: $(TARGET_TDB) check-default: $(TARGET_TDB)
$(VALGRIND) ./$(TARGET_TDB) $(QUIET) $(SUMMARIZE_CMD) $(VGRIND) ./$(TARGET_TDB) $(QUIET) $(SUMMARIZE_CMD)
check-x: $(TARGET_TDB) check-x: $(TARGET_TDB)
$(VALGRIND) ./$(TARGET_TDB) -x $(QUIET) $(SUMMARIZE_CMD) $(VGRIND) ./$(TARGET_TDB) -x $(QUIET) $(SUMMARIZE_CMD)
clean: clean:
rm -rf $(TARGETS) $(BENCHDBS) *.gcno *.gcda *.gcov rm -rf $(TARGETS) $(BENCHDBS) *.gcno *.gcda *.gcov
......
...@@ -48,8 +48,8 @@ toku_set_func_pwrite (ssize_t (*pwrite_fun)(int, const void *, size_t, off_t)) { ...@@ -48,8 +48,8 @@ toku_set_func_pwrite (ssize_t (*pwrite_fun)(int, const void *, size_t, off_t)) {
return 0; return 0;
} }
ssize_t void
toku_os_pwrite (int fd, const void *buf, size_t len, off_t off) { toku_os_full_pwrite (int fd, const void *buf, size_t len, off_t off) {
ssize_t r; ssize_t r;
again: again:
if (t_pwrite) { if (t_pwrite) {
...@@ -59,7 +59,7 @@ again: ...@@ -59,7 +59,7 @@ again:
} }
if (try_again_after_handling_write_error(fd, len, r)) if (try_again_after_handling_write_error(fd, len, r))
goto again; goto again;
return r; assert(r==(ssize_t)len);
} }
static ssize_t (*t_write)(int, const void *, size_t) = 0; static ssize_t (*t_write)(int, const void *, size_t) = 0;
...@@ -70,8 +70,8 @@ toku_set_func_write (ssize_t (*write_fun)(int, const void *, size_t)) { ...@@ -70,8 +70,8 @@ toku_set_func_write (ssize_t (*write_fun)(int, const void *, size_t)) {
return 0; return 0;
} }
ssize_t void
toku_os_write (int fd, const void *buf, size_t len) { toku_os_full_write (int fd, const void *buf, size_t len) {
ssize_t r; ssize_t r;
again: again:
if (t_write) { if (t_write) {
...@@ -81,7 +81,7 @@ again: ...@@ -81,7 +81,7 @@ again:
} }
if (try_again_after_handling_write_error(fd, len, r)) if (try_again_after_handling_write_error(fd, len, r))
goto again; goto again;
return r; assert(r==(ssize_t)len);
} }
static uint64_t get_tnow(void) { static uint64_t get_tnow(void) {
......
...@@ -124,42 +124,23 @@ maybe_preallocate_in_file (int fd, u_int64_t size) ...@@ -124,42 +124,23 @@ maybe_preallocate_in_file (int fd, u_int64_t size)
memset(wbuf, 0, N); memset(wbuf, 0, N);
toku_off_t start_write = alignup(file_size, 4096); toku_off_t start_write = alignup(file_size, 4096);
assert(start_write >= file_size); assert(start_write >= file_size);
ssize_t r = toku_os_pwrite(fd, wbuf, N, start_write); toku_os_full_pwrite(fd, wbuf, N, start_write);
if (r==-1) {
int e=errno; // must save errno before calling toku_free.
toku_free(wbuf); toku_free(wbuf);
return e;
}
toku_free(wbuf);
assert(r==N); // We don't handle short writes properly, which is the case where 0<= r < N.
} }
return 0; return 0;
} }
static int static void
toku_pwrite_extend (int fd, const void *buf, size_t count, toku_off_t offset, ssize_t *num_wrote) toku_full_pwrite_extend (int fd, const void *buf, size_t count, toku_off_t offset)
// requires that the pwrite has been locked // requires that the pwrite has been locked
// Returns 0 on success (and fills in *num_wrote for how many bytes are written) // On failure, this does not return (an assertion fails or something).
// Returns nonzero error number problems.
{ {
assert(pwrite_is_locked); assert(pwrite_is_locked);
{ {
int r = maybe_preallocate_in_file(fd, offset+count); int r = maybe_preallocate_in_file(fd, offset+count);
if (r!=0) { assert(r==0);
*num_wrote = 0;
return r;
}
}
{
*num_wrote = toku_os_pwrite(fd, buf, count, offset);
if (*num_wrote < 0) {
int r = errno;
*num_wrote = 0;
return r;
} else {
return 0;
}
} }
toku_os_full_pwrite(fd, buf, count, offset);
} }
// Don't include the compression header // Don't include the compression header
...@@ -532,7 +513,6 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -532,7 +513,6 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
} }
//write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone); //write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone);
int r;
{ {
// If the node has never been written, then write the whole buffer, including the zeros // If the node has never been written, then write the whole buffer, including the zeros
assert(blocknum.b>=0); assert(blocknum.b>=0);
...@@ -546,14 +526,8 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -546,14 +526,8 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
//h will be dirtied //h will be dirtied
toku_blocknum_realloc_on_disk(h->blocktable, blocknum, n_to_write, &offset, toku_blocknum_realloc_on_disk(h->blocktable, blocknum, n_to_write, &offset,
h, for_checkpoint); h, for_checkpoint);
ssize_t n_wrote;
lock_for_pwrite(); lock_for_pwrite();
r=toku_pwrite_extend(fd, compressed_buf, n_to_write, offset, &n_wrote); toku_full_pwrite_extend(fd, compressed_buf, n_to_write, offset);
if (r) {
// fprintf(stderr, "%s:%d: Error writing data to file. errno=%d (%s)\n", __FILE__, __LINE__, r, strerror(r));
} else {
r=0;
}
unlock_for_pwrite(); unlock_for_pwrite();
} }
...@@ -562,7 +536,7 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b ...@@ -562,7 +536,7 @@ int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct b
toku_free(buf); toku_free(buf);
toku_free(compressed_buf); toku_free(compressed_buf);
node->dirty = 0; // See #1957. Must set the node to be clean after serializing it so that it doesn't get written again on the next checkpoint or eviction. node->dirty = 0; // See #1957. Must set the node to be clean after serializing it so that it doesn't get written again on the next checkpoint or eviction.
return r; return 0;
} }
#define DO_DECOMPRESS_WORKER 1 #define DO_DECOMPRESS_WORKER 1
...@@ -1223,44 +1197,20 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) { ...@@ -1223,44 +1197,20 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
assert(w_main.ndone==size_main); assert(w_main.ndone==size_main);
} }
toku_brtheader_unlock(h); toku_brtheader_unlock(h);
char *writing_what;
lock_for_pwrite(); lock_for_pwrite();
{ {
//Actual Write translation table //Actual Write translation table
ssize_t nwrote; toku_full_pwrite_extend(fd, w_translation.buf,
rr = toku_pwrite_extend(fd, w_translation.buf, size_translation, address_translation);
size_translation, address_translation, &nwrote);
if (rr) {
writing_what = "translation";
goto panic;
}
assert(nwrote==size_translation);
} }
{ {
//Actual Write main header
ssize_t nwrote;
//Alternate writing header to two locations: //Alternate writing header to two locations:
// Beginning (0) or BLOCK_ALLOCATOR_HEADER_RESERVE // Beginning (0) or BLOCK_ALLOCATOR_HEADER_RESERVE
toku_off_t main_offset; toku_off_t main_offset;
//TODO: #1623 uncomment next line when ready for 2 headers //TODO: #1623 uncomment next line when ready for 2 headers
main_offset = (h->checkpoint_count & 0x1) ? 0 : BLOCK_ALLOCATOR_HEADER_RESERVE; main_offset = (h->checkpoint_count & 0x1) ? 0 : BLOCK_ALLOCATOR_HEADER_RESERVE;
rr = toku_pwrite_extend(fd, w_main.buf, w_main.ndone, main_offset, &nwrote); toku_full_pwrite_extend(fd, w_main.buf, w_main.ndone, main_offset);
if (rr) {
writing_what = "header";
panic:
if (h->panic==0) {
char *e = strerror(rr);
int l = 200 + strlen(e);
char s[l];
h->panic=rr;
snprintf(s, l-1, "%s:%d: Error writing %s to data file. errno=%d (%s)\n", __FILE__, __LINE__, writing_what, rr, e);
h->panic_string = toku_strdup(s);
}
goto finish;
} }
assert((u_int64_t)nwrote==size_main);
}
finish:
toku_free(w_main.buf); toku_free(w_main.buf);
toku_free(w_translation.buf); toku_free(w_translation.buf);
unlock_for_pwrite(); unlock_for_pwrite();
...@@ -1288,7 +1238,7 @@ serialize_descriptor_contents_to_wbuf(struct wbuf *wb, struct descriptor *desc) ...@@ -1288,7 +1238,7 @@ serialize_descriptor_contents_to_wbuf(struct wbuf *wb, struct descriptor *desc)
//Descriptors are NOT written during the header checkpoint process. //Descriptors are NOT written during the header checkpoint process.
int int
toku_serialize_descriptor_contents_to_fd(int fd, struct descriptor *desc, DISKOFF offset) { toku_serialize_descriptor_contents_to_fd(int fd, struct descriptor *desc, DISKOFF offset) {
int r; int r = 0;
// make the checksum // make the checksum
int64_t size = toku_serialize_descriptor_size(desc)+4; //4 for checksum int64_t size = toku_serialize_descriptor_size(desc)+4; //4 for checksum
struct wbuf w; struct wbuf w;
...@@ -1303,10 +1253,8 @@ toku_serialize_descriptor_contents_to_fd(int fd, struct descriptor *desc, DISKOF ...@@ -1303,10 +1253,8 @@ toku_serialize_descriptor_contents_to_fd(int fd, struct descriptor *desc, DISKOF
{ {
lock_for_pwrite(); lock_for_pwrite();
//Actual Write translation table //Actual Write translation table
ssize_t nwrote; toku_full_pwrite_extend(fd, w.buf, size, offset);
r = toku_pwrite_extend(fd, w.buf, size, offset, &nwrote);
unlock_for_pwrite(); unlock_for_pwrite();
if (r==0) assert(nwrote==size);
} }
toku_free(w.buf); toku_free(w.buf);
return r; return r;
......
...@@ -119,8 +119,9 @@ void *realloc(void*, size_t) __attribute__((__deprecated__)); ...@@ -119,8 +119,9 @@ void *realloc(void*, size_t) __attribute__((__deprecated__));
void *os_malloc(size_t); void *os_malloc(size_t);
void *os_realloc(void*,size_t); void *os_realloc(void*,size_t);
void os_free(void*); void os_free(void*);
ssize_t toku_os_pwrite (int fd, const void *buf, size_t len, toku_off_t off); // full_pwrite and full_write performs a pwrite, and checks errors. It doesn't return unless all the data was written. */
ssize_t toku_os_write (int fd, const void *buf, size_t len); void toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off);
void toku_os_full_write (int fd, const void *buf, size_t len);
// wrapper around fsync // wrapper around fsync
int toku_file_fsync(int fd); int toku_file_fsync(int fd);
......
...@@ -98,14 +98,16 @@ int toku_set_func_write (ssize_t (*write_fun)(int, const void *, size_t)) { ...@@ -98,14 +98,16 @@ int toku_set_func_write (ssize_t (*write_fun)(int, const void *, size_t)) {
} }
ssize_t void
toku_os_pwrite (int fd, const void *buf, size_t len, toku_off_t off) toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off)
{ {
ssize_t r;
if (t_pwrite) { if (t_pwrite) {
return t_pwrite(fd, buf, len, off); r = t_pwrite(fd, buf, len, off);
} else { } else {
return pwrite(fd, buf, len, off); r = pwrite(fd, buf, len, off);
} }
assert(r==len);
} }
static int (*t_fsync)(int) = 0; static int (*t_fsync)(int) = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment