Commit e944ee50 authored by Yoni Fogel's avatar Yoni Fogel

[t:2312] All paths to write/pwrite go through portability layer.

full_os_(p)write check for partial reads, and fixable errors.


git-svn-id: file:///svn/toku/tokudb@17078 c7de825b-a66e-492c-adef-691d508d4ae1
parent 15ffb96f
...@@ -8,36 +8,35 @@ ...@@ -8,36 +8,35 @@
//Print any necessary errors //Print any necessary errors
//Return whether we should try the write again. //Return whether we should try the write again.
static int static void
try_again_after_handling_write_error(int fd, size_t len, ssize_t r_write) { try_again_after_handling_write_error(int fd, size_t len, ssize_t r_write) {
int try_again = 0; int try_again = 0;
if (r_write==-1) { assert(r_write < 0);
int errno_write = errno; int errno_write = errno;
assert(errno_write != 0); assert(errno_write != 0);
switch (errno_write) { switch (errno_write) {
case EINTR: { //The call was interrupted by a signal before any data was written; see signal(7). case EINTR: { //The call was interrupted by a signal before any data was written; see signal(7).
char err_msg[sizeof("Write of [] bytes to fd=[] interrupted. Retrying.") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars char err_msg[sizeof("Write of [] bytes to fd=[] interrupted. Retrying.") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars
snprintf(err_msg, sizeof(err_msg), "Write of [%"PRIu64"] bytes to fd=[%d] interrupted. Retrying.", (uint64_t)len, fd); snprintf(err_msg, sizeof(err_msg), "Write of [%"PRIu64"] bytes to fd=[%d] interrupted. Retrying.", (uint64_t)len, fd);
perror(err_msg); perror(err_msg);
fflush(stderr); fflush(stderr);
try_again = 1; try_again = 1;
break; break;
}
case ENOSPC: {
char err_msg[sizeof("Failed write of [] bytes to fd=[].") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars
snprintf(err_msg, sizeof(err_msg), "Failed write of [%"PRIu64"] bytes to fd=[%d].", (uint64_t)len, fd);
perror(err_msg);
fflush(stderr);
int out_of_disk_space = 1;
assert(!out_of_disk_space); //Give an error message that might be useful if this is the only one that survives.
}
default:
break;
} }
errno = errno_write; case ENOSPC: {
char err_msg[sizeof("Failed write of [] bytes to fd=[].") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars
snprintf(err_msg, sizeof(err_msg), "Failed write of [%"PRIu64"] bytes to fd=[%d].", (uint64_t)len, fd);
perror(err_msg);
fflush(stderr);
int out_of_disk_space = 1;
assert(!out_of_disk_space); //Give an error message that might be useful if this is the only one that survives.
}
default:
break;
} }
return try_again; assert(try_again);
errno = errno_write;
} }
static ssize_t (*t_pwrite)(int, const void *, size_t, off_t) = 0; static ssize_t (*t_pwrite)(int, const void *, size_t, off_t) = 0;
...@@ -50,16 +49,23 @@ toku_set_func_pwrite (ssize_t (*pwrite_fun)(int, const void *, size_t, off_t)) { ...@@ -50,16 +49,23 @@ toku_set_func_pwrite (ssize_t (*pwrite_fun)(int, const void *, size_t, off_t)) {
void void
toku_os_full_pwrite (int fd, const void *buf, size_t len, off_t off) { toku_os_full_pwrite (int fd, const void *buf, size_t len, off_t off) {
ssize_t r; while (len > 0) {
again: ssize_t r;
if (t_pwrite) { if (t_pwrite) {
r = t_pwrite(fd, buf, len, off); r = t_pwrite(fd, buf, len, off);
} else { } else {
r = pwrite(fd, buf, len, off); r = pwrite(fd, buf, len, off);
}
if (r > 0) {
len -= r;
buf += r;
off += r;
}
else {
try_again_after_handling_write_error(fd, len, r);
}
} }
if (try_again_after_handling_write_error(fd, len, r)) assert(len == 0);
goto again;
assert(r==(ssize_t)len);
} }
static ssize_t (*t_write)(int, const void *, size_t) = 0; static ssize_t (*t_write)(int, const void *, size_t) = 0;
...@@ -72,16 +78,22 @@ toku_set_func_write (ssize_t (*write_fun)(int, const void *, size_t)) { ...@@ -72,16 +78,22 @@ toku_set_func_write (ssize_t (*write_fun)(int, const void *, size_t)) {
void void
toku_os_full_write (int fd, const void *buf, size_t len) { toku_os_full_write (int fd, const void *buf, size_t len) {
ssize_t r; while (len > 0) {
again: ssize_t r;
if (t_write) { if (t_write) {
r = t_write(fd, buf, len); r = t_write(fd, buf, len);
} else { } else {
r = write(fd, buf, len); r = write(fd, buf, len);
}
if (r > 0) {
len -= r;
buf += r;
}
else {
try_again_after_handling_write_error(fd, len, r);
}
} }
if (try_again_after_handling_write_error(fd, len, r)) assert(len == 0);
goto again;
assert(r==(ssize_t)len);
} }
static uint64_t get_tnow(void) { static uint64_t get_tnow(void) {
......
...@@ -151,15 +151,8 @@ int toku_logger_shutdown(TOKULOGGER logger) { ...@@ -151,15 +151,8 @@ int toku_logger_shutdown(TOKULOGGER logger) {
// On error: Return negative with errno set. // On error: Return negative with errno set.
// On success return nbytes. // On success return nbytes.
static int write_it (int fd, const void *bufv, int nbytes) { static int write_it (int fd, const void *bufv, int nbytes) {
int org_nbytes=nbytes; toku_os_full_write(fd, bufv, nbytes);
const char *buf=bufv; return nbytes;
while (nbytes>0) {
int r = write(fd, buf, nbytes);
if (r<0 || errno!=EAGAIN) return r;
buf+=r;
nbytes-=r;
}
return org_nbytes;
} }
// close the current file, and open the next one. // close the current file, and open the next one.
......
...@@ -206,15 +206,8 @@ int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) { ...@@ -206,15 +206,8 @@ int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
// NOTE : duplicated from logger.c - FIX THIS!!! // NOTE : duplicated from logger.c - FIX THIS!!!
static int write_it (int fd, const void *bufv, int nbytes) { static int write_it (int fd, const void *bufv, int nbytes) {
int org_nbytes=nbytes; toku_os_full_write(fd, bufv, nbytes);
const char *buf=bufv; return nbytes;
while (nbytes>0) {
int r = write(fd, buf, nbytes);
if (r<0 || errno!=EAGAIN) return r;
buf+=r;
nbytes-=r;
}
return org_nbytes;
} }
int toku_maybe_spill_rollbacks (TOKUTXN txn) { int toku_maybe_spill_rollbacks (TOKUTXN txn) {
......
...@@ -45,10 +45,10 @@ test (int seed) { ...@@ -45,10 +45,10 @@ test (int seed) {
Bytef compressed_buf[compressed_size]; Bytef compressed_buf[compressed_size];
{ int r = compress2(compressed_buf, &compressed_size, (Bytef*)(buf[i]), sizes[i], 1); assert(r==Z_OK); } { int r = compress2(compressed_buf, &compressed_size, (Bytef*)(buf[i]), sizes[i], 1); assert(r==Z_OK); }
u_int32_t compressed_size_n = toku_htod32(compressed_size); u_int32_t compressed_size_n = toku_htod32(compressed_size);
{ int r = write(fd, &compressed_size_n, 4); assert(r==4); } { toku_os_full_write(fd, &compressed_size_n, 4); }
{ int r = write(fd, compressed_buf, compressed_size); assert(r==(int)compressed_size); } { toku_os_full_write(fd, compressed_buf, compressed_size); }
{ int r = write(fd, &sizesn[i], 4); assert(r==4); } // the uncompressed size { toku_os_full_write(fd, &sizesn[i], 4); } // the uncompressed size
{ int r = write(fd, &compressed_size_n, 4); assert(r==4); } { toku_os_full_write(fd, &compressed_size_n, 4); }
} }
{ int r=close(fd); assert(r==0); } { int r=close(fd); assert(r==0); }
} }
......
...@@ -18,8 +18,7 @@ static void f_flush (CACHEFILE f, ...@@ -18,8 +18,7 @@ static void f_flush (CACHEFILE f,
BOOL for_checkpoint __attribute__((__unused__))) { BOOL for_checkpoint __attribute__((__unused__))) {
assert(size==BLOCKSIZE); assert(size==BLOCKSIZE);
if (write_me) { if (write_me) {
int r = pwrite(toku_cachefile_fd(f), value, BLOCKSIZE, key.b); toku_os_full_pwrite(toku_cachefile_fd(f), value, BLOCKSIZE, key.b);
assert(r==BLOCKSIZE);
} }
if (!keep_me) { if (!keep_me) {
toku_free(value); toku_free(value);
......
...@@ -52,22 +52,18 @@ assert(sizeof(buf) == N_BIGINTS * BIGINT_SIZE); ...@@ -52,22 +52,18 @@ assert(sizeof(buf) == N_BIGINTS * BIGINT_SIZE);
} }
{ {
u_int32_t v = toku_htod32(compressed_len); u_int32_t v = toku_htod32(compressed_len);
ssize_t r = write(fd, &v, sizeof(v)); toku_os_full_write(fd, &v, sizeof(v));
assert(r==sizeof(v));
} }
{ {
ssize_t r = write(fd, compressed_buf, compressed_len); toku_os_full_write(fd, compressed_buf, compressed_len);
assert(r==(ssize_t)compressed_len);
} }
{ {
u_int32_t v = toku_htod32(sizeof(buf)); u_int32_t v = toku_htod32(sizeof(buf));
ssize_t r = write(fd, &v, sizeof(v)); toku_os_full_write(fd, &v, sizeof(v));
assert(r==sizeof(v));
} }
{ {
u_int32_t v = toku_htod32(compressed_len); u_int32_t v = toku_htod32(compressed_len);
ssize_t r = write(fd, &v, sizeof(v)); toku_os_full_write(fd, &v, sizeof(v));
assert(r==sizeof(v));
} }
} }
{ int r = close(fd); assert(r==0); } { int r = close(fd); assert(r==0); }
......
...@@ -27,8 +27,7 @@ test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute_ ...@@ -27,8 +27,7 @@ test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute_
memset(buf, 0, sizeof(buf)); memset(buf, 0, sizeof(buf));
u_int64_t i; u_int64_t i;
for (i=0; i<(1LL<<32); i+=BUFSIZE) { for (i=0; i<(1LL<<32); i+=BUFSIZE) {
int r = write(fd, buf, BUFSIZE); toku_os_full_write(fd, buf, BUFSIZE);
assert(r==BUFSIZE);
} }
} }
int64_t file_size; int64_t file_size;
......
...@@ -52,6 +52,9 @@ ...@@ -52,6 +52,9 @@
toku_fstat; toku_fstat;
toku_dup2; toku_dup2;
toku_os_full_write;
toku_os_full_pwrite;
local: *; local: *;
}; };
...@@ -446,6 +446,7 @@ libs: ...@@ -446,6 +446,7 @@ libs:
diskfull.tdb$(BINSUF): CPPFLAGS+=-DDONT_DEPRECATE_WRITES
test_db_curs4.tdb$(BINSUF): trace.h test_db_curs4.tdb$(BINSUF): trace.h
test_db_curs4.bdb$(BINSUF): trace.h test_db_curs4.bdb$(BINSUF): trace.h
# a bunch of little tests designed to run in parallel # a bunch of little tests designed to run in parallel
......
...@@ -14,8 +14,8 @@ static void mkfile (const char *fname) { ...@@ -14,8 +14,8 @@ static void mkfile (const char *fname) {
int fd = open(fname, O_WRONLY | O_CREAT | O_BINARY, mode); int fd = open(fname, O_WRONLY | O_CREAT | O_BINARY, mode);
if (fd<0) perror("opening"); if (fd<0) perror("opening");
assert(fd>=0); assert(fd>=0);
int r = write(fd, "hello\n", 6); assert(r==6); toku_os_full_write(fd, "hello\n", 6);
r = close(fd); assert(r==0); int r = close(fd); assert(r==0);
} }
static void static void
......
...@@ -91,6 +91,9 @@ typedef int64_t toku_off_t; ...@@ -91,6 +91,9 @@ typedef int64_t toku_off_t;
# pragma poison pthread_condattr_t pthread_cond_t # pragma poison pthread_condattr_t pthread_cond_t
# pragma poison pthread_rwlockattr_t pthread_rwlock_t # pragma poison pthread_rwlockattr_t pthread_rwlock_t
# pragma poison timespec # pragma poison timespec
# ifndef DONT_DEPRECATE_WRITES
# pragma poison write pwrite
# endif
# ifndef DONT_DEPRECATE_MALLOC # ifndef DONT_DEPRECATE_MALLOC
# pragma deprecated (malloc, free, realloc) # pragma deprecated (malloc, free, realloc)
# endif # endif
...@@ -114,6 +117,10 @@ int _dup2(int fd, int fd2) __attribute__((__deprecated_ ...@@ -114,6 +117,10 @@ int _dup2(int fd, int fd2) __attribute__((__deprecated_
char* strdup(const char *) __attribute__((__deprecated__)); char* strdup(const char *) __attribute__((__deprecated__));
#undef __strdup #undef __strdup
char* __strdup(const char *) __attribute__((__deprecated__)); char* __strdup(const char *) __attribute__((__deprecated__));
# ifndef DONT_DEPRECATE_WRITES
ssize_t write(int, const void *, size_t) __attribute__((__deprecated__));
ssize_t pwrite(int, const void *, size_t, off_t) __attribute__((__deprecated__));
#endif
# ifndef DONT_DEPRECATE_MALLOC # ifndef DONT_DEPRECATE_MALLOC
void *malloc(size_t) __attribute__((__deprecated__)); void *malloc(size_t) __attribute__((__deprecated__));
void free(void*) __attribute__((__deprecated__)); void free(void*) __attribute__((__deprecated__));
...@@ -126,8 +133,8 @@ void *os_malloc(size_t); ...@@ -126,8 +133,8 @@ void *os_malloc(size_t);
void *os_realloc(void*,size_t); void *os_realloc(void*,size_t);
void os_free(void*); void os_free(void*);
// full_pwrite and full_write performs a pwrite, and checks errors. It doesn't return unless all the data was written. */ // full_pwrite and full_write performs a pwrite, and checks errors. It doesn't return unless all the data was written. */
void toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off); void toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off) __attribute__((__visibility__("default")));
void toku_os_full_write (int fd, const void *buf, size_t len); void toku_os_full_write (int fd, const void *buf, size_t len) __attribute__((__visibility__("default")));
// wrapper around fsync // wrapper around fsync
int toku_file_fsync(int fd); int toku_file_fsync(int fd);
......
...@@ -26,7 +26,8 @@ $(TARGET): $(OBJS) ...@@ -26,7 +26,8 @@ $(TARGET): $(OBJS)
check: $(TARGET) check: $(TARGET)
cd tests && $(MAKE) check cd tests && $(MAKE) check
$(OBJS): CFLAGS += -DTOKU_WINDOWS_ALLOW_DEPRECATED $(OBJS): CPPFLAGS += -DTOKU_WINDOWS_ALLOW_DEPRECATED
file.obj: CPPFLAGS += -DDONT_DEPRECATE_WRITES
clean: clean:
rm -rf $(TARGET) $(LIBPORTABILITY) $(PTHREAD_LIB) rm -rf $(TARGET) $(LIBPORTABILITY) $(PTHREAD_LIB)
......
...@@ -94,9 +94,62 @@ int toku_set_func_write (ssize_t (*write_fun)(int, const void *, size_t)) { ...@@ -94,9 +94,62 @@ int toku_set_func_write (ssize_t (*write_fun)(int, const void *, size_t)) {
return 0; return 0;
} }
//Print any necessary errors
//Return whether we should try the write again.
static void
try_again_after_handling_write_error(int fd, size_t len, ssize_t r_write) {
int try_again = 0;
assert(r_write < 0);
int errno_write = errno;
assert(errno_write != 0);
switch (errno_write) {
case EINTR: { //The call was interrupted by a signal before any data was written; see signal(7).
char err_msg[sizeof("Write of [] bytes to fd=[] interrupted. Retrying.") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars
snprintf(err_msg, sizeof(err_msg), "Write of [%"PRIu64"] bytes to fd=[%d] interrupted. Retrying.", (uint64_t)len, fd);
perror(err_msg);
fflush(stderr);
try_again = 1;
break;
}
case ENOSPC: {
char err_msg[sizeof("Failed write of [] bytes to fd=[].") + 20+10]; //64 bit is 20 chars, 32 bit is 10 chars
snprintf(err_msg, sizeof(err_msg), "Failed write of [%"PRIu64"] bytes to fd=[%d].", (uint64_t)len, fd);
perror(err_msg);
fflush(stderr);
int out_of_disk_space = 1;
assert(!out_of_disk_space); //Give an error message that might be useful if this is the only one that survives.
}
default:
break;
}
assert(try_again);
errno = errno_write;
}
void void
toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off) toku_os_full_pwrite (int fd, const void *org_buf, size_t len, toku_off_t off)
{
const uint8_t *buf = org_buf;
while (len > 0) {
ssize_t r;
if (t_pwrite) {
r = t_pwrite(fd, buf, len, off);
} else {
r = pwrite(fd, buf, len, off);
}
if (r > 0) {
len -= r;
buf += r;
off += r;
}
else {
try_again_after_handling_write_error(fd, len, r);
}
}
assert(len == 0);
}
/*
{ {
ssize_t r; ssize_t r;
if (t_pwrite) { if (t_pwrite) {
...@@ -114,6 +167,27 @@ toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off) ...@@ -114,6 +167,27 @@ toku_os_full_pwrite (int fd, const void *buf, size_t len, toku_off_t off)
} }
assert(r==len); assert(r==len);
} }
*/
void
toku_os_full_write (int fd, const void *org_buf, size_t len) {
const uint8_t *buf = org_buf;
while (len > 0) {
ssize_t r;
if (t_write) {
r = t_write(fd, buf, len);
} else {
r = write(fd, buf, len);
}
if (r > 0) {
len -= r;
buf += r;
}
else {
try_again_after_handling_write_error(fd, len, r);
}
}
assert(len == 0);
}
// t_fsync exists for testing purposes only // t_fsync exists for testing purposes only
static int (*t_fsync)(int) = 0; static int (*t_fsync)(int) = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment