Commit fd6a6d14 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Merge in the tokudb.1017 patches to the main branch. Fixes #1017.

{{{
$ (cd tokudb;svn merge -r5048:5080 https://svn.tokutek.com/tokudb/tokudb.1017 )
$ svn delete tokudb.1017
}}}


git-svn-id: file:///svn/tokudb@5081 c7de825b-a66e-492c-adef-691d508d4ae1
parent bf807a8a
......@@ -65,6 +65,7 @@ BRT_SOURCES = \
log \
log_code \
memory \
memarena \
mempool \
omt \
recover \
......
......@@ -2588,8 +2588,8 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
int r;
if (txn && (brt->txn_that_created != toku_txn_get_txnid(txn))) {
toku_cachefile_refup(brt->cf);
BYTESTRING keybs = {key->size, toku_memdup(key->data, key->size)};
BYTESTRING databs = {val->size, toku_memdup(val->data, val->size)};
BYTESTRING keybs = {key->size, toku_memdup_in_rollback(txn, key->data, key->size)};
BYTESTRING databs = {val->size, toku_memdup_in_rollback(txn, val->data, val->size)};
r = toku_logger_save_rollback_cmdinsert(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs, databs);
if (r!=0) return r;
r = toku_txn_note_brt(txn, brt);
......@@ -2619,7 +2619,7 @@ int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
int r;
if (txn && (brt->txn_that_created != toku_txn_get_txnid(txn))) {
BYTESTRING keybs = {key->size, toku_memdup(key->data, key->size)};
BYTESTRING keybs = {key->size, toku_memdup_in_rollback(txn, key->data, key->size)};
toku_cachefile_refup(brt->cf);
r = toku_logger_save_rollback_cmddelete(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs);
if (r!=0) return r;
......
......@@ -4,6 +4,7 @@
#include "log.h"
#include "toku_assert.h"
#include "list.h"
#include "memarena.h"
#include <stdio.h>
#include <pthread.h>
#include <sys/types.h>
......@@ -88,6 +89,9 @@ struct tokutxn {
LSN first_lsn; /* The first lsn in the transaction. */
struct roll_entry *oldest_logentry,*newest_logentry; /* Only logentries with rollbacks are here. There is a list going from newest to oldest. */
struct list live_txns_link;
MEMARENA rollentry_arena;
size_t rollentry_resident_bytecount; // How many bytes for the rollentries that are stored in main memory.
char *rollentry_filename;
int rollentry_fd; // If we spill the roll_entries, we write them into this fd.
......@@ -148,4 +152,4 @@ static inline char *fixup_fname(BYTESTRING *f) {
return fname;
}
int toku_read_rollback_backwards(BREAD, struct roll_entry **item);
int toku_read_rollback_backwards(BREAD, struct roll_entry **item, MEMARENA);
......@@ -25,6 +25,20 @@
static char dev_null[] = "/dev/null";
void* toku_malloc_in_rollback(TOKUTXN txn, size_t size) {
return malloc_in_memarena(txn->rollentry_arena, size);
}
void *toku_memdup_in_rollback(TOKUTXN txn, const void *v, size_t len) {
void *r=toku_malloc_in_rollback(txn, len);
memcpy(r,v,len);
return r;
}
char *toku_strdup_in_rollback(TOKUTXN txn, const char *s) {
return toku_memdup_in_rollback(txn, s, strlen(s)+1);
}
int toku_logger_fsync_null(int fd __attribute__((__unused__))) {
return 0;
}
......@@ -366,15 +380,7 @@ int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbu
static void note_txn_closing (TOKUTXN txn);
static void cleanup_txn (TOKUTXN txn) {
struct roll_entry *item;
while ((item=txn->newest_logentry)) {
txn->newest_logentry = item->prev;
rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item);
toku_logger_rollback_malloc_size -= sizeof(*item);
toku_logger_rollback_malloc_count --;
}
memarena_close(&txn->rollentry_arena);
if (txn->rollentry_filename!=0) {
int r = close(txn->rollentry_fd);
assert(r==0);
......@@ -392,24 +398,13 @@ static void cleanup_txn (TOKUTXN txn) {
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item) {
int r=0;
rolltype_dispatch_assign(item, toku_commit_, r, txn);
if (r!=0) return r;
u_int32_t rollback_fsize = toku_logger_rollback_fsize(item);
toku_logger_rollback_malloc_size -= rollback_fsize;
toku_logger_rollback_malloc_count --;
rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item);
return 0;
return r;
}
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item) {
int r=0;
rolltype_dispatch_assign(item, toku_rollback_, r, txn);
if (r!=0) return r;
u_int32_t rollback_fsize = toku_logger_rollback_fsize(item);
toku_logger_rollback_malloc_size -= rollback_fsize;
toku_logger_rollback_malloc_count --;
rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item);
return 0;
}
......@@ -421,10 +416,8 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
// printf("%s:%d committing\n", __FILE__, __LINE__);
// panic handled in log_commit
int r = toku_log_commit(txn->logger, (LSN*)0, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks.
if (r!=0) {
cleanup_txn(txn);
return r;
} else if (txn->parent!=0) {
if (r==0) {
if (txn->parent!=0) {
// First we must put a rollinclude entry into the parent if we have a rollentry file.
if (txn->rollentry_filename) {
int len = strlen(txn->rollentry_filename);
......@@ -456,6 +449,8 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
txn->parent->oldest_logentry = txn->oldest_logentry;
}
txn->newest_logentry = txn->oldest_logentry = 0;
// Put all the memarena data into the parent.
memarena_move_buffers(txn->parent->rollentry_arena, txn->rollentry_arena);
// Note the open brts, the omts must be merged
r = toku_omt_iterate(txn->open_brts, note_brt_used_in_parent_txn, txn->parent);
......@@ -479,6 +474,7 @@ int toku_logger_commit (TOKUTXN txn, int nosync) {
r = toku_commit_fileentries(txn->rollentry_fd, txn->rollentry_filesize, txn);
}
}
}
cleanup_txn(txn);
return r;
}
......@@ -507,6 +503,9 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER
result->logger = logger;
result->parent = parent_tokutxn;
result->oldest_logentry = result->newest_logentry = 0;
result->rollentry_arena = memarena_create();
list_push(&logger->live_txns, &result->live_txns_link);
result->rollentry_resident_bytecount=0;
result->rollentry_filename = 0;
......@@ -519,7 +518,7 @@ int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER
int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
BYTESTRING bs = { .len=strlen(fname), .data = strdup(fname) };
BYTESTRING bs = { .len=strlen(fname), .data = toku_strdup_in_rollback(txn, fname) };
int r = toku_log_fcreate (txn->logger, (LSN*)0, 0, toku_txn_get_txnid(txn), bs, mode);
if (r!=0) return r;
r = toku_logger_save_rollback_fcreate(txn, toku_txn_get_txnid(txn), bs);
......@@ -960,8 +959,6 @@ int toku_maybe_spill_rollbacks (TOKUTXN txn) {
txn->oldest_logentry = item->next;
if (item->next) { item->next->prev=0; }
toku_logger_rollback_wbufwrite(&w, item);
rolltype_dispatch(item, toku_free_rolltype_);
toku_free(item);
}
assert(txn->rollentry_resident_bytecount==0);
assert((ssize_t)w.ndone==bufsize);
......@@ -981,20 +978,22 @@ int toku_maybe_spill_rollbacks (TOKUTXN txn) {
assert(r==(ssize_t)w.ndone);
txn->rollentry_filesize+=w.ndone;
toku_free(buf);
// Cleanup the rollback memory
memarena_clear(txn->rollentry_arena);
}
return 0;
}
int toku_read_rollback_backwards(BREAD br, struct roll_entry **item) {
int toku_read_rollback_backwards(BREAD br, struct roll_entry **item, MEMARENA ma) {
u_int32_t nbytes_n; ssize_t sr;
if ((sr=bread_backwards(br, &nbytes_n, 4))!=4) { assert(sr<0); return errno; }
u_int32_t n_bytes=ntohl(nbytes_n);
unsigned char *buf = toku_malloc(n_bytes);
unsigned char *buf = malloc_in_memarena(ma, n_bytes);
if (buf==0) return errno;
if ((sr=bread_backwards(br, buf, n_bytes-4))!=(ssize_t)n_bytes-4) { assert(sr<0); return errno; }
int r = toku_parse_rollback(buf, n_bytes, item);
int r = toku_parse_rollback(buf, n_bytes, item, ma);
if (r!=0) return r;
toku_free(buf);
return 0;
}
......
......@@ -174,4 +174,10 @@ int toku_txn_note_close_brt (BRT brt);
// if found then return 0 and set txnptr to the address of the TOKUTXN object
int toku_txn_find_by_xid (BRT brt, TXNID xid, TOKUTXN *txnptr);
// Allocate memory as part of a rollback structure. It will be freed when the transaction completes.
void* toku_malloc_in_rollback(TOKUTXN txn, size_t size);
void *toku_memdup_in_rollback(TOKUTXN txn, const void *v, size_t len);
char *toku_strdup_in_rollback(TOKUTXN txn, const char *s);
#endif
......@@ -338,16 +338,6 @@ void generate_dispatch (void) {
fprintf(hf, " }})\n");
}
void generate_log_free(void) {
DO_ROLLBACKS(lt, ({
fprintf2(cf, hf, "void toku_free_rolltype_%s(struct rolltype_%s *e)", lt->name, lt->name);
fprintf(hf, ";\n");
fprintf(cf, " {\n");
DO_FIELDS(ft, lt, fprintf(cf, " toku_free_%s(e->%s);\n", ft->type, ft->name));
fprintf(cf, "}\n");
}));
}
void generate_log_writer (void) {
DO_LOGTYPES(lt, ({
fprintf2(cf, hf, "int toku_log_%s (TOKULOGGER logger, LSN *lsnp, int do_fsync", lt->name);
......@@ -468,8 +458,6 @@ void generate_logprint (void) {
}
void generate_rollbacks (void) {
fprintf(cf, "u_int64_t toku_logger_rollback_malloc_size=0, toku_logger_rollback_malloc_count=0;\n");
fprintf(hf, "extern u_int64_t toku_logger_rollback_malloc_size, toku_logger_rollback_malloc_count;\n");
DO_ROLLBACKS(lt, ({
fprintf2(cf, hf, "int toku_logger_save_rollback_%s (TOKUTXN txn", lt->name);
DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
......@@ -481,8 +469,7 @@ void generate_rollbacks (void) {
DO_FIELDS(ft, lt, fprintf(cf, "%s%s", (count++>0)?", ":"", ft->name));
fprintf(cf, ");\n");
}
fprintf(cf, " struct roll_entry *v = toku_malloc(sizeof(*v));\n");
fprintf(cf, " toku_logger_rollback_malloc_count++; toku_logger_rollback_malloc_size+=rollback_fsize;\n");
fprintf(cf, " struct roll_entry *v = toku_malloc_in_rollback(txn, sizeof(*v));\n");
fprintf(cf, " if (v==0) return errno;\n");
fprintf(cf, " v->cmd = %d;\n", lt->command_and_flags&0xff);
DO_FIELDS(ft, lt, fprintf(cf, " v->u.%s.%s = %s;\n", lt->name, ft->name, ft->name));
......@@ -541,14 +528,14 @@ void generate_rollbacks (void) {
fprintf(cf, " }\n assert(0);\n return 0;\n");
fprintf(cf, "}\n");
fprintf2(cf, hf, "int toku_parse_rollback(unsigned char *buf, u_int32_t n_bytes, struct roll_entry **itemp)");
fprintf2(cf, hf, "int toku_parse_rollback(unsigned char *buf, u_int32_t n_bytes, struct roll_entry **itemp, MEMARENA ma)");
fprintf(hf, ";\n");
fprintf(cf, " {\n assert(n_bytes>0);\n struct roll_entry *item = toku_malloc(sizeof(*item));\n item->cmd=buf[0];\n");
fprintf(cf, " {\n assert(n_bytes>0);\n struct roll_entry *item = malloc_in_memarena(ma, sizeof(*item));\n item->cmd=buf[0];\n");
fprintf(cf, " struct rbuf rc = {buf, n_bytes, 1};\n");
fprintf(cf, " switch(item->cmd) {\n");
DO_ROLLBACKS(lt, ({
fprintf(cf, " case RT_%s:\n", lt->name);
DO_FIELDS(ft, lt, fprintf(cf, " rbuf_%s(&rc, &item->u.%s.%s);\n", ft->type, lt->name, ft->name));
DO_FIELDS(ft, lt, fprintf(cf, " rbuf_ma_%s(&rc, ma, &item->u.%s.%s);\n", ft->type, lt->name, ft->name));
fprintf(cf, " *itemp = item;\n");
fprintf(cf, " return 0;\n");
}));
......@@ -567,6 +554,7 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__u
fprintf2(cf, hf, "#ident \"Copyright (c) 2007, 2008 Tokutek Inc. All rights reserved.\"\n");
fprintf(cf, "#include <stdio.h>\n");
fprintf(hf, "#include \"brt-internal.h\"\n");
fprintf(hf, "#include \"memarena.h\"\n");
fprintf(cf, "#include \"log_header.h\"\n");
fprintf(cf, "#include \"wbuf.h\"\n");
fprintf(cf, "#include \"log-internal.h\"\n");
......@@ -574,7 +562,6 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__u
generate_log_struct();
generate_dispatch();
generate_log_writer();
generate_log_free();
generate_log_reader();
generate_logprint();
generate_rollbacks();
......
#include <string.h>
#include <stdio.h>
#include "memarena.h"
#include "memory.h"
#include "toku_assert.h"
struct memarena {
char *buf;
size_t buf_used, buf_size;
char **other_bufs;
int n_other_bufs;
};
MEMARENA memarena_create (void) {
MEMARENA MALLOC(result); assert(result);
result->buf_size = 1024;
result->buf_used = 0;
result->other_bufs = NULL;
result->n_other_bufs = 0;
result->buf = toku_malloc(result->buf_size); assert(result->buf);
return result;
}
void memarena_clear (MEMARENA ma) {
// Free the other bufs.
int i;
for (i=0; i<ma->n_other_bufs; i++) {
toku_free(ma->other_bufs[i]);
ma->other_bufs[i]=0;
}
ma->n_other_bufs=0;
// But reuse the main buffer
ma->buf_used = 0;
}
size_t round_to_page (size_t size) {
const size_t PAGE_SIZE = 4096;
const size_t result = PAGE_SIZE+((size-1)&~(PAGE_SIZE-1));
assert(0==(result&(PAGE_SIZE-1))); // make sure it's aligned
assert(result>=size); // make sure it's not too small
assert(size<result+PAGE_SIZE); // make sure we didn't grow by more than a page.
return result;
}
void* malloc_in_memarena (MEMARENA ma, size_t size) {
if (ma->buf_size < ma->buf_used + size) {
// The existing block isn't big enough.
// Add the block to the vector of blocks.
if (ma->buf) {
int old_n = ma->n_other_bufs;
REALLOC_N(old_n+1, ma->other_bufs);
assert(ma->other_bufs);
ma->other_bufs[old_n]=ma->buf;
ma->n_other_bufs = old_n+1;
}
// Make a new one
{
size_t new_size = 2*ma->buf_size;
if (new_size<size) new_size=size;
new_size=round_to_page(new_size); // at least size, but round to the next page size
ma->buf = toku_malloc(new_size);
assert(ma->buf);
ma->buf_used = 0;
ma->buf_size = new_size;
}
}
// allocate in the existing block.
char *result=ma->buf+ma->buf_used;
ma->buf_used+=size;
return result;
}
void *memarena_memdup (MEMARENA ma, const void *v, size_t len) {
void *r=malloc_in_memarena(ma, len);
memcpy(r,v,len);
return r;
}
void memarena_close(MEMARENA *map) {
MEMARENA ma=*map;
if (ma->buf) {
toku_free(ma->buf);
ma->buf=0;
}
int i;
for (i=0; i<ma->n_other_bufs; i++) {
toku_free(ma->other_bufs[i]);
}
if (ma->other_bufs) toku_free(ma->other_bufs);
ma->other_bufs=0;
ma->n_other_bufs=0;
toku_free(ma);
*map = 0;
}
void memarena_move_buffers(MEMARENA dest, MEMARENA source) {
REALLOC_N(dest->n_other_bufs + source->n_other_bufs + 1, dest->other_bufs);
int i;
for (i=0; i<source->n_other_bufs; i++) {
dest->other_bufs[dest->n_other_bufs++] = source->other_bufs[i];
}
dest->other_bufs[dest->n_other_bufs++] = source->buf;
source->n_other_bufs = 0;
toku_free(source->other_bufs);
source->other_bufs = 0;
source->buf = 0;
source->buf_size = 0;
source->buf_used = 0;
}
#ifndef MEMARENA_H
#define MEMARENA_H
/* We have too many memory management tricks:
* mempool for a collection of objects that are all allocated together.
* It's pretty rigid about what happens when you run out of memory.
* There's a callback to compress data.
* memarena (this code) is for a collection of objects that cannot be moved.
* The pattern is allocate more and more stuff.
* Don't free items as you go.
* Free all the items at once.
* Then reuse the same buffer again.
* Allocated objects never move.
* A memarena (as currently implemented) is not suitable for interprocess memory sharing. No reason it couldn't be made to work though.
*/
#include <sys/types.h>
typedef struct memarena *MEMARENA;
MEMARENA memarena_create (void);
// Effect: Create a memarena. In case of ENOMEM, aborts.
void memarena_clear (MEMARENA ma);
// Effect: Reset the internal state so that the allocated memory can be used again.
void* malloc_in_memarena (MEMARENA ma, size_t size);
// Effect: Allocate some memory. The returned value remains valid until the memarena is cleared or closed.
// In case of ENOMEM, aborts.
void *memarena_memdup (MEMARENA ma, const void *v, size_t len);
void memarena_close(MEMARENA *ma);
void memarena_move_buffers(MEMARENA dest, MEMARENA source);
// Effect: Move all the memory from SOURCE into DEST. When SOURCE is closed the memory won't be freed. When DEST is closed, the memory will be freed. (Unless DEST moves its memory to another memarena...)
#endif
......@@ -211,6 +211,7 @@ void *toku_memdup (const void *v, size_t len) {
memcpy(r,v,len);
return r;
}
char *toku_strdup (const char *s) {
return toku_memdup(s, strlen(s)+1);
}
......
......@@ -5,6 +5,7 @@
#include "toku_assert.h"
#include "memory.h"
#include "memarena.h"
#include <arpa/inet.h>
struct rbuf {
......@@ -64,10 +65,16 @@ static inline DISKOFF rbuf_diskoff (struct rbuf *r) {
static inline void rbuf_TXNID (struct rbuf *r, TXNID *txnid) {
*txnid = rbuf_ulonglong(r);
}
static inline void rbuf_ma_TXNID (struct rbuf *r, MEMARENA ma __attribute__((__unused__)), TXNID *txnid) {
return rbuf_TXNID(r, txnid);
}
static inline void rbuf_FILENUM (struct rbuf *r, FILENUM *filenum) {
filenum->fileid = rbuf_int(r);
}
static inline void rbuf_ma_FILENUM (struct rbuf *r, MEMARENA ma __attribute__((__unused__)), FILENUM *filenum) {
return rbuf_FILENUM(r, filenum);
}
// Don't try to use the same space, malloc it
static inline void rbuf_BYTESTRING (struct rbuf *r, BYTESTRING *bs) {
......@@ -79,4 +86,13 @@ static inline void rbuf_BYTESTRING (struct rbuf *r, BYTESTRING *bs) {
r->ndone = newndone;
}
static inline void rbuf_ma_BYTESTRING (struct rbuf *r, MEMARENA ma, BYTESTRING *bs) {
bs->len = rbuf_int(r);
u_int32_t newndone = r->ndone + bs->len;
assert(newndone < r->size);
bs->data = memarena_memdup(ma, &r->buf[r->ndone], (size_t)bs->len);
assert(bs->data);
r->ndone = newndone;
}
#endif
......@@ -127,15 +127,18 @@ int toku_rollback_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN
int toku_commit_fileentries (int fd, off_t filesize, TOKUTXN txn) {
BREAD f = create_bread_from_fd_initialize_at(fd, filesize, 1<<20);
int r=0;
MEMARENA ma = memarena_create();
while (bread_has_more(f)) {
struct roll_entry *item;
r = toku_read_rollback_backwards(f, &item);
r = toku_read_rollback_backwards(f, &item, ma);
if (r!=0) goto finish;
r = toku_commit_rollback_item(txn, item);
if (r!=0) goto finish;
memarena_clear(ma);
}
finish:
{ int r2 = close_bread_without_closing_fd(f); assert(r2==0); }
memarena_close(&ma);
return r;
}
......@@ -143,15 +146,18 @@ int toku_rollback_fileentries (int fd, off_t filesize, TOKUTXN txn) {
BREAD f = create_bread_from_fd_initialize_at(fd, filesize, 1<<20);
assert(f);
int r=0;
MEMARENA ma = memarena_create();
while (bread_has_more(f)) {
struct roll_entry *item;
r = toku_read_rollback_backwards(f, &item);
r = toku_read_rollback_backwards(f, &item, ma);
if (r!=0) goto finish;
r = toku_abort_rollback_item(txn, item);
if (r!=0) goto finish;
memarena_clear(ma);
}
finish:
{ int r2 = close_bread_without_closing_fd(f); assert(r2==0); }
memarena_close(&ma);
return r;
}
......
......@@ -83,7 +83,7 @@ void test_nested (void) {
lookup(3, txn2, DB_NOTFOUND);
insert(3, txn2);
lookup(3, txn2, 0);
r=txn->commit(txn2, 0); CKERR(r);
r=txn2->commit(txn2, 0); CKERR(r);
lookup(0, txn, DB_NOTFOUND);
lookup(1, txn, DB_NOTFOUND);
lookup(2, txn, 0);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment