Commit d3dfa68e authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Do subtransactions properly (without an fsync)

git-svn-id: file:///svn/tokudb@499 c7de825b-a66e-492c-adef-691d508d4ae1
parent 7bb64e10
......@@ -13,7 +13,7 @@ extern "C" {
#ifndef _TOKUDB_WRAP_H
#define DB_VERSION_STRING "Tokutek: TokuDB 4.1.24"
#else
#define DB_VERSION_STRING "Tokutek: TokuDB (wrapped bdb)"
#define DB_VERSION_STRING_ydb "Tokutek: TokuDB (wrapped bdb)"
#endif
typedef struct __toku_db_btree_stat DB_BTREE_STAT;
typedef struct __toku_db_env DB_ENV;
......@@ -28,6 +28,7 @@ typedef struct __toku_dbt DBT;
typedef enum {
DB_BTREE=1
} DBTYPE;
#ifndef _TOKUDB_WRAP_H
#define DB_VERB_DEADLOCK 2
#define DB_VERB_RECOVERY 4
#define DB_VERB_REPLICATION 8
......@@ -67,6 +68,7 @@ typedef enum {
#define DB_SET 30
#define DB_SET_RANGE 32
#define DB_RMW 1073741824
#endif
/* in wrap mode, top-level function txn_begin is renamed, but the field isn't renamed, so we have to hack it here.*/
#ifdef _TOKUDB_WRAP_H
#undef txn_begin
......
......@@ -24,6 +24,7 @@ void print_db_notices (void) {
#define dodefine(name) printf("#define %s %d\n", #name, name)
void print_defines (void) {
printf("#ifndef _TOKUDB_WRAP_H\n");
dodefine(DB_VERB_DEADLOCK);
dodefine(DB_VERB_RECOVERY);
dodefine(DB_VERB_REPLICATION);
......@@ -73,6 +74,7 @@ void print_defines (void) {
dodefine(DB_SET);
dodefine(DB_SET_RANGE);
dodefine(DB_RMW);
printf("#endif\n");
}
//#define DECL_LIMIT 100
......@@ -183,7 +185,7 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
printf("#ifndef _TOKUDB_WRAP_H\n");
printf("#define DB_VERSION_STRING \"Tokutek: TokuDB %d.%d.%d\"\n", DB_VERSION_MAJOR, DB_VERSION_MINOR, DB_VERSION_PATCH);
printf("#else\n");
printf("#define DB_VERSION_STRING \"Tokutek: TokuDB (wrapped bdb)\"\n");
printf("#define DB_VERSION_STRING_ydb \"Tokutek: TokuDB (wrapped bdb)\"\n");
printf("#endif\n");
if (0) {
......
......@@ -12,9 +12,8 @@ extern "C" {
#define DB_VERSION_PATCH 24
#ifndef _TOKUDB_WRAP_H
#define DB_VERSION_STRING "Tokutek: TokuDB 4.1.24"
#else
#define DB_VERSION_STRING "Tokutek: TokuDB (wrapped bdb)"
#endif
#ifndef _TOKUDB_WRAP_H
typedef struct __toku_db_btree_stat DB_BTREE_STAT;
typedef struct __toku_db_env DB_ENV;
typedef struct __toku_db_key_range DB_KEY_RANGE;
......@@ -67,6 +66,7 @@ typedef enum {
#define DB_SET 30
#define DB_SET_RANGE 32
#define DB_RMW 1073741824
#endif
/* in wrap mode, top-level function txn_begin is renamed, but the field isn't renamed, so we have to hack it here.*/
#ifdef _TOKUDB_WRAP_H
#undef txn_begin
......
......@@ -14,6 +14,7 @@ endif
CFLAGS = -Wall -W $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS) -Werror $(FPICFLAGS)
LDFLAGS = $(OPTFLAGS) -g $(GCOV_FLAGS) $(PROF_FLAGS)
CPPFLAGS += -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE
ifdef BRT_FANOUT
CPPFLAGS += -DBRT_FANOUT=$(BRT_FANOUT)
......@@ -66,12 +67,13 @@ check-fanout:
key.o: brttypes.h key.h
pma-test.o: pma-internal.h pma.h yerror.h memory.h ../include/db.h list.h kv-pair.h brttypes.h ybt.h yerror.h
pma-test: pma.o memory.o key.o ybt.o log.o mempool.o
pma.o: pma.h yerror.h pma-internal.h memory.h key.h ybt.h brttypes.h ../include/db.h
pma.o: pma.h yerror.h pma-internal.h memory.h key.h ybt.h brttypes.h log.h ../include/db.h
ybt.o: ybt.h brttypes.h ../include/db.h
ybt-test: ybt-test.o ybt.o memory.o
ybt-test.o: ybt.h ../include/db.h
cachetable.o: cachetable.h hashfun.h
brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o header-io.o ybt.o key.o primes.o log.o mempool.o
log.o: log-internal.h log.h
brt-test.o brt.o: brt.h ../include/db.h hashtable.h pma.h brttypes.h cachetable.h
brt-serialize-test.o: pma.h yerror.h brt.h ../include/db.h memory.h hashtable.h brttypes.h brt-internal.h
brt.o: brt.h ../include/db.h mdict.h pma.h brttypes.h memory.h brt-internal.h cachetable.h hashtable.h
......
......@@ -4,7 +4,7 @@
#include <sys/types.h>
#define LOGGER_BUF_SIZE (1<<20)
#define LOGGER_BUF_SIZE (1<<24)
struct tokulogger {
enum typ_tag tag;
char *directory;
......@@ -22,4 +22,5 @@ enum { LT_INSERT_WITH_NO_OVERWRITE = 'I', LT_DELETE = 'D', LT_COMMIT = 'C' };
struct tokutxn {
u_int64_t txnid64;
TOKULOGGER logger;
TOKUTXN parent;
};
......@@ -50,12 +50,12 @@ int tokulogger_create_and_open_logger (const char *directory, TOKULOGGER *result
int tokulogger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) {
int r;
//printf("%s:%d logging %d bytes\n", __FILE__, __LINE__, nbytes);
//fprintf(stderr, "%s:%d logging %d bytes\n", __FILE__, __LINE__, nbytes);
if (logger->fd==-1) {
int fnamelen = strlen(logger->directory)+50;
char fname[fnamelen];
snprintf(fname, fnamelen, "%s/log%012llu.tokulog", logger->directory, logger->next_log_file_number);
printf("%s:%d creat(%s, ...)\n", __FILE__, __LINE__, fname);
fprintf(stderr, "%s:%d creat(%s, ...)\n", __FILE__, __LINE__, fname);
logger->fd = creat(fname, O_EXCL | 0700);
if (logger->fd==-1) return errno;
logger->next_log_file_number++;
......@@ -66,11 +66,13 @@ int tokulogger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) {
v[0].iov_len = logger->n_in_buf;
v[1].iov_base = bytes;
v[1].iov_len = nbytes;
//fprintf(stderr, "%s:%d flushing log due to buffer overflow\n", __FILE__, __LINE__);
r=writev(logger->fd, v, 2);
if (r!=logger->n_in_buf + nbytes) return errno;
logger->n_in_file += logger->n_in_buf+nbytes;
logger->n_in_buf=0;
if (logger->n_in_file > 100<<20) {
fprintf(stderr, "%s:%d closing logfile\n", __FILE__, __LINE__);
r = close(logger->fd);
if (r!=0) return errno;
logger->fd=-1;
......@@ -107,7 +109,7 @@ int tokulogger_log_close(TOKULOGGER *loggerp) {
TOKULOGGER logger = *loggerp;
int r = 0;
if (logger->fd!=-1) {
printf("%s:%d n_in_buf=%d\n", __FILE__, __LINE__, logger->n_in_buf);
printf("%s:%d closing log: n_in_buf=%d\n", __FILE__, __LINE__, logger->n_in_buf);
if (logger->n_in_buf>0) {
r = write(logger->fd, logger->buf, logger->n_in_buf);
if (r==-1) return errno;
......@@ -156,6 +158,7 @@ int tokulogger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, diskoff disk
int tokulogger_fsync (TOKULOGGER logger) {
//return 0;/// NO TXN
//fprintf(stderr, "%s:%d syncing log\n", __FILE__, __LINE__);
if (logger->n_in_buf>0) {
int r = write(logger->fd, logger->buf, logger->n_in_buf);
if (r==-1) return errno;
......@@ -177,14 +180,16 @@ int tokulogger_log_commit (TOKUTXN txn) {
wbuf_txnid(&wbuf, txn->txnid64);
int r = tokulogger_log_bytes(txn->logger, wbuf.ndone, wbuf.buf);
if (r!=0) return r;
return tokulogger_fsync(txn->logger);
if (txn->parent) return 0;
else return tokulogger_fsync(txn->logger);
}
int tokutxn_begin (TOKUTXN *tokutxn, TXNID txnid64, TOKULOGGER logger) {
int tokutxn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid64, TOKULOGGER logger) {
TAGMALLOC(TOKUTXN, result);
if (result==0) return errno;
result->txnid64 = txnid64;
result->logger = logger;
result->parent = parent_tokutxn;
*tokutxn = result;
return 0;
}
......
......@@ -13,6 +13,6 @@ int tokulogger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, diskoff d
int tokulogger_log_commit (TOKUTXN txn);
int tokutxn_begin (TOKUTXN *, TXNID txnid64, TOKULOGGER logger);
int tokutxn_begin (TOKUTXN /*parent*/,TOKUTXN *, TXNID txnid64, TOKULOGGER logger);
#endif
......@@ -45,3 +45,6 @@ ydb.lo: bdbw.h
bdbw.lo: CPPFLAGS=-I/home/bradley/mysql/build-bdb-with-uniquename/bdb/build_unix
%.lo: %.c
cc $(CPPFLAGS) $< -c -fPIC -o $@ $(CFLAGS)
bdbw.lo: bdbw.h ydb-uniq.h
/* Wrapper for bdb.c. */
#include <sys/types.h>
/* This includes the ydb db.h, but with unique names for everything. */
#include "ydb-uniq.h"
/* This include is to the berkeley-db compiled with --with-uniquename */
#include <db.h>
/* This include is to the interface between ydb and bdb. */
#include "bdbw.h"
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
......@@ -16,6 +13,8 @@
#include <sys/time.h>
#include <time.h>
#undef db_env_create
#define barf() ({ fprintf(stderr, "YDB: BARF %s:%d in %s\n", __FILE__, __LINE__, __func__); })
#define barff(fmt,...) ({ fprintf(stderr, "YDB: BARF %s:%d in %s, ", __FILE__, __LINE__, __func__); fprintf(stderr, fmt, __VA_ARGS__); })
#define note() ({ fprintf(stderr, "YDB: Note %s:%d in %s\n", __FILE__, __LINE__, __func__); })
......@@ -45,19 +44,70 @@ void tracef (const char *fmt, ...) {
va_end(ap);
}
struct db_env_ydb_internal {
struct wrap_db_env_internal {
unsigned long long objnum;
DB_ENV *env;
void (*noticecall)(DB_ENV_ydb*, db_notices_ydb);
//void (*noticecall)(DB_ENV_ydb*, db_notices_ydb);
char *home;
};
struct yobi_db_txn_internal {
static void wrap_env_err (const DB_ENV *, int error, const char *fmt, ...);
static int wrap_env_open (DB_ENV *, const char *home, u_int32_t flags, int mode);
static int wrap_env_close (DB_ENV *, u_int32_t flags);
static int wrap_env_txn_checkpoint (DB_ENV *, u_int32_t kbyte, u_int32_t min, u_int32_t flags);
static int wrap_db_env_log_flush (DB_ENV*, const DB_LSN*lsn);
int db_env_create (DB_ENV **envp, u_int32_t flags) {
DB_ENV *result = malloc(sizeof(*result));
struct wrap_db_env_internal *i = malloc(sizeof(*i));
int r;
//note();
// use a private field for this
result->reginfo = i;
i->objnum = objnum++;
//i->noticecall = 0;
i->home = 0;
result->err = wrap_env_err;
result->open = wrap_env_open;
result->close = wrap_env_close;
result->txn_checkpoint = wrap_env_txn_checkpoint;
result->log_flush = wrap_env_log_flush;
result->set_errcall = ydb_env_set_errcall;
result->set_errpfx = ydb_env_set_errpfx;
result->set_noticecall = ydb_env_set_noticecall;
result->set_flags = ydb_env_set_flags;
result->set_data_dir = ydb_env_set_data_dir;
result->set_tmp_dir = ydb_env_set_tmp_dir;
result->set_verbose = ydb_env_set_verbose;
result->set_lg_bsize = ydb_env_set_lg_bsize;
result->set_lg_dir = ydb_env_set_lg_dir;
result->set_lg_max = ydb_env_set_lg_max;
result->set_cachesize = ydb_env_set_cachesize;
result->set_lk_detect = ydb_env_set_lk_detect;
result->set_lk_max = ydb_env_set_lk_max;
result->log_archive = ydb_env_log_archive;
result->txn_stat = ydb_env_txn_stat;
result->txn_begin = txn_begin_bdbw;
r = db_env_create_4001(&result->i->env, flags);
result->i->env->app_private = result;
*envp = result;
tracef("r=db_env_create(new_envobj(%lld), %u); assert(r==%d);\n",
result->i->objnum, flags, r);
return r;
}
struct __toku_db_txn_internal {
long long objnum;
DB_TXN *txn;
};
static void ydb_env_err (const DB_ENV_ydb *env, int error, const char *fmt, ...) {
static void wrap_env_err (const DB_ENV *env, int error, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
fprintf(stderr, "YDB Error %d:", error);
......@@ -79,7 +129,9 @@ void doits_internal (u_int32_t flag_ydb, u_int32_t flag_bdb, char *flagname, u_i
}
}
#define doits(flag) doits_internal(flag ## _ydb, flag, #flag, &flags, &gotit, &flagstring, &flagstringlen)
//#define doits(flag) doits_internal(flag ## _ydb, flag, #flag, &flags, &gotit, &flagstring, &flagstringlen)
#define doits(flag) doits_internal(flag, flag, #flag, &flags, &gotit, &flagstring, &flagstringlen)
static u_int32_t convert_envopen_flags(u_int32_t flags, char *flagstring, int flagstringlen) {
u_int32_t gotit=0;
......@@ -121,7 +173,8 @@ u_int32_t convert_db_set_flags (u_int32_t flags, char *flagstring, int flagstrin
return gotit;
}
#define retit(flag) ({ if (flag ## _ydb == flags) { strncpy(flagstring, #flag ,flagstringlen); return flag; } })
//#define retit(flag) ({ if (flag ## _ydb == flags) { strncpy(flagstring, #flag ,flagstringlen); return flag; } })
#define retit(flag) ({ if (flag == flags) { strncpy(flagstring, #flag ,flagstringlen); return flag; } })
u_int32_t convert_c_get_flags(u_int32_t flags, char *flagstring, int flagstringlen) {
retit(DB_FIRST);
......@@ -130,7 +183,8 @@ u_int32_t convert_c_get_flags(u_int32_t flags, char *flagstring, int flagstringl
abort();
}
int ydb_env_open (DB_ENV_ydb *env, const char *home, u_int32_t flags, int mode) {
int wrap_env_open (DB_ENV *env_w, const char *home, u_int32_t flags, int mode) {
struct __toku_db_env *env = bdb2toku_env(env_w);
int r;
char flagstring[1000];
u_int32_t bdb_flags = convert_envopen_flags(flags, flagstring, sizeof(flagstring));
......@@ -142,7 +196,8 @@ int ydb_env_open (DB_ENV_ydb *env, const char *home, u_int32_t flags, int mode)
return r;
}
int bdbw_env_close (DB_ENV_ydb * env, u_int32_t flags) {
int bdbw_env_close (DB_ENV * env_w, u_int32_t flags) {
struct __toku_db_env *env = bdb2toku_env(env_w);
int r;
notef("flags=%d\n", flags);
assert(flags==0);
......@@ -158,7 +213,8 @@ u_int32_t convert_log_archive_flags (u_int32_t flags, char *flagstring, int flag
abort();
}
int ydb_env_log_archive (DB_ENV_ydb *env, char **list[], u_int32_t flags) {
int ydb_env_log_archive (DB_ENV *env_w, char **list[], u_int32_t flags) {
struct __toku_db_env *env = bdb2toku_env(env_w);
int r;
char flagstring[1000];
int bdbflags = convert_log_archive_flags(flags, flagstring, sizeof(flagstring));
......@@ -168,42 +224,53 @@ int ydb_env_log_archive (DB_ENV_ydb *env, char **list[], u_int32_t flags) {
env->i->objnum, env->i->objnum, flagstring, r);
return r;
}
int ydb_env_log_flush (DB_ENV_ydb * env, const DB_LSN_ydb * lsn) {
int ydb_env_log_flush (DB_ENV *env, const DB_LSN *lsn) {
barf();
return 1;
}
int ydb_env_set_cachesize (DB_ENV_ydb * env, u_int32_t gbytes, u_int32_t bytes, int ncache) {
int ydb_env_set_cachesize (DB_ENV * env_w, u_int32_t gbytes, u_int32_t bytes, int ncache) {
struct __toku_db_env *env = bdb2toku_env(env_w);
return env->i->env->set_cachesize(env->i->env, gbytes, bytes, ncache);
}
int ydb_env_set_data_dir (DB_ENV_ydb * env, const char *dir) {
int ydb_env_set_data_dir (DB_ENV* env_w, const char *dir) {
struct __toku_db_env *env = bdb2toku_env(env_w);
return env->i->env->set_data_dir(env->i->env, dir);
}
void ydb_env_set_errcall (DB_ENV_ydb *env, void (*errcall)(const char *, char *)) {
void ydb_env_set_errcall (DB_ENV *env_w, void (*errcall)(const char *, char *)) {
struct __toku_db_env *env = bdb2toku_env(env_w);
env->i->env->set_errcall(env->i->env, errcall);
}
void ydb_env_set_errpfx (DB_ENV_ydb * env, const char *errpfx) {
void ydb_env_set_errpfx (DB_ENV * env_w, const char *errpfx) {
struct __toku_db_env *env = bdb2toku_env(env_w);
env->i->env->set_errpfx(env->i->env, errpfx);
}
int ydb_env_set_flags (DB_ENV_ydb *env, u_int32_t flags, int onoff) {
int ydb_env_set_flags (DB_ENV *env_w, u_int32_t flags, int onoff) {
struct __toku_db_env *env = bdb2toku_env(env_w);
assert(flags==0);
return env->i->env->set_flags(env->i->env, flags, onoff);
}
int ydb_env_set_lg_bsize (DB_ENV_ydb * env, u_int32_t bsize) {
int ydb_env_set_lg_bsize (DB_ENV * env_w, u_int32_t bsize) {
struct __toku_db_env *env = bdb2toku_env(env_w);
return env->i->env->set_lg_bsize(env->i->env, bsize);
}
int ydb_env_set_lg_dir (DB_ENV_ydb *env, const char * dir) {
barf();
return 1;
int ydb_env_set_lg_dir (DB_ENV *env_w, const char * dir) {
struct __toku_db_env *env = bdb2toku_env(env_w);
barf();
return 1;
}
int ydb_env_set_lg_max (DB_ENV_ydb *env, u_int32_t lg_max) {
int ydb_env_set_lg_max (DB_ENV *env_w, u_int32_t lg_max) {
struct __toku_db_env *env = bdb2toku_env(env_w);
return env->i->env->set_lg_max(env->i->env, lg_max);
}
int ydb_env_set_lk_detect (DB_ENV_ydb *env, u_int32_t detect) {
int ydb_env_set_lk_detect (DB_ENV *env_w, u_int32_t detect) {
struct __toku_db_env *env = bdb2toku_env(env_w);
return env->i->env->set_lk_detect(env->i->env, detect);
}
int ydb_env_set_lk_max (DB_ENV_ydb *env, u_int32_t lk_max) {
int ydb_env_set_lk_max (DB_ENV *env_w, u_int32_t lk_max) {
struct __toku_db_env *env = bdb2toku_env(env_w);
return env->i->env->set_lk_max(env->i->env, lk_max);
}
#if 0
void ydbenv_bdb_noticecall (DB_ENV *bdb_env, db_notices notices) {
DB_ENV_ydb *ydb_env = bdb_env->app_private;
tracef("/* Doing noticecall */\n");
......@@ -227,17 +294,21 @@ void ydb_env_set_noticecall (DB_ENV_ydb *env, void (*noticecall)(DB_ENV_ydb *, d
env->i->objnum, env->i->objnum, fun_name);
}
}
int ydb_env_set_tmp_dir (DB_ENV_ydb * env, const char *tmp_dir) {
#endif
int ydb_env_set_tmp_dir (DB_ENV * env_w, const char *tmp_dir) {
struct __toku_db_env *env = bdb2toku_env(env_w);
int r = env->i->env->set_tmp_dir(env->i->env, tmp_dir);
tracef("r = envobj(%lld)->set_tmp_dir(envobj(%lld), \"%s\"); assert(r==%d);\n",
env->i->objnum, env->i->objnum, tmp_dir, r);
return r;
}
int ydb_env_set_verbose (DB_ENV_ydb *env, u_int32_t which, int onoff) {
barf();
return 1;
int ydb_env_set_verbose (DB_ENV *env_w, u_int32_t which, int onoff) {
struct __toku_db_env *env = bdb2toku_env(env_w);
barf();
return 1;
}
int ydb_env_txn_checkpoint (DB_ENV_ydb *env, u_int32_t kbyte, u_int32_t min, u_int32_t flags) {
int ydb_env_txn_checkpoint (DB_ENV *env_w, u_int32_t kbyte, u_int32_t min, u_int32_t flags) {
struct __toku_db_env *env = bdb2toku_env(env_w);
int r;
assert(flags==0);
r=env->i->env->txn_checkpoint(env->i->env, kbyte, min, 0);
......@@ -247,51 +318,12 @@ int ydb_env_txn_checkpoint (DB_ENV_ydb *env, u_int32_t kbyte, u_int32_t min, u_
return r;
}
int ydb_env_txn_stat (DB_ENV_ydb *env, DB_TXN_STAT_ydb **statp, u_int32_t flags) {
barf();
return 1;
int ydb_env_txn_stat (DB_ENV *env_w, DB_TXN_STAT **statp, u_int32_t flags) {
struct __toku_db_env *env = bdb2toku_env(env_w);
barf();
return 1;
}
int db_env_create_bdbw (struct yobi_db_env **envp, u_int32_t flags) {
struct yobi_db_env *result = malloc(sizeof(*result));
int r;
//note();
result->i = malloc(sizeof(*result->i));
result->i->objnum = objnum++;
result->i->noticecall = 0;
result->i->home = 0;
result->err = ydb_env_err;
result->open = ydb_env_open;
result->close = bdbw_env_close;
result->txn_checkpoint = ydb_env_txn_checkpoint;
result->log_flush = ydb_env_log_flush;
result->set_errcall = ydb_env_set_errcall;
result->set_errpfx = ydb_env_set_errpfx;
result->set_noticecall = ydb_env_set_noticecall;
result->set_flags = ydb_env_set_flags;
result->set_data_dir = ydb_env_set_data_dir;
result->set_tmp_dir = ydb_env_set_tmp_dir;
result->set_verbose = ydb_env_set_verbose;
result->set_lg_bsize = ydb_env_set_lg_bsize;
result->set_lg_dir = ydb_env_set_lg_dir;
result->set_lg_max = ydb_env_set_lg_max;
result->set_cachesize = ydb_env_set_cachesize;
result->set_lk_detect = ydb_env_set_lk_detect;
result->set_lk_max = ydb_env_set_lk_max;
result->log_archive = ydb_env_log_archive;
result->txn_stat = ydb_env_txn_stat;
result->txn_begin = txn_begin_bdbw;
r = db_env_create_4001(&result->i->env, flags);
result->i->env->app_private = result;
*envp = result;
tracef("r=db_env_create(new_envobj(%lld), %u); assert(r==%d);\n",
result->i->objnum, flags, r);
return r;
}
int yobi_db_txn_commit (DB_TXN_ydb *txn, u_int32_t flags) {
int r;
......
......@@ -8,11 +8,11 @@ extern "C" {
#endif
#endif
int db_env_create_bdbw (struct yobi_db_env **, u_int32_t);
int txn_abort_bdbw (struct yobi_db_txn *);
int txn_begin_bdbw (struct yobi_db_env *env, struct yobi_db_txn *stxn, struct yobi_db_txn **txn, u_int32_t flags);
int txn_commit_bdbw (struct yobi_db_txn *, u_int32_t);
int db_create_bdbw (struct yobi_db **, struct yobi_db_env *, u_int32_t);
int db_env_create_bdbw (struct __toku_db_env **, u_int32_t);
int txn_abort_bdbw (struct __toku_db_txn *);
int txn_begin_bdbw (struct __toku_db_env *env, struct __toku_db_txn *stxn, struct __toku_db_txn **txn, u_int32_t flags);
int txn_commit_bdbw (struct __toku_db_txn *, u_int32_t);
int db_create_bdbw (struct __toku_db **, struct __toku_db_env *, u_int32_t);
#if 0
......@@ -35,18 +35,18 @@ enum {
};
typedef struct yobi_db DB;
typedef struct __toku_db DB;
typedef struct yobi_db_btree_stat DB_BTREE_STAT;
typedef struct yobi_db_env DB_ENV;
typedef struct __toku_db_env DB_ENV;
typedef struct yobi_db_key_range DB_KEY_RANGE;
typedef struct yobi_db_lsn DB_LSN;
typedef struct yobi_db_txn DB_TXN;
typedef struct yobi_db_txn_active DB_TXN_ACTIVE;
typedef struct yobi_db_txn_stat DB_TXN_STAT;
typedef struct __toku_db_txn DB_TXN;
typedef struct __toku_db_txn_active DB_TXN_ACTIVE;
typedef struct __toku_db_txn_stat DB_TXN_STAT;
typedef struct yobi_dbc DBC;
typedef struct yobi_dbt DBT;
struct yobi_db {
struct __toku_db {
void *app_private;
int (*close) (DB *, u_int32_t);
int (*cursor) (DB *, DB_TXN *, DBC **, u_int32_t);
......@@ -63,7 +63,7 @@ struct yobi_db {
int (*set_flags) (DB *, u_int32_t);
int (*stat) (DB *, void *, u_int32_t);
struct ydb_db_internal *i;
struct __toku_db_internal *i;
};
enum {
DB_DBT_MALLOC = 0x002,
......@@ -78,7 +78,7 @@ struct yobi_dbt {
u_int32_t size;
u_int32_t ulen;
};
struct yobi_db_txn {
struct __toku_db_txn {
int (*commit) (DB_TXN*, u_int32_t);
u_int32_t (*id) (DB_TXN *);
};
......@@ -87,7 +87,7 @@ struct yobi_dbc {
int (*c_close) (DBC *);
int (*c_del) (DBC *, u_int32_t);
};
struct yobi_db_env {
struct __toku_db_env {
// Methods used by MYSQL
void (*err) (const DB_ENV *, int, const char *, ...);
int (*open) (DB_ENV *, const char *, u_int32_t, int);
......@@ -123,14 +123,14 @@ struct yobi_db_btree_stat {
u_int32_t bt_ndata;
u_int32_t bt_nkeys;
};
struct yobi_db_txn_stat {
struct __toku_db_txn_stat {
u_int32_t st_nactive;
DB_TXN_ACTIVE *st_txnarray;
};
struct yobi_db_lsn {
int hello;
};
struct yobi_db_txn_active {
struct __toku_db_txn_active {
DB_LSN lsn;
u_int32_t txnid;
};
......
#ifndef _YDB_WRAP_H
#define _YDB_WRAP_H
#ifndef _TOKUDB_WRAP_H
#define _TOKUDB_WRAP_H
#define DB_BTREE DB_BTREE_ydb
#define DB_NOTICE_LOGFILE_CHANGED DB_NOTICE_LOGFILE_CHANGED_ydb
#define DBTYPE DBTYPE_ydb
......@@ -9,8 +8,8 @@
#define txn_begin txn_begin_ydb
#define txn_commit txn_commit_ydb
#define DB_VERB_CHKPOINT DB_VERB_CHKPOINT_ydb
#define DB_VERB_DEADLOCK DB_VERB_DEADLOCK_ydb
#define DB_VERB_RECOVERY DB_VERB_RECOVERY_ydb
//#define DB_VERB_DEADLOCK DB_VERB_DEADLOCK_ydb
//#define DB_VERB_RECOVERY DB_VERB_RECOVERY_ydb
#define DB DB_ydb
#define DB_BTREE_STAT DB_BTREE_STAT_ydb
#define DB_ENV DB_ENV_ydb
......@@ -25,7 +24,6 @@
#define DB_DBT_REALLOC DB_DBT_REALLOC_ydb
#define DB_DBT_USERMEM DB_DBT_USERMEM_ydb
#define DB_DBT_DUPOK DB_DBT_DUPOK_ydb
#define DB_VERSION_STRING DB_VERSION_STRING_ydb
#define DB_ARCH_ABS DB_ARCH_ABS_ydb
#define DB_ARCH_LOG DB_ARCH_LOG_ydb
#define DB_FIRST DB_FIRST_ydb
......@@ -88,7 +86,6 @@
#undef DB_DBT_REALLOC
#undef DB_DBT_USERMEM
#undef DB_DBT_DUPOK
#undef DB_VERSION_STRING
#undef DB_ARCH_ABS
#undef DB_ARCH_LOG
#undef DB_FIRST
......
......@@ -3,6 +3,7 @@
CFLAGS = -W -Wall -Wno-unused -g -fPIC -O2
CPPFLAGS = -I../include -I../newbrt
CPPFLAGS += -D_FILE_OFFSET_BITS=64 -D_LARGEFILE64_SOURCE
ifeq ($(OSX),OSX)
......@@ -21,7 +22,7 @@ install: $(LIBNAME)
clean:
rm -rf *.$(LIBEXT) *.o
ydb.o: ../include/db.h ../newbrt/cachetable.h ../newbrt/brt.h
ydb.o: ../include/db.h ../newbrt/cachetable.h ../newbrt/brt.h ../newbrt/log.c
DBBINS = ydb.o ../newbrt/brt.o ../newbrt/brt-serialize.o ../newbrt/cachetable.o ../newbrt/hashtable.o ../newbrt/header-io.o ../newbrt/key.o ../newbrt/memory.o ../newbrt/pma.o ../newbrt/ybt.o ../newbrt/primes.o ../newbrt/log.o ../newbrt/mempool.o
$(LIBNAME): $(DBBINS)
cc $(CPPFLAGS) $(DBBINS) $(SHARED) -o $@ $(CFLAGS)
......
......@@ -28,6 +28,7 @@ static inline void *malloc_zero(size_t size) {
struct __toku_db_txn_internal {
//TXNID txnid64; /* A sixty-four bit txn id. */
TOKUTXN tokutxn;
DB_TXN *parent;
};
void __toku_db_env_err (const DB_ENV *env __attribute__((__unused__)), int error, const char *fmt, ...) {
......@@ -243,7 +244,8 @@ int txn_begin (DB_ENV *env, DB_TXN *stxn, DB_TXN **txn, u_int32_t flags) {
result->commit = __toku_db_txn_commit;
result->id = __toku_db_txn_id;
result->i = malloc(sizeof(*result->i));
int r = tokutxn_begin(&result->i->tokutxn, next_txn++, env->i->logger);
result->i->parent = stxn;
int r = tokutxn_begin(stxn ? stxn->i->tokutxn : 0, &result->i->tokutxn, next_txn++, env->i->logger);
if (r!=0) return r;
*txn = result;
return 0;
......@@ -400,12 +402,9 @@ int __toku_db_open (DB *db, DB_TXN *txn, const char *fname, const char *dbname,
db->i->open_flags = flags;
db->i->open_mode = mode;
// Warning: new_brt has deficienceis:
// Each tree has its own cache, instead of a big shared cache.
// It doesn't do error checking on insert.
// It's tough to do cursors.
r=open_brt(db->i->full_fname, dbname, (flags&DB_CREATE), &db->i->brt, 1<<20, db->i->env->i->cachetable,
db->i->bt_compare);
printf("r=%d\n", r);
assert(r==0);
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment