Commit 85f355d6 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Improve error messages and error handling when writes fail.

 * Fixes #78 ({{{lg_bsize}}} works)
 * Fixes #1323 (write errors)
Did
{{{
svn merge -r8967:9031 https://svn.tokutek.com/tokudb/toku/tokudb.1323a
}}}

(And I'll delete the 1323 branches in a moment.)


git-svn-id: file:///svn/toku/tokudb@9032 c7de825b-a66e-492c-adef-691d508d4ae1
parent 8eaa912f
......@@ -288,9 +288,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d
int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default")));
int toku_set_trace_file (char *fname) __attribute__((__visibility__("default")));
int toku_close_trace_file (void) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
#if defined(__cplusplus)
}
#endif
......
......@@ -304,9 +304,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d
int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default")));
int toku_set_trace_file (char *fname) __attribute__((__visibility__("default")));
int toku_close_trace_file (void) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
#if defined(__cplusplus)
}
#endif
......
......@@ -310,9 +310,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d
int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default")));
int toku_set_trace_file (char *fname) __attribute__((__visibility__("default")));
int toku_close_trace_file (void) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
#if defined(__cplusplus)
}
#endif
......
......@@ -310,9 +310,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d
int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default")));
int toku_set_trace_file (char *fname) __attribute__((__visibility__("default")));
int toku_close_trace_file (void) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
#if defined(__cplusplus)
}
#endif
......
......@@ -315,9 +315,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d
int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default")));
int toku_set_trace_file (char *fname) __attribute__((__visibility__("default")));
int toku_close_trace_file (void) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
#if defined(__cplusplus)
}
#endif
......
......@@ -358,9 +358,11 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
printf("int db_env_set_func_fsync (int (*)(int)) %s;\n", VISIBLE);
printf("int toku_set_trace_file (char *fname) %s;\n", VISIBLE);
printf("int toku_close_trace_file (void) %s;\n", VISIBLE);
printf("int db_env_set_func_free (void (*)(void*)) %s;\n", VISIBLE);
printf("int db_env_set_func_malloc (void *(*)(size_t)) %s;\n", VISIBLE);
printf("int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, off_t)) %s;\n", VISIBLE);
printf("int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) %s;\n", VISIBLE);
printf("int db_env_set_func_realloc (void *(*)(void*, size_t)) %s;\n", VISIBLE);
printf("int db_env_set_func_free (void (*)(void*)) %s;\n", VISIBLE);
printf("#if defined(__cplusplus)\n}\n#endif\n");
printf("#endif\n");
return 0;
......
......@@ -81,7 +81,7 @@ static void benchmark_setup (void) {
int r;
{
char unlink_cmd[strlen(dbdir) + strlen("rf -rf ") + 1];
char unlink_cmd[strlen(dbdir) + strlen("rm -rf ") + 1];
snprintf(unlink_cmd, sizeof(unlink_cmd), "rm -rf %s", dbdir);
//printf("unlink_cmd=%s\n", unlink_cmd);
system(unlink_cmd);
......
......@@ -310,9 +310,11 @@ int log_compare (const DB_LSN*, const DB_LSN *) __attribute__((__visibility__("d
int db_env_set_func_fsync (int (*)(int)) __attribute__((__visibility__("default")));
int toku_set_trace_file (char *fname) __attribute__((__visibility__("default")));
int toku_close_trace_file (void) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
int db_env_set_func_malloc (void *(*)(size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_pwrite (ssize_t (*)(int, const void *, size_t, off_t)) __attribute__((__visibility__("default")));
int db_env_set_func_write (ssize_t (*)(int, const void *, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_realloc (void *(*)(void*, size_t)) __attribute__((__visibility__("default")));
int db_env_set_func_free (void (*)(void*)) __attribute__((__visibility__("default")));
#if defined(__cplusplus)
}
#endif
......
#include <unistd.h>
static ssize_t (*t_pwrite)(int, const void *, size_t, off_t) = 0;
static ssize_t (*t_write)(int, const void *, size_t) = 0;
int toku_set_func_pwrite (ssize_t (*pwrite_fun)(int, const void *, size_t, off_t)) {
t_pwrite = pwrite_fun;
return 0;
}
int toku_set_func_write (ssize_t (*write_fun)(int, const void *, size_t)) {
t_write = write_fun;
return 0;
}
ssize_t
toku_os_pwrite (int fd, const void *buf, size_t len, off_t off)
{
if (t_pwrite) {
return t_pwrite(fd, buf, len, off);
} else {
return pwrite(fd, buf, len, off);
}
}
......@@ -123,6 +123,8 @@ struct block_translation_pair {
struct brt_header {
int refcount;
int dirty;
int panic; // If nonzero there was a write error. Don't write any more, because it probably only gets worse. This is the error code.
char *panic_string; // A malloced string that can indicate what went wrong.
int layout_version;
unsigned int nodesize;
int n_named_roots; /* -1 if the only one is unnamed */
......@@ -181,7 +183,7 @@ struct brt {
};
/* serialization code */
void toku_serialize_brtnode_to(int fd, BLOCKNUM, BRTNODE node, struct brt_header *h);
int toku_serialize_brtnode_to(int fd, BLOCKNUM, BRTNODE node, struct brt_header *h);
int toku_deserialize_brtnode_from (int fd, BLOCKNUM off, u_int32_t /*fullhash*/, BRTNODE *brtnode, struct brt_header *h);
unsigned int toku_serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int toku_keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
......@@ -286,7 +288,7 @@ enum brt_layout_version_e {
};
void toku_brtheader_free (struct brt_header *h);
int toku_brtheader_close (CACHEFILE cachefile, void *header_v);
int toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **error_string);
int toku_brtheader_checkpoint (CACHEFILE cachefile, void *header_v);
#define BLOCK_ALLOCATOR_ALIGNMENT 4096
......
......@@ -23,9 +23,10 @@ static inline u_int64_t alignup (u_int64_t a, u_int64_t b) {
return ((a+b-1)/b)*b;
}
void
int
maybe_preallocate_in_file (int fd, u_int64_t size)
// Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MB whichever is less.
// Return 0 on success, otherwise an error number.
{
int64_t file_size;
{
......@@ -39,10 +40,16 @@ maybe_preallocate_in_file (int fd, u_int64_t size)
memset(wbuf, 0, N);
toku_off_t start_write = alignup(file_size, 4096);
assert(start_write >= file_size);
ssize_t r = pwrite(fd, wbuf, N, start_write);
assert(r==N);
ssize_t r = toku_os_pwrite(fd, wbuf, N, start_write);
if (r==-1) {
int e=errno; // must save errno before calling toku_free.
toku_free(wbuf);
return e;
}
toku_free(wbuf);
assert(r==N); // We don't handle short writes properly, which is the case where 0<= r < N.
}
return 0;
}
// This mutex protects pwrite from running in parallel, and also protects modifications to the block allocator.
......@@ -72,13 +79,30 @@ unlock_for_pwrite (void) {
assert(r==0);
}
static ssize_t
toku_pwrite (int fd, const void *buf, size_t count, toku_off_t offset)
static int
toku_pwrite_extend (int fd, const void *buf, size_t count, toku_off_t offset, ssize_t *num_wrote)
// requires that the pwrite has been locked
// Returns 0 on success (and fills in *num_wrote for how many bytes are written)
// Returns nonzero error number problems.
{
assert(pwrite_is_locked);
maybe_preallocate_in_file(fd, offset+count);
return pwrite(fd, buf, count, offset);
{
int r = maybe_preallocate_in_file(fd, offset+count);
if (r!=0) {
*num_wrote = 0;
return r;
}
}
{
*num_wrote = toku_os_pwrite(fd, buf, count, offset);
if (*num_wrote < 0) {
int r = errno;
*num_wrote = 0;
return r;
} else {
return 0;
}
}
}
// Don't include the compressed data size or the uncompressed data size.
......@@ -184,7 +208,7 @@ enum { compression_header_len = (4 // compressed_len
+4 // uncompressed_len
) };
void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_header *h) {
int toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct brt_header *h) {
struct wbuf w;
int i;
unsigned int calculated_size = toku_serialize_brtnode_size(node) - 8; // don't include the compressed or uncompressed sizes
......@@ -311,6 +335,7 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct
((int32_t*)(compressed_buf+uncompressed_magic_len))[1] = toku_htonl(uncompressed_len);
//write_now: printf("%s:%d Writing %d bytes\n", __FILE__, __LINE__, w.ndone);
int r;
{
lock_for_pwrite();
// If the node has never been written, then write the whole buffer, including the zeros
......@@ -331,9 +356,13 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct
block_allocator_alloc_block(h->block_allocator, n_to_write, &offset);
h->block_translation[blocknum.b].diskoff = offset;
h->block_translation[blocknum.b].size = n_to_write;
ssize_t r=toku_pwrite(fd, compressed_buf, n_to_write, offset);
if (r<0) printf("r=%ld errno=%d\n", (long)r, errno);
assert(r==(ssize_t)n_to_write);
ssize_t n_wrote;
r=toku_pwrite_extend(fd, compressed_buf, n_to_write, offset, &n_wrote);
if (r) {
fprintf(stderr, "%s:%d: Error writing data to file. errno=%d (%s)\n", __FILE__, __LINE__, r, strerror(r));
} else {
r=0;
}
unlock_for_pwrite();
}
......@@ -341,10 +370,12 @@ void toku_serialize_brtnode_to (int fd, BLOCKNUM blocknum, BRTNODE node, struct
assert(w.ndone==calculated_size);
toku_free(buf);
toku_free(compressed_buf);
return r;
}
int toku_deserialize_brtnode_from (int fd, BLOCKNUM blocknum, u_int32_t fullhash, BRTNODE *brtnode, struct brt_header *h) {
if (0) printf("Deserializing Block %" PRId64 "\n", blocknum.b);
if (h->panic) return h->panic;
assert(0 <= blocknum.b && (u_int64_t)blocknum.b < h->translated_blocknum_limit);
DISKOFF offset = h->block_translation[blocknum.b].diskoff;
TAGMALLOC(BRTNODE, result);
......@@ -697,18 +728,31 @@ int toku_serialize_brt_header_to_wbuf (struct wbuf *wbuf, struct brt_header *h)
}
int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
int rr = 0;
if (h->panic) return h->panic;
lock_for_pwrite();
{
struct wbuf w;
unsigned int size = toku_serialize_brt_header_size (h);
wbuf_init(&w, toku_malloc(size), size);
{
int r=toku_serialize_brt_header_to_wbuf(&w, h);
assert(r==0);
}
assert(w.ndone==size);
ssize_t nwrote = toku_pwrite(fd, w.buf, w.ndone, 0);
if (nwrote<0) perror("pwrite");
assert((size_t)nwrote==w.ndone);
ssize_t nwrote;
rr = toku_pwrite_extend(fd, w.buf, w.ndone, 0, &nwrote);
toku_free(w.buf);
if (rr) {
if (h->panic==0) {
char s[200];
h->panic=rr;
snprintf(s, sizeof(s), "%s:%d: Error writing header to data file. errno=%d (%s)\n", __FILE__, __LINE__, rr, strerror(rr));
h->panic_string = toku_strdup(s);
}
goto finish;
}
assert((u_int64_t)nwrote==size);
}
{
struct wbuf w;
......@@ -723,12 +767,18 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h) {
}
u_int32_t checksum = x1764_finish(&w.checksum);
wbuf_int(&w, checksum);
ssize_t nwrote = toku_pwrite(fd, w.buf, size, h->block_translation_address_on_disk);
assert((u_int64_t)nwrote==size);
ssize_t nwrote;
rr = toku_pwrite_extend(fd, w.buf, size, h->block_translation_address_on_disk, &nwrote);
toku_free(w.buf);
};
if (rr) {
fprintf(stderr, "%s:%d: Error writing data to file. errno=%d (%s)\n", __FILE__, __LINE__, rr, strerror(rr));
goto finish;
}
assert((u_int64_t)nwrote==size);
}
finish:
unlock_for_pwrite();
return 0;
return rr;
}
// We only deserialize brt header once and then share everything with all the brts.
......@@ -750,6 +800,8 @@ deserialize_brtheader (u_int32_t size, int fd, DISKOFF off, struct brt_header **
if (r!=(ssize_t)size-12) { ret = EINVAL; goto died1; }
}
h->dirty=0;
h->panic = 0;
h->panic_string = 0;
h->layout_version = rbuf_int(&rc);
h->nodesize = rbuf_int(&rc);
assert(h->layout_version==BRT_LAYOUT_VERSION_9);
......@@ -882,8 +934,13 @@ int toku_serialize_fifo_at (int fd, toku_off_t freeoff, FIFO fifo) {
struct wbuf w;
wbuf_init(&w, buf, size);
wbuf_int(&w, toku_fifo_n_entries(fifo));
ssize_t r = toku_pwrite(fd, w.buf, size, freeoff);
if (r!=size) return errno;
ssize_t nwrote;
int r = toku_pwrite_extend(fd, w.buf, size, freeoff, &nwrote);
if (r) {
unlock_for_pwrite();
return r;
}
assert(nwrote==size);
freeoff+=size;
}
FIFO_ITERATE(fifo, key, keylen, val, vallen, type, xid,
......@@ -900,10 +957,11 @@ int toku_serialize_fifo_at (int fd, toku_off_t freeoff, FIFO fifo) {
//printf("%s:%d Writing %d bytes: %s\n", __FILE__, __LINE__, vallen, (char*)val);
wbuf_bytes(&w, val, vallen);
assert(w.ndone==size);
ssize_t r = toku_pwrite(fd, w.buf, (size_t)size, freeoff);
if (r<0) {
ssize_t nwrote;
int r = toku_pwrite_extend(fd, w.buf, (size_t)size, freeoff, &nwrote);
if (r) {
unlock_for_pwrite();
return errno;
return r;
}
assert(r==(ssize_t)size);
freeoff+=size;
......
......@@ -377,7 +377,17 @@ void toku_brtnode_flush_callback (CACHEFILE cachefile, BLOCKNUM nodename, void *
assert(brtnode->thisnodename.b==nodename.b);
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (write_me) {
toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode, h);
if (!h->panic) { // if the brt panicked, stop writing, otherwise try to write it.
int r = toku_serialize_brtnode_to(toku_cachefile_fd(cachefile), brtnode->thisnodename, brtnode, h);
if (r) {
if (h->panic==0) {
char s[200];
h->panic=r;
snprintf(s, sizeof(s), "While writing data to disk, error %d (%s)", r, strerror(r));
h->panic_string = toku_strdup(s);
}
}
}
}
//printf("%s:%d %p->mdict[0]=%p\n", __FILE__, __LINE__, brtnode, brtnode->mdicts[0]);
if (!keep_me) {
......@@ -1196,7 +1206,6 @@ brt_split_child (BRT t, BRTNODE node, int childnum, TOKULOGGER logger, BOOL *did
NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback,
t->h);
assert(r==0); // REMOVE LATER
if (r!=0) return r;
child = childnode_v;
assert(child->thisnodename.b!=0);
......@@ -2525,7 +2534,10 @@ int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger)
VERIFY_NODE(brt, node);
verify_local_fingerprint_nonleaf(node);
if ((r = push_something_at_root(brt, &node, rootp, cmd, logger))) return r;
if ((r = push_something_at_root(brt, &node, rootp, cmd, logger))) {
toku_unpin_brtnode(brt, node); // ignore any error code on the unpin.
return r;
}
verify_local_fingerprint_nonleaf(node);
r = toku_unpin_brtnode(brt, node);
assert(r == 0);
......@@ -2864,7 +2876,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
}
if (r!=0) {
died_after_open:
toku_cachefile_close(&t->cf, toku_txn_logger(txn));
toku_cachefile_close(&t->cf, toku_txn_logger(txn), 0);
t->database_name = 0;
goto died0a;
}
......@@ -3035,25 +3047,35 @@ toku_brtheader_checkpoint (CACHEFILE cachefile, void *header_v)
//printf("%s:%d allocated_limit=%lu writing queue to %lu\n", __FILE__, __LINE__,
// block_allocator_allocated_limit(h->block_allocator), h->unused_blocks.b*h->nodesize);
if (h->dirty) {
toku_serialize_brt_header_to(toku_cachefile_fd(cachefile), h);
{
int r = toku_serialize_brt_header_to(toku_cachefile_fd(cachefile), h);
if (r) return r;
}
u_int64_t write_to = block_allocator_allocated_limit(h->block_allocator); // Must compute this after writing the header.
//printf("%s:%d fifo written to %lu\n", __FILE__, __LINE__, write_to);
toku_serialize_fifo_at(toku_cachefile_fd(cachefile), write_to, h->fifo);
{
int r = toku_serialize_fifo_at(toku_cachefile_fd(cachefile), write_to, h->fifo);
if (r) return r;
}
h->dirty = 0;
}
return 0;
}
int
toku_brtheader_close (CACHEFILE cachefile, void *header_v)
toku_brtheader_close (CACHEFILE cachefile, void *header_v, char **malloced_error_string)
{
struct brt_header *h = header_v;
toku_brtheader_checkpoint(cachefile, h);
int r = toku_brtheader_checkpoint(cachefile, h);
if (malloced_error_string) *malloced_error_string = h->panic_string;
if (r==0) {
r=h->panic;
}
toku_brtheader_free(h);
return 0;
return r;
}
int toku_close_brt (BRT brt, TOKULOGGER logger) {
int toku_close_brt (BRT brt, TOKULOGGER logger, char **error_string) {
int r;
while (!list_empty(&brt->cursors)) {
BRT_CURSOR c = list_struct(list_pop(&brt->cursors), struct brt_cursor, cursors_link);
......@@ -3074,17 +3096,18 @@ int toku_close_brt (BRT brt, TOKULOGGER logger) {
r = toku_log_brtclose(logger, &lsn, 1, bs, toku_cachefile_filenum(brt->cf)); // flush the log on close, otherwise it might not make it out.
if (r!=0) return r;
}
assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // For the brt, the pinned count should be zero.
if (!brt->h->panic)
assert(0==toku_cachefile_count_pinned(brt->cf, 1)); // For the brt, the pinned count should be zero (but if panic, don't worry)
//printf("%s:%d closing cachetable\n", __FILE__, __LINE__);
// printf("%s:%d brt=%p ,brt->h=%p\n", __FILE__, __LINE__, brt, brt->h);
if ((r = toku_cachefile_close(&brt->cf, logger))!=0) return r;
r = toku_cachefile_close(&brt->cf, logger, error_string);
}
if (brt->database_name) toku_free(brt->database_name);
if (brt->fname) toku_free(brt->fname);
if (brt->skey) { toku_free(brt->skey); }
if (brt->sval) { toku_free(brt->sval); }
toku_free(brt);
return 0;
return r;
}
int toku_brt_create(BRT *brt_ptr) {
......
......@@ -30,7 +30,7 @@ int toku_brt_insert (BRT, DBT *, DBT *, TOKUTXN);
int toku_brt_lookup (BRT brt, DBT *k, DBT *v);
int toku_brt_delete (BRT brt, DBT *k, TOKUTXN);
int toku_brt_delete_both (BRT brt, DBT *k, DBT *v, TOKUTXN); // Delete a pair only if both k and v are equal according to the comparison function.
int toku_close_brt (BRT, TOKULOGGER);
int toku_close_brt (BRT, TOKULOGGER, char **error_string);
int toku_dump_brt (FILE *,BRT brt);
......@@ -118,7 +118,7 @@ void toku_brt_destroy(void);
void toku_pwrite_lock_init(void);
void toku_pwrite_lock_destroy(void);
void maybe_preallocate_in_file (int fd, u_int64_t size);
int maybe_preallocate_in_file (int fd, u_int64_t size);
// Effect: If file size is less than SIZE, make it bigger by either doubling it or growing by 16MB whichever is less.
int toku_brt_note_table_lock (BRT brt, TOKUTXN txn);
......
......@@ -134,7 +134,7 @@ struct cachefile {
char *fname;
void *userdata;
int (*close_userdata)(CACHEFILE cf, void *userdata); // when closing the last reference to a cachefile, first call this function.
int (*close_userdata)(CACHEFILE cf, void *userdata, char **error_string); // when closing the last reference to a cachefile, first call this function.
int (*checkpoint_userdata)(CACHEFILE cf, void *userdata); // when checkpointing a cachefile, call this function.
};
......@@ -254,7 +254,7 @@ int toku_cachefile_set_fd (CACHEFILE cf, int fd, const char *fname) {
if (r != 0) {
r=errno; close(fd); return r;
}
if (cf->close_userdata && (r = cf->close_userdata(cf, cf->userdata))) {
if (cf->close_userdata && (r = cf->close_userdata(cf, cf->userdata, 0))) {
return r;
}
cf->close_userdata = NULL;
......@@ -299,29 +299,30 @@ void toku_cachefile_refup (CACHEFILE cf) {
cf->refcount++;
}
int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger) {
int toku_cachefile_close (CACHEFILE *cfp, TOKULOGGER logger, char **error_string) {
CACHEFILE cf = *cfp;
CACHETABLE ct = cf->cachetable;
cachetable_lock(ct);
assert(cf->refcount>0);
cf->refcount--;
if (cf->refcount==0) {
int r;
int r = 0;
if ((r = cachefile_write_maybe_remove(ct, cf, TRUE))) {
cachetable_unlock(ct);
return r;
}
if (cf->close_userdata && (r = cf->close_userdata(cf, cf->userdata))) {
cachetable_unlock(ct);
return r;
if (cf->close_userdata) {
r = cf->close_userdata(cf, cf->userdata, error_string);
}
cf->close_userdata = NULL;
cf->checkpoint_userdata = NULL;
cf->userdata = NULL;
cf->cachetable->cachefiles = remove_cf_from_list(cf, cf->cachetable->cachefiles);
cachetable_unlock(ct);
r = close(cf->fd);
assert(r == 0);
{
int r2 = close(cf->fd);
assert(r2 == 0);
}
cf->fd = -1;
if (logger) {
//assert(cf->fname);
......@@ -1309,7 +1310,7 @@ int toku_cachetable_get_key_state (CACHETABLE ct, CACHEKEY key, CACHEFILE cf, vo
void
toku_cachefile_set_userdata (CACHEFILE cf,
void *userdata,
int (*close_userdata)(CACHEFILE, void*),
int (*close_userdata)(CACHEFILE, void*, char**/*error_string*/),
int (*checkpoint_userdata)(CACHEFILE, void*))
{
cf->userdata = userdata;
......
......@@ -66,7 +66,7 @@ typedef void (*CACHETABLE_FLUSH_CALLBACK)(CACHEFILE, CACHEKEY key, void *value,
// associated with the key are returned.
typedef int (*CACHETABLE_FETCH_CALLBACK)(CACHEFILE, CACHEKEY key, u_int32_t fullhash, void **value, long *sizep, void *extraargs, LSN *written_lsn);
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, int (*close_userdata)(CACHEFILE, void*), int (*checkpoint_userdata)(CACHEFILE, void*));
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata, int (*close_userdata)(CACHEFILE, void*, char **/*error_string*/), int (*checkpoint_userdata)(CACHEFILE, void*));
// Effect: Store some cachefile-specific user data. When the last reference to a cachefile is closed, we call close_userdata().
// When the cachefile needs to be checkpointed, we call checkpoint_userdata().
// If userdata is already non-NULL, then we simply overwrite it.
......@@ -140,7 +140,7 @@ int toku_cachetable_rename (CACHEFILE cachefile, CACHEKEY oldkey, CACHEKEY newke
// close function does not return until all of the objects are evicted. The cachefile
// object is freed.
// Returns: 0 if success, otherwise returns an error number.
int toku_cachefile_close (CACHEFILE*, TOKULOGGER);
int toku_cachefile_close (CACHEFILE*, TOKULOGGER, char **error_string);
// Flush the cachefile.
// Effect: Flush everything owned by the cachefile from the cachetable. All dirty
......
......@@ -74,6 +74,7 @@ struct tokulogger {
char buf[LOGGER_BUF_SIZE]; // used to marshall logbytes so we can use only a single write
int n_in_file;
u_int32_t write_block_size; // How big should the blocks be written to various logs?
};
int toku_logger_find_next_unused_log_file(const char *directory, long long *result);
......
......@@ -103,6 +103,7 @@ int toku_logger_create (TOKULOGGER *resultp) {
result->n_in_file=0;
result->directory=0;
result->checkpoint_lsns[0]=result->checkpoint_lsns[1]=(LSN){0};
result->write_block_size = BRT_DEFAULT_NODE_SIZE; // default logging size is the same as the default brt block size
*resultp=result;
r = ml_init(&result->input_lock); if (r!=0) goto died0;
r = ml_init(&result->output_lock); if (r!=0) goto died1;
......@@ -223,6 +224,15 @@ int toku_logger_get_lg_max(TOKULOGGER logger, u_int32_t *lg_maxp) {
}
int toku_logger_set_lg_bsize(TOKULOGGER logger, u_int32_t bsize) {
if (logger==0) return EINVAL; // no logger
if (logger->is_panicked) return EINVAL;
if (logger->is_open) return EINVAL;
if (bsize<=0 || bsize>(1<<30)) return EINVAL;
logger->write_block_size = bsize;
return 0;
}
// Enter holding both locks
// Exit holding only the output_lock
static int do_write (TOKULOGGER logger, int do_fsync) {
......@@ -989,7 +999,7 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
}
int toku_maybe_spill_rollbacks (TOKUTXN txn) {
if (txn->rollentry_resident_bytecount>(1<<20)) {
if (txn->rollentry_resident_bytecount>txn->logger->write_block_size) {
struct roll_entry *item;
ssize_t bufsize = txn->rollentry_resident_bytecount;
char *MALLOC_N(bufsize, buf);
......
......@@ -38,6 +38,7 @@ LSN toku_logger_last_lsn(TOKULOGGER);
int toku_logger_set_lg_max (TOKULOGGER logger, u_int32_t);
int toku_logger_get_lg_max (TOKULOGGER logger, u_int32_t *);
int toku_logger_set_lg_bsize(TOKULOGGER, u_int32_t);
// Doesn't close the txn, just performs the commit operations.
int toku_logger_commit (TOKUTXN txn, int no_sync, void(*yield)(void*yield_v), void*yield_v);
......
......@@ -36,7 +36,7 @@ void toku_recover_cleanup (void) {
int i;
for (i=0; i<n_cf_pairs; i++) {
if (cf_pairs[i].brt) {
int r = toku_close_brt(cf_pairs[i].brt, 0);
int r = toku_close_brt(cf_pairs[i].brt, 0, 0);
//r = toku_cachefile_close(&cf_pairs[i].cf);
assert(r==0);
}
......@@ -132,6 +132,8 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,L
struct brt_header *MALLOC(h);
assert(h);
h->dirty=0;
h->panic=0;
h->panic_string=0;
XMALLOC(h->flags_array);
h->flags_array[0] = header.flags;
h->nodesize = header.nodesize;
......@@ -485,7 +487,7 @@ toku_recover_brtclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) {
assert(r==0);
// Bump up the reference count
toku_cachefile_refup(pair->cf);
r = toku_close_brt(pair->brt, 0);
r = toku_close_brt(pair->brt, 0, 0);
assert(r==0);
pair->brt=0;
toku_free_BYTESTRING(fname);
......@@ -496,7 +498,7 @@ toku_recover_cfclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) {
int i;
for (i=0; i<n_cf_pairs; i++) {
if (filenum.fileid==cf_pairs[i].filenum.fileid) {
int r = toku_cachefile_close(&cf_pairs[i].cf, 0);
int r = toku_cachefile_close(&cf_pairs[i].cf, 0, 0);
assert(r==0);
cf_pairs[i] = cf_pairs[n_cf_pairs-1];
n_cf_pairs--;
......
......@@ -84,7 +84,7 @@ static int do_insertion (enum brt_cmd_type type, TXNID xid, FILENUM filenum, BYT
BRT brt = brtv;
r = toku_brt_root_put_cmd(brt, &brtcmd, txn->logger);
}
return toku_cachefile_close(&cf, toku_txn_logger(txn));
return toku_cachefile_close(&cf, toku_txn_logger(txn), 0);
}
......@@ -92,7 +92,7 @@ static int do_nothing_with_filenum(TOKUTXN txn, FILENUM filenum) {
CACHEFILE cf;
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
assert(r==0);
return toku_cachefile_close(&cf, toku_txn_logger(txn));
return toku_cachefile_close(&cf, toku_txn_logger(txn), 0);
}
......@@ -316,7 +316,7 @@ toku_rollback_tablelock_on_empty_table (FILENUM filenum, TOKUTXN txn, YIELDF UU(
BRT brt = brtv;
r = toku_brt_truncate(brt);
return toku_cachefile_close(&cf, toku_txn_logger(txn));
return toku_cachefile_close(&cf, toku_txn_logger(txn), 0);
}
int
......
......@@ -35,7 +35,7 @@ static void setup (void) {
static void toku_shutdown (void) {
int r;
r = toku_close_brt(t, 0); assert(r==0);
r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
}
static void long_long_to_array (unsigned char *a, unsigned long long l) {
......
......@@ -52,6 +52,7 @@ static void test_serialize(void) {
struct block_translation_pair *XMALLOC_N(21, btps);
memset(btps, 0, sizeof(btps));
brt->h = brt_h;
brt_h->panic = 0; brt_h->panic_string = 0;
brt_h->translated_blocknum_limit = 1;
brt_h->block_translation = btps;
brt_h->block_translation[20].diskoff = 4096;
......
......@@ -70,7 +70,7 @@ static void test_multiple_brt_cursor_dbts(int n, DB *db) {
assert(r == 0);
}
r = toku_close_brt(brt, 0);
r = toku_close_brt(brt, 0, 0);
assert(r==0);
r = toku_cachetable_close(&ct);
......
......@@ -131,7 +131,7 @@ static void test_brt_cursor_first(int n, DB *db) {
else
assert_cursor_value(brt, DB_FIRST, 0);
r = toku_close_brt(brt, 0);
r = toku_close_brt(brt, 0, 0);
assert(r==0);
r = toku_cachetable_close(&ct);
......@@ -172,7 +172,7 @@ static void test_brt_cursor_last(int n, DB *db) {
else
assert_cursor_value(brt, DB_LAST, n-1);
r = toku_close_brt(brt, 0);
r = toku_close_brt(brt, 0, 0);
assert(r==0);
r = toku_cachetable_close(&ct);
......@@ -215,7 +215,7 @@ static void test_brt_cursor_first_last(int n, DB *db) {
} else
assert_cursor_first_last(brt, 0, n-1);
r = toku_close_brt(brt, 0);
r = toku_close_brt(brt, 0, 0);
assert(r==0);
r = toku_cachetable_close(&ct);
......@@ -259,7 +259,7 @@ static void test_brt_cursor_rfirst(int n, DB *db) {
else
assert_cursor_value(brt, DB_FIRST, 0);
r = toku_close_brt(brt, 0);
r = toku_close_brt(brt, 0, 0);
assert(r==0);
r = toku_cachetable_close(&ct);
......@@ -330,7 +330,7 @@ static void test_brt_cursor_walk(int n, DB *db) {
/* walk the tree */
assert_cursor_walk(brt, n);
r = toku_close_brt(brt, 0);
r = toku_close_brt(brt, 0, 0);
assert(r==0);
r = toku_cachetable_close(&ct);
......@@ -402,7 +402,7 @@ static void test_brt_cursor_rwalk(int n, DB *db) {
/* walk the tree */
assert_cursor_rwalk(brt, n);
r = toku_close_brt(brt, 0);
r = toku_close_brt(brt, 0, 0);
assert(r==0);
r = toku_cachetable_close(&ct);
......@@ -489,7 +489,7 @@ static void test_brt_cursor_rand(int n, DB *db) {
/* walk the tree */
assert_cursor_walk_inorder(brt, n);
r = toku_close_brt(brt, 0);
r = toku_close_brt(brt, 0, 0);
assert(r==0);
r = toku_cachetable_close(&ct);
......@@ -570,7 +570,7 @@ static void test_brt_cursor_split(int n, DB *db) {
r = toku_brt_cursor_close(cursor);
assert(r==0);
r = toku_close_brt(brt, 0);
r = toku_close_brt(brt, 0, 0);
assert(r==0);
r = toku_cachetable_close(&ct);
......@@ -604,7 +604,7 @@ static void test_multiple_brt_cursors(int n, DB *db) {
assert(r == 0);
}
r = toku_close_brt(brt, 0);
r = toku_close_brt(brt, 0, 0);
assert(r==0);
r = toku_cachetable_close(&ct);
......@@ -700,7 +700,7 @@ static void test_multiple_brt_cursor_walk(int n, DB *db) {
assert(r == 0);
}
r = toku_close_brt(brt, 0);
r = toku_close_brt(brt, 0, 0);
assert(r==0);
r = toku_cachetable_close(&ct);
......@@ -772,7 +772,7 @@ static void test_brt_cursor_set(int n, int cursor_op, DB *db) {
r = toku_brt_cursor_close(cursor);
assert(r==0);
r = toku_close_brt(brt, 0);
r = toku_close_brt(brt, 0, 0);
assert(r==0);
r = toku_cachetable_close(&ct);
......@@ -838,7 +838,7 @@ static void test_brt_cursor_set_range(int n, DB *db) {
r = toku_brt_cursor_close(cursor);
assert(r==0);
r = toku_close_brt(brt, 0);
r = toku_close_brt(brt, 0, 0);
assert(r==0);
r = toku_cachetable_close(&ct);
......@@ -899,7 +899,7 @@ static void test_brt_cursor_delete(int n, DB *db) {
error = toku_brt_cursor_close(cursor);
assert(error == 0);
error = toku_close_brt(brt, 0);
error = toku_close_brt(brt, 0, 0);
assert(error == 0);
error = toku_cachetable_close(&ct);
......@@ -1002,7 +1002,7 @@ static void test_brt_cursor_get_both(int n, DB *db) {
error = toku_brt_cursor_close(cursor);
assert(error == 0);
error = toku_close_brt(brt, 0);
error = toku_close_brt(brt, 0, 0);
assert(error == 0);
error = toku_cachetable_close(&ct);
......
......@@ -22,7 +22,7 @@ static void test_named_db (void) {
toku_brt_insert(t0, toku_fill_dbt(&k, "good", 5), toku_fill_dbt(&v, "day", 4), null_txn); assert(r==0);
r = toku_close_brt(t0, 0); assert(r==0);
r = toku_close_brt(t0, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free();
......@@ -37,7 +37,7 @@ static void test_named_db (void) {
assert(strcmp(v.data,"day")==0);
}
r = toku_close_brt(t0, 0); assert(r==0);
r = toku_close_brt(t0, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free();
}
......
This diff is collapsed.
......@@ -23,7 +23,7 @@ static void test0 (void) {
assert(r==0);
//printf("%s:%d test0\n", __FILE__, __LINE__);
//printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced);
r = toku_close_brt(t, 0); assert(r==0);
r = toku_close_brt(t, 0, 0); assert(r==0);
//printf("%s:%d n_items_malloced=%lld\n", __FILE__, __LINE__, n_items_malloced);
r = toku_cachetable_close(&ct);
assert(r==0);
......
......@@ -27,7 +27,7 @@ static void test1 (void) {
assert(strcmp(v.data, "there")==0);
assert(v.size==6);
}
r = toku_close_brt(t, 0); assert(r==0);
r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free();
if (verbose) printf("test1 ok\n");
......
......@@ -41,7 +41,7 @@ static void test2 (int memcheck, int limit) {
}
if (verbose) printf("%s:%d inserted\n", __FILE__, __LINE__);
r = toku_verify_brt(t); assert(r==0);
r = toku_close_brt(t, 0); assert(r==0);
r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free();
if (verbose) printf("test2 ok\n");
......
......@@ -30,7 +30,7 @@ static void test3 (int nodesize, int count, int memcheck) {
toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
}
r = toku_verify_brt(t); assert(r==0);
r = toku_close_brt(t, 0); assert(r==0);
r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free();
gettimeofday(&t1, 0);
......
......@@ -30,7 +30,7 @@ static void test4 (int nodesize, int count, int memcheck) {
toku_brt_insert(t, toku_fill_dbt(&k, key, 1+strlen(key)), toku_fill_dbt(&v, val, 1+strlen(val)), null_txn);
}
r = toku_verify_brt(t); assert(r==0);
r = toku_close_brt(t, 0); assert(r==0);
r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free();
gettimeofday(&t1, 0);
......
......@@ -49,7 +49,7 @@ static void test5 (void) {
}
if (verbose) printf("\n");
toku_free(values);
r = toku_close_brt(t, 0); assert(r==0);
r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
toku_memory_check_all_free();
}
......
......@@ -67,7 +67,7 @@ cachetable_count_pinned_test (int n) {
assert(toku_cachefile_count_pinned(f1, 1) == 0);
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0);
r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
......
......@@ -75,7 +75,7 @@ cachetable_debug_test (int n) {
if (verbose) print_hash_histogram();
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0);
r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
......
......@@ -39,7 +39,7 @@ cachetable_fd_test (void) {
r = toku_cachefile_of_filenum(ct, fn, &newcf);
assert(r == ENOENT);
r = toku_cachefile_close(&cf, NULL_LOGGER); assert(r == 0 && cf == 0);
r = toku_cachefile_close(&cf, NULL_LOGGER, 0); assert(r == 0 && cf == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
......
......@@ -94,8 +94,8 @@ test_cachetable_flush (int n) {
assert(r == 0);
}
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0);
r = toku_cachefile_close(&f2, NULL_LOGGER); assert(r == 0 && f2 == 0);
r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachefile_close(&f2, NULL_LOGGER, 0); assert(r == 0 && f2 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
......
......@@ -72,7 +72,7 @@ cachetable_getandpin_test (int n) {
}
toku_cachetable_verify(ct);
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0);
r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
......
......@@ -79,7 +79,7 @@ cachetable_put_test (int n) {
r = toku_cachetable_unpin(f1, k, toku_cachetable_hash(f1, k), CACHETABLE_CLEAN, 1);
assert(r != 0);
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0);
r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
......
......@@ -154,7 +154,7 @@ static void test_rename (void) {
r = toku_cachetable_rename(f, okey, nkey);
assert(r != 0);
r = toku_cachefile_close(&f, 0);
r = toku_cachefile_close(&f, 0, 0);
assert(r == 0);
r = toku_cachetable_close(&t);
assert(r == 0);
......
......@@ -89,7 +89,7 @@ static void readit (void) {
r=toku_cachetable_get_and_pin(f, key, fullhash, &block, &current_size, f_flush, f_fetch, 0); assert(r==0);
r=toku_cachetable_unpin(f, key, fullhash, 0, BLOCKSIZE); assert(r==0);
}
r = toku_cachefile_close(&f, 0); assert(r == 0);
r = toku_cachefile_close(&f, 0, 0); assert(r == 0);
r = toku_cachetable_close(&t); assert(r == 0);
gettimeofday(&end, 0);
toku_os_get_process_times(&end_usertime, &end_systime);
......
......@@ -283,7 +283,7 @@ static void test0 (void) {
expectN(7);
expectN(6);
expectN(1);
r=toku_cachefile_close(&f, 0);
r=toku_cachefile_close(&f, 0, 0);
assert(r==0);
r=toku_cachetable_close(&t);
assert(r==0);
......@@ -349,7 +349,7 @@ static void test_nested_pin (void) {
r = toku_cachetable_unpin(f, make_blocknum(2), f2hash, 0, test_object_size);
assert(r==0);
// toku_os_usleep(1*1000000);
r = toku_cachefile_close(&f, 0); assert(r==0);
r = toku_cachefile_close(&f, 0, 0); assert(r==0);
r = toku_cachetable_close(&t); assert(r==0);
}
......@@ -415,14 +415,14 @@ static void test_multi_filehandles (void) {
r = toku_cachetable_unpin(f1, make_blocknum(1), toku_cachetable_hash(f1, make_blocknum(1)), CACHETABLE_CLEAN, 0); assert(r==0);
r = toku_cachetable_unpin(f1, make_blocknum(2), toku_cachetable_hash(f1, make_blocknum(2)), CACHETABLE_CLEAN, 0); assert(r==0);
r = toku_cachefile_close(&f1, 0); assert(r==0);
r = toku_cachefile_close(&f1, 0, 0); assert(r==0);
r = toku_cachetable_unpin(f2, make_blocknum(1), toku_cachetable_hash(f2, make_blocknum(1)), CACHETABLE_CLEAN, 0); assert(r==0);
r = toku_cachetable_unpin(f2, make_blocknum(2), toku_cachetable_hash(f2, make_blocknum(2)), CACHETABLE_CLEAN, 0); assert(r==0);
r = toku_cachefile_close(&f2, 0); assert(r==0);
r = toku_cachefile_close(&f2, 0, 0); assert(r==0);
r = toku_cachetable_unpin(f3, make_blocknum(2), toku_cachetable_hash(f3, make_blocknum(2)), CACHETABLE_CLEAN, 0); assert(r==0);
r = toku_cachefile_close(&f3, 0); assert(r==0);
r = toku_cachefile_close(&f3, 0, 0); assert(r==0);
r = toku_cachetable_close(&t); assert(r==0);
}
......@@ -545,7 +545,7 @@ static void test_dirty() {
assert(dirty == 1);
assert(pinned == 0);
r = toku_cachefile_close(&f, 0);
r = toku_cachefile_close(&f, 0, 0);
assert(r == 0);
r = toku_cachetable_close(&t);
assert(r == 0);
......@@ -620,7 +620,7 @@ static void test_size_resize() {
r = toku_cachetable_unpin(f, key, hkey, CACHETABLE_CLEAN, new_size);
assert(r == 0);
r = toku_cachefile_close(&f, 0);
r = toku_cachefile_close(&f, 0, 0);
assert(r == 0);
r = toku_cachetable_close(&t);
assert(r == 0);
......@@ -688,7 +688,7 @@ static void test_size_flush() {
assert(r == 0);
}
r = toku_cachefile_close(&f, 0);
r = toku_cachefile_close(&f, 0, 0);
assert(r == 0);
r = toku_cachetable_close(&t);
assert(r == 0);
......
......@@ -222,13 +222,13 @@ static void test_chaining (void) {
//printf("Close %d (%p), now n_present=%d\n", i, f[i], n_present);
//print_ints();
CACHEFILE oldcf=f[i];
r = toku_cachefile_close(&f[i], 0); assert(r==0);
r = toku_cachefile_close(&f[i], 0, 0); assert(r==0);
file_is_not_present(oldcf);
r = toku_cachetable_openf(&f[i], ct, fname[i], O_RDWR, S_IRWXU|S_IRWXG|S_IRWXO); assert(r==0);
}
}
for (i=0; i<N_FILES; i++) {
r = toku_cachefile_close(&f[i], 0); assert(r==0);
r = toku_cachefile_close(&f[i], 0, 0); assert(r==0);
}
r = toku_cachetable_close(&ct); assert(r==0);
}
......
......@@ -69,7 +69,7 @@ cachetable_unpin_test (int n) {
r = toku_cachetable_unpin(f1, k, toku_cachetable_hash(f1, k), CACHETABLE_CLEAN, 1);
assert(r != 0);
r = toku_cachefile_close(&f1, NULL_LOGGER); assert(r == 0 && f1 == 0);
r = toku_cachefile_close(&f1, NULL_LOGGER, 0); assert(r == 0 && f1 == 0);
r = toku_cachetable_close(&ct); assert(r == 0 && ct == 0);
}
......
......@@ -50,7 +50,7 @@ static void test_flat (void) {
assert(less>=prevless); prevless=less;
assert(greater<=prevgreater); prevgreater=greater;
}
r = toku_close_brt(t, 0); assert(r==0);
r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
}
......
......@@ -69,7 +69,7 @@ static void test_flat (void) {
assert(equal==0);
assert(less+equal+greater == limit*ilimit);
}
r = toku_close_brt(t, 0); assert(r==0);
r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
}
......
......@@ -54,7 +54,7 @@ static void test_flat (void) {
assert(less>=prevless); prevless = less;
assert(greater<=prevgreater); prevgreater = greater;
}
r = toku_close_brt(t, 0); assert(r==0);
r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
}
......
......@@ -58,7 +58,7 @@ static void test_flat (void) {
assert(equal==0);
assert(less+equal+greater == limit);
}
r = toku_close_brt(t, 0); assert(r==0);
r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
}
......
......@@ -48,7 +48,7 @@ test_main (int argc __attribute__((__unused__)), const char *argv[] __attribute
assert(strcmp(val.data, "0002")==0);
r = toku_brt_cursor_close(cursor); assert(r==0);
r = toku_close_brt(brt, 0); assert(r==0);
r = toku_close_brt(brt, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
return 0;
}
......@@ -39,7 +39,7 @@ doit (void) {
r = toku_brt_lookup(t, toku_fill_dbt(&k, "a", 2), toku_init_dbt(&v));
assert(r==DB_NOTFOUND);
r = toku_close_brt(t, 0); assert(r==0);
r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
}
......
......@@ -30,7 +30,7 @@ test_overflow (void) {
char key[]={(char)('a'+i), 0};
toku_brt_insert(t, toku_fill_dbt(&k, key, 2), toku_fill_dbt(&v,buf,sizeof(buf)), null_txn);
}
r = toku_close_brt(t, 0); assert(r==0);
r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
}
......
......@@ -56,7 +56,7 @@ doit (void) {
r = toku_brt_lookup(t, &k, &v);
assert(r==0);
r = toku_close_brt(t, 0); assert(r==0);
r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
}
......
......@@ -130,7 +130,7 @@ doit (int ksize __attribute__((__unused__))) {
r = toku_testsetup_root(t, anode);
assert(r==0);
r = toku_close_brt(t, 0); assert(r==0);
r = toku_close_brt(t, 0, 0); assert(r==0);
r = toku_cachetable_close(&ct); assert(r==0);
//printf("ksize=%d, unused\n", ksize);
......
......@@ -8,23 +8,25 @@ EXPORTS
db_env_set_func_malloc @7
db_env_set_func_realloc @8
db_env_set_func_free @9
toku_os_get_max_rss @10
toku_ydb_error_all_cases @11
toku_set_trace_file @12
toku_close_trace_file @13
toku_add_trace_mem @14
toku_print_trace_mem @15
toku_free @16
toku_malloc @17
toku_os_get_file_size @18
toku_os_getpid @19
toku_os_gettid @20
toku_os_initialize_settings @21
toku_os_is_absolute_name @22
toku_os_mkdir @23
toku_realloc @24
toku_strdup @25
dlmalloc @26
dlrealloc @27
dlfree @28
db_env_set_func_pwrite @10
db_env_set_func_write @11
toku_os_get_max_rss @12
toku_ydb_error_all_cases @13
toku_set_trace_file @14
toku_close_trace_file @15
toku_add_trace_mem @16
toku_print_trace_mem @17
toku_free @18
toku_malloc @19
toku_os_get_file_size @20
toku_os_getpid @21
toku_os_gettid @22
toku_os_initialize_settings @23
toku_os_is_absolute_name @24
toku_os_mkdir @25
toku_realloc @26
toku_strdup @27
dlmalloc @28
dlrealloc @29
dlfree @30
......@@ -9,6 +9,8 @@
db_env_set_func_malloc;
db_env_set_func_realloc;
db_env_set_func_free;
db_env_set_func_pwrite;
db_env_set_func_write;
toku_os_get_max_rss;
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2007 Tokutek Inc. All rights reserved."
#include "test.h"
/* Simulate disk full by making pwrite return ENOSPC */
/* Strategy, repeatedly run a test, and on the Ith run of the test make the Ith write fail. */
#include <fcntl.h>
#include <errno.h>
#define DOERR(r) do { if (r!=0) { did_fail=1; fprintf(stderr, "%s:%d error %d (%s)\n", __FILE__, __LINE__, r, db_strerror(r)); }} while (0)
static void
do_db_work(void) {
int r;
int did_fail=0;
{
system("rm -rf " ENVDIR);
r=toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0);
DB_ENV *env;
DB_TXN *tid;
DB *db;
DBT key,data;
r=db_env_create(&env, 0); assert(r==0);
env->set_errfile(env, stderr);
// Don't set the lg bsize for the small experiment.
r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE|DB_THREAD, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r=db_create(&db, env, 0); CKERR(r);
r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
r=db->open(db, tid, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); DOERR(r);
if (did_fail) {
r=tid->abort(tid); CKERR(r);
} else {
r=tid->commit(tid, 0); DOERR(r);
}
if (did_fail) goto shutdown1;
r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
r=db->put(db, tid, dbt_init(&key, "a", 2), dbt_init(&data, "b", 2), 0); DOERR(r);
if (did_fail) {
r = tid->abort(tid); CKERR(r);
} else {
r=tid->commit(tid, 0); DOERR(r);
}
shutdown1:
r=db->close(db, 0); DOERR(r);
r=env->close(env, 0); DOERR(r);
if (did_fail) return;
}
{
system("rm -rf " ENVDIR);
r=toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO); assert(r==0);
DB_ENV *env;
DB_TXN *tid;
DB *db;
DBT key,data;
// Repeat with more put operations
r=db_env_create(&env, 0); assert(r==0);
env->set_errfile(env, stderr);
r=env->set_lg_bsize(env, 4096); assert(r==0);
r=env->set_cachesize(env, 0, 1, 1); assert(r==0);
r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE|DB_THREAD, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r=db_create(&db, env, 0); CKERR(r);
r=db->set_pagesize(db, 4096);
r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
r=db->open(db, tid, "foo.db", 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO); DOERR(r);
if (did_fail) {
r = tid->abort(tid); CKERR(r);
} else {
r=tid->commit(tid, 0); DOERR(r);
}
if (did_fail) goto shutdown2;
// Put an extra item in so that the rolltmp file will be created.
r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
r=db->put(db, tid, dbt_init(&key, "a", 2), dbt_init(&data, "b", 2), 0); DOERR(r);
if (did_fail) {
r=tid->abort(tid); CKERR(r);
} else {
r=tid->commit(tid, 0); DOERR(r);
}
if (did_fail) goto shutdown2;
r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
{
int i;
for (i=0; i<100; i++) {
int kvsize=50;
int kvsize_i = kvsize / sizeof(int);
int keyi[kvsize_i],vali[kvsize_i];
int j;
keyi[0] = vali[0] = toku_htonl(i);
for (j=1; j<kvsize_i; j++) {
keyi[j] = random();
vali[j] = random();
}
r=db->put(db, tid, dbt_init(&key, keyi, sizeof keyi), dbt_init(&data, vali, sizeof vali), 0);
DOERR(r);
if (did_fail) goto break_out_of_loop;
}
}
break_out_of_loop:
system("ls -l " ENVDIR);
if (did_fail) {
r=tid->abort(tid); CKERR(r);
} else {
r=tid->commit(tid, 0); DOERR(r);
}
shutdown2:
r=db->close(db, 0); DOERR(r);
r=env->close(env, 0); DOERR(r);
}
}
static int write_count = 0;
#define FAIL_NEVER 0x7FFFFFFF
static int fail_at = FAIL_NEVER;
static ssize_t
pwrite_counting_and_failing (int fd, const void *buf, size_t size, off_t off)
{
write_count++;
if (write_count>fail_at) {
errno = ENOSPC;
return -1;
} else {
return pwrite(fd, buf, size, off);
}
}
static ssize_t
write_counting_and_failing (int fd, const void *buf, size_t size)
{
write_count++;
if (write_count>fail_at) {
errno = ENOSPC;
return -1;
} else {
return write(fd, buf, size);
}
}
static void
do_writes_that_fail (void) {
db_env_set_func_pwrite(pwrite_counting_and_failing);
db_env_set_func_write (write_counting_and_failing);
write_count=0;
do_db_work();
printf("Write_count=%d\n", write_count);
int count = write_count;
// fail_at=83; write_count=0; do_db_work();
for (fail_at = 0; fail_at<count; fail_at++) {
printf("About to fail at %d:\n", fail_at);
write_count=0;
do_db_work();
}
// fail_at = FAIL_NEVER; write_count=0;
// do_db_work();
}
int
test_main (int argc, const char *argv[]) {
parse_args(argc, argv);
do_writes_that_fail();
return 0;
}
......@@ -46,7 +46,7 @@ def main():
exceptsymbols = {}
for n in [ "_init", "_fini", "_end", "_edata", "__bss_start" ]:
exceptsymbols[n] = 1
for n in [ "db_env_create", "db_create", "db_strerror", "db_version", "log_compare", "db_env_set_func_fsync", "dlfree", "dlmalloc", "dlrealloc", "db_env_set_func_free", "db_env_set_func_malloc", "db_env_set_func_realloc" ]:
for n in [ "db_env_create", "db_create", "db_strerror", "db_version", "log_compare", "db_env_set_func_fsync", "dlfree", "dlmalloc", "dlrealloc", "db_env_set_func_free", "db_env_set_func_malloc", "db_env_set_func_realloc", "db_env_set_func_pwrite", "db_env_set_func_write"]:
exceptsymbols[n] = 1
return checkglobals(libname, exceptsymbols, verbose)
......
......@@ -50,7 +50,8 @@ typedef void (*toku_env_errcall_t)(const DB_ENV *, const char *, const char *);
#endif
struct __toku_db_env_internal {
int is_panicked;
int is_panicked; // if nonzero, then its an error number
char *panic_string;
int ref_count;
u_int32_t open_flags;
int open_mode;
......
......@@ -562,13 +562,35 @@ static int toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mo
}
static int toku_env_close(DB_ENV * env, u_int32_t flags) {
// Even if the env is panicedk, try to close as much as we can.
int is_panicked = toku_env_is_panicked(env);
char *panic_string = env->i->panic_string;
env->i->panic_string = 0;
// Even if the env is panicked, try to close as much as we can.
int r0=0,r1=0;
if (env->i->cachetable)
if (env->i->cachetable) {
r0=toku_cachetable_close(&env->i->cachetable);
if (env->i->logger)
if (r0) {
toku_ydb_do_error(env, r0, "Cannot close environment (cachetable close error)\n");
}
}
if (env->i->logger) {
r1=toku_logger_close(&env->i->logger);
if (r0==0 && r1) {
toku_ydb_do_error(env, r0, "Cannot close environment (logger close error)\n");
}
}
// Even if nothing else went wrong, but we were panicked, then raise an error.
// But if something else went wrong then raise that error (above)
if (is_panicked) {
if (r0==0 && r1==0) {
toku_ydb_do_error(env, is_panicked, "Cannot close environment due to previous error: %s\n", panic_string);
}
if (panic_string) toku_free(panic_string);
} else {
assert(panic_string==0);
}
if (env->i->data_dirs) {
u_int32_t i;
assert(env->i->n_data_dirs > 0);
......@@ -589,8 +611,7 @@ static int toku_env_close(DB_ENV * env, u_int32_t flags) {
if (flags!=0) return EINVAL;
if (r0) return r0;
if (r1) return r1;
if (is_panicked) return EINVAL;
return 0;
return is_panicked;
}
static int toku_env_log_archive(DB_ENV * env, char **list[], u_int32_t flags) {
......@@ -699,8 +720,7 @@ static int toku_env_set_flags(DB_ENV * env, u_int32_t flags, int onoff) {
static int toku_env_set_lg_bsize(DB_ENV * env, u_int32_t bsize) {
HANDLE_PANICKED_ENV(env);
bsize=bsize;
return toku_ydb_do_error(env, EINVAL, "TokuDB does not (yet) support ENV->set_lg_bsize\n");
return toku_logger_set_lg_bsize(env->i->logger, bsize);
}
static int toku_env_set_lg_dir(DB_ENV * env, const char *dir) {
......@@ -929,6 +949,7 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) {
if (result->i == 0) { r = ENOMEM; goto cleanup; }
memset(result->i, 0, sizeof *result->i);
result->i->is_panicked=0;
result->i->panic_string = 0;
result->i->ref_count = 1;
result->i->errcall = 0;
result->i->errpfx = 0;
......@@ -1275,13 +1296,27 @@ static int toku_db_close(DB * db, u_int32_t flags) {
}
}
flags=flags;
int r = toku_close_brt(db->i->brt, db->dbenv->i->logger);
if (r != 0)
return r;
char *error_string = 0;
int r1 = toku_close_brt(db->i->brt, db->dbenv->i->logger, &error_string);
if (r1) {
db->dbenv->i->is_panicked = r1; // Panicking the whole environment may be overkill, but I'm not sure what else to do.
db->dbenv->i->panic_string = error_string;
if (error_string) {
toku_ydb_do_error(db->dbenv, r1, "%s\n", error_string);
} else {
toku_ydb_do_error(db->dbenv, r1, "Closing file\n");
}
error_string=0;
}
assert(error_string==0);
int r2 = 0;
if (db->i->db_id) { toku_db_id_remove_ref(&db->i->db_id); }
if (db->i->lt) {
r = toku_lt_remove_ref(db->i->lt);
if (r!=0) { return r; }
r2 = toku_lt_remove_ref(db->i->lt);
if (r2) {
db->dbenv->i->is_panicked = r2; // Panicking the whole environment may be overkill, but I'm not sure what else to do.
db->dbenv->i->panic_string = 0;
}
}
// printf("%s:%d %d=__toku_db_close(%p)\n", __FILE__, __LINE__, r, db);
// Even if panicked, let's close as much as we can.
......@@ -1293,8 +1328,10 @@ static int toku_db_close(DB * db, u_int32_t flags) {
toku_free(db->i);
toku_free(db);
ydb_unref();
if (r==0 && is_panicked) return EINVAL;
return r;
if (r1) return r1;
if (r2) return r2;
if (is_panicked) return EINVAL;
return 0;
}
/* Verify that an element from the secondary database is still consistent
......@@ -2851,12 +2888,15 @@ static int find_db_file(DB_ENV* dbenv, const char *fname, char** full_name_out)
}
static int toku_db_lt_panic(DB* db, int r) {
assert(r!=0);
assert(db && db->i && db->dbenv && db->dbenv->i);
DB_ENV* env = db->dbenv;
env->i->is_panicked = 1;
if (r < 0) toku_ydb_do_error(env, 0, toku_lt_strerror((TOKU_LT_ERROR)r));
else toku_ydb_do_error(env, r, "Error in locktree.\n");
return EINVAL;
env->i->is_panicked = r;
if (r < 0) env->i->panic_string = toku_strdup(toku_lt_strerror((TOKU_LT_ERROR)r));
else env->i->panic_string = toku_strdup("Error in locktree.\n");
return toku_ydb_do_error(env, r, env->i->panic_string);
}
static int toku_txn_add_lt(DB_TXN* txn, toku_lock_tree* lt) {
......@@ -3685,6 +3725,13 @@ int db_env_set_func_fsync (int (*fsync_function)(int)) {
return toku_set_func_fsync(fsync_function);
}
int db_env_set_func_pwrite (ssize_t (*pwrite_function)(int, const void *, size_t, off_t)) {
return toku_set_func_pwrite(pwrite_function);
}
int db_env_set_func_write (ssize_t (*write_function)(int, const void *, size_t)) {
return toku_set_func_write(write_function);
}
int db_env_set_func_malloc (void *(*f)(size_t)) {
return toku_set_func_malloc(f);
}
......
......@@ -99,11 +99,14 @@ void *realloc(void*, size_t) __attribute__((__deprecated__));
void *os_malloc(size_t);
void *os_realloc(void*,size_t);
void os_free(void*);
ssize_t toku_os_pwrite (int fd, const void *buf, size_t len, off_t off);
int toku_set_func_fsync (int (*fsync_function)(int));
int toku_set_func_malloc (void *(*)(size_t));
int toku_set_func_realloc (void *(*)(void*,size_t));
int toku_set_func_free (void (*)(void*));
int toku_set_func_pwrite (ssize_t (*pwrite_fun)(int, const void *, size_t, off_t));
int toku_set_func_write (ssize_t (*pwrite_fun)(int, const void *, size_t));
#if defined __cplusplus
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment