Commit b58a413b authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

{{{

svn merge -r3352:3375 https://svn.tokutek.com/tokudb/tokudb.558a
}}}
with no conflicts.

Fixes #672, #558, #680.


git-svn-id: file:///svn/tokudb@3376 c7de825b-a66e-492c-adef-691d508d4ae1
parent 33ea1592
...@@ -126,7 +126,7 @@ tdb_logprint.o: log-internal.h brttypes.h yerror.h log.h kv-pair.h log_header.h ...@@ -126,7 +126,7 @@ tdb_logprint.o: log-internal.h brttypes.h yerror.h log.h kv-pair.h log_header.h
tdb_logprint: $(OFILES) tdb_logprint: $(OFILES)
tdb-recover: LDFLAGS+=-lz tdb-recover: LDFLAGS+=-lz
recover.o: log_header.h log-internal.h log.h yerror.h brttypes.h kv-pair.h memory.h key.h recover.o: log_header.h log-internal.h log.h yerror.h brttypes.h kv-pair.h memory.h key.h cachetable.h
tdb-recover: $(OFILES) tdb-recover: $(OFILES)
roll.o: log_header.h log-internal.h log.h yerror.h brttypes.h kv-pair.h memory.h key.h cachetable.h gpma.h roll.o: log_header.h log-internal.h log.h yerror.h brttypes.h kv-pair.h memory.h key.h cachetable.h gpma.h
......
...@@ -105,6 +105,8 @@ struct brt_header { ...@@ -105,6 +105,8 @@ struct brt_header {
char **names; char **names;
DISKOFF *roots; DISKOFF *roots;
unsigned int flags; unsigned int flags;
FIFO fifo; // all the abort and commit commands. If the header gets flushed to disk, we write the fifo contents beyond the unused_memory.
}; };
struct brt { struct brt {
...@@ -122,6 +124,7 @@ struct brt { ...@@ -122,6 +124,7 @@ struct brt {
DB *db; // To pass to the compare fun DB *db; // To pass to the compare fun
void *skey,*sval; /* Used for DBT return values. */ void *skey,*sval; /* Used for DBT return values. */
}; };
/* serialization code */ /* serialization code */
...@@ -137,6 +140,9 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h); ...@@ -137,6 +140,9 @@ int toku_serialize_brt_header_to (int fd, struct brt_header *h);
int toku_serialize_brt_header_to_wbuf (struct wbuf *, struct brt_header *h); int toku_serialize_brt_header_to_wbuf (struct wbuf *, struct brt_header *h);
int toku_deserialize_brtheader_from (int fd, DISKOFF off, struct brt_header **brth); int toku_deserialize_brtheader_from (int fd, DISKOFF off, struct brt_header **brth);
int toku_serialize_fifo_at (int fd, off_t freeoff, FIFO fifo); // Write a fifo into a disk, without worrying about fitting it into a block. This write is done at the end of the file.
int toku_deserialize_fifo_at (int fd, off_t at, FIFO *fifo);
void toku_brtnode_free (BRTNODE *node); void toku_brtnode_free (BRTNODE *node);
#if 1 #if 1
...@@ -202,6 +208,7 @@ struct cmd_leafval_bessel_extra { ...@@ -202,6 +208,7 @@ struct cmd_leafval_bessel_extra {
int toku_cmd_leafval_bessel (u_int32_t dlen, void *leafentry, void *extra); int toku_cmd_leafval_bessel (u_int32_t dlen, void *leafentry, void *extra);
int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger); int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger);
int toku_cachefile_root_put_cmd (CACHEFILE cf, BRT_CMD cmd, TOKULOGGER logger);
int toku_gpma_compress_kvspace (GPMA pma, struct mempool *memp); int toku_gpma_compress_kvspace (GPMA pma, struct mempool *memp);
void *mempool_malloc_from_gpma(GPMA pma, struct mempool *mp, size_t size); void *mempool_malloc_from_gpma(GPMA pma, struct mempool *mp, size_t size);
......
...@@ -613,3 +613,107 @@ unsigned int toku_brtnode_pivot_key_len (BRTNODE node, struct kv_pair *pk) { ...@@ -613,3 +613,107 @@ unsigned int toku_brtnode_pivot_key_len (BRTNODE node, struct kv_pair *pk) {
return kv_pair_keylen(pk); return kv_pair_keylen(pk);
} }
} }
// To serialize the fifo, we just write it all at the end of the file.
// For now, just do all the writes as separate system calls. This function is hardly ever called, and
// we might not be able to allocate a large enough buffer to hold everything,
// and it would be more complex to batch up several writes.
int toku_serialize_fifo_at (int fd, off_t freeoff, FIFO fifo) {
{
int size=4;
char buf[size];
struct wbuf w;
wbuf_init(&w, buf, size);
wbuf_int(&w, toku_fifo_n_entries(fifo));
ssize_t r = pwrite(fd, w.buf, size, freeoff);
if (r!=size) return errno;
freeoff+=size;
}
FIFO_ITERATE(fifo, key, keylen, val, vallen, type, xid,
({
size_t size=keylen+vallen+1+8+4+4;
char *MALLOC_N(size, buf);
assert(buf!=0);
struct wbuf w;
wbuf_init(&w, buf, size);
wbuf_char(&w, type);
wbuf_TXNID(&w, xid);
wbuf_bytes(&w, key, keylen);
//printf("%s:%d Writing %d bytes: %s\n", __FILE__, __LINE__, vallen, (char*)val);
wbuf_bytes(&w, val, vallen);
assert(w.ndone==size);
ssize_t r = pwrite(fd, w.buf, (size_t)size, freeoff);
if (r<0) return errno;
assert(r==(ssize_t)size);
freeoff+=size;
toku_free(buf);
}));
return 0;
}
int read_int (int fd, off_t *at, u_int32_t *result) {
int v;
ssize_t r = pread(fd, &v, 4, *at);
if (r<0) return errno;
assert(r==4);
*result = ntohl(v);
(*at) += 4;
return 0;
}
int read_char (int fd, off_t *at, char *result) {
ssize_t r = pread(fd, result, 1, *at);
if (r<0) return errno;
assert(r==1);
(*at)++;
return 0;
}
int read_uint64_t (int fd, off_t *at, u_int64_t *result) {
u_int32_t v1,v2;
int r;
if ((r = read_int(fd, at, &v1))) return r;
if ((r = read_int(fd, at, &v2))) return r;
*result = (((u_int64_t)v1)<<32) + v2;
return 0;
}
int read_nbytes (int fd, off_t *at, char **data, u_int32_t len) {
char *result = toku_malloc(len);
if (result==0) return errno;
ssize_t r = pread(fd, result, len, *at);
//printf("%s:%d read %d bytes, which are %s\n", __FILE__, __LINE__, len, result);
if (r<0) return errno;
assert(r==(ssize_t)len);
(*at)+=len;
*data=result;
return 0;
}
int toku_deserialize_fifo_at (int fd, off_t at, FIFO *fifo) {
FIFO result;
int r = toku_fifo_create(&result);
if (r) return r;
u_int32_t count;
if ((r=read_int(fd, &at, &count))) return r;
u_int32_t i;
for (i=0; i<count; i++) {
char type;
TXNID xid;
u_int32_t keylen, vallen;
char *key, *val;
if ((r=read_char(fd, &at, &type))) return r;
if ((r=read_uint64_t(fd, &at, &xid))) return r;
if ((r=read_int(fd, &at, &keylen))) return r;
if ((r=read_nbytes(fd, &at, &key, keylen))) return r;
if ((r=read_int(fd, &at, &vallen))) return r;
if ((r=read_nbytes(fd, &at, &val, vallen))) return r;
//printf("%s:%d read %d byte key, key=%s\n dlen=%d data=%s\n", __FILE__, __LINE__, keylen, key, vallen, val);
if ((r=toku_fifo_enq(result, key, keylen, val, vallen, type, xid))) return r;
toku_free(key);
toku_free(val);
}
*fifo = result;
//printf("%s:%d *fifo=%p\n", __FILE__, __LINE__, result);
return 0;
}
...@@ -55,6 +55,7 @@ int main (int argc , const char *argv[]) { ...@@ -55,6 +55,7 @@ int main (int argc , const char *argv[]) {
// test2(1); // test2(1);
if (verbose) printf("test2 faster\n"); if (verbose) printf("test2 faster\n");
test2(0, 2); test2(0, 2);
test2(0, 27);
test2(0, 212); test2(0, 212);
test2(0, 4096); test2(0, 4096);
toku_malloc_cleanup(); toku_malloc_cleanup();
......
...@@ -167,6 +167,7 @@ void toku_brtheader_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void ...@@ -167,6 +167,7 @@ void toku_brtheader_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void
assert(!h->dirty); // shouldn't be dirty once it is unpinned. assert(!h->dirty); // shouldn't be dirty once it is unpinned.
if (write_me) { if (write_me) {
toku_serialize_brt_header_to(toku_cachefile_fd(cachefile), h); toku_serialize_brt_header_to(toku_cachefile_fd(cachefile), h);
toku_serialize_fifo_at(toku_cachefile_fd(cachefile), h->unused_memory, h->fifo);
} }
if (!keep_me) { if (!keep_me) {
if (h->n_named_roots>0) { if (h->n_named_roots>0) {
...@@ -177,31 +178,36 @@ void toku_brtheader_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void ...@@ -177,31 +178,36 @@ void toku_brtheader_flush_callback (CACHEFILE cachefile, DISKOFF nodename, void
toku_free(h->names); toku_free(h->names);
toku_free(h->roots); toku_free(h->roots);
} }
toku_fifo_free(&h->fifo);
toku_free(h); toku_free(h);
} }
} }
int toku_brtheader_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **headerp_v, long *sizep __attribute__((unused)), void*extraargs __attribute__((__unused__)), LSN *written_lsn) { int toku_brtheader_fetch_callback (CACHEFILE cachefile, DISKOFF nodename, void **headerp_v, long *sizep __attribute__((unused)), void*extraargs __attribute__((__unused__)), LSN *written_lsn) {
int r;
struct brt_header **h = (struct brt_header **)headerp_v; struct brt_header **h = (struct brt_header **)headerp_v;
assert(nodename==0); assert(nodename==0);
int r = toku_deserialize_brtheader_from(toku_cachefile_fd(cachefile), nodename, h); if ((r = toku_deserialize_brtheader_from(toku_cachefile_fd(cachefile), nodename, h))) return r;
if ((r = toku_deserialize_fifo_at(toku_cachefile_fd(cachefile), (*h)->unused_memory, &(*h)->fifo))) return r;
//printf("%s:%d fifo=%p\nn", __FILE__, __LINE__, (*h)->fifo);
written_lsn->lsn = 0; // !!! WRONG. This should be stored or kept redundantly or something. written_lsn->lsn = 0; // !!! WRONG. This should be stored or kept redundantly or something.
return r; return 0;
} }
int toku_read_and_pin_brt_header (CACHEFILE cf, struct brt_header **header) { int toku_read_and_pin_brt_header (CACHEFILE cf, struct brt_header **header) {
void *header_p; void *header_p;
//fprintf(stderr, "%s:%d read_and_pin_brt_header(...)\n", __FILE__, __LINE__); //fprintf(stderr, "%s:%d read_and_pin_brt_header(...)\n", __FILE__, __LINE__);
int r = toku_cachetable_get_and_pin(cf, 0, &header_p, NULL, int r = toku_cachetable_get_and_pin(cf, 0, &header_p, NULL,
toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0); toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
if (r!=0) return r; if (r!=0) return r;
*header = header_p; *header = header_p;
return 0; return 0;
} }
int toku_unpin_brt_header (BRT brt) { int toku_unpin_brt_header (BRT brt) {
int r = toku_cachetable_unpin(brt->cf, 0, brt->h->dirty, 0); int dirty = brt->h->dirty;
brt->h->dirty=0; brt->h->dirty=0; // Unpinning it may make it go way.
int r = toku_cachetable_unpin(brt->cf, 0, dirty, 0);
brt->h=0; brt->h=0;
return r; return r;
} }
...@@ -1178,14 +1184,13 @@ static int apply_cmd_to_le_committed (u_int32_t klen, void *kval, ...@@ -1178,14 +1184,13 @@ static int apply_cmd_to_le_committed (u_int32_t klen, void *kval,
return 0; return 0;
} }
static int apply_cmd_to_le_both (TXNID xid, static int apply_cmd_to_le_both (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval, u_int32_t klen, void *kval,
u_int32_t clen, void *cval, u_int32_t clen, void *cval,
u_int32_t plen, void *pval, u_int32_t plen, void *pval,
BRT_CMD cmd, BRT_CMD cmd,
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) {
// keep the committed value for rollback. // keep the committed value for rollback.
assert(cmd->xid == xid); // provisional things must match the cmd. (Others should already be committed or aborted)
assert(cmd->u.id.key->size == klen); assert(cmd->u.id.key->size == klen);
assert(memcmp(cmd->u.id.key->data, kval, klen)==0); assert(memcmp(cmd->u.id.key->data, kval, klen)==0);
switch (cmd->type) { switch (cmd->type) {
...@@ -1216,13 +1221,12 @@ static int apply_cmd_to_le_both (TXNID xid, ...@@ -1216,13 +1221,12 @@ static int apply_cmd_to_le_both (TXNID xid,
assert(0); assert(0);
return 0; return 0;
} }
static int apply_cmd_to_le_provdel (TXNID xid, static int apply_cmd_to_le_provdel (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval, u_int32_t klen, void *kval,
u_int32_t clen, void *cval, u_int32_t clen, void *cval,
BRT_CMD cmd, BRT_CMD cmd,
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) {
// keep the committed value for rollback // keep the committed value for rollback
assert(cmd->xid == xid); // provisional things must match the cmd. (Others should already be committed or aborted)
assert(cmd->u.id.key->size == klen); assert(cmd->u.id.key->size == klen);
assert(memcmp(cmd->u.id.key->data, kval, klen)==0); assert(memcmp(cmd->u.id.key->data, kval, klen)==0);
switch (cmd->type) { switch (cmd->type) {
...@@ -1255,13 +1259,11 @@ static int apply_cmd_to_le_provdel (TXNID xid, ...@@ -1255,13 +1259,11 @@ static int apply_cmd_to_le_provdel (TXNID xid,
return 0; return 0;
} }
static int apply_cmd_to_le_provpair (TXNID xid, static int apply_cmd_to_le_provpair (TXNID xid __attribute__((__unused__)),
u_int32_t klen, void *kval, u_int32_t klen, void *kval,
u_int32_t plen , void *pval, u_int32_t plen , void *pval,
BRT_CMD cmd, BRT_CMD cmd,
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) { u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) {
assert(cmd->xid == xid); // provisional things must match the cmd. (Others should already be committed or aborted)
assert(cmd->u.id.key->size == klen); assert(cmd->u.id.key->size == klen);
assert(memcmp(cmd->u.id.key->data, kval, klen)==0); assert(memcmp(cmd->u.id.key->data, kval, klen)==0);
switch (cmd->type) { switch (cmd->type) {
...@@ -1925,7 +1927,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char ...@@ -1925,7 +1927,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
} else } else
goto died0a; goto died0a;
} }
r=toku_cachetable_openfd(&t->cf, cachetable, fd, t); r=toku_cachetable_openfd(&t->cf, cachetable, fd);
if (r != 0) goto died0a; if (r != 0) goto died0a;
toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf)); toku_logger_log_fopen(txn, fname_in_env, toku_cachefile_filenum(t->cf));
} }
...@@ -1957,6 +1959,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char ...@@ -1957,6 +1959,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
t->h->nodesize=t->nodesize; t->h->nodesize=t->nodesize;
t->h->freelist=-1; t->h->freelist=-1;
t->h->unused_memory=2*t->nodesize; t->h->unused_memory=2*t->nodesize;
toku_fifo_create(&t->h->fifo);
if (dbname) { if (dbname) {
t->h->unnamed_root = -1; t->h->unnamed_root = -1;
t->h->n_named_roots = 1; t->h->n_named_roots = 1;
...@@ -2216,54 +2219,84 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, ...@@ -2216,54 +2219,84 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
return 0; return 0;
} }
int toku_cachefile_root_put_cmd (CACHEFILE cf, BRT_CMD cmd, TOKULOGGER logger) {
int r;
struct brt_header *h;
r = toku_read_and_pin_brt_header(cf, &h);
if (r!=0) return r;
r = toku_fifo_enq_cmdstruct(h->fifo, cmd);
if (r!=0) return r;
{
BYTESTRING keybs = {.len=cmd->u.id.key->size, .data=cmd->u.id.key->data};
BYTESTRING valbs = {.len=cmd->u.id.val->size, .data=cmd->u.id.val->data};
r = toku_log_enqrootentry(logger, (LSN*)0, 0, toku_cachefile_filenum(cf), cmd->xid, cmd->type, keybs, valbs);
if (r!=0) return r;
}
h->dirty = 0;
r = toku_cachetable_unpin(cf, 0, 1, 0);
return 0;
}
static int push_something(BRT brt, BRTNODE *nodep, CACHEKEY *rootp, BRT_CMD cmd, TOKULOGGER logger) {
int did_split = 0;
BRTNODE nodea=0, nodeb=0;
DBT splitk;
int result = brtnode_put_cmd(brt, *nodep, cmd,
&did_split, &nodea, &nodeb, &splitk,
0,
logger);
int r;
if (did_split) {
// node is unpinned, so now we have to proceed to update the root with a new node.
//printf("%s:%d did_split=%d nodeb=%p nodeb->thisnodename=%lld nodeb->nodesize=%d\n", __FILE__, __LINE__, did_split, nodeb, nodeb->thisnodename, nodeb->nodesize);
//printf("Did split, splitkey=%s\n", splitkey);
if (nodeb->height>0) assert(BNC_DISKOFF(nodeb,nodeb->u.n.n_children-1)!=0);
assert(nodeb->nodesize>0);
r = brt_init_new_root(brt, nodea, nodeb, splitk, rootp, logger, nodep);
assert(r == 0);
} else {
if ((*nodep)->height>0)
assert((*nodep)->u.n.n_children<=TREE_FANOUT);
}
//assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable));
return result;
}
int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) { int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger) {
void *node_v; void *node_v;
BRTNODE node; BRTNODE node;
CACHEKEY *rootp; CACHEKEY *rootp;
int result;
int r; int r;
int did_split; BRTNODE nodea=0, nodeb=0;
DBT splitk;
int debug = toku_brt_debug_mode;//strcmp(key,"hello387")==0;
//assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable)); //assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable));
if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) { if ((r = toku_read_and_pin_brt_header(brt->cf, &brt->h))) {
if (0) { died0: toku_unpin_brt_header(brt); } if (0) { died0: toku_unpin_brt_header(brt); }
return r; return r;
} }
rootp = toku_calculate_root_offset_pointer(brt); rootp = toku_calculate_root_offset_pointer(brt);
if (debug) printf("%s:%d Getting %lld\n", __FILE__, __LINE__, *rootp);
if ((r=toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL, if ((r=toku_cachetable_get_and_pin(brt->cf, *rootp, &node_v, NULL,
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) { toku_brtnode_flush_callback, toku_brtnode_fetch_callback, brt))) {
goto died0; goto died0;
} }
//printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v); //printf("%s:%d pin %p\n", __FILE__, __LINE__, node_v);
node=node_v; node=node_v;
if (debug) printf("%s:%d node inserting\n", __FILE__, __LINE__); // push the fifo stuff
did_split = 0; {
result = brtnode_put_cmd(brt, node, cmd, DBT okey,odata;
&did_split, &nodea, &nodeb, &splitk, BRT_CMD_S ocmd;
debug, while (0==toku_fifo_peek_cmdstruct(brt->h->fifo, &ocmd, &okey, &odata)) {
logger); if ((r = push_something(brt, &node, rootp, &ocmd, logger))) return r;
if (debug) printf("%s:%d did_insert\n", __FILE__, __LINE__); r = toku_fifo_deq(brt->h->fifo);
if (did_split) { assert(r==0);
// node is unpinned, so now we have to proceed to update the root with a new node. }
//printf("%s:%d did_split=%d nodeb=%p nodeb->thisnodename=%lld nodeb->nodesize=%d\n", __FILE__, __LINE__, did_split, nodeb, nodeb->thisnodename, nodeb->nodesize);
//printf("Did split, splitkey=%s\n", splitkey);
if (nodeb->height>0) assert(BNC_DISKOFF(nodeb,nodeb->u.n.n_children-1)!=0);
assert(nodeb->nodesize>0);
r = brt_init_new_root(brt, nodea, nodeb, splitk, rootp, logger, &node);
assert(r == 0);
} else {
if (node->height>0)
assert(node->u.n.n_children<=TREE_FANOUT);
} }
if ((r = push_something(brt, &node, rootp, cmd, logger))) return r;
r = toku_unpin_brtnode(brt, node); r = toku_unpin_brtnode(brt, node);
assert(r==0); assert(r == 0);
r = toku_unpin_brt_header(brt); r = toku_unpin_brt_header(brt);
assert(r == 0); assert(r == 0);
//assert(0==toku_cachetable_assert_all_unpinned(brt->cachetable)); return 0;
return result;
} }
int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) { int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
...@@ -2271,6 +2304,7 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) { ...@@ -2271,6 +2304,7 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
if (txn) { if (txn) {
BYTESTRING keybs = {key->size, toku_memdup(key->data, key->size)}; BYTESTRING keybs = {key->size, toku_memdup(key->data, key->size)};
BYTESTRING databs = {val->size, toku_memdup(val->data, val->size)}; BYTESTRING databs = {val->size, toku_memdup(val->data, val->size)};
toku_cachefile_refup(brt->cf);
r = toku_logger_save_rollback_cmdinsert(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs, databs); r = toku_logger_save_rollback_cmdinsert(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs, databs);
if (r!=0) return r; if (r!=0) return r;
} }
...@@ -2299,6 +2333,7 @@ int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) { ...@@ -2299,6 +2333,7 @@ int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
int r; int r;
if (txn) { if (txn) {
BYTESTRING keybs = {key->size, toku_memdup(key->data, key->size)}; BYTESTRING keybs = {key->size, toku_memdup(key->data, key->size)};
toku_cachefile_refup(brt->cf);
r = toku_logger_save_rollback_cmddelete(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs); r = toku_logger_save_rollback_cmddelete(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs);
if (r!=0) return r; if (r!=0) return r;
} }
...@@ -2313,6 +2348,7 @@ int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) { ...@@ -2313,6 +2348,7 @@ int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
if (txn) { if (txn) {
BYTESTRING keybs = {key->size, toku_memdup(key->data, key->size)}; BYTESTRING keybs = {key->size, toku_memdup(key->data, key->size)};
BYTESTRING databs = {val->size, toku_memdup(val->data, val->size)}; BYTESTRING databs = {val->size, toku_memdup(val->data, val->size)};
toku_cachefile_refup(brt->cf);
r = toku_logger_save_rollback_cmddeleteboth(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs, databs); r = toku_logger_save_rollback_cmddeleteboth(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs, databs);
if (r!=0) return r; if (r!=0) return r;
} }
...@@ -2644,6 +2680,17 @@ int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOK ...@@ -2644,6 +2680,17 @@ int toku_brt_search(BRT brt, brt_search_t *search, DBT *newkey, DBT *newval, TOK
BRTNODE node = node_v; BRTNODE node = node_v;
// push the fifo sutff
{
DBT okey,odata;
BRT_CMD_S ocmd;
while (0==toku_fifo_peek_cmdstruct(brt->h->fifo, &ocmd, &okey, &odata)) {
if ((r = push_something(brt, &node, rootp, &ocmd, logger))) return r;
r = toku_fifo_deq(brt->h->fifo);
assert(r==0);
}
}
for (;;) { for (;;) {
BRT_SPLIT split; brt_split_init(&split); BRT_SPLIT split; brt_split_init(&split);
r = brt_search_node(brt, node, search, newkey, newval, &split, logger); r = brt_search_node(brt, node, search, newkey, newval, &split, logger);
......
...@@ -84,7 +84,7 @@ static void test0 (void) { ...@@ -84,7 +84,7 @@ static void test0 (void) {
r=toku_create_cachetable(&t, 5, ZERO_LSN, NULL_LOGGER); r=toku_create_cachetable(&t, 5, ZERO_LSN, NULL_LOGGER);
assert(r==0); assert(r==0);
unlink(fname); unlink(fname);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777, (BRT)0); r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777);
assert(r==0); assert(r==0);
expect_f = f; expect_f = f;
...@@ -208,7 +208,7 @@ static void test_nested_pin (void) { ...@@ -208,7 +208,7 @@ static void test_nested_pin (void) {
r = toku_create_cachetable(&t, 1, ZERO_LSN, NULL_LOGGER); r = toku_create_cachetable(&t, 1, ZERO_LSN, NULL_LOGGER);
assert(r==0); assert(r==0);
unlink(fname); unlink(fname);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777, (BRT)0); r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777);
assert(r==0); assert(r==0);
expect_f = f; expect_f = f;
...@@ -271,10 +271,10 @@ static void test_multi_filehandles (void) { ...@@ -271,10 +271,10 @@ static void test_multi_filehandles (void) {
unlink(fname2); unlink(fname2);
r = toku_create_cachetable(&t, 4, ZERO_LSN, NULL_LOGGER); assert(r==0); r = toku_create_cachetable(&t, 4, ZERO_LSN, NULL_LOGGER); assert(r==0);
r = toku_cachetable_openf(&f1, t, fname1, O_RDWR|O_CREAT, 0777, (BRT)0); assert(r==0); r = toku_cachetable_openf(&f1, t, fname1, O_RDWR|O_CREAT, 0777); assert(r==0);
r = link(fname1, fname2); assert(r==0); r = link(fname1, fname2); assert(r==0);
r = toku_cachetable_openf(&f2, t, fname2, O_RDWR|O_CREAT, 0777, (BRT)0); assert(r==0); r = toku_cachetable_openf(&f2, t, fname2, O_RDWR|O_CREAT, 0777); assert(r==0);
r = toku_cachetable_openf(&f3, t, fname3, O_RDWR|O_CREAT, 0777, (BRT)0); assert(r==0); r = toku_cachetable_openf(&f3, t, fname3, O_RDWR|O_CREAT, 0777); assert(r==0);
assert(f1==f2); assert(f1==f2);
assert(f1!=f3); assert(f1!=f3);
...@@ -320,7 +320,7 @@ static void test_dirty() { ...@@ -320,7 +320,7 @@ static void test_dirty() {
char *fname = "test.dat"; char *fname = "test.dat";
unlink(fname); unlink(fname);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777, (BRT)0); r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777);
assert(r == 0); assert(r == 0);
key = 1; value = (void*)1; key = 1; value = (void*)1;
...@@ -428,7 +428,7 @@ static void test_size_resize() { ...@@ -428,7 +428,7 @@ static void test_size_resize() {
char *fname = "test.dat"; char *fname = "test.dat";
unlink(fname); unlink(fname);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777, (BRT)0); r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777);
assert(r == 0); assert(r == 0);
CACHEKEY key = 42; CACHEKEY key = 42;
...@@ -479,7 +479,7 @@ static void test_size_flush() { ...@@ -479,7 +479,7 @@ static void test_size_flush() {
char *fname = "test.dat"; char *fname = "test.dat";
unlink(fname); unlink(fname);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777, (BRT)0); r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777);
assert(r == 0); assert(r == 0);
/* put 2*n keys into the table, ensure flushes occur in key order */ /* put 2*n keys into the table, ensure flushes occur in key order */
...@@ -570,7 +570,7 @@ static void test_rename (void) { ...@@ -570,7 +570,7 @@ static void test_rename (void) {
const char fname[] = "ct-test-rename.dat"; const char fname[] = "ct-test-rename.dat";
r=toku_create_cachetable(&t, KEYLIMIT, ZERO_LSN, NULL_LOGGER); assert(r==0); r=toku_create_cachetable(&t, KEYLIMIT, ZERO_LSN, NULL_LOGGER); assert(r==0);
unlink(fname); unlink(fname);
r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777, (BRT)0); r = toku_cachetable_openf(&f, t, fname, O_RDWR|O_CREAT, 0777);
assert(r==0); assert(r==0);
for (i=0; i<TRIALLIMIT; i++) { for (i=0; i<TRIALLIMIT; i++) {
......
...@@ -112,7 +112,7 @@ static void test_chaining (void) { ...@@ -112,7 +112,7 @@ static void test_chaining (void) {
r = snprintf(fname[i], FILENAME_LEN, "cachetabletest2.%ld.dat", i); r = snprintf(fname[i], FILENAME_LEN, "cachetabletest2.%ld.dat", i);
assert(r>0 && r<FILENAME_LEN); assert(r>0 && r<FILENAME_LEN);
unlink(fname[i]); unlink(fname[i]);
r = toku_cachetable_openf(&f[i], ct, fname[i], O_RDWR|O_CREAT, 0777, (BRT)0); assert(r==0); r = toku_cachetable_openf(&f[i], ct, fname[i], O_RDWR|O_CREAT, 0777); assert(r==0);
} }
for (i=0; i<N_PRESENT_LIMIT; i++) { for (i=0; i<N_PRESENT_LIMIT; i++) {
int fnum = i%N_FILES; int fnum = i%N_FILES;
...@@ -161,7 +161,7 @@ static void test_chaining (void) { ...@@ -161,7 +161,7 @@ static void test_chaining (void) {
CACHEFILE oldcf=f[i]; CACHEFILE oldcf=f[i];
r = toku_cachefile_close(&f[i]); assert(r==0); r = toku_cachefile_close(&f[i]); assert(r==0);
file_is_not_present(oldcf); file_is_not_present(oldcf);
r = toku_cachetable_openf(&f[i], ct, fname[i], O_RDWR, 0777, (BRT)0); assert(r==0); r = toku_cachetable_openf(&f[i], ct, fname[i], O_RDWR, 0777); assert(r==0);
} }
} }
for (i=0; i<N_FILES; i++) { for (i=0; i<N_FILES; i++) {
......
...@@ -72,7 +72,6 @@ struct cachefile { ...@@ -72,7 +72,6 @@ struct cachefile {
CACHETABLE cachetable; CACHETABLE cachetable;
struct fileid fileid; struct fileid fileid;
FILENUM filenum; FILENUM filenum;
BRT brt;
}; };
int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn, TOKULOGGER logger) { int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn, TOKULOGGER logger) {
...@@ -97,14 +96,11 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn, ...@@ -97,14 +96,11 @@ int toku_create_cachetable(CACHETABLE *result, long size_limit, LSN initial_lsn,
} }
// What cachefile goes with particular fd? // What cachefile goes with particular fd?
int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf, BRT *brt) { int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf) {
CACHEFILE extant; CACHEFILE extant;
for (extant = t->cachefiles; extant; extant=extant->next) { for (extant = t->cachefiles; extant; extant=extant->next) {
if (extant->filenum.fileid==filenum.fileid) { if (extant->filenum.fileid==filenum.fileid) {
*cf = extant; *cf = extant;
assert(extant->brt);
assert(extant->brt->cf==extant);
*brt = extant->brt;
return 0; return 0;
} }
} }
...@@ -112,7 +108,7 @@ int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf, BRT ...@@ -112,7 +108,7 @@ int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf, BRT
} }
// If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile. // If something goes wrong, close the fd. After this, the caller shouldn't close the fd, but instead should close the cachefile.
int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, BRT brt) { int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd) {
int r; int r;
CACHEFILE extant; CACHEFILE extant;
FILENUM max_filenum_in_use={0}; FILENUM max_filenum_in_use={0};
...@@ -141,17 +137,16 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, BRT brt) { ...@@ -141,17 +137,16 @@ int toku_cachetable_openfd (CACHEFILE *cf, CACHETABLE t, int fd, BRT brt) {
newcf->fd = fd; newcf->fd = fd;
newcf->cachetable = t; newcf->cachetable = t;
newcf->fileid = fileid; newcf->fileid = fileid;
newcf->brt = brt;
t->cachefiles = newcf; t->cachefiles = newcf;
*cf = newcf; *cf = newcf;
return 0; return 0;
} }
} }
int toku_cachetable_openf (CACHEFILE *cf, CACHETABLE t, const char *fname, int flags, mode_t mode, BRT brt) { int toku_cachetable_openf (CACHEFILE *cf, CACHETABLE t, const char *fname, int flags, mode_t mode) {
int fd = open(fname, flags, mode); int fd = open(fname, flags, mode);
if (fd<0) return errno; if (fd<0) return errno;
return toku_cachetable_openfd (cf, t, fd, brt); return toku_cachetable_openfd (cf, t, fd);
} }
static CACHEFILE remove_cf_from_list (CACHEFILE cf, CACHEFILE list) { static CACHEFILE remove_cf_from_list (CACHEFILE cf, CACHEFILE list) {
...@@ -166,6 +161,11 @@ static CACHEFILE remove_cf_from_list (CACHEFILE cf, CACHEFILE list) { ...@@ -166,6 +161,11 @@ static CACHEFILE remove_cf_from_list (CACHEFILE cf, CACHEFILE list) {
static int cachefile_flush_and_remove (CACHEFILE cf); static int cachefile_flush_and_remove (CACHEFILE cf);
// Increment the reference count
void toku_cachefile_refup (CACHEFILE cf) {
cf->refcount++;
}
int toku_cachefile_close (CACHEFILE *cfp) { int toku_cachefile_close (CACHEFILE *cfp) {
CACHEFILE cf = *cfp; CACHEFILE cf = *cfp;
assert(cf->refcount>0); assert(cf->refcount>0);
......
...@@ -25,8 +25,8 @@ typedef long long CACHEKEY; ...@@ -25,8 +25,8 @@ typedef long long CACHEKEY;
*/ */
int toku_create_cachetable(CACHETABLE */*result*/, long size_limit, LSN initial_lsn, TOKULOGGER); int toku_create_cachetable(CACHETABLE */*result*/, long size_limit, LSN initial_lsn, TOKULOGGER);
int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, int flags, mode_t mode, BRT); int toku_cachetable_openf (CACHEFILE *,CACHETABLE, const char */*fname*/, int flags, mode_t mode);
int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/, BRT brt); int toku_cachetable_openfd (CACHEFILE *,CACHETABLE, int /*fd*/);
typedef void (cachetable_flush_func_t)(CACHEFILE, CACHEKEY key, void*value, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p); typedef void (cachetable_flush_func_t)(CACHEFILE, CACHEKEY key, void*value, long size, BOOL write_me, BOOL keep_me, LSN modified_lsn, BOOL rename_p);
typedef cachetable_flush_func_t *CACHETABLE_FLUSH_FUNC_T; typedef cachetable_flush_func_t *CACHETABLE_FLUSH_FUNC_T;
...@@ -64,6 +64,8 @@ int toku_cachetable_close (CACHETABLE*); /* Flushes everything to disk, and dest ...@@ -64,6 +64,8 @@ int toku_cachetable_close (CACHETABLE*); /* Flushes everything to disk, and dest
int toku_cachefile_close (CACHEFILE*); int toku_cachefile_close (CACHEFILE*);
//int cachefile_flush (CACHEFILE); /* Flush everything related to the VOID* to disk and free all memory. Don't destroy the cachetable. */ //int cachefile_flush (CACHEFILE); /* Flush everything related to the VOID* to disk and free all memory. Don't destroy the cachetable. */
void toku_cachefile_refup (CACHEFILE cfp); // Increment the reference count. Use close to decrement it.
// Return on success (different from pread and pwrite) // Return on success (different from pread and pwrite)
//int cachefile_pwrite (CACHEFILE, const void *buf, size_t count, off_t offset); //int cachefile_pwrite (CACHEFILE, const void *buf, size_t count, off_t offset);
//int cachefile_pread (CACHEFILE, void *buf, size_t count, off_t offset); //int cachefile_pread (CACHEFILE, void *buf, size_t count, off_t offset);
...@@ -84,7 +86,7 @@ FILENUM toku_cachefile_filenum (CACHEFILE); ...@@ -84,7 +86,7 @@ FILENUM toku_cachefile_filenum (CACHEFILE);
// What is the cachefile that goes with a particular filenum? // What is the cachefile that goes with a particular filenum?
// During a transaction, we cannot reuse a filenum. // During a transaction, we cannot reuse a filenum.
int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf, BRT *brt); int toku_cachefile_of_filenum (CACHETABLE t, FILENUM filenum, CACHEFILE *cf);
int toku_cachetable_checkpoint (CACHETABLE ct); int toku_cachetable_checkpoint (CACHETABLE ct);
......
...@@ -180,6 +180,18 @@ const struct logtype logtypes[] = { ...@@ -180,6 +180,18 @@ const struct logtype logtypes[] = {
// {"LEAFENTRY", "oldleafentry", 0}, // {"LEAFENTRY", "oldleafentry", 0},
// {"LEAFENTRY", "newleafentry", 0}, // {"LEAFENTRY", "newleafentry", 0},
// NULLFIELD}}, // NULLFIELD}},
{"enqrootentry", 'a', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"u_int32_t", "typ", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
{"deqrootentry", 'A', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"u_int32_t", "typ", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
{"insertleafentry", 'I', FA{{"FILENUM", "filenum", 0}, {"insertleafentry", 'I', FA{{"FILENUM", "filenum", 0},
{"DISKOFF", "diskoff", 0}, {"DISKOFF", "diskoff", 0},
{"u_int32_t", "pmaidx", 0}, {"u_int32_t", "pmaidx", 0},
......
...@@ -113,6 +113,8 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,L ...@@ -113,6 +113,8 @@ static void toku_recover_fheader (LSN UU(lsn), TXNID UU(txnid),FILENUM filenum,L
h->freelist = header.freelist; h->freelist = header.freelist;
h->unused_memory = header.unused_memory; h->unused_memory = header.unused_memory;
h->n_named_roots = header.n_named_roots; h->n_named_roots = header.n_named_roots;
r=toku_fifo_create(&h->fifo);
assert(r==0);
if ((signed)header.n_named_roots==-1) { if ((signed)header.n_named_roots==-1) {
h->unnamed_root = header.u.one.root; h->unnamed_root = header.u.one.root;
} else { } else {
...@@ -195,6 +197,50 @@ static void recover_setup_node (FILENUM filenum, DISKOFF diskoff, CACHEFILE *cf, ...@@ -195,6 +197,50 @@ static void recover_setup_node (FILENUM filenum, DISKOFF diskoff, CACHEFILE *cf,
*cf = pair->cf; *cf = pair->cf;
} }
void toku_recover_deqrootentry (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, u_int32_t typ, BYTESTRING key, BYTESTRING val) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
void *h_v;
r = toku_cachetable_get_and_pin(pair->cf, 0, &h_v, NULL, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
assert(r==0);
struct brt_header *h=h_v;
bytevec storedkey,storeddata;
ITEMLEN storedkeylen, storeddatalen;
TXNID storedxid;
u_int32_t storedtype;
r = toku_fifo_peek(h->fifo, &storedkey, &storedkeylen, &storeddata, &storeddatalen, &storedtype, &storedxid);
assert(r==0);
assert(storedkeylen==key.len);
assert(storeddatalen==val.len);
assert(memcmp(storedkey, key.data, key.len)==0);
assert(memcmp(storeddata, val.data, val.len)==0);
assert(typ==storedtype);
assert(xid==storedxid);
r = toku_fifo_deq(h->fifo);
assert(r==0);
r = toku_cachetable_unpin(pair->cf, 0, 1, 0);
assert(r==0);
toku_free(key.data);
toku_free(val.data);
}
void toku_recover_enqrootentry (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, u_int32_t typ, BYTESTRING key, BYTESTRING val) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
assert(r==0);
void *h_v;
r = toku_cachetable_get_and_pin(pair->cf, 0, &h_v, NULL, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0);
assert(r==0);
struct brt_header *h=h_v;
r = toku_fifo_enq(h->fifo, key.data, key.len, val.data, val.len, typ, xid);
assert(r==0);
r = toku_cachetable_unpin(pair->cf, 0, 1, 0);
assert(r==0);
toku_free(key.data);
toku_free(val.data);
}
void toku_recover_brtdeq (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, TXNID xid, u_int32_t typ, BYTESTRING key, BYTESTRING data, u_int32_t oldfingerprint, u_int32_t newfingerprint) { void toku_recover_brtdeq (LSN lsn, FILENUM filenum, DISKOFF diskoff, u_int32_t childnum, TXNID xid, u_int32_t typ, BYTESTRING key, BYTESTRING data, u_int32_t oldfingerprint, u_int32_t newfingerprint) {
CACHEFILE cf; CACHEFILE cf;
BRTNODE node; BRTNODE node;
...@@ -382,7 +428,7 @@ void toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM ...@@ -382,7 +428,7 @@ void toku_recover_fopen (LSN UU(lsn), TXNID UU(txnid), BYTESTRING fname, FILENUM
brt->compare_fun = 0; brt->compare_fun = 0;
brt->dup_compare = 0; brt->dup_compare = 0;
brt->db = 0; brt->db = 0;
int r = toku_cachetable_openfd(&cf, ct, fd, brt); int r = toku_cachetable_openfd(&cf, ct, fd);
assert(r==0); assert(r==0);
brt->skey = brt->sval = 0; brt->skey = brt->sval = 0;
brt->cf=cf; brt->cf=cf;
......
...@@ -35,28 +35,30 @@ int toku_rollback_fcreate (TXNID xid __attribute__((__unused__)), ...@@ -35,28 +35,30 @@ int toku_rollback_fcreate (TXNID xid __attribute__((__unused__)),
int toku_commit_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRING data,TOKUTXN txn) { int toku_commit_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
CACHEFILE cf; CACHEFILE cf;
BRT brt;
//printf("%s:%d committing insert %s %s\n", __FILE__, __LINE__, key.data, data.data); //printf("%s:%d committing insert %s %s\n", __FILE__, __LINE__, key.data, data.data);
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt); int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
assert(r==0); assert(r==0);
DBT key_dbt,data_dbt; DBT key_dbt,data_dbt;
BRT_CMD_S brtcmd = { BRT_COMMIT_BOTH, xid, BRT_CMD_S brtcmd = { BRT_COMMIT_BOTH, xid,
.u.id={toku_fill_dbt(&key_dbt, key.data, key.len), .u.id={toku_fill_dbt(&key_dbt, key.data, key.len),
toku_fill_dbt(&data_dbt, data.data, data.len)}}; toku_fill_dbt(&data_dbt, data.data, data.len)}};
return toku_brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn)); r = toku_cachefile_root_put_cmd(cf, &brtcmd, toku_txn_logger(txn));
if (r!=0) return r;
return toku_cachefile_close(&cf);
} }
int toku_rollback_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRING data,TOKUTXN txn) { int toku_rollback_cmdinsert (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
CACHEFILE cf; CACHEFILE cf;
BRT brt; int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt);
assert(r==0); assert(r==0);
//printf("%s:%d aborting insert %s %s\n", __FILE__, __LINE__, key.data, data.data); //printf("%s:%d aborting insert %s %s\n", __FILE__, __LINE__, key.data, data.data);
DBT key_dbt,data_dbt; DBT key_dbt,data_dbt;
BRT_CMD_S brtcmd = { BRT_ABORT_BOTH, xid, BRT_CMD_S brtcmd = { BRT_ABORT_BOTH, xid,
.u.id={toku_fill_dbt(&key_dbt, key.data, key.len), .u.id={toku_fill_dbt(&key_dbt, key.data, key.len),
toku_fill_dbt(&data_dbt, data.data, data.len)}}; toku_fill_dbt(&data_dbt, data.data, data.len)}};
return toku_brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn)); r = toku_cachefile_root_put_cmd(cf, &brtcmd, toku_txn_logger(txn));
if (r!=0) return r;
return toku_cachefile_close(&cf);
} }
int toku_commit_cmddeleteboth (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRING data,TOKUTXN txn) { int toku_commit_cmddeleteboth (TXNID xid, FILENUM filenum, BYTESTRING key,BYTESTRING data,TOKUTXN txn) {
...@@ -69,26 +71,28 @@ int toku_rollback_cmddeleteboth (TXNID xid, FILENUM filenum, BYTESTRING key,BYTE ...@@ -69,26 +71,28 @@ int toku_rollback_cmddeleteboth (TXNID xid, FILENUM filenum, BYTESTRING key,BYTE
int toku_commit_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN txn) { int toku_commit_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN txn) {
CACHEFILE cf; CACHEFILE cf;
BRT brt; int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt);
assert(r==0); assert(r==0);
//printf("%s:%d aborting delete %s %s\n", __FILE__, __LINE__, key.data, data.data); //printf("%s:%d aborting delete %s %s\n", __FILE__, __LINE__, key.data, data.data);
DBT key_dbt,data_dbt; DBT key_dbt,data_dbt;
BRT_CMD_S brtcmd = { BRT_COMMIT_ANY, xid, BRT_CMD_S brtcmd = { BRT_COMMIT_ANY, xid,
.u.id={toku_fill_dbt(&key_dbt, key.data, key.len), .u.id={toku_fill_dbt(&key_dbt, key.data, key.len),
toku_init_dbt(&data_dbt)}}; toku_init_dbt(&data_dbt)}};
return toku_brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn)); r = toku_cachefile_root_put_cmd(cf, &brtcmd, toku_txn_logger(txn));
if (r!=0) return r;
return toku_cachefile_close(&cf);
} }
int toku_rollback_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN txn) { int toku_rollback_cmddelete (TXNID xid, FILENUM filenum, BYTESTRING key,TOKUTXN txn) {
CACHEFILE cf; CACHEFILE cf;
BRT brt; int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf);
int r = toku_cachefile_of_filenum(txn->logger->ct, filenum, &cf, &brt);
assert(r==0); assert(r==0);
//printf("%s:%d aborting delete %s %s\n", __FILE__, __LINE__, key.data, data.data); //printf("%s:%d aborting delete %s %s\n", __FILE__, __LINE__, key.data, data.data);
DBT key_dbt,data_dbt; DBT key_dbt,data_dbt;
BRT_CMD_S brtcmd = { BRT_ABORT_ANY, xid, BRT_CMD_S brtcmd = { BRT_ABORT_ANY, xid,
.u.id={toku_fill_dbt(&key_dbt, key.data, key.len), .u.id={toku_fill_dbt(&key_dbt, key.data, key.len),
toku_init_dbt(&data_dbt)}}; toku_init_dbt(&data_dbt)}};
return toku_brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn)); r = toku_cachefile_root_put_cmd(cf, &brtcmd, toku_txn_logger(txn));
if (r!=0) return r;
return toku_cachefile_close(&cf);
} }
/* Nested transactions. */
#include <db.h>
#include <sys/stat.h>
#include "test.h"
static DB_ENV *env;
static DB *db;
static void insert (int i, DB_TXN *x) {
char hello[30], there[30];
DBT key,data;
if (verbose) printf("Insert %d\n", i);
snprintf(hello, sizeof(hello), "hello%d", i);
snprintf(there, sizeof(there), "there%d", i);
int r = db->put(db, x,
dbt_init(&key, hello, strlen(hello)+1),
dbt_init(&data, there, strlen(there)+1),
0);
CKERR(r);
}
static void delete (int i, DB_TXN *x) {
char hello[30];
DBT key;
if (verbose) printf("delete %d\n", i);
snprintf(hello, sizeof(hello), "hello%d", i);
int r = db->del(db, x,
dbt_init(&key, hello, strlen(hello)+1),
0);
CKERR(r);
}
static void lookup (int i, DB_TXN *x, int expect) {
char hello[30], there[30];
DBT key,data;
snprintf(hello, sizeof(hello), "hello%d", i);
memset(&data, 0, sizeof(data));
if (verbose) printf("Looking up %d (expecting %s)\n", i, expect==0 ? "to find" : "not to find");
int r = db->get(db, x,
dbt_init(&key, hello, strlen(hello)+1),
&data,
0);
assert(expect==r);
if (expect==0) {
CKERR(r);
snprintf(there, sizeof(there), "there%d", i);
assert(data.size==strlen(there)+1);
assert(strcmp(data.data, there)==0);
}
}
static DB_TXN *txn, *txn2;
void test_nested (void) {
int r;
system("rm -rf " ENVDIR);
r=mkdir(ENVDIR, 0777); assert(r==0);
r=db_env_create(&env, 0); assert(r==0);
env->set_errfile(env, stderr);
r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, 0777); CKERR(r);
r=db_create(&db, env, 0); CKERR(r);
r=env->txn_begin(env, 0, &txn, 0); assert(r==0);
r=db->open(db, txn, "foo.db", 0, DB_BTREE, DB_CREATE, 0777); CKERR(r);
insert(0, txn);
insert(1, txn);
insert(2, txn);
insert(3, txn);
lookup(0, txn, 0);
lookup(1, txn, 0);
lookup(2, txn, 0);
lookup(3, txn, 0);
r=txn->commit(txn, 0); assert(r==0);
r=env->txn_begin(env, 0, &txn, 0); CKERR(r);
delete(0, txn);
delete(3, txn);
r=env->txn_begin(env, txn, &txn2, 0); CKERR(r);
delete(1, txn2); CKERR(r);
lookup(3, txn2, DB_NOTFOUND);
insert(3, txn2);
lookup(3, txn2, 0);
r=txn->commit(txn2, 0); CKERR(r);
lookup(0, txn, DB_NOTFOUND);
lookup(1, txn, DB_NOTFOUND);
lookup(2, txn, 0);
lookup(3, txn, 0);
r=txn->commit(txn, 0); CKERR(r);
r=env->txn_begin(env, 0, &txn, 0); CKERR(r);
lookup(0, txn, DB_NOTFOUND);
lookup(1, txn, DB_NOTFOUND);
lookup(2, txn, 0);
r=txn->commit(txn, 0); CKERR(r);
insert(4, 0);
r=env->txn_begin(env, 0, &txn, 0); CKERR(r);
r=env->txn_begin(env, txn, &txn2, 0); CKERR(r);
delete(4, txn2);
r=txn->commit(txn2, 0); CKERR(r);
lookup(4, txn, DB_NOTFOUND);
insert(4, txn);
r=txn->commit(txn, 0); CKERR(r);
lookup(4, 0, 0);
r=env->txn_begin(env, 0, &txn, 0); CKERR(r);
insert(5, txn);
r=env->txn_begin(env, txn, &txn2, 0); CKERR(r);
lookup(5, txn2, 0);
insert(5, txn2);
lookup(5, txn2, 0);
r=txn->commit(txn2, 0); CKERR(r);
lookup(5, txn, 0);
r=env->txn_begin(env, txn, &txn2, 0); CKERR(r);
lookup(5, txn2, 0);
delete(5, txn2);
r=txn->commit(txn2, 0); CKERR(r);
lookup(5, txn, DB_NOTFOUND);
r=txn->commit(txn, 0); CKERR(r);
insert(6, 0);
r=env->txn_begin(env, 0, &txn, 0); CKERR(r);
insert(6, txn);
r=env->txn_begin(env, txn, &txn2, 0); CKERR(r);
delete(6, txn2);
r=txn->commit(txn2, 0); CKERR(r);
r=txn->commit(txn, 0); CKERR(r);
r=db->close(db, 0); CKERR(r);
r=env->close(env, 0); CKERR(r);
}
int main (int argc, const char *argv[]) {
parse_args(argc, argv);
test_nested();
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment