Commit 0c9ac61b authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Improve error messages and error handling when writes fail.

 * Fixes #78 ({{{lg_bsize}}} works)
 * Fixes #1323 (write errors)
Did
{{{
svn merge -r8967:9031 https://svn.tokutek.com/tokudb/toku/tokudb.1323a
}}}

(And I'll delete the 1323 branches in a moment.)


git-svn-id: file:///svn/toku/tokudb@9032 c7de825b-a66e-492c-adef-691d508d4ae1
parent f50e6924
...@@ -288,9 +288,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d ...@@ -288,9 +288,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d
int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default"))); int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default")));
int toku_set_trace_file (char *fname) __attribute__((__visibility__("default"))); int toku_set_trace_file (char *fname) __attribute__((__visibility__("default")));
int toku_close_trace_file (void) __attribute__((__visibility__("default"))); int toku_close_trace_file (void) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif
......
...@@ -304,9 +304,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d ...@@ -304,9 +304,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d
int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default"))); int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default")));
int toku_set_trace_file (char *fname) __attribute__((__visibility__("default"))); int toku_set_trace_file (char *fname) __attribute__((__visibility__("default")));
int toku_close_trace_file (void) __attribute__((__visibility__("default"))); int toku_close_trace_file (void) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif
......
...@@ -310,9 +310,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d ...@@ -310,9 +310,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d
int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default"))); int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default")));
int toku_set_trace_file (char *fname) __attribute__((__visibility__("default"))); int toku_set_trace_file (char *fname) __attribute__((__visibility__("default")));
int toku_close_trace_file (void) __attribute__((__visibility__("default"))); int toku_close_trace_file (void) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif
......
...@@ -310,9 +310,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d ...@@ -310,9 +310,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d
int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default"))); int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default")));
int toku_set_trace_file (char *fname) __attribute__((__visibility__("default"))); int toku_set_trace_file (char *fname) __attribute__((__visibility__("default")));
int toku_close_trace_file (void) __attribute__((__visibility__("default"))); int toku_close_trace_file (void) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif
......
...@@ -315,9 +315,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d ...@@ -315,9 +315,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d
int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default"))); int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default")));
int toku_set_trace_file (char *fname) __attribute__((__visibility__("default"))); int toku_set_trace_file (char *fname) __attribute__((__visibility__("default")));
int toku_close_trace_file (void) __attribute__((__visibility__("default"))); int toku_close_trace_file (void) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif
......
...@@ -358,9 +358,11 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un ...@@ -358,9 +358,11 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
printf("int db_env_set_func_fsync (int (*)(int)) %s;\n", VISIBLE); printf("int db_env_set_func_fsync (int (*)(int)) %s;\n", VISIBLE);
printf("int toku_set_trace_file (char *fname) %s;\n", VISIBLE); printf("int toku_set_trace_file (char *fname) %s;\n", VISIBLE);
printf("int toku_close_trace_file (void) %s;\n", VISIBLE); printf("int toku_close_trace_file (void) %s;\n", VISIBLE);
printf("int db_env_set_func_free (void (*)(void*)) %s;\n", VISIBLE);
printf("int db_env_set_func_malloc (void *(*)(size_t)) %s;\n", VISIBLE); printf("int db_env_set_func_malloc (void *(*)(size_t)) %s;\n", VISIBLE);
printf("int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, off_t)) %s;\n", VISIBLE);
printf("int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) %s;\n", VISIBLE);
printf("int db_env_set_func_realloc (void *(*)(void*, size_t)) %s;\n", VISIBLE); printf("int db_env_set_func_realloc (void *(*)(void*, size_t)) %s;\n", VISIBLE);
printf("int db_env_set_func_free (void (*)(void*)) %s;\n", VISIBLE);
printf("#if defined(__cplusplus)\n}\n#endif\n"); printf("#if defined(__cplusplus)\n}\n#endif\n");
printf("#endif\n"); printf("#endif\n");
return 0; return 0;
......
...@@ -81,7 +81,7 @@ static void benchmark_setup (void) { ...@@ -81,7 +81,7 @@ static void benchmark_setup (void) {
int r; int r;
{ {
char unlink_cmd[strlen(dbdir) + strlen("rf -rf ") + 1]; char unlink_cmd[strlen(dbdir) + strlen("rm -rf ") + 1];
snprintf(unlink_cmd, sizeof(unlink_cmd), "rm -rf %s", dbdir); snprintf(unlink_cmd, sizeof(unlink_cmd), "rm -rf %s", dbdir);
//printf("unlink_cmd=%s\n", unlink_cmd); //printf("unlink_cmd=%s\n", unlink_cmd);
system(unlink_cmd); system(unlink_cmd);
......
...@@ -310,9 +310,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d ...@@ -310,9 +310,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d
int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default"))); int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default")));
int toku_set_trace_file (char *fname) __attribute__((__visibility__("default"))); int toku_set_trace_file (char *fname) __attribute__((__visibility__("default")));
int toku_close_trace_file (void) __attribute__((__visibility__("default"))); int toku_close_trace_file (void) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default"))); int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif
......
#include <unistd.h>
static ssize_t (*t_pwrite)(int, const void *, size_t, off_t) = 0;
static ssize_t (*t_write)(int, const void *, size_t) = 0;
int toku_set_func_pwrite (ssize_t (*pwrite_fun)(int, const void *, size_t, off_t)) {
t_pwrite = pwrite_fun;
return 0;
}
int toku_set_func_write (ssize_t (*write_fun)(int, const void *, size_t)) {
t_write = write_fun;
return 0;
}
ssize_t
toku_os_pwrite (int fd, const void *buf, size_t len, off_t off)
{
if (t_pwrite) {
return t_pwrite(fd, buf, len, off);
} else {
return pwrite(fd, buf, len, off);
}
}
...@@ -123,6 +123,8 @@ struct block_translation_pair { ...@@ -123,6 +123,8 @@ struct block_translation_pair {
struct brt_header { struct brt_header {
int refcount; int refcount;
int dirty; int dirty;
int panic; // If nonzero there was a write error. Don't write any more, because it probably only gets worse. This is the error code.
char *panic_string; // A malloced string that can indicate what went wrong.
int layout_version; int layout_version;
unsigned int nodesize; unsigned int nodesize;
int n_named_roots; /* -1 if the only one is unnamed */ int n_named_roots; /* -1 if the only one is unnamed */
...@@ -181,7 +183,7 @@ struct brt { ...@@ -181,7 +183,7 @@ struct brt {
}; };
/* serialization code */ /* serialization code */
void toku_serialize_brtnode_to(int fd, BLOCKNUM, BRTNODE node, struct brt_header *h); int toku_serialize_brtnode_to(int fd, BLOCKNUM, BRTNODE node, struct brt_header *h);
int toku_deserialize_brtnode_from (int fd, BLOCKNUM off, u_int32_t /*fullhash*/, BRTNODE *brtnode, struct brt_header *h); int toku_deserialize_brtnode_from (int fd, BLOCKNUM off, u_int32_t /*fullhash*/, BRTNODE *brtnode, struct brt_header *h);
unsigned int toku_serialize_brtnode_size(BRTNODE node); /* How much space will it take? */ unsigned int toku_serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len); int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
...@@ -286,7 +288,7 @@ enum brt_layout_version_e { ...@@ -286,7 +288,7 @@ enum brt_layout_version_e {
}; };
void toku_brtheader_free (struct brt_header *h); void toku_brtheader_free (struct brt_header *h);
int toku_brtheader_close (CACHEFILE cachefile, void *header_v); int toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **error_string);
int toku_brtheader_checkpoint (CACHEFILE cachefile, void *header_v); int toku_brtheader_checkpoint (CACHEFILE cachefile, void *header_v);
#define BLOCK_ALLOCATOR_ALIGNMENT 4096 #define BLOCK_ALLOCATOR_ALIGNMENT 4096
......
...@@ -23,9 +23,10 @@ static inline u_int64_t alignup (u_int64_t a, u_int64_t b) { ...@@ -23,9 +23,10 @@ static inline u_int64_t alignup (u_int64_t a, u_int64_t b) {
return ((a+b-1)/b)*b; return ((a+b-1)/b)*b;
} }
void int
maybe_preallocate_in_file (int fd, u_int64_t size) maybe_preallocate_in_file (int fd, u_int64_t size)
// Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MB whichever is less. // Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MB whichever is less.
// Return 0 on success, otherwise an error number.
{ {
int64_t file_size; int64_t file_size;
{ {
...@@ -39,10 +40,16 @@ maybe_preallocate_in_file (int fd, u_int64_t size) ...@@ -39,10 +40,16 @@ maybe_preallocate_in_file (int fd, u_int64_t size)
memset(wbuf, 0, N); memset(wbuf, 0, N);
toku_off_t start_write = alignup(file_size, 4096); toku_off_t start_write = alignup(file_size, 4096);
assert(start_write >= file_size); assert(start_write >= file_size);
ssize_t r = pwrite(fd, wbuf, N, start_write); ssize_t r = toku_os_pwrite(fd, wbuf, N, start_write);
assert(r==N); if (r==-1) {
int e=errno; // must save errno before calling toku_free.
toku_free(wbuf);
return e;
}
toku_free(wbuf); toku_free(wbuf);
assert(r==N); // We don't handle short writes properly, which is the case where 0<= r < N.
} }
return 0;
} }
// This mutex protects pwrite from running in parallel, and also protects modifications to the block allocator. // This mutex protects pwrite from running in parallel, and also protects modifications to the block allocator.
...@@ -72,13 +79,30 @@ unlock_for_pwrite (void) { ...@@ -72,13 +79,30 @@ unlock_for_pwrite (void) {
assert(r==0); assert(r==0);
} }
static ssize_t static int
toku_pwrite (int fd, const void *buf, size_t count, toku_off_t offset) toku_pwrite_extend (int fd, const void *buf, size_t count, toku_off_t offset, ssize_t *num_wrote)
// requires that the pwrite has been locked // requires that the pwrite has been locked
// Returns 0 on success (and fills in *num_wrote for how many bytes are written)
// Returns nonzero error number problems.
{ {
assert(pwrite_is_locked); assert(pwrite_is_locked);
maybe_preallocate_in_file(fd, offset+count); {
return pwrite(fd, buf, count, offset); int r = maybe_preallocate_in_file(fd, offset+count);
if (r!=0) {
*num_wrote = 0;
return r;
}
}
{
*num_wrote = toku_os_pwrite(fd, buf, count, offset);
if (*num_wrote < 0) {
int r = errno;
*num_wrote = 0;
return r;
} else {
return 0;
}
}
} }
// Don't include the compressed data size or the uncompressed data size. // Don't include the compressed data size or the uncompressed data size.
...@@ -184,7 +208,7 @@ enum { compression_header_len = (4 // compressed_len ...@@ -184,7 +208,7 @@ enum { compression_header_len = (4 // compressed_len
+4 // uncompressed_len +4 // uncompressed_len
) }; ) };
void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_header *h) { int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_header *h) {
struct wbuf w; struct wbuf w;
int i; int i;
unsigned int calculated_size = toku_serialize_brtnode_size(node) - 8; // don't include the compressed or uncompressed sizes unsigned int calculated_size = toku_serialize_brtnode_size(node) - 8; // don't include the compressed or uncompressed sizes
...@@ -311,6 +335,7 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct ...@@ -311,6 +335,7 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct
((int32_t*)(compressed_buf+uncompressed_magic_len))[1] = toku_htonl(uncompressed_len); ((int32_t*)(compressed_buf+uncompressed_magic_len))[1] = toku_htonl(uncompressed_len);
//write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone); //write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone);
int r;
{ {
lock_for_pwrite(); lock_for_pwrite();
// If the node has never been written, then write the whole buffer, including the zeros // If the node has never been written, then write the whole buffer, including the zeros
...@@ -331,9 +356,13 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct ...@@ -331,9 +356,13 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct
block_allocator_alloc_block(h->block_allocator, n_to_write, &offset); block_allocator_alloc_block(h->block_allocator, n_to_write, &offset);
h->block_translation[blocknum.b].diskoff = offset; h->block_translation[blocknum.b].diskoff = offset;
h->block_translation[blocknum.b].size = n_to_write; h->block_translation[blocknum.b].size = n_to_write;
ssize_t r=toku_pwrite(fd, compressed_buf, n_to_write, offset); ssize_t n_wrote;
if (r<0) printf("r=%ld errno=%d\n", (long)r, errno); r=toku_pwrite_extend(fd, compressed_buf, n_to_write, offset, &n_wrote);
assert(r==(ssize_t)n_to_write); if (r) {
fprintf(stderr, "%s:%d: Error writing data to file. errno=%d (%s)\n", __FILE__, __LINE__, r, strerror(r));
} else {
r=0;
}
unlock_for_pwrite(); unlock_for_pwrite();
} }
...@@ -341,10 +370,12 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct ...@@ -341,10 +370,12 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct
assert(w.ndone==calculated_size); assert(w.ndone==calculated_size);
toku_free(buf); toku_free(buf);
toku_free(compressed_buf); toku_free(compressed_buf);
return r;
} }
int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h) { int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h) {
if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b); if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b);
if (h->panic) return h->panic;
assert(0 <= blocknum.b && (u_int64_t)blocknum.b < h->translated_blocknum_limit); assert(0 <= blocknum.b && (u_int64_t)blocknum.b < h->translated_blocknum_limit);
DISKOFF offset = h->block_translation[blocknum.b].diskoff; DISKOFF offset = h->block_translation[blocknum.b].diskoff;
TAGMALLOC(BRTNODE, result); TAGMALLOC(BRTNODE, result);
...@@ -697,18 +728,31 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h) ...@@ -697,18 +728,31 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h)
} }
int toku_serialize_brt_header_to (int fd, struct brt_header *h) { int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
int rr = 0;
if (h->panic) return h->panic;
lock_for_pwrite(); lock_for_pwrite();
{ {
struct wbuf w; struct wbuf w;
unsigned int size = toku_serialize_brt_header_size (h); unsigned int size = toku_serialize_brt_header_size (h);
wbuf_init(&w, toku_malloc(size), size); wbuf_init(&w, toku_malloc(size), size);
int r=toku_serialize_brt_header_to_wbuf(&w, h); {
assert(r==0); int r=toku_serialize_brt_header_to_wbuf(&w, h);
assert(r==0);
}
assert(w.ndone==size); assert(w.ndone==size);
ssize_t nwrote = toku_pwrite(fd, w.buf, w.ndone, 0); ssize_t nwrote;
if (nwrote<0) perror("pwrite"); rr = toku_pwrite_extend(fd, w.buf, w.ndone, 0, &nwrote);
assert((size_t)nwrote==w.ndone);
toku_free(w.buf); toku_free(w.buf);
if (rr) {
if (h->panic==0) {
char s[200];
h->panic=rr;
snprintf(s, sizeof(s), "%s:%d: Error writing header to data file. errno=%d (%s)\n", __FILE__, __LINE__, rr, strerror(rr));
h->panic_string = toku_strdup(s);
}
goto finish;
}
assert((u_int64_t)nwrote==size);
} }
{ {
struct wbuf w; struct wbuf w;
...@@ -723,12 +767,18 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) { ...@@ -723,12 +767,18 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
} }
u_int32_t checksum = x1764_finish(&w.checksum); u_int32_t checksum = x1764_finish(&w.checksum);
wbuf_int(&w, checksum); wbuf_int(&w, checksum);
ssize_t nwrote = toku_pwrite(fd, w.buf, size, h->block_translation_address_on_disk); ssize_t nwrote;
assert((u_int64_t)nwrote==size); rr = toku_pwrite_extend(fd, w.buf, size, h->block_translation_address_on_disk, &nwrote);
toku_free(w.buf); toku_free(w.buf);
}; if (rr) {
fprintf(stderr, "%s:%d: Error writing data to file. errno=%d (%s)\n", __FILE__, __LINE__, rr, strerror(rr));
goto finish;
}
assert((u_int64_t)nwrote==size);
}
finish:
unlock_for_pwrite(); unlock_for_pwrite();
return 0; return rr;
} }
// We only deserialize brt header once and then share everything with all the brts. // We only deserialize brt header once and then share everything with all the brts.
...@@ -750,6 +800,8 @@ deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_header ** ...@@ -750,6 +800,8 @@ deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_header **
if (r!=(ssize_t)size-12) { ret = EINVAL; goto died1; } if (r!=(ssize_t)size-12) { ret = EINVAL; goto died1; }
} }
h->dirty=0; h->dirty=0;
h->panic = 0;
h->panic_string = 0;
h->layout_version = rbuf_int(&rc); h->layout_version = rbuf_int(&rc);
h->nodesize = rbuf_int(&rc); h->nodesize = rbuf_int(&rc);
assert(h->layout_version==BRT_LAYOUT_VERSION_9); assert(h->layout_version==BRT_LAYOUT_VERSION_9);
...@@ -882,8 +934,13 @@ int toku_serialize_fifo_at (int fd, toku_off_t freeoff, FIFO fifo) { ...@@ -882,8 +934,13 @@ int toku_serialize_fifo_at (int fd, toku_off_t freeoff, FIFO fifo) {
struct wbuf w; struct wbuf w;
wbuf_init(&w, buf, size); wbuf_init(&w, buf, size);
wbuf_int(&w, toku_fifo_n_entries(fifo)); wbuf_int(&w, toku_fifo_n_entries(fifo));
ssize_t r = toku_pwrite(fd, w.buf, size, freeoff); ssize_t nwrote;
if (r!=size) return errno; int r = toku_pwrite_extend(fd, w.buf, size, freeoff, &nwrote);
if (r) {
unlock_for_pwrite();
return r;
}
assert(nwrote==size);
freeoff+=size; freeoff+=size;
} }
FIFO_ITERATE(fifo, key, keylen, val, vallen, type, xid, FIFO_ITERATE(fifo, key, keylen, val, vallen, type, xid,
...@@ -900,10 +957,11 @@ int toku_serialize_fifo_at (int fd, toku_off_t freeoff, FIFO fifo) { ...@@ -900,10 +957,11 @@ int toku_serialize_fifo_at (int fd, toku_off_t freeoff, FIFO fifo) {
//printf("%s:%d Writing %d bytes: %s\n", __FILE__, __LINE__, vallen, (char*)val); //printf("%s:%d Writing %d bytes: %s\n", __FILE__, __LINE__, vallen, (char*)val);
wbuf_bytes(&w, val, vallen); wbuf_bytes(&w, val, vallen);
assert(w.ndone==size); assert(w.ndone==size);
ssize_t r = toku_pwrite(fd, w.buf, (size_t)size, freeoff); ssize_t nwrote;
if (r<0) { int r = toku_pwrite_extend(fd, w.buf, (size_t)size, freeoff, &nwrote);
if (r) {
unlock_for_pwrite(); unlock_for_pwrite();
return errno; return r;
} }
assert(r==(ssize_t)size); assert(r==(ssize_t)size);
freeoff+=size; freeoff+=size;
......
...@@ -377,7 +377,17 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void * ...@@ -377,7 +377,17 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *
assert(brtnode->thisnodename.b==nodename.b); assert(brtnode->thisnodename.b==nodename.b);
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]); //printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (write_me) { if (write_me) {
toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode, h); if (!h->panic) { // if the brt panicked, stop writing, otherwise try to write it.
int r = toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode, h);
if (r) {
if (h->panic==0) {
char s[200];
h->panic=r;
snprintf(s, sizeof(s), "While writing data to disk, error %d (%s)", r, strerror(r));
h->panic_string = toku_strdup(s);
}
}
}
} }
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]); //printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (!keep_me) { if (!keep_me) {
...@@ -1196,7 +1206,6 @@ brt_split_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, BOOL *did ...@@ -1196,7 +1206,6 @@ brt_split_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, BOOL *did
NULL, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, toku_brtnode_flush_callback, toku_brtnode_fetch_callback,
t->h); t->h);
assert(r==0); // REMOVE LATER
if (r!=0) return r; if (r!=0) return r;
child = childnode_v; child = childnode_v;
assert(child->thisnodename.b!=0); assert(child->thisnodename.b!=0);
...@@ -2525,7 +2534,10 @@ int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) ...@@ -2525,7 +2534,10 @@ int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger)
VERIFY_NODE(brt, node); VERIFY_NODE(brt, node);
verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
if ((r = push_something_at_root(brt, &node, rootp, cmd, logger))) return r; if ((r = push_something_at_root(brt, &node, rootp, cmd, logger))) {
toku_unpin_brtnode(brt, node); // ignore any error code on the unpin.
return r;
}
verify_local_fingerprint_nonleaf(node); verify_local_fingerprint_nonleaf(node);
r = toku_unpin_brtnode(brt, node); r = toku_unpin_brtnode(brt, node);
assert(r == 0); assert(r == 0);
...@@ -2864,7 +2876,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char ...@@ -2864,7 +2876,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
} }
if (r!=0) { if (r!=0) {
died_after_open: died_after_open:
toku_cachefile_close(&t->cf, toku_txn_logger(txn)); toku_cachefile_close(&t->cf, toku_txn_logger(txn), 0);
t->database_name = 0; t->database_name = 0;
goto died0a; goto died0a;
} }
...@@ -3035,25 +3047,35 @@ toku_brtheader_checkpoint (CACHEFILE cachefile, void *header_v) ...@@ -3035,25 +3047,35 @@ toku_brtheader_checkpoint (CACHEFILE cachefile, void *header_v)
//printf("%s:%d allocated_limit=%lu writing queue to %lu\n", __FILE__, __LINE__, //printf("%s:%d allocated_limit=%lu writing queue to %lu\n", __FILE__, __LINE__,
// block_allocator_allocated_limit(h->block_allocator), h->unused_blocks.b*h->nodesize); // block_allocator_allocated_limit(h->block_allocator), h->unused_blocks.b*h->nodesize);
if (h->dirty) { if (h->dirty) {
toku_serialize_brt_header_to(toku_cachefile_fd(cachefile), h); {
int r = toku_serialize_brt_header_to(toku_cachefile_fd(cachefile), h);
if (r) return r;
}
u_int64_t write_to = block_allocator_allocated_limit(h->block_allocator); // Must compute this after writing the header. u_int64_t write_to = block_allocator_allocated_limit(h->block_allocator); // Must compute this after writing the header.
//printf("%s:%d fifo written to %lu\n", __FILE__, __LINE__, write_to); //printf("%s:%d fifo written to %lu\n", __FILE__, __LINE__, write_to);
toku_serialize_fifo_at(toku_cachefile_fd(cachefile), write_to, h->fifo); {
int r = toku_serialize_fifo_at(toku_cachefile_fd(cachefile), write_to, h->fifo);
if (r) return r;
}
h->dirty = 0; h->dirty = 0;
} }
return 0; return 0;
} }
int int
toku_brtheader_close (CACHEFILE cachefile, void *header_v) toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **malloced_error_string)
{ {
struct brt_header *h = header_v; struct brt_header *h = header_v;
toku_brtheader_checkpoint(cachefile, h); int r = toku_brtheader_checkpoint(cachefile, h);
if (malloced_error_string) *malloced_error_string = h->panic_string;
if (r==0) {
r=h->panic;
}
toku_brtheader_free(h); toku_brtheader_free(h);
return 0; return r;
} }
int toku_close_brt (BRT brt, TOKULOGGER logger) { int toku_close_brt (BRT brt, TOKULOGGER logger, char **error_string) {
int r; int r;
while (!list_empty(&brt->cursors)) { while (!list_empty(&brt->cursors)) {
BRT_CURSOR c = list_struct(list_pop(&brt->cursors), struct brt_cursor, cursors_link); BRT_CURSOR c = list_struct(list_pop(&brt->cursors), struct brt_cursor, cursors_link);
...@@ -3074,17 +3096,18 @@ int toku_close_brt (BRT brt, TOKULOGGER logger) { ...@@ -3074,17 +3096,18 @@ int toku_close_brt (BRT brt, TOKULOGGER logger) {
r = toku_log_brtclose(logger, &lsn, 1, bs, toku_cachefile_filenum(brt->cf)); // flush the log on close, otherwise it might not make it out. r = toku_log_brtclose(logger, &lsn, 1, bs, toku_cachefile_filenum(brt->cf)); // flush the log on close, otherwise it might not make it out.
if (r!=0) return r; if (r!=0) return r;
} }
assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // For the brt, the pinned count should be zero. if (!brt->h->panic)
assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // For the brt, the pinned count should be zero (but if panic, don't worry)
//printf("%s:%d closing cachetable\n", __FILE__, __LINE__); //printf("%s:%d closing cachetable\n", __FILE__, __LINE__);
// printf("%s:%d brt=%p ,brt->h=%p\n", __FILE__, __LINE__, brt, brt->h); // printf("%s:%d brt=%p ,brt->h=%p\n", __FILE__, __LINE__, brt, brt->h);
if ((r = toku_cachefile_close(&brt->cf, logger))!=0) return r; r = toku_cachefile_close(&brt->cf, logger, error_string);
} }
if (brt->database_name) toku_free(brt->database_name); if (brt->database_name) toku_free(brt->database_name);
if (brt->fname) toku_free(brt->fname); if (brt->fname) toku_free(brt->fname);
if (brt->skey) { toku_free(brt->skey); } if (brt->skey) { toku_free(brt->skey); }
if (brt->sval) { toku_free(brt->sval); } if (brt->sval) { toku_free(brt->sval); }
toku_free(brt); toku_free(brt);
return 0; return r;
} }
int toku_brt_create(BRT *brt_ptr) { int toku_brt_create(BRT *brt_ptr) {
......
...@@ -30,7 +30,7 @@ int toku_brt_insert (BRT, DBT *, DBT *, TOKUTXN); ...@@ -30,7 +30,7 @@ int toku_brt_insert (BRT, DBT *, DBT *, TOKUTXN);
int toku_brt_lookup (BRT brt, DBT *k, DBT *v); int toku_brt_lookup (BRT brt, DBT *k, DBT *v);
int toku_brt_delete (BRT brt, DBT *k, TOKUTXN); int toku_brt_delete (BRT brt, DBT *k, TOKUTXN);
int toku_brt_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN); // Delete a pair only if both k and v are equal according to the comparison function. int toku_brt_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN); // Delete a pair only if both k and v are equal according to the comparison function.
int toku_close_brt (BRT, TOKULOGGER); int toku_close_brt (BRT, TOKULOGGER, char **error_string);
int toku_dump_brt (FILE *,BRT brt); int toku_dump_brt (FILE *,BRT brt);
...@@ -118,7 +118,7 @@ void toku_brt_destroy(void); ...@@ -118,7 +118,7 @@ void toku_brt_destroy(void);
void toku_pwrite_lock_init(void); void toku_pwrite_lock_init(void);
void toku_pwrite_lock_destroy(void); void toku_pwrite_lock_destroy(void);
void maybe_preallocate_in_file (int fd, u_int64_t size); int maybe_preallocate_in_file (int fd, u_int64_t size);
// Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MB whichever is less. // Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MB whichever is less.
int toku_brt_note_table_lock (BRT brt, TOKUTXN txn); int toku_brt_note_table_lock (BRT brt, TOKUTXN txn);
......
...@@ -134,7 +134,7 @@ struct cachefile { ...@@ -134,7 +134,7 @@ struct cachefile {
char *fname; char *fname;
void *userdata; void *userdata;
int (*close_userdata)(CACHEFILE cf, void *userdata); // when closing the last reference to a cachefile, first call this function. int (*close_userdata)(CACHEFILE cf, void *userdata, char **error_string); // when closing the last reference to a cachefile, first call this function.
int (*checkpoint_userdata)(CACHEFILE cf, void *userdata); // when checkpointing a cachefile, call this function. int (*checkpoint_userdata)(CACHEFILE cf, void *userdata); // when checkpointing a cachefile, call this function.
}; };
...@@ -254,7 +254,7 @@ int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname) { ...@@ -254,7 +254,7 @@ int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname) {
if (r != 0) { if (r != 0) {
r=errno; close(fd); return r; r=errno; close(fd); return r;
} }
if (cf->close_userdata && (r = cf->close_userdata(cf, cf->userdata))) { if (cf->close_userdata && (r = cf->close_userdata(cf, cf->userdata, 0))) {
return r; return r;
} }
cf->close_userdata = NULL; cf->close_userdata = NULL;
...@@ -299,29 +299,30 @@ void toku_cachefile_refup (CACHEFILE cf) { ...@@ -299,29 +299,30 @@ void toku_cachefile_refup (CACHEFILE cf) {
cf->refcount++; cf->refcount++;
} }
int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger) { int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string) {
CACHEFILE cf = *cfp; CACHEFILE cf = *cfp;
CACHETABLE ct = cf->cachetable; CACHETABLE ct = cf->cachetable;
cachetable_lock(ct); cachetable_lock(ct);
assert(cf->refcount>0); assert(cf->refcount>0);
cf->refcount--; cf->refcount--;
if (cf->refcount==0) { if (cf->refcount==0) {
int r; int r = 0;
if ((r = cachefile_write_maybe_remove(ct, cf, TRUE))) { if ((r = cachefile_write_maybe_remove(ct, cf, TRUE))) {
cachetable_unlock(ct); cachetable_unlock(ct);
return r; return r;
} }
if (cf->close_userdata && (r = cf->close_userdata(cf, cf->userdata))) { if (cf->close_userdata) {
cachetable_unlock(ct); r = cf->close_userdata(cf, cf->userdata, error_string);
return r;
} }
cf->close_userdata = NULL; cf->close_userdata = NULL;
cf->checkpoint_userdata = NULL; cf->checkpoint_userdata = NULL;
cf->userdata = NULL; cf->userdata = NULL;
cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles); cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles);
cachetable_unlock(ct); cachetable_unlock(ct);
r = close(cf->fd); {
assert(r == 0); int r2 = close(cf->fd);
assert(r2 == 0);
}
cf->fd = -1; cf->fd = -1;
if (logger) { if (logger) {
//assert(cf->fname); //assert(cf->fname);
...@@ -1309,7 +1310,7 @@ int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, vo ...@@ -1309,7 +1310,7 @@ int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, vo
void void
toku_cachefile_set_userdata (CACHEFILE cf, toku_cachefile_set_userdata (CACHEFILE cf,
void *userdata, void *userdata,
int (*close_userdata)(CACHEFILE, void*), int (*close_userdata)(CACHEFILE, void*, char**/*error_string*/),
int (*checkpoint_userdata)(CACHEFILE, void*)) int (*checkpoint_userdata)(CACHEFILE, void*))
{ {
cf->userdata = userdata; cf->userdata = userdata;
......
...@@ -66,7 +66,7 @@ typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, CACHEKEY key, void *value, ...@@ -66,7 +66,7 @@ typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, CACHEKEY key, void *value,
// associated with the key are returned. // associated with the key are returned.
typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs, LSN *written_lsn); typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs, LSN *written_lsn);
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, int (*close_userdata)(CACHEFILE, void*), int (*checkpoint_userdata)(CACHEFILE, void*)); void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, int (*close_userdata)(CACHEFILE, void*, char **/*error_string*/), int (*checkpoint_userdata)(CACHEFILE, void*));
// Effect: Store some cachefile-specific user data. When the last reference to a cachefile is closed, we call close_userdata(). // Effect: Store some cachefile-specific user data. When the last reference to a cachefile is closed, we call close_userdata().
// When the cachefile needs to be checkpointed, we call checkpoint_userdata(). // When the cachefile needs to be checkpointed, we call checkpoint_userdata().
// If userdata is already non-NULL, then we simply overwrite it. // If userdata is already non-NULL, then we simply overwrite it.
...@@ -140,7 +140,7 @@ int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newke ...@@ -140,7 +140,7 @@ int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newke
// close function does not return until all of the objects are evicted. The cachefile // close function does not return until all of the objects are evicted. The cachefile
// object is freed. // object is freed.
// Returns: 0 if success, otherwise returns an error number. // Returns: 0 if success, otherwise returns an error number.
int toku_cachefile_close (CACHEFILE*, TOKULOGGER); int toku_cachefile_close (CACHEFILE*, TOKULOGGER, char **error_string);
// Flush the cachefile. // Flush the cachefile.
// Effect: Flush everything owned by the cachefile from the cachetable. All dirty // Effect: Flush everything owned by the cachefile from the cachetable. All dirty
......
...@@ -74,6 +74,7 @@ struct tokulogger { ...@@ -74,6 +74,7 @@ struct tokulogger {
char buf[LOGGER_BUF_SIZE]; // used to marshall logbytes so we can use only a single write char buf[LOGGER_BUF_SIZE]; // used to marshall logbytes so we can use only a single write
int n_in_file; int n_in_file;
u_int32_t write_block_size; // How big should the blocks be written to various logs?
}; };
int toku_logger_find_next_unused_log_file(const char *directory, long long *result); int toku_logger_find_next_unused_log_file(const char *directory, long long *result);
......
...@@ -103,6 +103,7 @@ int toku_logger_create (TOKULOGGER *resultp) { ...@@ -103,6 +103,7 @@ int toku_logger_create (TOKULOGGER *resultp) {
result->n_in_file=0; result->n_in_file=0;
result->directory=0; result->directory=0;
result->checkpoint_lsns[0]=result->checkpoint_lsns[1]=(LSN){0}; result->checkpoint_lsns[0]=result->checkpoint_lsns[1]=(LSN){0};
result->write_block_size = BRT_DEFAULT_NODE_SIZE; // default logging size is the same as the default brt block size
*resultp=result; *resultp=result;
r = ml_init(&result->input_lock); if (r!=0) goto died0; r = ml_init(&result->input_lock); if (r!=0) goto died0;
r = ml_init(&result->output_lock); if (r!=0) goto died1; r = ml_init(&result->output_lock); if (r!=0) goto died1;
...@@ -223,6 +224,15 @@ int toku_logger_get_lg_max(TOKULOGGER logger, u_int32_t *lg_maxp) { ...@@ -223,6 +224,15 @@ int toku_logger_get_lg_max(TOKULOGGER logger, u_int32_t *lg_maxp) {
} }
int toku_logger_set_lg_bsize(TOKULOGGER logger, u_int32_t bsize) {
if (logger==0) return EINVAL; // no logger
if (logger->is_panicked) return EINVAL;
if (logger->is_open) return EINVAL;
if (bsize<=0 || bsize>(1<<30)) return EINVAL;
logger->write_block_size = bsize;
return 0;
}
// Enter holding both locks // Enter holding both locks
// Exit holding only the output_lock // Exit holding only the output_lock
static int do_write (TOKULOGGER logger, int do_fsync) { static int do_write (TOKULOGGER logger, int do_fsync) {
...@@ -989,7 +999,7 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) { ...@@ -989,7 +999,7 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
} }
int toku_maybe_spill_rollbacks (TOKUTXN txn) { int toku_maybe_spill_rollbacks (TOKUTXN txn) {
if (txn->rollentry_resident_bytecount>(1<<20)) { if (txn->rollentry_resident_bytecount>txn->logger->write_block_size) {
struct roll_entry *item; struct roll_entry *item;
ssize_t bufsize = txn->rollentry_resident_bytecount; ssize_t bufsize = txn->rollentry_resident_bytecount;
char *MALLOC_N(bufsize, buf); char *MALLOC_N(bufsize, buf);
......
...@@ -38,6 +38,7 @@ LSN toku_logger_last_lsn(TOKULOGGER); ...@@ -38,6 +38,7 @@ LSN toku_logger_last_lsn(TOKULOGGER);
int toku_logger_set_lg_max (TOKULOGGER logger, u_int32_t); int toku_logger_set_lg_max (TOKULOGGER logger, u_int32_t);
int toku_logger_get_lg_max (TOKULOGGER logger, u_int32_t *); int toku_logger_get_lg_max (TOKULOGGER logger, u_int32_t *);
int toku_logger_set_lg_bsize(TOKULOGGER, u_int32_t);
// Doesn't close the txn, just performs the commit operations. // Doesn't close the txn, just performs the commit operations.
int toku_logger_commit (TOKUTXN txn, int no_sync, void(*yield)(void*yield_v), void*yield_v); int toku_logger_commit (TOKUTXN txn, int no_sync, void(*yield)(void*yield_v), void*yield_v);
......
...@@ -36,7 +36,7 @@ void toku_recover_cleanup (void) { ...@@ -36,7 +36,7 @@ void toku_recover_cleanup (void) {
int i; int i;
for (i=0; i<n_cf_pairs; i++) { for (i=0; i<n_cf_pairs; i++) {
if (cf_pairs[i].brt) { if (cf_pairs[i].brt) {
int r = toku_close_brt(cf_pairs[i].brt, 0); int r = toku_close_brt(cf_pairs[i].brt, 0, 0);
//r = toku_cachefile_close(&cf_pairs[i].cf); //r = toku_cachefile_close(&cf_pairs[i].cf);
assert(r==0); assert(r==0);
} }
...@@ -132,6 +132,8 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,L ...@@ -132,6 +132,8 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,L
struct brt_header *MALLOC(h); struct brt_header *MALLOC(h);
assert(h); assert(h);
h->dirty=0; h->dirty=0;
h->panic=0;
h->panic_string=0;
XMALLOC(h->flags_array); XMALLOC(h->flags_array);
h->flags_array[0] = header.flags; h->flags_array[0] = header.flags;
h->nodesize = header.nodesize; h->nodesize = header.nodesize;
...@@ -485,7 +487,7 @@ toku_recover_brtclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) { ...@@ -485,7 +487,7 @@ toku_recover_brtclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) {
assert(r==0); assert(r==0);
// Bump up the reference count // Bump up the reference count
toku_cachefile_refup(pair->cf); toku_cachefile_refup(pair->cf);
r = toku_close_brt(pair->brt, 0); r = toku_close_brt(pair->brt, 0, 0);
assert(r==0); assert(r==0);
pair->brt=0; pair->brt=0;
toku_free_BYTESTRING(fname); toku_free_BYTESTRING(fname);
...@@ -496,7 +498,7 @@ toku_recover_cfclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) { ...@@ -496,7 +498,7 @@ toku_recover_cfclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) {
int i; int i;
for (i=0; i<n_cf_pairs; i++) { for (i=0; i<n_cf_pairs; i++) {
if (filenum.fileid==cf_pairs[i].filenum.fileid) { if (filenum.fileid==cf_pairs[i].filenum.fileid) {
int r = toku_cachefile_close(&cf_pairs[i].cf, 0); int r = toku_cachefile_close(&cf_pairs[i].cf, 0, 0);
assert(r==0); assert(r==0);
cf_pairs[i] = cf_pairs[n_cf_pairs-1]; cf_pairs[i] = cf_pairs[n_cf_pairs-1];
n_cf_pairs--; n_cf_pairs--;
......
...@@ -84,7 +84,7 @@ static int do_insertion (enum brt_cmd_type type, TXNID xid, FILENUM filenum, BYT ...@@ -84,7 +84,7 @@ static int do_insertion (enum brt_cmd_type type, TXNID xid, FILENUM filenum, BYT
BRT brt = brtv; BRT brt = brtv;
r = toku_brt_root_put_cmd(brt, &brtcmd, txn->logger); r = toku_brt_root_put_cmd(brt, &brtcmd, txn->logger);
} }
return toku_cachefile_close(&cf, toku_txn_logger(txn)); return toku_cachefile_close(&cf, toku_txn_logger(txn), 0);
} }
...@@ -92,7 +92,7 @@ static int do_nothing_with_filenum(TOKUTXN txn, FILENUM filenum) { ...@@ -92,7 +92,7 @@ static int do_nothing_with_filenum(TOKUTXN txn, FILENUM filenum) {
CACHEFILE cf; CACHEFILE cf;
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf); int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
assert(r==0); assert(r==0);
return toku_cachefile_close(&cf, toku_txn_logger(txn)); return toku_cachefile_close(&cf, toku_txn_logger(txn), 0);
} }
...@@ -316,7 +316,7 @@ toku_rollback_tablelock_on_empty_table (FILENUM filenum, TOKUTXN txn, YIELDF UU( ...@@ -316,7 +316,7 @@ toku_rollback_tablelock_on_empty_table (FILENUM filenum, TOKUTXN txn, YIELDF UU(
BRT brt = brtv; BRT brt = brtv;
r = toku_brt_truncate(brt); r = toku_brt_truncate(brt);
return toku_cachefile_close(&cf, toku_txn_logger(txn)); return toku_cachefile_close(&cf, toku_txn_logger(txn), 0);
} }
int int
......
...@@ -35,7 +35,7 @@ static void setup (void) { ...@@ -35,7 +35,7 @@ static void setup (void) {
static void toku_shutdown (void) { static void toku_shutdown (void) {
int r; int r;
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
} }
static void long_long_to_array (unsigned char *a, unsigned long long l) { static void long_long_to_array (unsigned char *a, unsigned long long l) {
......
...@@ -52,6 +52,7 @@ static void test_serialize(void) { ...@@ -52,6 +52,7 @@ static void test_serialize(void) {
struct block_translation_pair *XMALLOC_N(21, btps); struct block_translation_pair *XMALLOC_N(21, btps);
memset(btps, 0, sizeof(btps)); memset(btps, 0, sizeof(btps));
brt->h = brt_h; brt->h = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->translated_blocknum_limit = 1; brt_h->translated_blocknum_limit = 1;
brt_h->block_translation = btps; brt_h->block_translation = btps;
brt_h->block_translation[20].diskoff = 4096; brt_h->block_translation[20].diskoff = 4096;
......
...@@ -70,7 +70,7 @@ static void test_multiple_brt_cursor_dbts(int n, DB *db) { ...@@ -70,7 +70,7 @@ static void test_multiple_brt_cursor_dbts(int n, DB *db) {
assert(r == 0); assert(r == 0);
} }
r = toku_close_brt(brt, 0); r = toku_close_brt(brt, 0, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
......
...@@ -131,7 +131,7 @@ static void test_brt_cursor_first(int n, DB *db) { ...@@ -131,7 +131,7 @@ static void test_brt_cursor_first(int n, DB *db) {
else else
assert_cursor_value(brt, DB_FIRST, 0); assert_cursor_value(brt, DB_FIRST, 0);
r = toku_close_brt(brt, 0); r = toku_close_brt(brt, 0, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -172,7 +172,7 @@ static void test_brt_cursor_last(int n, DB *db) { ...@@ -172,7 +172,7 @@ static void test_brt_cursor_last(int n, DB *db) {
else else
assert_cursor_value(brt, DB_LAST, n-1); assert_cursor_value(brt, DB_LAST, n-1);
r = toku_close_brt(brt, 0); r = toku_close_brt(brt, 0, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -215,7 +215,7 @@ static void test_brt_cursor_first_last(int n, DB *db) { ...@@ -215,7 +215,7 @@ static void test_brt_cursor_first_last(int n, DB *db) {
} else } else
assert_cursor_first_last(brt, 0, n-1); assert_cursor_first_last(brt, 0, n-1);
r = toku_close_brt(brt, 0); r = toku_close_brt(brt, 0, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -259,7 +259,7 @@ static void test_brt_cursor_rfirst(int n, DB *db) { ...@@ -259,7 +259,7 @@ static void test_brt_cursor_rfirst(int n, DB *db) {
else else
assert_cursor_value(brt, DB_FIRST, 0); assert_cursor_value(brt, DB_FIRST, 0);
r = toku_close_brt(brt, 0); r = toku_close_brt(brt, 0, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -330,7 +330,7 @@ static void test_brt_cursor_walk(int n, DB *db) { ...@@ -330,7 +330,7 @@ static void test_brt_cursor_walk(int n, DB *db) {
/* walk the tree */ /* walk the tree */
assert_cursor_walk(brt, n); assert_cursor_walk(brt, n);
r = toku_close_brt(brt, 0); r = toku_close_brt(brt, 0, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -402,7 +402,7 @@ static void test_brt_cursor_rwalk(int n, DB *db) { ...@@ -402,7 +402,7 @@ static void test_brt_cursor_rwalk(int n, DB *db) {
/* walk the tree */ /* walk the tree */
assert_cursor_rwalk(brt, n); assert_cursor_rwalk(brt, n);
r = toku_close_brt(brt, 0); r = toku_close_brt(brt, 0, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -489,7 +489,7 @@ static void test_brt_cursor_rand(int n, DB *db) { ...@@ -489,7 +489,7 @@ static void test_brt_cursor_rand(int n, DB *db) {
/* walk the tree */ /* walk the tree */
assert_cursor_walk_inorder(brt, n); assert_cursor_walk_inorder(brt, n);
r = toku_close_brt(brt, 0); r = toku_close_brt(brt, 0, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -570,7 +570,7 @@ static void test_brt_cursor_split(int n, DB *db) { ...@@ -570,7 +570,7 @@ static void test_brt_cursor_split(int n, DB *db) {
r = toku_brt_cursor_close(cursor); r = toku_brt_cursor_close(cursor);
assert(r==0); assert(r==0);
r = toku_close_brt(brt, 0); r = toku_close_brt(brt, 0, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -604,7 +604,7 @@ static void test_multiple_brt_cursors(int n, DB *db) { ...@@ -604,7 +604,7 @@ static void test_multiple_brt_cursors(int n, DB *db) {
assert(r == 0); assert(r == 0);
} }
r = toku_close_brt(brt, 0); r = toku_close_brt(brt, 0, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -700,7 +700,7 @@ static void test_multiple_brt_cursor_walk(int n, DB *db) { ...@@ -700,7 +700,7 @@ static void test_multiple_brt_cursor_walk(int n, DB *db) {
assert(r == 0); assert(r == 0);
} }
r = toku_close_brt(brt, 0); r = toku_close_brt(brt, 0, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -772,7 +772,7 @@ static void test_brt_cursor_set(int n, int cursor_op, DB *db) { ...@@ -772,7 +772,7 @@ static void test_brt_cursor_set(int n, int cursor_op, DB *db) {
r = toku_brt_cursor_close(cursor); r = toku_brt_cursor_close(cursor);
assert(r==0); assert(r==0);
r = toku_close_brt(brt, 0); r = toku_close_brt(brt, 0, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -838,7 +838,7 @@ static void test_brt_cursor_set_range(int n, DB *db) { ...@@ -838,7 +838,7 @@ static void test_brt_cursor_set_range(int n, DB *db) {
r = toku_brt_cursor_close(cursor); r = toku_brt_cursor_close(cursor);
assert(r==0); assert(r==0);
r = toku_close_brt(brt, 0); r = toku_close_brt(brt, 0, 0);
assert(r==0); assert(r==0);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
...@@ -899,7 +899,7 @@ static void test_brt_cursor_delete(int n, DB *db) { ...@@ -899,7 +899,7 @@ static void test_brt_cursor_delete(int n, DB *db) {
error = toku_brt_cursor_close(cursor); error = toku_brt_cursor_close(cursor);
assert(error == 0); assert(error == 0);
error = toku_close_brt(brt, 0); error = toku_close_brt(brt, 0, 0);
assert(error == 0); assert(error == 0);
error = toku_cachetable_close(&ct); error = toku_cachetable_close(&ct);
...@@ -1002,7 +1002,7 @@ static void test_brt_cursor_get_both(int n, DB *db) { ...@@ -1002,7 +1002,7 @@ static void test_brt_cursor_get_both(int n, DB *db) {
error = toku_brt_cursor_close(cursor); error = toku_brt_cursor_close(cursor);
assert(error == 0); assert(error == 0);
error = toku_close_brt(brt, 0); error = toku_close_brt(brt, 0, 0);
assert(error == 0); assert(error == 0);
error = toku_cachetable_close(&ct); error = toku_cachetable_close(&ct);
......
...@@ -22,7 +22,7 @@ static void test_named_db (void) { ...@@ -22,7 +22,7 @@ static void test_named_db (void) {
toku_brt_insert(t0, toku_fill_dbt(&k, "good", 5), toku_fill_dbt(&v, "day", 4), null_txn); assert(r==0); toku_brt_insert(t0, toku_fill_dbt(&k, "good", 5), toku_fill_dbt(&v, "day", 4), null_txn); assert(r==0);
r = toku_close_brt(t0, 0); assert(r==0); r = toku_close_brt(t0, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free(); toku_memory_check_all_free();
...@@ -37,7 +37,7 @@ static void test_named_db (void) { ...@@ -37,7 +37,7 @@ static void test_named_db (void) {
assert(strcmp(v.data,"day")==0); assert(strcmp(v.data,"day")==0);
} }
r = toku_close_brt(t0, 0); assert(r==0); r = toku_close_brt(t0, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free(); toku_memory_check_all_free();
} }
......
This diff is collapsed.
...@@ -23,7 +23,7 @@ static void test0 (void) { ...@@ -23,7 +23,7 @@ static void test0 (void) {
assert(r==0); assert(r==0);
//printf("%s:%d test0\n", __FILE__, __LINE__); //printf("%s:%d test0\n", __FILE__, __LINE__);
//printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced); //printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced);
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0, 0); assert(r==0);
//printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced); //printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced);
r = toku_cachetable_close(&ct); r = toku_cachetable_close(&ct);
assert(r==0); assert(r==0);
......
...@@ -27,7 +27,7 @@ static void test1 (void) { ...@@ -27,7 +27,7 @@ static void test1 (void) {
assert(strcmp(v.data, "there")==0); assert(strcmp(v.data, "there")==0);
assert(v.size==6); assert(v.size==6);
} }
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free(); toku_memory_check_all_free();
if (verbose) printf("test1 ok\n"); if (verbose) printf("test1 ok\n");
......
...@@ -41,7 +41,7 @@ static void test2 (int memcheck, int limit) { ...@@ -41,7 +41,7 @@ static void test2 (int memcheck, int limit) {
} }
if (verbose) printf("%s:%d inserted\n", __FILE__, __LINE__); if (verbose) printf("%s:%d inserted\n", __FILE__, __LINE__);
r = toku_verify_brt(t); assert(r==0); r = toku_verify_brt(t); assert(r==0);
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free(); toku_memory_check_all_free();
if (verbose) printf("test2 ok\n"); if (verbose) printf("test2 ok\n");
......
...@@ -30,7 +30,7 @@ static void test3 (int nodesize, int count, int memcheck) { ...@@ -30,7 +30,7 @@ static void test3 (int nodesize, int count, int memcheck) {
toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn); toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
} }
r = toku_verify_brt(t); assert(r==0); r = toku_verify_brt(t); assert(r==0);
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free(); toku_memory_check_all_free();
gettimeofday(&t1, 0); gettimeofday(&t1, 0);
......
...@@ -30,7 +30,7 @@ static void test4 (int nodesize, int count, int memcheck) { ...@@ -30,7 +30,7 @@ static void test4 (int nodesize, int count, int memcheck) {
toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn); toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
} }
r = toku_verify_brt(t); assert(r==0); r = toku_verify_brt(t); assert(r==0);
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free(); toku_memory_check_all_free();
gettimeofday(&t1, 0); gettimeofday(&t1, 0);
......
...@@ -49,7 +49,7 @@ static void test5 (void) { ...@@ -49,7 +49,7 @@ static void test5 (void) {
} }
if (verbose) printf("\n"); if (verbose) printf("\n");
toku_free(values); toku_free(values);
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free(); toku_memory_check_all_free();
} }
......
...@@ -67,7 +67,7 @@ cachetable_count_pinned_test (int n) { ...@@ -67,7 +67,7 @@ cachetable_count_pinned_test (int n) {
assert(toku_cachefile_count_pinned(f1, 1) == 0); assert(toku_cachefile_count_pinned(f1, 1) == 0);
toku_cachetable_verify(ct); toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0); r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0); r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
} }
......
...@@ -75,7 +75,7 @@ cachetable_debug_test (int n) { ...@@ -75,7 +75,7 @@ cachetable_debug_test (int n) {
if (verbose) print_hash_histogram(); if (verbose) print_hash_histogram();
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0); r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0); r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
} }
......
...@@ -39,7 +39,7 @@ cachetable_fd_test (void) { ...@@ -39,7 +39,7 @@ cachetable_fd_test (void) {
r = toku_cachefile_of_filenum(ct, fn, &newcf); r = toku_cachefile_of_filenum(ct, fn, &newcf);
assert(r == ENOENT); assert(r == ENOENT);
r = toku_cachefile_close(&cf, NULL_LOGGER); assert(r == 0 && cf == 0); r = toku_cachefile_close(&cf, NULL_LOGGER, 0); assert(r == 0 && cf == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0); r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
} }
......
...@@ -94,8 +94,8 @@ test_cachetable_flush (int n) { ...@@ -94,8 +94,8 @@ test_cachetable_flush (int n) {
assert(r == 0); assert(r == 0);
} }
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0); r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachefile_close(&f2, NULL_LOGGER); assert(r == 0 && f2 == 0); r = toku_cachefile_close(&f2, NULL_LOGGER, 0); assert(r == 0 && f2 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0); r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
} }
......
...@@ -72,7 +72,7 @@ cachetable_getandpin_test (int n) { ...@@ -72,7 +72,7 @@ cachetable_getandpin_test (int n) {
} }
toku_cachetable_verify(ct); toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0); r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0); r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
} }
......
...@@ -79,7 +79,7 @@ cachetable_put_test (int n) { ...@@ -79,7 +79,7 @@ cachetable_put_test (int n) {
r = toku_cachetable_unpin(f1, k, toku_cachetable_hash(f1, k), CACHETABLE_CLEAN, 1); r = toku_cachetable_unpin(f1, k, toku_cachetable_hash(f1, k), CACHETABLE_CLEAN, 1);
assert(r != 0); assert(r != 0);
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0); r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0); r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
} }
......
...@@ -154,7 +154,7 @@ static void test_rename (void) { ...@@ -154,7 +154,7 @@ static void test_rename (void) {
r = toku_cachetable_rename(f, okey, nkey); r = toku_cachetable_rename(f, okey, nkey);
assert(r != 0); assert(r != 0);
r = toku_cachefile_close(&f, 0); r = toku_cachefile_close(&f, 0, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_close(&t); r = toku_cachetable_close(&t);
assert(r == 0); assert(r == 0);
......
...@@ -89,7 +89,7 @@ static void readit (void) { ...@@ -89,7 +89,7 @@ static void readit (void) {
r=toku_cachetable_get_and_pin(f, key, fullhash, &block, &current_size, f_flush, f_fetch, 0); assert(r==0); r=toku_cachetable_get_and_pin(f, key, fullhash, &block, &current_size, f_flush, f_fetch, 0); assert(r==0);
r=toku_cachetable_unpin(f, key, fullhash, 0, BLOCKSIZE); assert(r==0); r=toku_cachetable_unpin(f, key, fullhash, 0, BLOCKSIZE); assert(r==0);
} }
r = toku_cachefile_close(&f, 0); assert(r == 0); r = toku_cachefile_close(&f, 0, 0); assert(r == 0);
r = toku_cachetable_close(&t); assert(r == 0); r = toku_cachetable_close(&t); assert(r == 0);
gettimeofday(&end, 0); gettimeofday(&end, 0);
toku_os_get_process_times(&end_usertime, &end_systime); toku_os_get_process_times(&end_usertime, &end_systime);
......
...@@ -283,7 +283,7 @@ static void test0 (void) { ...@@ -283,7 +283,7 @@ static void test0 (void) {
expectN(7); expectN(7);
expectN(6); expectN(6);
expectN(1); expectN(1);
r=toku_cachefile_close(&f, 0); r=toku_cachefile_close(&f, 0, 0);
assert(r==0); assert(r==0);
r=toku_cachetable_close(&t); r=toku_cachetable_close(&t);
assert(r==0); assert(r==0);
...@@ -349,7 +349,7 @@ static void test_nested_pin (void) { ...@@ -349,7 +349,7 @@ static void test_nested_pin (void) {
r = toku_cachetable_unpin(f, make_blocknum(2), f2hash, 0, test_object_size); r = toku_cachetable_unpin(f, make_blocknum(2), f2hash, 0, test_object_size);
assert(r==0); assert(r==0);
// toku_os_usleep(1*1000000); // toku_os_usleep(1*1000000);
r = toku_cachefile_close(&f, 0); assert(r==0); r = toku_cachefile_close(&f, 0, 0); assert(r==0);
r = toku_cachetable_close(&t); assert(r==0); r = toku_cachetable_close(&t); assert(r==0);
} }
...@@ -415,14 +415,14 @@ static void test_multi_filehandles (void) { ...@@ -415,14 +415,14 @@ static void test_multi_filehandles (void) {
r = toku_cachetable_unpin(f1, make_blocknum(1), toku_cachetable_hash(f1, make_blocknum(1)), CACHETABLE_CLEAN, 0); assert(r==0); r = toku_cachetable_unpin(f1, make_blocknum(1), toku_cachetable_hash(f1, make_blocknum(1)), CACHETABLE_CLEAN, 0); assert(r==0);
r = toku_cachetable_unpin(f1, make_blocknum(2), toku_cachetable_hash(f1, make_blocknum(2)), CACHETABLE_CLEAN, 0); assert(r==0); r = toku_cachetable_unpin(f1, make_blocknum(2), toku_cachetable_hash(f1, make_blocknum(2)), CACHETABLE_CLEAN, 0); assert(r==0);
r = toku_cachefile_close(&f1, 0); assert(r==0); r = toku_cachefile_close(&f1, 0, 0); assert(r==0);
r = toku_cachetable_unpin(f2, make_blocknum(1), toku_cachetable_hash(f2, make_blocknum(1)), CACHETABLE_CLEAN, 0); assert(r==0); r = toku_cachetable_unpin(f2, make_blocknum(1), toku_cachetable_hash(f2, make_blocknum(1)), CACHETABLE_CLEAN, 0); assert(r==0);
r = toku_cachetable_unpin(f2, make_blocknum(2), toku_cachetable_hash(f2, make_blocknum(2)), CACHETABLE_CLEAN, 0); assert(r==0); r = toku_cachetable_unpin(f2, make_blocknum(2), toku_cachetable_hash(f2, make_blocknum(2)), CACHETABLE_CLEAN, 0); assert(r==0);
r = toku_cachefile_close(&f2, 0); assert(r==0); r = toku_cachefile_close(&f2, 0, 0); assert(r==0);
r = toku_cachetable_unpin(f3, make_blocknum(2), toku_cachetable_hash(f3, make_blocknum(2)), CACHETABLE_CLEAN, 0); assert(r==0); r = toku_cachetable_unpin(f3, make_blocknum(2), toku_cachetable_hash(f3, make_blocknum(2)), CACHETABLE_CLEAN, 0); assert(r==0);
r = toku_cachefile_close(&f3, 0); assert(r==0); r = toku_cachefile_close(&f3, 0, 0); assert(r==0);
r = toku_cachetable_close(&t); assert(r==0); r = toku_cachetable_close(&t); assert(r==0);
} }
...@@ -545,7 +545,7 @@ static void test_dirty() { ...@@ -545,7 +545,7 @@ static void test_dirty() {
assert(dirty == 1); assert(dirty == 1);
assert(pinned == 0); assert(pinned == 0);
r = toku_cachefile_close(&f, 0); r = toku_cachefile_close(&f, 0, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_close(&t); r = toku_cachetable_close(&t);
assert(r == 0); assert(r == 0);
...@@ -620,7 +620,7 @@ static void test_size_resize() { ...@@ -620,7 +620,7 @@ static void test_size_resize() {
r = toku_cachetable_unpin(f, key, hkey, CACHETABLE_CLEAN, new_size); r = toku_cachetable_unpin(f, key, hkey, CACHETABLE_CLEAN, new_size);
assert(r == 0); assert(r == 0);
r = toku_cachefile_close(&f, 0); r = toku_cachefile_close(&f, 0, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_close(&t); r = toku_cachetable_close(&t);
assert(r == 0); assert(r == 0);
...@@ -688,7 +688,7 @@ static void test_size_flush() { ...@@ -688,7 +688,7 @@ static void test_size_flush() {
assert(r == 0); assert(r == 0);
} }
r = toku_cachefile_close(&f, 0); r = toku_cachefile_close(&f, 0, 0);
assert(r == 0); assert(r == 0);
r = toku_cachetable_close(&t); r = toku_cachetable_close(&t);
assert(r == 0); assert(r == 0);
......
...@@ -222,13 +222,13 @@ static void test_chaining (void) { ...@@ -222,13 +222,13 @@ static void test_chaining (void) {
//printf("Close %d (%p), now n_present=%d\n", i, f[i], n_present); //printf("Close %d (%p), now n_present=%d\n", i, f[i], n_present);
//print_ints(); //print_ints();
CACHEFILE oldcf=f[i]; CACHEFILE oldcf=f[i];
r = toku_cachefile_close(&f[i], 0); assert(r==0); r = toku_cachefile_close(&f[i], 0, 0); assert(r==0);
file_is_not_present(oldcf); file_is_not_present(oldcf);
r = toku_cachetable_openf(&f[i], ct, fname[i], O_RDWR, S_IRWXU|S_IRWXG|S_IRWXO); assert(r==0); r = toku_cachetable_openf(&f[i], ct, fname[i], O_RDWR, S_IRWXU|S_IRWXG|S_IRWXO); assert(r==0);
} }
} }
for (i=0; i<N_FILES; i++) { for (i=0; i<N_FILES; i++) {
r = toku_cachefile_close(&f[i], 0); assert(r==0); r = toku_cachefile_close(&f[i], 0, 0); assert(r==0);
} }
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
} }
......
...@@ -69,7 +69,7 @@ cachetable_unpin_test (int n) { ...@@ -69,7 +69,7 @@ cachetable_unpin_test (int n) {
r = toku_cachetable_unpin(f1, k, toku_cachetable_hash(f1, k), CACHETABLE_CLEAN, 1); r = toku_cachetable_unpin(f1, k, toku_cachetable_hash(f1, k), CACHETABLE_CLEAN, 1);
assert(r != 0); assert(r != 0);
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0); r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0); r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
} }
......
...@@ -50,7 +50,7 @@ static void test_flat (void) { ...@@ -50,7 +50,7 @@ static void test_flat (void) {
assert(less>=prevless); prevless=less; assert(less>=prevless); prevless=less;
assert(greater<=prevgreater); prevgreater=greater; assert(greater<=prevgreater); prevgreater=greater;
} }
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
} }
......
...@@ -69,7 +69,7 @@ static void test_flat (void) { ...@@ -69,7 +69,7 @@ static void test_flat (void) {
assert(equal==0); assert(equal==0);
assert(less+equal+greater == limit*ilimit); assert(less+equal+greater == limit*ilimit);
} }
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
} }
......
...@@ -54,7 +54,7 @@ static void test_flat (void) { ...@@ -54,7 +54,7 @@ static void test_flat (void) {
assert(less>=prevless); prevless = less; assert(less>=prevless); prevless = less;
assert(greater<=prevgreater); prevgreater = greater; assert(greater<=prevgreater); prevgreater = greater;
} }
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
} }
......
...@@ -58,7 +58,7 @@ static void test_flat (void) { ...@@ -58,7 +58,7 @@ static void test_flat (void) {
assert(equal==0); assert(equal==0);
assert(less+equal+greater == limit); assert(less+equal+greater == limit);
} }
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
} }
......
...@@ -48,7 +48,7 @@ test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute ...@@ -48,7 +48,7 @@ test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute
assert(strcmp(val.data, "0002")==0); assert(strcmp(val.data, "0002")==0);
r = toku_brt_cursor_close(cursor); assert(r==0); r = toku_brt_cursor_close(cursor); assert(r==0);
r = toku_close_brt(brt, 0); assert(r==0); r = toku_close_brt(brt, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
return 0; return 0;
} }
...@@ -39,7 +39,7 @@ doit (void) { ...@@ -39,7 +39,7 @@ doit (void) {
r = toku_brt_lookup(t, toku_fill_dbt(&k, "a", 2), toku_init_dbt(&v)); r = toku_brt_lookup(t, toku_fill_dbt(&k, "a", 2), toku_init_dbt(&v));
assert(r==DB_NOTFOUND); assert(r==DB_NOTFOUND);
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
} }
......
...@@ -30,7 +30,7 @@ test_overflow (void) { ...@@ -30,7 +30,7 @@ test_overflow (void) {
char key[]={(char)('a'+i), 0}; char key[]={(char)('a'+i), 0};
toku_brt_insert(t, toku_fill_dbt(&k, key, 2), toku_fill_dbt(&v,buf,sizeof(buf)), null_txn); toku_brt_insert(t, toku_fill_dbt(&k, key, 2), toku_fill_dbt(&v,buf,sizeof(buf)), null_txn);
} }
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
} }
......
...@@ -56,7 +56,7 @@ doit (void) { ...@@ -56,7 +56,7 @@ doit (void) {
r = toku_brt_lookup(t, &k, &v); r = toku_brt_lookup(t, &k, &v);
assert(r==0); assert(r==0);
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
} }
......
...@@ -130,7 +130,7 @@ doit (int ksize __attribute__((__unused__))) { ...@@ -130,7 +130,7 @@ doit (int ksize __attribute__((__unused__))) {
r = toku_testsetup_root(t, anode); r = toku_testsetup_root(t, anode);
assert(r==0); assert(r==0);
r = toku_close_brt(t, 0); assert(r==0); r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0); r = toku_cachetable_close(&ct); assert(r==0);
//printf("ksize=%d, unused\n", ksize); //printf("ksize=%d, unused\n", ksize);
......
...@@ -8,23 +8,25 @@ EXPORTS ...@@ -8,23 +8,25 @@ EXPORTS
db_env_set_func_malloc @7 db_env_set_func_malloc @7
db_env_set_func_realloc @8 db_env_set_func_realloc @8
db_env_set_func_free @9 db_env_set_func_free @9
toku_os_get_max_rss @10 db_env_set_func_pwrite @10
toku_ydb_error_all_cases @11 db_env_set_func_write @11
toku_set_trace_file @12 toku_os_get_max_rss @12
toku_close_trace_file @13 toku_ydb_error_all_cases @13
toku_add_trace_mem @14 toku_set_trace_file @14
toku_print_trace_mem @15 toku_close_trace_file @15
toku_free @16 toku_add_trace_mem @16
toku_malloc @17 toku_print_trace_mem @17
toku_os_get_file_size @18 toku_free @18
toku_os_getpid @19 toku_malloc @19
toku_os_gettid @20 toku_os_get_file_size @20
toku_os_initialize_settings @21 toku_os_getpid @21
toku_os_is_absolute_name @22 toku_os_gettid @22
toku_os_mkdir @23 toku_os_initialize_settings @23
toku_realloc @24 toku_os_is_absolute_name @24
toku_strdup @25 toku_os_mkdir @25
dlmalloc @26 toku_realloc @26
dlrealloc @27 toku_strdup @27
dlfree @28 dlmalloc @28
dlrealloc @29
dlfree @30
...@@ -9,6 +9,8 @@ ...@@ -9,6 +9,8 @@
db_env_set_func_malloc; db_env_set_func_malloc;
db_env_set_func_realloc; db_env_set_func_realloc;
db_env_set_func_free; db_env_set_func_free;
db_env_set_func_pwrite;
db_env_set_func_write;
toku_os_get_max_rss; toku_os_get_max_rss;
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include "test.h"
/* Simulate disk full by making pwrite return ENOSPC */
/* Strategy, repeatedly run a test, and on the Ith run of the test make the Ith write fail. */
#include <fcntl.h>
#include <errno.h>
#define DOERR(r) do { if (r!=0) { did_fail=1; fprintf(stderr, "%s:%d error %d (%s)\n", __FILE__, __LINE__, r, db_strerror(r)); }} while (0)
static void
do_db_work(void) {
int r;
int did_fail=0;
{
system("rm -rf " ENVDIR);
r=toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0);
DB_ENV *env;
DB_TXN *tid;
DB *db;
DBT key,data;
r=db_env_create(&env, 0); assert(r==0);
env->set_errfile(env, stderr);
// Don't set the lg bsize for the small experiment.
r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE|DB_THREAD, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r=db_create(&db, env, 0); CKERR(r);
r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
r=db->open(db, tid, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); DOERR(r);
if (did_fail) {
r=tid->abort(tid); CKERR(r);
} else {
r=tid->commit(tid, 0); DOERR(r);
}
if (did_fail) goto shutdown1;
r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
r=db->put(db, tid, dbt_init(&key, "a", 2), dbt_init(&data, "b", 2), 0); DOERR(r);
if (did_fail) {
r = tid->abort(tid); CKERR(r);
} else {
r=tid->commit(tid, 0); DOERR(r);
}
shutdown1:
r=db->close(db, 0); DOERR(r);
r=env->close(env, 0); DOERR(r);
if (did_fail) return;
}
{
system("rm -rf " ENVDIR);
r=toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0);
DB_ENV *env;
DB_TXN *tid;
DB *db;
DBT key,data;
// Repeat with more put operations
r=db_env_create(&env, 0); assert(r==0);
env->set_errfile(env, stderr);
r=env->set_lg_bsize(env, 4096); assert(r==0);
r=env->set_cachesize(env, 0, 1, 1); assert(r==0);
r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE|DB_THREAD, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r=db_create(&db, env, 0); CKERR(r);
r=db->set_pagesize(db, 4096);
r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
r=db->open(db, tid, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); DOERR(r);
if (did_fail) {
r = tid->abort(tid); CKERR(r);
} else {
r=tid->commit(tid, 0); DOERR(r);
}
if (did_fail) goto shutdown2;
// Put an extra item in so that the rolltmp file will be created.
r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
r=db->put(db, tid, dbt_init(&key, "a", 2), dbt_init(&data, "b", 2), 0); DOERR(r);
if (did_fail) {
r=tid->abort(tid); CKERR(r);
} else {
r=tid->commit(tid, 0); DOERR(r);
}
if (did_fail) goto shutdown2;
r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
{
int i;
for (i=0; i<100; i++) {
int kvsize=50;
int kvsize_i = kvsize / sizeof(int);
int keyi[kvsize_i],vali[kvsize_i];
int j;
keyi[0] = vali[0] = toku_htonl(i);
for (j=1; j<kvsize_i; j++) {
keyi[j] = random();
vali[j] = random();
}
r=db->put(db, tid, dbt_init(&key, keyi, sizeof keyi), dbt_init(&data, vali, sizeof vali), 0);
DOERR(r);
if (did_fail) goto break_out_of_loop;
}
}
break_out_of_loop:
system("ls -l " ENVDIR);
if (did_fail) {
r=tid->abort(tid); CKERR(r);
} else {
r=tid->commit(tid, 0); DOERR(r);
}
shutdown2:
r=db->close(db, 0); DOERR(r);
r=env->close(env, 0); DOERR(r);
}
}
static int write_count = 0;
#define FAIL_NEVER 0x7FFFFFFF
static int fail_at = FAIL_NEVER;
static ssize_t
pwrite_counting_and_failing (int fd, const void *buf, size_t size, off_t off)
{
write_count++;
if (write_count>fail_at) {
errno = ENOSPC;
return -1;
} else {
return pwrite(fd, buf, size, off);
}
}
static ssize_t
write_counting_and_failing (int fd, const void *buf, size_t size)
{
write_count++;
if (write_count>fail_at) {
errno = ENOSPC;
return -1;
} else {
return write(fd, buf, size);
}
}
static void
do_writes_that_fail (void) {
db_env_set_func_pwrite(pwrite_counting_and_failing);
db_env_set_func_write (write_counting_and_failing);
write_count=0;
do_db_work();
printf("Write_count=%d\n", write_count);
int count = write_count;
// fail_at=83; write_count=0; do_db_work();
for (fail_at = 0; fail_at<count; fail_at++) {
printf("About to fail at %d:\n", fail_at);
write_count=0;
do_db_work();
}
// fail_at = FAIL_NEVER; write_count=0;
// do_db_work();
}
int
test_main (int argc, const char *argv[]) {
parse_args(argc, argv);
do_writes_that_fail();
return 0;
}
...@@ -46,7 +46,7 @@ def main(): ...@@ -46,7 +46,7 @@ def main():
exceptsymbols = {} exceptsymbols = {}
for n in [ "_init", "_fini", "_end", "_edata", "__bss_start" ]: for n in [ "_init", "_fini", "_end", "_edata", "__bss_start" ]:
exceptsymbols[n] = 1 exceptsymbols[n] = 1
for n in [ "db_env_create", "db_create", "db_strerror", "db_version", "log_compare", "db_env_set_func_fsync", "dlfree", "dlmalloc", "dlrealloc", "db_env_set_func_free", "db_env_set_func_malloc", "db_env_set_func_realloc" ]: for n in [ "db_env_create", "db_create", "db_strerror", "db_version", "log_compare", "db_env_set_func_fsync", "dlfree", "dlmalloc", "dlrealloc", "db_env_set_func_free", "db_env_set_func_malloc", "db_env_set_func_realloc", "db_env_set_func_pwrite", "db_env_set_func_write"]:
exceptsymbols[n] = 1 exceptsymbols[n] = 1
return checkglobals(libname, exceptsymbols, verbose) return checkglobals(libname, exceptsymbols, verbose)
......
...@@ -50,7 +50,8 @@ typedef void (*toku_env_errcall_t)(const DB_ENV *, const char *, const char *); ...@@ -50,7 +50,8 @@ typedef void (*toku_env_errcall_t)(const DB_ENV *, const char *, const char *);
#endif #endif
struct __toku_db_env_internal { struct __toku_db_env_internal {
int is_panicked; int is_panicked; // if nonzero, then its an error number
char *panic_string;
int ref_count; int ref_count;
u_int32_t open_flags; u_int32_t open_flags;
int open_mode; int open_mode;
......
...@@ -562,13 +562,35 @@ static int toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mo ...@@ -562,13 +562,35 @@ static int toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mo
} }
static int toku_env_close(DB_ENV * env, u_int32_t flags) { static int toku_env_close(DB_ENV * env, u_int32_t flags) {
// Even if the env is panicedk, try to close as much as we can.
int is_panicked = toku_env_is_panicked(env); int is_panicked = toku_env_is_panicked(env);
char *panic_string = env->i->panic_string;
env->i->panic_string = 0;
// Even if the env is panicked, try to close as much as we can.
int r0=0,r1=0; int r0=0,r1=0;
if (env->i->cachetable) if (env->i->cachetable) {
r0=toku_cachetable_close(&env->i->cachetable); r0=toku_cachetable_close(&env->i->cachetable);
if (env->i->logger) if (r0) {
toku_ydb_do_error(env, r0, "Cannot close environment (cachetable close error)\n");
}
}
if (env->i->logger) {
r1=toku_logger_close(&env->i->logger); r1=toku_logger_close(&env->i->logger);
if (r0==0 && r1) {
toku_ydb_do_error(env, r0, "Cannot close environment (logger close error)\n");
}
}
// Even if nothing else went wrong, but we were panicked, then raise an error.
// But if something else went wrong then raise that error (above)
if (is_panicked) {
if (r0==0 && r1==0) {
toku_ydb_do_error(env, is_panicked, "Cannot close environment due to previous error: %s\n", panic_string);
}
if (panic_string) toku_free(panic_string);
} else {
assert(panic_string==0);
}
if (env->i->data_dirs) { if (env->i->data_dirs) {
u_int32_t i; u_int32_t i;
assert(env->i->n_data_dirs > 0); assert(env->i->n_data_dirs > 0);
...@@ -589,8 +611,7 @@ static int toku_env_close(DB_ENV * env, u_int32_t flags) { ...@@ -589,8 +611,7 @@ static int toku_env_close(DB_ENV * env, u_int32_t flags) {
if (flags!=0) return EINVAL; if (flags!=0) return EINVAL;
if (r0) return r0; if (r0) return r0;
if (r1) return r1; if (r1) return r1;
if (is_panicked) return EINVAL; return is_panicked;
return 0;
} }
static int toku_env_log_archive(DB_ENV * env, char **list[], u_int32_t flags) { static int toku_env_log_archive(DB_ENV * env, char **list[], u_int32_t flags) {
...@@ -699,8 +720,7 @@ static int toku_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) { ...@@ -699,8 +720,7 @@ static int toku_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) {
static int toku_env_set_lg_bsize(DB_ENV * env, u_int32_t bsize) { static int toku_env_set_lg_bsize(DB_ENV * env, u_int32_t bsize) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
bsize=bsize; return toku_logger_set_lg_bsize(env->i->logger, bsize);
return toku_ydb_do_error(env, EINVAL, "TokuDB does not (yet) support ENV->set_lg_bsize\n");
} }
static int toku_env_set_lg_dir(DB_ENV * env, const char *dir) { static int toku_env_set_lg_dir(DB_ENV * env, const char *dir) {
...@@ -929,6 +949,7 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) { ...@@ -929,6 +949,7 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) {
if (result->i == 0) { r = ENOMEM; goto cleanup; } if (result->i == 0) { r = ENOMEM; goto cleanup; }
memset(result->i, 0, sizeof *result->i); memset(result->i, 0, sizeof *result->i);
result->i->is_panicked=0; result->i->is_panicked=0;
result->i->panic_string = 0;
result->i->ref_count = 1; result->i->ref_count = 1;
result->i->errcall = 0; result->i->errcall = 0;
result->i->errpfx = 0; result->i->errpfx = 0;
...@@ -1275,13 +1296,27 @@ static int toku_db_close(DB * db, u_int32_t flags) { ...@@ -1275,13 +1296,27 @@ static int toku_db_close(DB * db, u_int32_t flags) {
} }
} }
flags=flags; flags=flags;
int r = toku_close_brt(db->i->brt, db->dbenv->i->logger); char *error_string = 0;
if (r != 0) int r1 = toku_close_brt(db->i->brt, db->dbenv->i->logger, &error_string);
return r; if (r1) {
db->dbenv->i->is_panicked = r1; // Panicking the whole environment may be overkill, but I'm not sure what else to do.
db->dbenv->i->panic_string = error_string;
if (error_string) {
toku_ydb_do_error(db->dbenv, r1, "%s\n", error_string);
} else {
toku_ydb_do_error(db->dbenv, r1, "Closing file\n");
}
error_string=0;
}
assert(error_string==0);
int r2 = 0;
if (db->i->db_id) { toku_db_id_remove_ref(&db->i->db_id); } if (db->i->db_id) { toku_db_id_remove_ref(&db->i->db_id); }
if (db->i->lt) { if (db->i->lt) {
r = toku_lt_remove_ref(db->i->lt); r2 = toku_lt_remove_ref(db->i->lt);
if (r!=0) { return r; } if (r2) {
db->dbenv->i->is_panicked = r2; // Panicking the whole environment may be overkill, but I'm not sure what else to do.
db->dbenv->i->panic_string = 0;
}
} }
// printf("%s:%d %d=__toku_db_close(%p)\n", __FILE__, __LINE__, r, db); // printf("%s:%d %d=__toku_db_close(%p)\n", __FILE__, __LINE__, r, db);
// Even if panicked, let's close as much as we can. // Even if panicked, let's close as much as we can.
...@@ -1293,8 +1328,10 @@ static int toku_db_close(DB * db, u_int32_t flags) { ...@@ -1293,8 +1328,10 @@ static int toku_db_close(DB * db, u_int32_t flags) {
toku_free(db->i); toku_free(db->i);
toku_free(db); toku_free(db);
ydb_unref(); ydb_unref();
if (r==0 && is_panicked) return EINVAL; if (r1) return r1;
return r; if (r2) return r2;
if (is_panicked) return EINVAL;
return 0;
} }
/* Verify that an element from the secondary database is still consistent /* Verify that an element from the secondary database is still consistent
...@@ -2851,12 +2888,15 @@ finish: ...@@ -2851,12 +2888,15 @@ finish:
} }
static int toku_db_lt_panic(DB* db, int r) { static int toku_db_lt_panic(DB* db, int r) {
assert(r!=0);
assert(db && db->i && db->dbenv && db->dbenv->i); assert(db && db->i && db->dbenv && db->dbenv->i);
DB_ENV* env = db->dbenv; DB_ENV* env = db->dbenv;
env->i->is_panicked = 1; env->i->is_panicked = r;
if (r < 0) toku_ydb_do_error(env, 0, toku_lt_strerror((TOKU_LT_ERROR)r));
else toku_ydb_do_error(env, r, "Error in locktree.\n"); if (r < 0) env->i->panic_string = toku_strdup(toku_lt_strerror((TOKU_LT_ERROR)r));
return EINVAL; else env->i->panic_string = toku_strdup("Error in locktree.\n");
return toku_ydb_do_error(env, r, env->i->panic_string);
} }
static int toku_txn_add_lt(DB_TXN* txn, toku_lock_tree* lt) { static int toku_txn_add_lt(DB_TXN* txn, toku_lock_tree* lt) {
...@@ -3685,6 +3725,13 @@ int db_env_set_func_fsync (int (*fsync_function)(int)) { ...@@ -3685,6 +3725,13 @@ int db_env_set_func_fsync (int (*fsync_function)(int)) {
return toku_set_func_fsync(fsync_function); return toku_set_func_fsync(fsync_function);
} }
int db_env_set_func_pwrite (ssize_t (*pwrite_function)(int, const void *, size_t, off_t)) {
return toku_set_func_pwrite(pwrite_function);
}
int db_env_set_func_write (ssize_t (*write_function)(int, const void *, size_t)) {
return toku_set_func_write(write_function);
}
int db_env_set_func_malloc (void *(*f)(size_t)) { int db_env_set_func_malloc (void *(*f)(size_t)) {
return toku_set_func_malloc(f); return toku_set_func_malloc(f);
} }
......
...@@ -99,11 +99,14 @@ void *realloc(void*, size_t) __attribute__((__deprecated__)); ...@@ -99,11 +99,14 @@ void *realloc(void*, size_t) __attribute__((__deprecated__));
void *os_malloc(size_t); void *os_malloc(size_t);
void *os_realloc(void*,size_t); void *os_realloc(void*,size_t);
void os_free(void*); void os_free(void*);
ssize_t toku_os_pwrite (int fd, const void *buf, size_t len, off_t off);
int toku_set_func_fsync (int (*fsync_function)(int)); int toku_set_func_fsync (int (*fsync_function)(int));
int toku_set_func_malloc (void *(*)(size_t)); int toku_set_func_malloc (void *(*)(size_t));
int toku_set_func_realloc (void *(*)(void*,size_t)); int toku_set_func_realloc (void *(*)(void*,size_t));
int toku_set_func_free (void (*)(void*)); int toku_set_func_free (void (*)(void*));
int toku_set_func_pwrite (ssize_t (*pwrite_fun)(int, const void *, size_t, off_t));
int toku_set_func_write (ssize_t (*pwrite_fun)(int, const void *, size_t));
#if defined __cplusplus #if defined __cplusplus
}; };
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment