Commit 24c21821 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Merge logging

git-svn-id: file:///svn/tokudb@371 c7de825b-a66e-492c-adef-691d508d4ae1
parent 55ced3bf
......@@ -227,7 +227,9 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
printf("#ifdef _TOKUDB_WRAP_H\n#define txn_begin txn_begin_tokudb\n#endif\n");
printf("int db_env_create(DB_ENV **, u_int32_t);\n");
printf("int db_create(DB **, DB_ENV *, u_int32_t);\n");
printf("char *db_strerror(int);\n");
printf("#if defined(__cplusplus)\n}\n#endif\n");
printf("#endif\n");
return 0;
}
......@@ -204,6 +204,7 @@ struct __toku_dbt {
#endif
int db_env_create(DB_ENV **, u_int32_t);
int db_create(DB **, DB_ENV *, u_int32_t);
char *db_strerror(int);
#if defined(__cplusplus)
}
#endif
......
......@@ -23,7 +23,7 @@ endif
# When debugging, try: valgrind --show-reachable=yes --leak-check=full ./brt-test
default: bins
default: bins libs
# Put these one-per-line so that if we insert a new one the svn diff can understand it better.
# Also keep them sorted.
REGRESSION_TESTS = \
......@@ -42,6 +42,7 @@ BINS = $(REGRESSION_TESTS) \
randdb4 \
# This line intentially kept commented so I can have a \ on the end of the previous line
libs: log.o
bins: $(BINS)
check: bins
$(DTOOL) ./ybt-test
......@@ -62,14 +63,14 @@ check-fanout:
# pma: PROF_FLAGS=-fprofile-arcs -ftest-coverage
key.o: brttypes.h key.h
pma-test.o: pma-internal.h pma.h yerror.h memory.h ../include/db.h
pma-test: pma.o memory.o key.o ybt.o
pma-test.o: pma-internal.h pma.h yerror.h memory.h ../include/db.h list.h kv-pair.h brttypes.h ybt.h yerror.h
pma-test: pma.o memory.o key.o ybt.o log.o
pma.o: pma.h yerror.h pma-internal.h memory.h key.h ybt.h brttypes.h ../include/db.h
ybt.o: ybt.h brttypes.h ../include/db.h
ybt-test: ybt-test.o ybt.o memory.o
ybt-test.o: ybt.h ../include/db.h
cachetable.o: cachetable.h hashfun.h
brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o header-io.o ybt.o key.o primes.o
brt-test: ybt.o brt.o hashtable.o pma.o memory.o brt-serialize.o cachetable.o header-io.o ybt.o key.o primes.o log.o
brt-test.o brt.o: brt.h ../include/db.h hashtable.h pma.h brttypes.h cachetable.h
brt-serialize-test.o: pma.h yerror.h brt.h ../include/db.h memory.h hashtable.h brttypes.h brt-internal.h
brt.o: brt.h ../include/db.h mdict.h pma.h brttypes.h memory.h brt-internal.h cachetable.h hashtable.h
......@@ -85,7 +86,7 @@ brt-bigtest: memory.o ybt.o brt.o pma.o cachetable.o key.o hashtable.o brt-seria
brt-bigtest.o: brt.h ../include/db.h
log-test: log.o memory.o
brt-serialize-test: brt-serialize-test.o brt-serialize.o memory.o hashtable.o pma.o key.o ybt.o brt.o cachetable.o primes.o
brt-serialize-test: brt-serialize-test.o brt-serialize.o memory.o hashtable.o pma.o key.o ybt.o brt.o cachetable.o primes.o log.o
cachetable-test.o: cachetable.h memory.h
cachetable-test: cachetable.o memory.o cachetable-test.o
......@@ -93,7 +94,7 @@ cachetable-test: cachetable.o memory.o cachetable-test.o
cachetable-test2.o: cachetable.h memory.h
cachetable-test2: cachetable.o memory.o cachetable-test2.o
benchmark-test: benchmark-test.o ybt.o memory.o brt.o pma.o cachetable.o key.o hashtable.o brt-serialize.o primes.o
benchmark-test: benchmark-test.o ybt.o memory.o brt.o pma.o cachetable.o key.o hashtable.o brt-serialize.o primes.o log.o
benchmark-test.o: brt.h ../include/db.h
clean:
......
......@@ -44,7 +44,7 @@ void insert (long long v) {
DBT kt, vt;
long_long_to_array(kc, v);
long_long_to_array(vc, v);
brt_insert(t, fill_dbt(&kt, kc, 8), fill_dbt(&vt, vc, 8), 0);
brt_insert(t, fill_dbt(&kt, kc, 8), fill_dbt(&vt, vc, 8), 0, 0);
}
void serial_insert_from (long long from) {
......
......@@ -4,8 +4,6 @@
#include "brt.h"
//#include "pma.h"
typedef long long diskoff; /* Offset in a disk. -1 is the NULL pointer. */
#ifndef BRT_FANOUT
#define BRT_FANOUT 16
#endif
......@@ -76,7 +74,7 @@ struct brt {
};
/* serialization code */
int serialize_brtnode_to(int fd, diskoff off, diskoff size, BRTNODE node);
void serialize_brtnode_to(int fd, diskoff off, diskoff size, BRTNODE node);
int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesize);
unsigned int serialize_brtnode_size(BRTNODE node); /* How much space will it take? */
int keycompare (bytevec key1, ITEMLEN key1len, bytevec key2, ITEMLEN key2len);
......
......@@ -32,7 +32,7 @@ void test_serialize(void) {
r = toku_hash_insert(sn.u.n.htables[1], "x", 2, "xval", 5, BRT_NONE); assert(r==0);
sn.u.n.n_bytes_in_hashtables = 3*(BRT_CMD_OVERHEAD+KEY_VALUE_OVERHEAD+2+5);
r = serialize_brtnode_to(fd, sn.nodesize*20, sn.nodesize, &sn); assert(r==0);
serialize_brtnode_to(fd, sn.nodesize*20, sn.nodesize, &sn); assert(r==0);
r = deserialize_brtnode_from(fd, nodesize*20, &dn, nodesize);
......
......@@ -73,13 +73,13 @@ unsigned int serialize_brtnode_size (BRTNODE node) {
return result;
}
int serialize_brtnode_to(int fd, diskoff off, diskoff size, BRTNODE node) {
void serialize_brtnode_to(int fd, diskoff off, diskoff size, BRTNODE node) {
struct wbuf w;
int i;
unsigned int calculated_size = serialize_brtnode_size(node);
int r;
char buf[size];
assert(size>0);
if ((r=wbuf_init(&w, size))) return r;
wbuf_init(&w, buf, size);
//printf("%s:%d serializing %lld w height=%d p0=%p\n", __FILE__, __LINE__, off, node->height, node->mdicts[0]);
wbuf_int(&w, calculated_size);
wbuf_int(&w, node->height);
......@@ -124,8 +124,6 @@ int serialize_brtnode_to(int fd, diskoff off, diskoff size, BRTNODE node) {
//printf("%s:%d wrote %d bytes for %lld size=%lld\n", __FILE__, __LINE__, w.ndone, off, size);
assert(w.ndone<=size);
toku_free(w.buf);
return 0;
}
int deserialize_brtnode_from (int fd, diskoff off, BRTNODE *brtnode, int nodesize) {
......
This diff is collapsed.
This diff is collapsed.
......@@ -8,11 +8,12 @@
#include "ybt.h"
#include "../include/db.h"
#include "cachetable.h"
#include "log.h"
typedef struct brt *BRT;
int open_brt (const char *fname, const char *dbname, int is_create, BRT *, int nodesize, CACHETABLE, int(*)(DB*,const DBT*,const DBT*));
//int brt_create (BRT **, int nodesize, int n_nodes_in_cache); /* the nodesize and n_nodes in cache really should be separately configured. */
//int brt_open (BRT *, char *fname, char *dbname);
int brt_insert (BRT brt, DBT *k, DBT *v, DB*db);
int brt_insert (BRT, DBT *, DBT *, DB*, TOKUTXN);
int brt_lookup (BRT brt, DBT *k, DBT *v, DB*db);
int brt_delete (BRT brt, DBT *k, DB *db);
int close_brt (BRT);
......@@ -35,7 +36,7 @@ int show_brt_blocknumbers(BRT);
typedef struct brt_cursor *BRT_CURSOR;
int brt_cursor (BRT, BRT_CURSOR*);
int brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int brtc_flags, DB *db);
int brt_cursor_get (BRT_CURSOR cursor, DBT *kbt, DBT *vbt, int brtc_flags, DB *db, TOKUTXN);
int brt_cursor_delete(BRT_CURSOR cursor, int flags);
int brt_cursor_close (BRT_CURSOR curs);
......
......@@ -5,4 +5,8 @@
typedef unsigned int ITEMLEN;
typedef const void *bytevec;
//typedef const void *bytevec;
typedef long long diskoff; /* Offset in a disk. -1 is the NULL pointer. */
typedef long long TXNID;
#endif
#ifndef KV_PAIR_H
#define KV_PAIR_H
#include "memory.h"
#include <string.h>
/*
* the key value pair contains a key and a value in a contiguous space. the
* key is right after the length fields and the value is right after the key.
......@@ -40,6 +46,10 @@ static inline void kv_pair_free(struct kv_pair *pair) {
static inline void *kv_pair_key(struct kv_pair *pair) {
return pair->key;
}
static inline const void *kv_pair_key_const(const struct kv_pair *pair) {
return pair->key;
}
static inline int kv_pair_keylen(struct kv_pair *pair) {
return pair->keylen;
......@@ -48,6 +58,9 @@ static inline int kv_pair_keylen(struct kv_pair *pair) {
static inline void *kv_pair_val(struct kv_pair *pair) {
return pair->key + pair->keylen;
}
static inline const void *kv_pair_val_const(const struct kv_pair *pair) {
return pair->key + pair->keylen;
}
static inline int kv_pair_vallen(struct kv_pair *pair) {
return pair->vallen;
......@@ -79,3 +92,4 @@ struct kv_pair_tag {
struct kv_pair *pair;
int oldtag, newtag;
};
#endif
#include "yerror.h"
#include <stdio.h>
#include "log.h"
#include <sys/types.h>
#define LOGGER_BUF_SIZE (1<<20)
typedef struct tokulogger *TOKULOGGER;
struct tokulogger {
enum typ_tag tag;
char *directory;
......@@ -14,3 +16,10 @@ struct tokulogger {
};
int tokulogger_find_next_unused_log_file(const char *directory, long long *result);
enum { LT_INSERT_WITH_NO_OVERWRITE = 'I', LT_DELETE = 'D', LT_COMMIT = 'C' };
struct tokutxn {
u_int64_t txnid64;
TOKULOGGER logger;
};
#include "brttypes.h"
#include "log-internal.h"
#include "wbuf.h"
#include "memory.h"
#include <dirent.h>
#include <errno.h>
......@@ -22,7 +24,8 @@ int tokulogger_find_next_unused_log_file(const char *directory, long long *resul
if (r==1 && thisl>max) max=thisl;
}
*result=max+1;
return 0;
int r = closedir(d);
return r;
}
int tokulogger_create_and_open_logger (const char *directory, TOKULOGGER *resultp) {
......@@ -37,20 +40,22 @@ int tokulogger_create_and_open_logger (const char *directory, TOKULOGGER *result
return nexti;
}
result->directory = toku_strdup(directory);
if (result->directory!=0) goto died0;
if (result->directory==0) goto died0;
result->fd = -1;
result->next_log_file_number = nexti;
result->n_in_buf = 0;
*resultp=result;
return 0;
return tokulogger_log_bytes(result, 0, "");
}
int tokulogger_log_bytes(TOKULOGGER logger, int nbytes, char *bytes) {
int tokulogger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes) {
int r;
printf("%s:%d logging %d bytes\n", __FILE__, __LINE__, nbytes);
if (logger->fd==-1) {
int fnamelen = strlen(logger->directory)+50;
char fname[fnamelen];
snprintf(fname, fnamelen, "%s/log%012llu.tokulog", logger->directory, logger->next_log_file_number);
printf("%s:%d creat(%s, ...)\n", __FILE__, __LINE__, fname);
logger->fd = creat(fname, O_EXCL | 0700);
if (logger->fd==-1) return errno;
}
......@@ -78,8 +83,6 @@ int tokulogger_log_bytes(TOKULOGGER logger, int nbytes, char *bytes) {
return 0;
}
enum { LT_INSERT_WITH_NO_OVERWITE = 'I' };
// Log an insertion of a key-value pair into a particular node of the tree.
int tokulogger_log_brt_insert_with_no_overwrite (TOKULOGGER logger,
TXNID txnid,
......@@ -89,19 +92,34 @@ int tokulogger_log_brt_insert_with_no_overwrite (TOKULOGGER logger,
unsigned char *val,
int vallen) {
int buflen=30+keylen+vallen;
char buf[buflen];
WBUF wbuf;
int r;
r = wbuf_create(&wbuf, buf, buflen) ;
if (r!=0) return r;
wbuf_byte(&wbuf, LT_INSERT_WITH_NO_OVERWRITE);
wbuf_txnid(&wbuf, txnind);
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init(&wbuf, buf, buflen) ;
wbuf_char(&wbuf, LT_INSERT_WITH_NO_OVERWRITE);
wbuf_txnid(&wbuf, txnid);
wbuf_diskoff(&wbuf, diskoff);
wbuf_bytes(&wbuf, key, keylen);
wbuf_bytes(&wbuf, val, vallen);
return tokulogger_log_wbuf(logger, &wbuf);
return tokulogger_log_bytes(logger, wbuf.ndone, wbuf.buf);
}
int tokulogger_log_close(TOKULOGGER *loggerp) {
TOKULOGGER logger = *loggerp;
int r = 0;
if (logger->fd!=-1) {
printf("%s:%d n_in_buf=%d\n", __FILE__, __LINE__, logger->n_in_buf);
if (logger->n_in_buf>0) {
r = write(logger->fd, logger->buf, logger->n_in_buf);
if (r==-1) return errno;
}
r = close(logger->fd);
}
toku_free(logger->directory);
toku_free(logger);
*loggerp=0;
return r;
}
#if 0
int tokulogger_log_brt_remove (TOKULOGGER logger,
TXNID txnid,
diskoff diskoff,
......@@ -109,5 +127,58 @@ int tokulogger_log_brt_remove (TOKULOGGER logger,
int keylen,
unsigned char *val,
int vallen) {
n
}
#endif
int tokulogger_log_phys_add_or_delete_in_leaf (TOKUTXN txn, diskoff diskoff, int is_add, const struct kv_pair *pair) {
if (txn==0) return 0;
int keylen = pair->keylen;
int vallen = pair->vallen;
int buflen=30+keylen+vallen;
unsigned char buf[buflen];
struct wbuf wbuf;
wbuf_init(&wbuf, buf, buflen) ;
wbuf_char(&wbuf, is_add ? LT_INSERT_WITH_NO_OVERWRITE : LT_DELETE);
wbuf_txnid(&wbuf, txn->txnid64);
wbuf_diskoff(&wbuf, diskoff);
wbuf_bytes(&wbuf, kv_pair_key_const(pair), keylen);
wbuf_bytes(&wbuf, kv_pair_val_const(pair), vallen);
return tokulogger_log_bytes(txn->logger, wbuf.ndone, wbuf.buf);
}
int tokulogger_fsync (TOKULOGGER logger) {
if (logger->n_in_buf>0) {
int r = write(logger->fd, logger->buf, logger->n_in_buf);
if (r==-1) return errno;
logger->n_in_buf=0;
}
{
int r = fsync(logger->fd);
if (r!=0) return errno;
}
return 0;
}
int tokulogger_log_commit (TOKUTXN txn) {
struct wbuf wbuf;
int buflen =30;
unsigned char buf[buflen];
wbuf_init(&wbuf, buf, buflen);
wbuf_char(&wbuf, LT_COMMIT);
wbuf_txnid(&wbuf, txn->txnid64);
int r = tokulogger_log_bytes(txn->logger, wbuf.ndone, wbuf.buf);
if (r!=0) return r;
return tokulogger_fsync(txn->logger);
}
int tokutxn_begin (TOKUTXN *tokutxn, TXNID txnid64, TOKULOGGER logger) {
TAGMALLOC(TOKUTXN, result);
if (result==0) return errno;
result->txnid64 = txnid64;
result->logger = logger;
*tokutxn = result;
return 0;
}
#ifndef TOKULOGGGER_H
#define TOKULOGGGER_H
#include "kv-pair.h"
typedef struct tokulogger *TOKULOGGER;
typedef struct tokutxn *TOKUTXN;
int tokulogger_create_and_open_logger (const char *directory, TOKULOGGER *resultp);
int tokulogger_log_bytes(TOKULOGGER logger, int nbytes, void *bytes);
int tokulogger_log_close(TOKULOGGER *logger);
int tokulogger_log_phys_add_or_delete_in_leaf (TOKUTXN txn, diskoff diskoff, int is_add, const struct kv_pair *pair);
int tokulogger_log_commit (TOKUTXN txn);
int tokutxn_begin (TOKUTXN *, TXNID txnid64, TOKULOGGER logger);
#endif
/* Dump the log from stdin to stdout. */
#include <stdio.h>
#include "brttypes.h"
#include "log-internal.h"
#include <sys/types.h>
#include <ctype.h>
#include <stdlib.h>
u_int32_t get_uint32 (void) {
u_int32_t a = getchar();
u_int32_t b = getchar();
u_int32_t c = getchar();
u_int32_t d = getchar();
return (a<<24)|(b<<16)|(c<<8)|d;
}
u_int64_t get_uint64 (void) {
u_int32_t hi = get_uint32();
u_int32_t lo = get_uint32();
return ((((long long)hi) << 32)
|
lo);
}
void transcribe_txnid (void) {
long long value = get_uint64();
printf(" txnid=%lld", value);
}
void transcribe_diskoff (void) {
long long value = get_uint64();
printf(" diskoff=%lld", value);
}
void transcribe_key_or_data (char *what) {
u_int32_t l = get_uint32();
unsigned int i;
printf(" %s(%d):\"", what, l);
for (i=0; i<l; i++) {
u_int32_t c = getchar();
if (c=='\\') printf("\\\\");
else if (c=='\n') printf("\\n");
else if (c==' ') printf("\\ ");
else if (c=='"') printf("\"\"");
else if (isprint(c)) printf("%c", c);
else printf("\\%02x", c);
}
printf("\"");
}
int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__unused__))) {
int cmd;
while ((cmd=getchar())!=EOF) {
switch (cmd) {
case LT_INSERT_WITH_NO_OVERWRITE:
printf("INSERT_WITH_NO_OVERWRITE:");
transcribe_txnid();
transcribe_diskoff();
transcribe_key_or_data("key");
transcribe_key_or_data("data");
printf("\n");
break;
case LT_DELETE:
printf("DELETE:");
transcribe_txnid();
transcribe_diskoff();
transcribe_key_or_data("key");
transcribe_key_or_data("data");
printf("\n");
break;
case LT_COMMIT:
printf("COMMIT:");
transcribe_txnid();
printf("\n");
break;
default:
printf("Huh?");
abort();
}
}
return 0;
}
#ifndef MEMORY_H
#define MEMORY_H
//#include <stdlib.h>
/* Tokutek memory allocation functions and macros.
......@@ -66,3 +69,4 @@ int get_n_items_malloced(void); /* How many items are malloc'd but not free'd.
void print_malloced_items(void); /* Try to print some malloced-but-not-freed items. May be a noop. */
void malloc_report (void); /* report on statistics about number of mallocs. Maybe a no-op. */
#endif
This diff is collapsed.
......@@ -14,6 +14,7 @@
#include "list.h"
#include "kv-pair.h"
#include "pma-internal.h"
#include "log.h"
/* get KEY_VALUE_OVERHEAD */
#include "brt-internal.h"
......@@ -648,7 +649,7 @@ int pma_free (PMA *pmap) {
}
/* Copies keylen and datalen */
int pma_insert (PMA pma, DBT *k, DBT *v, DB* db) {
int pma_insert (PMA pma, DBT *k, DBT *v, DB* db, TOKUTXN txn, diskoff diskoff) {
int idx = pmainternal_find(pma, k, db);
if (idx < pma_index_limit(pma) && pma->pairs[idx]) {
DBT k2;
......@@ -656,10 +657,11 @@ int pma_insert (PMA pma, DBT *k, DBT *v, DB* db) {
if (0==pma->compare_fun(db, k, fill_dbt(&k2, kv->key, kv->keylen))) {
if (kv_pair_deleted(pma->pairs[idx])) {
pma->pairs[idx] = kv_pair_realloc_same_key(kv, v->data, v->size);
return BRT_OK;
int r = tokulogger_log_phys_add_or_delete_in_leaf(txn, diskoff, 0, pma->pairs[idx]);
return r;
} else
return BRT_ALREADY_THERE; /* It is already here. Return an error. */
}
}
}
if (kv_pair_inuse(pma->pairs[idx])) {
idx = pmainternal_make_space_at (pma, idx); /* returns the new idx. */
......@@ -668,7 +670,7 @@ int pma_insert (PMA pma, DBT *k, DBT *v, DB* db) {
pma->pairs[idx] = kv_pair_malloc(k->data, k->size, v->data, v->size);
assert(pma->pairs[idx]);
pma->n_pairs_present++;
return BRT_OK;
return tokulogger_log_phys_add_or_delete_in_leaf(txn, diskoff, 1, pma->pairs[idx]);
}
int pma_delete (PMA pma, DBT *k, DB *db) {
......@@ -770,20 +772,26 @@ void __pma_delete_at(PMA pma, int here) {
toku_free(newpairs);
}
int pma_insert_or_replace (PMA pma, DBT *k, DBT *v, DB *db,
int *replaced_v_size /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
) {
int pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
int *replaced_v_size, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
DB *db, TOKUTXN txn, diskoff diskoff) {
//printf("%s:%d v->size=%d\n", __FILE__, __LINE__, v->size);
int idx = pmainternal_find(pma, k, db);
struct kv_pair *kv;
int r;
if (idx < pma_index_limit(pma) && (kv = pma->pairs[idx])) {
DBT k2;
// printf("%s:%d\n", __FILE__, __LINE__);
kv = kv_pair_ptr(kv);
if (0==pma->compare_fun(db, k, fill_dbt(&k2, kv->key, kv->keylen))) {
if (!kv_pair_deleted(pma->pairs[idx]))
if (!kv_pair_deleted(pma->pairs[idx])) {
*replaced_v_size = kv->vallen;
r=tokulogger_log_phys_add_or_delete_in_leaf(txn, diskoff, 0, kv);
if (r!=0) return r;
}
pma->pairs[idx] = kv_pair_realloc_same_key(kv, v->data, v->size);
return BRT_OK;
r = tokulogger_log_phys_add_or_delete_in_leaf(txn, diskoff, 0, pma->pairs[idx]);
return r;
}
}
if (kv_pair_inuse(pma->pairs[idx])) {
......@@ -795,7 +803,9 @@ int pma_insert_or_replace (PMA pma, DBT *k, DBT *v, DB *db,
assert(pma->pairs[idx]);
pma->n_pairs_present++;
*replaced_v_size = -1;
return BRT_OK;
//printf("%s:%d txn=%p\n", __FILE__, __LINE__, txn);
r = tokulogger_log_phys_add_or_delete_in_leaf(txn, diskoff, 1, pma->pairs[idx]);
return r;
}
void pma_iterate (PMA pma, void(*f)(bytevec,ITEMLEN,bytevec,ITEMLEN, void*), void*v) {
......
......@@ -4,6 +4,8 @@
#include "brttypes.h"
#include "ybt.h"
#include "yerror.h"
#include "../include/db.h"
#include "log.h"
/* An in-memory Packed Memory Array dictionary. */
/* There is a built-in-cursor. */
......@@ -26,15 +28,15 @@ int pma_n_entries (PMA);
/* Duplicates the key and keylen. */
//enum pma_errors pma_insert (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
// The DB pointer is there so that the comparison function can be called.
enum pma_errors pma_insert (PMA, DBT*, DBT*, DB*);
enum pma_errors pma_insert (PMA, DBT*, DBT*, DB*, TOKUTXN txn, diskoff);
/* This returns an error if the key is NOT present. */
int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen);
/* This returns an error if the key is NOT present. */
int pma_delete (PMA, DBT *, DB*);
int pma_insert_or_replace (PMA pma, DBT *k, DBT *v, DB *db,
int *replaced_v_size /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
);
int pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
int *replaced_v_size, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
DB *db, TOKUTXN txn, diskoff);
/* Exposes internals of the PMA by returning a pointer to the guts.
......
......@@ -4,6 +4,7 @@
#include <assert.h>
#include <string.h>
#include <errno.h>
#include "memory.h"
/* When serializing a value, write it into a buffer. */
/* This code requires that the buffer be big enough to hold whatever you put into it. */
......@@ -15,11 +16,10 @@ struct wbuf {
unsigned int ndone;
};
static int wbuf_init (struct wbuf *w, diskoff size) {
w->buf=toku_malloc(size);
static void wbuf_init (struct wbuf *w, void *buf, diskoff size) {
w->buf=buf;
w->size=size;
w->ndone=0;
return errno;
}
/* Write a character. */
......@@ -56,8 +56,17 @@ static void wbuf_bytes (struct wbuf *w, bytevec bytes_bv, int nbytes) {
#endif
}
static void wbuf_ulonglong (struct wbuf *w, unsigned long long ull) {
wbuf_int(w, ull>>32);
wbuf_int(w, ull&0xFFFFFFFF);
}
static void wbuf_diskoff (struct wbuf *w, diskoff off) {
wbuf_int(w, off>>32);
wbuf_int(w, off&0xFFFFFFFF);
wbuf_ulonglong(w, off);
}
static inline void wbuf_txnid (struct wbuf *w, TXNID tid) {
wbuf_ulonglong(w, tid);
}
#endif
......@@ -3,4 +3,6 @@ enum pma_errors { BRT_OK=0, BRT_ALREADY_THERE = -2, BRT_KEYEMPTY=-3 };
enum typ_tag { TYP_BRTNODE = 0xdead0001,
TYP_CACHETABLE, TYP_PAIR, /* for cachetables */
TYP_PMA,
TYP_TOKULOGGER };
TYP_TOKULOGGER,
TYP_TOKUTXN
};
......@@ -6,9 +6,8 @@ install: libdb.so
clean:
rm -rf *.so *.o
ydb.o: ../include/db.h ../newbrt/cachetable.h
DBBINS = ydb.o ../newbrt/brt.o ../newbrt/brt-serialize.o ../newbrt/cachetable.o ../newbrt/hashtable.o ../newbrt/header-io.o ../newbrt/key.o ../newbrt/memory.o ../newbrt/pma.o ../newbrt/ybt.o ../newbrt/primes.o
ydb.o: ../include/db.h ../newbrt/cachetable.h ../newbrt/brt.h
DBBINS = ydb.o ../newbrt/brt.o ../newbrt/brt-serialize.o ../newbrt/cachetable.o ../newbrt/hashtable.o ../newbrt/header-io.o ../newbrt/key.o ../newbrt/memory.o ../newbrt/pma.o ../newbrt/ybt.o ../newbrt/primes.o ../newbrt/log.o
libdb.so: $(DBBINS)
cc $(CPPFLAGS) $(DBBINS) -shared -o libdb.so $(CFLAGS)
libdb.a(ydb.o): ydb.o
CFLAGS = -Wall -Werror -O2 -g
CPPFLAGS = -I../../include
LOADLIBES = -L../ -ldb -Wl,-rpath,..
test_log1.bdb: test_log1
test_log1 test_log0: ../libdb.so
check_log0: ./test_log0
valgrind --quiet ./test_log0
test -f dir.test_log0/log000000000000.tokulog
check_log1: ./test_log1
valgrind --quiet ./test_log1
check_db_close_no_open: ./test_db_close_no_open
valgrind --quiet ./test_db_close_no_open
.PHONY: check_log0 make_libs
make_libs:
cd ..;make
check: make_libs check_log0 check_log1 check_db_close_no_open
test_log1.bdb: test_log1.c
cc -Wall -Werror -O2 -g test_log1.c -o $@ -ldb -DDBVERSION=\"bdb\"
test_log1.bdb_link: test_log1.c
cc -Wall -Werror -O2 -g test_log1.c -o $@ -ldb $(CPPFLAGS)
test_log1.tokudb_link: test_log1.c
cc -Wall -Werror -O2 -g test_log1.c -o $@ $(LOADLIBES)
/* Simple test of logging. Can I start a TokuDB with logging enabled? */
#include <assert.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <db.h>
#define DIR "dir.test_db_close_no_open"
DB_ENV *env;
DB *db;
int main (int argc, char *argv[]) {
int r;
system("rm -rf " DIR);
r=mkdir(DIR, 0777); assert(r==0);
r=db_env_create(&env, 0); assert(r==0);
r=env->open(env, DIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_PRIVATE|DB_CREATE, 0777); assert(r==0);
r=db_create(&db, env, 0); assert(r==0);
r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0);
return 0;
}
/* Simple test of logging. Can I start a TokuDB with logging enabled? */
#include <assert.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <db.h>
#define DIR "dir.test_log0"
DB_ENV *env;
int main (int argc, char *argv[]) {
int r;
system("rm -rf " DIR);
r=mkdir(DIR, 0777); assert(r==0);
r=db_env_create(&env, 0); assert(r==0);
r=env->open(env, DIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_PRIVATE|DB_CREATE, 0777); assert(r==0);
r=env->close(env, 0); assert(r==0);
return 0;
}
/* Simple test of logging. Can I start a TokuDB with logging enabled? */
#include <assert.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <db.h>
#include <string.h>
#include <stdio.h>
#ifndef DBVERSION
#define DBVERSION "toku"
#endif
#define DIR "dir.test_log1." DBVERSION
DB_ENV *env;
DB *db;
DB_TXN *tid;
#define CKERR(r) if (r!=0) fprintf(stderr, "%s:%d error %d %s\n", __FILE__, __LINE__, r, db_strerror(r)); assert(r==0);
int main (int argc, char *argv[]) {
int r;
system("rm -rf " DIR);
r=mkdir(DIR, 0777); assert(r==0);
r=db_env_create(&env, 0); assert(r==0);
r=env->open(env, DIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_PRIVATE|DB_CREATE, 0777); CKERR(r);
r=db_create(&db, env, 0); CKERR(r);
r=env->txn_begin(env, 0, &tid, 0); assert(r==0);
r=db->open(db, tid, "foo.db", 0, DB_BTREE, DB_CREATE, 0777); CKERR(r);
{
DBT key,data;
memset(&key, 0, sizeof(key));
memset(&data, 0, sizeof(data));
key.data="hello";
key.size=6;
data.data="there";
data.size=6;
r=db->put(db, tid, &key, &data, 0);
CKERR(r);
}
r=tid->commit(tid, 0); assert(r==0);
r=db->close(db, 0); assert(r==0);
r=env->close(env, 0); assert(r==0);
return 0;
}
......@@ -15,6 +15,8 @@
#include <unistd.h>
#include "cachetable.h"
#include "log.h"
#include "memory.h"
struct db_header {
int n_databases; // Or there can be >=1 named databases. This is the count.
......@@ -38,12 +40,17 @@ struct __toku_db_internal {
};
static inline void *malloc_zero(size_t size) {
void *vp = malloc(size);
void *vp = toku_malloc(size);
if (vp)
memset(vp, 0, size);
return vp;
}
struct __toku_db_txn_internal {
//TXNID txnid64; /* A sixty-four bit txn id. */
TOKUTXN tokutxn;
};
void __toku_db_env_err (const DB_ENV *env __attribute__((__unused__)), int error, const char *fmt, ...) {
va_list ap;
va_start(ap, fmt);
......@@ -86,6 +93,7 @@ struct __toku_db_env_internal {
int files_array_limit; // How big is *files ?
struct ydb_file **files;
CACHETABLE cachetable;
TOKULOGGER logger;
};
int __toku_db_env_open (DB_ENV *env, const char *home, u_int32_t flags, int mode) {
......@@ -97,17 +105,28 @@ int __toku_db_env_open (DB_ENV *env, const char *home, u_int32_t flags, int mod
print_flags(flags);
assert(DB_PRIVATE & flags); // This means that we don't have to do anything with shared memory. And that's good enough for mysql.
r = brt_create_cachetable(&env->i->cachetable, 32);
assert(r==0);
if (flags & (DB_INIT_TXN | DB_INIT_LOG)) {
r = tokulogger_create_and_open_logger(env->i->dir, &env->i->logger);
} else {
env->i->dir = 0;
}
return 0;
}
int __toku_db_env_close (DB_ENV * env, u_int32_t flags) {
cachetable_close(&env->i->cachetable);
free(env->i->dir);
free(env->i->files);
free(env->i);
free(env);
if (env->i->logger) {
tokulogger_log_close(&env->i->logger);
}
toku_free(env->i->dir);
toku_free(env->i->files);
toku_free(env->i);
toku_free(env);
return 0;
}
int __toku_db_env_log_archive (DB_ENV *env, char **list[], u_int32_t flags) {
......@@ -225,6 +244,11 @@ int db_env_create (DB_ENV **envp, u_int32_t flags) {
int __toku_db_txn_commit (DB_TXN *txn, u_int32_t flags) {
notef("flags=%d\n", flags);
if (!txn) return -1;
int r = tokulogger_log_commit(txn->i->tokutxn);
if (r!=0) return r;
if (txn->i) toku_free(txn->i);
toku_free(txn);
return 0;
}
......@@ -233,11 +257,16 @@ u_int32_t __toku_db_txn_id (DB_TXN *txn) {
abort();
}
static TXNID next_txn=0;
int txn_begin (DB_ENV *env, DB_TXN *stxn, DB_TXN **txn, u_int32_t flags) {
DB_TXN *result = malloc_zero(sizeof(*result));
notef("parent=%p flags=0x%x\n", stxn, flags);
result->commit = __toku_db_txn_commit;
result->id = __toku_db_txn_id;
result->i = malloc(sizeof(*result->i));
int r = tokutxn_begin(&result->i->tokutxn, next_txn++, env->i->logger);
if (r!=0) return r;
*txn = result;
return 0;
}
......@@ -247,9 +276,12 @@ int txn_abort (DB_TXN *txn) {
abort();
}
#if 0
int txn_commit (DB_TXN *txn, u_int32_t flags) {
return 0;
printf("%s:%d\n", __FILE__, __LINE__);
return tokulogger_log_commit(txn->i->tokutxn);
}
#endif
int log_compare (const DB_LSN *a, const DB_LSN *b) {
fprintf(stderr, "%s:%d log_compare(%p,%p)\n", __FILE__, __LINE__, a, b);
......@@ -257,31 +289,35 @@ int log_compare (const DB_LSN *a, const DB_LSN *b) {
}
int __toku_db_close (DB *db, u_int32_t flags) {
int r = close_brt(db->i->brt);
int r = 0;
if (db->i->brt) {
r = close_brt(db->i->brt);
}
printf("%s:%d %d=__toku_db_close(%p)\n", __FILE__, __LINE__, r, db);
db->i->freed = 1;
free(db->i->database_name);
free(db->i->full_fname);
free(db->i);
free(db);
toku_free(db->i->database_name);
toku_free(db->i->full_fname);
toku_free(db->i);
toku_free(db);
return r;
}
struct __toku_dbc_internal {
BRT_CURSOR c;
DB *db;
DB_TXN *txn;
};
int __toku_c_get (DBC *c, DBT *key, DBT *data, u_int32_t flag) {
int r = brt_cursor_get(c->i->c, key, data, flag, c->i->db);
int r = brt_cursor_get(c->i->c, key, data, flag, c->i->db, c->i->txn->i->tokutxn);
return r;
}
int __toku_c_close (DBC *c) {
int r = brt_cursor_close(c->i->c);
printf("%s:%d %d=__toku_c_close(%p)\n", __FILE__, __LINE__, r, c);
free(c->i);
free(c);
toku_free(c->i);
toku_free(c);
return r;
}
......@@ -299,6 +335,7 @@ int __toku_db_cursor (DB *db, DB_TXN *txn, DBC **c, u_int32_t flags) {
result->c_del = __toku_c_del;
result->i = malloc_zero(sizeof(*result->i));
result->i->db = db;
result->i->txn = txn;
r = brt_cursor(db->i->brt, &result->i->c);
assert(r==0);
*c = result;
......@@ -328,7 +365,7 @@ char *construct_full_name (const char *dir, const char *fname) {
int dirlen = strlen(dir);
int fnamelen = strlen(fname);
int len = dirlen+fnamelen+2; // One for the / between (which may not be there). One for the trailing null.
char *result = malloc(len);
char *result = toku_malloc(len);
int l;
printf("%s:%d len(%d)=%d+%d+2\n", __FILE__, __LINE__, len, dirlen, fnamelen);
assert(result);
......@@ -388,8 +425,9 @@ int __toku_db_open (DB *db, DB_TXN *txn, const char *fname, const char *dbname,
assert(r==0);
return 0;
}
int __toku_db_put (DB *db, DB_TXN *txn, DBT *key, DBT *data, u_int32_t flags) {
int r = brt_insert(db->i->brt, key, data, db);
int r = brt_insert(db->i->brt, key, data, db, txn->i->tokutxn);
//printf("%s:%d %d=__toku_db_put(...)\n", __FILE__, __LINE__, r);
return r;
}
......@@ -465,3 +503,22 @@ int db_create (DB **db, DB_ENV *env, u_int32_t flags) {
*db = result;
return 0;
}
char *db_strerror (int error) {
if (error == 0) return "Success: 0";
if (error > 0) {
char *result = strerror(error);
if (result) return result;
unknown:
{
static char unknown_result[100]; // Race condition if two threads call this at the same time. However even in a bad case, it should be some sort of nul-terminated string.
snprintf(unknown_result, 100, "Unknown error code: %d", error);
return unknown_result;
}
}
switch (error) {
default:
goto unknown;
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment