Commit 0c1af478 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Add the first two tests for {{{DB_ENV->log_archive}}}. Fix up some memory...

Add the first two tests for {{{DB_ENV->log_archive}}}.  Fix up some memory leaks.  Add {{{LSN*}}} to all the {{{toku_log_*}}} functions so that {{{toku_logger_txn_begin}}} can use the LSN as a txnid.  Addresses #75, #83, #392.

git-svn-id: file:///svn/tokudb@3004 c7de825b-a66e-492c-adef-691d508d4ae1
parent cbcf5bf5
......@@ -95,7 +95,7 @@ static void fixup_child_fingerprint(BRTNODE node, int childnum_of_node, BRTNODE
// We only call this function if we have reason to believe that the child's fingerprint did change.
BNC_SUBTREE_FINGERPRINT(node,childnum_of_node)=sum;
node->dirty=1;
toku_log_changechildfingerprint(logger, 0, toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum);
toku_log_changechildfingerprint(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), node->thisnodename, childnum_of_node, old_fingerprint, sum);
toku_update_brtnode_loggerlsn(node, logger);
}
......@@ -227,7 +227,7 @@ static int malloc_diskblock_header_is_in_memory (DISKOFF *res, BRT brt, int size
DISKOFF result = brt->h->unused_memory;
brt->h->unused_memory+=size;
brt->h->dirty = 1;
int r = toku_log_changeunusedmemory(logger, 0, toku_cachefile_filenum(brt->cf), result, brt->h->unused_memory);
int r = toku_log_changeunusedmemory(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), result, brt->h->unused_memory);
*res = result;
return r;
}
......@@ -295,7 +295,7 @@ int toku_create_new_brtnode (BRT t, BRTNODE *result, int height, TOKULOGGER logg
r=toku_cachetable_put(t->cf, n->thisnodename, n, brtnode_size(n),
toku_brtnode_flush_callback, toku_brtnode_fetch_callback, t);
assert(r==0);
r=toku_log_newbrtnode(logger, 0, toku_cachefile_filenum(t->cf), n->thisnodename, height, n->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, n->rand4fingerprint);
r=toku_log_newbrtnode(logger, (LSN*)0, 0, toku_cachefile_filenum(t->cf), n->thisnodename, height, n->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, n->rand4fingerprint);
assert(r==0);
toku_update_brtnode_loggerlsn(n, logger);
return 0;
......@@ -355,7 +355,7 @@ static int log_and_save_brtenq(TOKULOGGER logger, BRT t, BRTNODE node, int child
u_int32_t new_fingerprint = old_fingerprint + fdiff;
//printf("%s:%d node=%lld fingerprint old=%08x new=%08x diff=%08x xid=%lld\n", __FILE__, __LINE__, (long long)node->thisnodename, old_fingerprint, new_fingerprint, fdiff, (long long)xid);
*fingerprint = new_fingerprint;
int r = toku_log_brtenq(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, xid, type, keybs, databs, old_fingerprint, new_fingerprint);
int r = toku_log_brtenq(logger, (LSN*)0, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, xid, type, keybs, databs, old_fingerprint, new_fingerprint);
if (r!=0) return r;
TOKUTXN txn;
if (0==toku_txnid2txn(logger, xid, &txn) && txn) {
......@@ -409,7 +409,7 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
BNC_DISKOFF(B, targchild) = thischilddiskoff;
int r = toku_log_addchild(logger, 0, fnum, B->thisnodename, targchild, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i));
int r = toku_log_addchild(logger, (LSN*)0, 0, fnum, B->thisnodename, targchild, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i));
if (r!=0) return r;
while (1) {
......@@ -426,7 +426,7 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
u_int32_t delta = toku_calccrc32_cmd(type, xid, key, keylen, data, datalen);
u_int32_t new_from_fingerprint = old_from_fingerprint - node->rand4fingerprint*delta;
if (r!=0) return r;
r = toku_log_brtdeq(logger, 0, fnum, node->thisnodename, n_children_in_a, xid, type, keybs, databs, old_from_fingerprint, new_from_fingerprint);
r = toku_log_brtdeq(logger, (LSN*)0, 0, fnum, node->thisnodename, n_children_in_a, xid, type, keybs, databs, old_from_fingerprint, new_from_fingerprint);
if (r!=0) return r;
r = log_and_save_brtenq(logger, t, B, targchild, xid, type, key, keylen, data, datalen, &B->local_fingerprint, path_to_parent);
r = toku_fifo_enq(to_htab, key, keylen, data, datalen, type, xid);
......@@ -448,10 +448,10 @@ static int brt_nonleaf_split (BRT t, BRTNODE node, BRTNODE *nodea, BRTNODE *node
BYTESTRING bs = { .len = kv_pair_keylen(node->u.n.childkeys[i-1]),
.data = kv_pair_key(node->u.n.childkeys[i-1]) };
assert(i>0);
r = toku_log_delchild(logger, 0, fnum, node->thisnodename, n_children_in_a, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i), bs);
r = toku_log_delchild(logger, (LSN*)0, 0, fnum, node->thisnodename, n_children_in_a, thischilddiskoff, BNC_SUBTREE_FINGERPRINT(node, i), bs);
if (r!=0) return r;
if (i>n_children_in_a) {
r = toku_log_setpivot(logger, 0, fnum, B->thisnodename, targchild-1, bs);
r = toku_log_setpivot(logger, (LSN*)0, 0, fnum, B->thisnodename, targchild-1, bs);
if (r!=0) return r;
B->u.n.childkeys[targchild-1] = node->u.n.childkeys[i-1];
B->u.n.totalchildkeylens += toku_brt_pivot_key_len(t, node->u.n.childkeys[i-1]);
......@@ -599,7 +599,7 @@ static int push_a_brt_cmd_down (BRT t, BRTNODE node, BRTNODE child, int childnum
{
BYTESTRING keybs = { .len=k->size, .data=(char*)k->data };
BYTESTRING databs = { .len=v->size, .data=(char*)v->data };
int r = toku_log_brtdeq(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum,
int r = toku_log_brtdeq(logger, (LSN*)0, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum,
cmd->xid, cmd->type, keybs, databs, old_fingerprint, new_fingerprint);
assert(r==0);
}
......@@ -669,7 +669,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
for (cnum=node->u.n.n_children; cnum>childnum+1; cnum--) {
node->u.n.childinfos[cnum] = node->u.n.childinfos[cnum-1];
}
r = toku_log_addchild(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum+1, childb->thisnodename, 0);
r = toku_log_addchild(logger, (LSN*)0, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum+1, childb->thisnodename, 0);
node->u.n.n_children++;
assert(BNC_DISKOFF(node, childnum)==childa->thisnodename); // use the same child
......@@ -691,7 +691,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
BYTESTRING databs = { .len = svallen, .data = (char*)sval };
u_int32_t old_fingerprint = node->local_fingerprint;
u_int32_t new_fingerprint = old_fingerprint - node->rand4fingerprint*toku_calccrc32_cmd(type, xid, skey, skeylen, sval, svallen);
r = toku_log_brtdeq(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum,
r = toku_log_brtdeq(logger, (LSN*)0, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum,
xid, type, keybs, databs, old_fingerprint, new_fingerprint);
node->local_fingerprint = new_fingerprint;
}));
......@@ -703,7 +703,7 @@ static int handle_split_of_child (BRT t, BRTNODE node, int childnum,
struct kv_pair *pivot = childsplitk->data;
BYTESTRING bs = { .len = childsplitk->size,
.data = kv_pair_key(pivot) };
r = toku_log_setpivot(logger, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, bs);
r = toku_log_setpivot(logger, (LSN*)0, 0, toku_cachefile_filenum(t->cf), node->thisnodename, childnum, bs);
if (r!=0) return r;
for (cnum=node->u.n.n_children-2; cnum>childnum; cnum--) {
......@@ -1359,7 +1359,7 @@ static int setup_initial_brt_root_node (BRT t, DISKOFF offset, TOKULOGGER logger
}
toku_verify_counts(node);
// verify_local_fingerprint_nonleaf(node);
toku_log_newbrtnode(logger, 0, toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
toku_log_newbrtnode(logger, (LSN*)0, 0, toku_cachefile_filenum(t->cf), offset, 0, t->h->nodesize, (t->flags&TOKU_DB_DUPSORT)!=0, node->rand4fingerprint);
toku_update_brtnode_loggerlsn(node, logger);
r = toku_unpin_brtnode(t, node);
if (r!=0) {
......@@ -1526,7 +1526,7 @@ int toku_brt_open(BRT t, const char *fname, const char *fname_in_env, const char
} else {
lh.u.one.root = t->h->unnamed_root;
}
if ((r=toku_log_fheader(toku_txn_logger(txn), 0, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), lh))) { goto died6; }
if ((r=toku_log_fheader(toku_txn_logger(txn), (LSN*)0, 0, toku_txn_get_txnid(txn), toku_cachefile_filenum(t->cf), lh))) { goto died6; }
}
if ((r=setup_initial_brt_root_node(t, t->nodesize, toku_txn_logger(txn)))!=0) { died6: if (dbname) goto died5; else goto died2; }
if ((r=toku_cachetable_put(t->cf, 0, t->h, 0, toku_brtheader_flush_callback, toku_brtheader_fetch_callback, 0))) { goto died6; }
......@@ -1704,12 +1704,12 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
assert(r==0);
assert(newroot);
if (brt->database_name==0) {
toku_log_changeunnamedroot(logger, 0, toku_cachefile_filenum(brt->cf), *rootp, newroot_diskoff);
toku_log_changeunnamedroot(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), *rootp, newroot_diskoff);
} else {
BYTESTRING bs;
bs.len = 1+strlen(brt->database_name);
bs.data = brt->database_name;
toku_log_changenamedroot(logger, 0, toku_cachefile_filenum(brt->cf), bs, *rootp, newroot_diskoff);
toku_log_changenamedroot(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), bs, *rootp, newroot_diskoff);
}
*rootp=newroot_diskoff;
brt->h->dirty=1;
......@@ -1732,18 +1732,18 @@ static int brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk,
toku_verify_counts(newroot);
//verify_local_fingerprint_nonleaf(nodea);
//verify_local_fingerprint_nonleaf(nodeb);
r=toku_log_newbrtnode(logger, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (brt->flags&TOKU_DB_DUPSORT)!=0, newroot->rand4fingerprint);
r=toku_log_newbrtnode(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, new_height, new_nodesize, (brt->flags&TOKU_DB_DUPSORT)!=0, newroot->rand4fingerprint);
if (r!=0) return r;
r=toku_log_addchild(logger, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, nodea->thisnodename, 0);
r=toku_log_addchild(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, nodea->thisnodename, 0);
if (r!=0) return r;
r=toku_log_addchild(logger, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 1, nodeb->thisnodename, 0);
r=toku_log_addchild(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 1, nodeb->thisnodename, 0);
if (r!=0) return r;
fixup_child_fingerprint(newroot, 0, nodea, brt, logger);
fixup_child_fingerprint(newroot, 1, nodeb, brt, logger);
{
BYTESTRING bs = { .len = kv_pair_keylen(newroot->u.n.childkeys[0]),
.data = kv_pair_key(newroot->u.n.childkeys[0]) };
r=toku_log_setpivot(logger, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs);
r=toku_log_setpivot(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), newroot_diskoff, 0, bs);
if (r!=0) return r;
toku_update_brtnode_loggerlsn(newroot, logger);
}
......
......@@ -74,10 +74,11 @@ int toku_logger_find_logfiles (const char *directory, char ***resultp);
struct tokutxn {
enum typ_tag tag;
u_int64_t txnid64;
u_int64_t txnid64; /* this happens to be the first lsn */
TOKULOGGER logger;
TOKUTXN parent;
LSN last_lsn; /* Everytime anything is logged, update the LSN. (We need to atomically record the LSN along with writing into the log.) */
LSN first_lsn; /* The first lsn in the transaction. */
struct roll_entry *oldest_logentry,*newest_logentry; /* Only logentries with rollbacks are here. There is a list going from newest to oldest. */
struct list live_txns_link;
};
......
......@@ -344,7 +344,7 @@ int toku_logger_finish (TOKULOGGER logger, struct logbytes *logbytes, struct wbu
int toku_logger_commit (TOKUTXN txn, int nosync) {
// panic handled in log_commit
int r = toku_log_commit(txn->logger, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks.
int r = toku_log_commit(txn->logger, (LSN*)0, (txn->parent==0) && !nosync, txn->txnid64); // exits holding neither of the tokulogger locks.
if (r!=0) goto free_and_return;
if (txn->parent!=0) {
// Append the list to the front.
......@@ -378,14 +378,16 @@ int toku_logger_log_checkpoint (TOKULOGGER logger) {
if (r!=0) return r;
logger->checkpoint_lsns[1]=logger->checkpoint_lsns[0];
logger->checkpoint_lsns[0]=logger->lsn;
return toku_log_checkpoint(logger, 1);
return toku_log_checkpoint(logger, (LSN*)0, 1);
}
int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TXNID txnid64, TOKULOGGER logger) {
int toku_logger_txn_begin (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger) {
if (logger->is_panicked) return EINVAL;
TAGMALLOC(TOKUTXN, result);
if (result==0) return errno;
result->txnid64 = txnid64;
int r =toku_log_xbegin(logger, &result->first_lsn, 0, parent_tokutxn ? parent_tokutxn->txnid64 : 0);
if (r!=0) { toku_logger_panic(logger, r); return r; }
result->txnid64 = result->first_lsn.lsn;
result->logger = logger;
result->parent = parent_tokutxn;
result->oldest_logentry = result->newest_logentry = 0;
......@@ -398,7 +400,7 @@ int toku_logger_log_fcreate (TOKUTXN txn, const char *fname, int mode) {
if (txn==0) return 0;
if (txn->logger->is_panicked) return EINVAL;
BYTESTRING bs = { .len=strlen(fname), .data = strdup(fname) };
int r = toku_log_fcreate (txn->logger, 0, toku_txn_get_txnid(txn), bs, mode);
int r = toku_log_fcreate (txn->logger, (LSN*)0, 0, toku_txn_get_txnid(txn), bs, mode);
if (r!=0) return r;
r = toku_logger_save_rollback_fcreate(txn, bs);
return r;
......@@ -411,7 +413,7 @@ int toku_logger_log_fopen (TOKUTXN txn, const char * fname, FILENUM filenum) {
BYTESTRING bs;
bs.len = strlen(fname);
bs.data = (char*)fname;
return toku_log_fopen (txn->logger, 0, toku_txn_get_txnid(txn), bs, filenum);
return toku_log_fopen (txn->logger, (LSN*)0, 0, toku_txn_get_txnid(txn), bs, filenum);
}
int toku_fread_u_int8_t_nocrclen (FILE *f, u_int8_t *v) {
......@@ -767,20 +769,24 @@ int toku_logger_log_archive (TOKULOGGER logger, char ***logs_p, int flags) {
for (i=0; i<n_to_archive; i++) {
count_bytes+=1+strlen(all_logs[i]);
}
char **result = toku_malloc((1+n_to_archive)*sizeof(*result) + count_bytes);
char *base = (char*)(result+1+n_to_archive);
for (i=0; i<n_to_archive; i++) {
int len=1+strlen(all_logs[i]);
result[i]=base;
memcpy(base, all_logs[i], len);
base+=len;
free(all_logs[i]);
char **result;
if (i==0) {
result=0;
} else {
result = toku_malloc((1+n_to_archive)*sizeof(*result) + count_bytes);
char *base = (char*)(result+1+n_to_archive);
for (i=0; i<n_to_archive; i++) {
int len=1+strlen(all_logs[i]);
result[i]=base;
memcpy(base, all_logs[i], len);
base+=len;
}
result[n_to_archive]=0;
}
for (; all_logs[i]; i++) {
for (i=0; all_logs[i]; i++) {
free(all_logs[i]);
}
free(all_logs);
result[n_to_archive]=0;
*logs_p = result;
return 0;
}
......@@ -37,7 +37,7 @@ int toku_logger_log_phys_add_or_delete_in_leaf (DB *db, TOKUTXN txn, DISKOFF
int toku_logger_commit (TOKUTXN txn, int no_sync);
int toku_logger_txn_begin (TOKUTXN /*parent*/,TOKUTXN *, TXNID /*txnid64*/, TOKULOGGER /*logger*/);
int toku_logger_txn_begin (TOKUTXN /*parent*/,TOKUTXN *, TOKULOGGER /*logger*/);
int toku_logger_log_fcreate (TOKUTXN, const char */*fname*/, int /*mode*/);
......
......@@ -59,6 +59,7 @@ const struct logtype rollbacks[] = {
const struct logtype logtypes[] = {
{"checkpoint", 'x', FA{NULLFIELD}},
{"commit", 'C', FA{{"TXNID", "txnid", 0},NULLFIELD}},
{"xbegin", 'b', FA{{"TXNID", "parenttxnid", 0},NULLFIELD}},
#if 0
{"tl_delete", 'D', FA{{"FILENUM", "filenum", 0}, // tl logentries can be used, by themselves, to rebuild the whole DB from scratch.
{"DISKOFF", "diskoff", 0},
......@@ -316,7 +317,7 @@ void generate_log_free(void) {
void generate_log_writer (void) {
DO_LOGTYPES(lt, ({
fprintf2(cf, hf, "int toku_log_%s (TOKULOGGER logger, int do_fsync", lt->name);
fprintf2(cf, hf, "int toku_log_%s (TOKULOGGER logger, LSN *lsnp, int do_fsync", lt->name);
DO_FIELDS(ft, lt, fprintf2(cf, hf, ", %s %s", ft->type, ft->name));
fprintf(hf, ");\n");
fprintf(cf, ") {\n");
......@@ -338,6 +339,7 @@ void generate_log_writer (void) {
fprintf(cf, " LSN lsn = logger->lsn;\n");
fprintf(cf, " wbuf_LSN(&wbuf, lsn);\n");
fprintf(cf, " lbytes->lsn = lsn;\n");
fprintf(cf, " if (lsnp) *lsnp=logger->lsn;\n");
fprintf(cf, " logger->lsn.lsn++;\n");
DO_FIELDS(ft, lt,
fprintf(cf, " wbuf_%s(&wbuf, %s);\n", ft->type, ft->name));
......
......@@ -369,7 +369,7 @@ static int pma_log_distribute (TOKULOGGER logger, FILENUM filenum, DISKOFF old_d
}
}
ipa.size=j;
int r=toku_log_pmadistribute(logger, 0, filenum, old_diskoff, new_diskoff, ipa);
int r=toku_log_pmadistribute(logger, (LSN*)0, 0, filenum, old_diskoff, new_diskoff, ipa);
if (logger && oldnode_lsn) *oldnode_lsn = toku_logger_last_lsn(logger);
if (logger && newnode_lsn) *newnode_lsn = toku_logger_last_lsn(logger);
// if (0 && pma) {
......@@ -546,7 +546,7 @@ static int pma_resize_array(TOKULOGGER logger, FILENUM filenum, DISKOFF offset,
unsigned int oldN, n;
int r = pma_resize_array_nolog(pma, asksize, startz, &oldN, &n);
if (r!=0) return r;
toku_log_resizepma (logger, 0, filenum, offset, oldN, n);
toku_log_resizepma (logger, (LSN*)0, 0, filenum, offset, oldN, n);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
return 0;
}
......@@ -734,7 +734,7 @@ int toku_pma_insert (PMA pma, DBT *k, DBT *v, TOKULOGGER logger, TXNID xid, FILE
{
const BYTESTRING key = { pair->keylen, kv_pair_key(pair) };
const BYTESTRING data = { pair->vallen, kv_pair_val(pair) };
int r = toku_log_insertinleaf (logger, 0, xid, pma->filenum, diskoff, idx, key, data);
int r = toku_log_insertinleaf (logger, (LSN*)0, 0, xid, pma->filenum, diskoff, idx, key, data);
if (r!=0) return r;
if (node_lsn) *node_lsn = toku_logger_last_lsn(logger);
}
......@@ -772,7 +772,7 @@ static int pma_log_delete (PMA pma, const char *key, int keylen, const char *val
{
const BYTESTRING deletedkey = { keylen, (char*)key };
const BYTESTRING deleteddata = { vallen, (char*)val };
int r=toku_log_deleteinleaf(logger, 0, xid, pma->filenum, diskoff, idx, deletedkey, deleteddata);
int r=toku_log_deleteinleaf(logger, (LSN*)0, 0, xid, pma->filenum, diskoff, idx, deletedkey, deleteddata);
if (r!=0) return r;
}
if (logger) {
......@@ -945,7 +945,7 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
{
const BYTESTRING key = { k->size, k->data };
const BYTESTRING data = { v->size, v->data };
r = toku_log_insertinleaf (logger, 0, xid, pma->filenum, diskoff, idx, key, data);
r = toku_log_insertinleaf (logger, (LSN*)0, 0, xid, pma->filenum, diskoff, idx, key, data);
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
if (r!=0) return r;
/* We don't record the insert here for rollback. The insert should have been logged at the top-level. */
......@@ -1105,7 +1105,7 @@ int toku_pma_split(TOKULOGGER logger, FILENUM filenum,
{
int r = pma_log_distribute(logger, filenum, diskoff, diskoff, spliti, &pairs[0], lsn, lsn);
if (r!=0) { toku_free(pairs); return r; }
r = toku_log_resizepma(logger, 0, filenum, diskoff, oldn_for_logging, newn_for_logging);
r = toku_log_resizepma(logger, (LSN*)0, 0, filenum, diskoff, oldn_for_logging, newn_for_logging);
if (r!=0) { toku_free(pairs); return r; }
if (logger && lsn) *lsn = toku_logger_last_lsn(logger);
......
......@@ -527,6 +527,10 @@ static int toku_recover_checkpoint (LSN UU(lsn)) {
return 0;
}
static int toku_recover_xbegin (LSN UU(lsn), TXNID UU(parent)) {
return 0;
}
int tokudb_recover(const char *data_dir, const char *log_dir) {
int r;
int entrycount=0;
......
/* Test log archive. */
#include <db.h>
#include <sys/stat.h>
#include "test.h"
int main (int argc, const char *argv[]) {
parse_args(argc, argv);
DB_ENV *env;
DB *db;
DB_TXN *txn;
int r;
system("rm -rf " ENVDIR);
r=mkdir(ENVDIR, 0777); assert(r==0);
r=db_env_create(&env, 0); CKERR(r);
env->set_errfile(env, stderr);
r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, 0777); CKERR(r);
r=db_create(&db, env, 0); CKERR(r);
r=env->txn_begin(env, 0, &txn, 0); CKERR(r);
r=db->open(db, txn, "foo.db", 0, DB_BTREE, DB_CREATE, 0777); CKERR(r);
r=txn->commit(txn, 0); CKERR(r);
char **list;
r=env->log_archive(env, &list, 0);
assert(list==0);
r=db->close(db, 0); CKERR(r);
r=env->close(env, 0); CKERR(r);
return 0;
}
/* Test log archive. */
#include <db.h>
#include <sys/stat.h>
#include "test.h"
int main (int argc, const char *argv[]) {
parse_args(argc, argv);
DB_ENV *env;
DB *db;
DB_TXN *txn;
int r;
system("rm -rf " ENVDIR);
r=mkdir(ENVDIR, 0777); assert(r==0);
r=db_env_create(&env, 0); CKERR(r);
env->set_errfile(env, stderr);
r=env->set_lg_max(env, 16000); CKERR(r);
r=env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, 0777); CKERR(r);
r=db_create(&db, env, 0); CKERR(r);
r=env->txn_begin(env, 0, &txn, 0); CKERR(r);
r=db->open(db, txn, "foo.db", 0, DB_BTREE, DB_CREATE, 0777); CKERR(r);
r=txn->commit(txn, 0); CKERR(r);
int i;
for (i=0; i<100; i++) {
DBT key,data;
char hello[30],there[30];
snprintf(hello, sizeof(hello), "hello%d", i);
snprintf(there, sizeof(there), "there%d", i);
r=env->txn_begin(env, 0, &txn, 0); CKERR(r);
r=db->put(db, txn,
dbt_init(&key, hello, strlen(hello)+1),
dbt_init(&data, there, strlen(there)+1),
0);
r=txn->commit(txn, 0); CKERR(r);
r=env->txn_checkpoint(env, 0, 0, 0);
}
{
char **list;
r=env->log_archive(env, &list, 0);
CKERR(r);
assert(list);
assert(list[0]);
if (verbose) printf("file[0]=%s\n", list[0]);
free(list);
}
r=db->close(db, 0); CKERR(r);
r=env->close(env, 0); CKERR(r);
return 0;
}
......@@ -810,8 +810,6 @@ static u_int32_t toku_txn_id(DB_TXN * txn) {
return -1;
}
static TXNID next_txn = 0;
static int toku_txn_abort(DB_TXN * txn) {
HANDLE_PANICKED_ENV(txn->mgrp);
int r = toku_logger_abort(txn->i->tokutxn);
......@@ -873,7 +871,7 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f
}
}
r = toku_logger_txn_begin(stxn ? stxn->i->tokutxn : 0, &result->i->tokutxn, next_txn++, env->i->logger);
r = toku_logger_txn_begin(stxn ? stxn->i->tokutxn : 0, &result->i->tokutxn, env->i->logger);
if (r != 0)
return r;
*txn = result;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment