Commit 19e7b929 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

merge the enqrootentry and shutdown log entry changes to main. addresses #1847

git-svn-id: file:///svn/toku/tokudb@13453 c7de825b-a66e-492c-adef-691d508d4ae1
parent e3fa4e0f
......@@ -2630,13 +2630,6 @@ CACHEKEY* toku_calculate_root_offset_pointer (BRT brt, u_int32_t *roothash) {
int toku_brt_root_put_cmd(BRT brt, BRT_CMD cmd, TOKULOGGER logger)
// Effect: Flush the root fifo into the brt, and then push the cmd into the brt.
{
if (logger) {
BYTESTRING keybs = {.len=cmd->u.id.key->size, .data=cmd->u.id.key->data};
BYTESTRING valbs = {.len=cmd->u.id.val->size, .data=cmd->u.id.val->data};
int r = toku_log_enqrootentry(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), cmd->xid, cmd->type, keybs, valbs);
if (r!=0) return r;
}
void *node_v;
BRTNODE node;
CACHEKEY *rootp;
......@@ -2675,37 +2668,54 @@ int toku_brt_insert (BRT brt, DBT *key, DBT *val, TOKUTXN txn)
// Effect: Insert the key-val pair into brt.
{
int r;
if (txn && (brt->h->txnid_that_created_or_locked_when_empty != toku_txn_get_txnid(txn))) {
TXNID xid = toku_txn_get_txnid(txn);
if (txn && (brt->h->txnid_that_created_or_locked_when_empty != xid)) {
BYTESTRING keybs = {key->size, toku_memdup_in_rollback(txn, key->data, key->size)};
int need_data = (brt->flags&TOKU_DB_DUPSORT)!=0; // dupsorts don't need the data part
if (need_data) {
BYTESTRING databs = {val->size, toku_memdup_in_rollback(txn, val->data, val->size)};
r = toku_logger_save_rollback_cmdinsertboth(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs, databs);
r = toku_logger_save_rollback_cmdinsertboth(txn, xid, toku_cachefile_filenum(brt->cf), keybs, databs);
} else {
r = toku_logger_save_rollback_cmdinsert (txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs);
r = toku_logger_save_rollback_cmdinsert (txn, xid, toku_cachefile_filenum(brt->cf), keybs);
}
if (r!=0) return r;
r = toku_txn_note_brt(txn, brt);
if (r!=0) return r;
}
BRT_CMD_S brtcmd = { BRT_INSERT, toku_txn_get_txnid(txn), .u.id={key,val}};
r = toku_brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
TOKULOGGER logger = toku_txn_logger(txn);
if (logger) {
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
r = toku_log_enq_insert(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), xid, keybs, valbs);
if (r!=0) return r;
}
BRT_CMD_S brtcmd = { BRT_INSERT, xid, .u.id={key,val}};
r = toku_brt_root_put_cmd(brt, &brtcmd, logger);
if (r!=0) return r;
return r;
}
int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
int r;
if (txn && (brt->h->txnid_that_created_or_locked_when_empty != toku_txn_get_txnid(txn))) {
TXNID xid = toku_txn_get_txnid(txn);
if (txn && (brt->h->txnid_that_created_or_locked_when_empty != xid)) {
BYTESTRING keybs = {key->size, toku_memdup_in_rollback(txn, key->data, key->size)};
r = toku_logger_save_rollback_cmddelete(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs);
r = toku_logger_save_rollback_cmddelete(txn, xid, toku_cachefile_filenum(brt->cf), keybs);
if (r!=0) return r;
r = toku_txn_note_brt(txn, brt);
if (r!=0) return r;
}
TOKULOGGER logger = toku_txn_logger(txn);
if (logger) {
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=0, .data=NULL};
r = toku_log_enq_delete_any(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), xid, keybs, valbs);
if (r!=0) return r;
}
DBT val;
BRT_CMD_S brtcmd = { BRT_DELETE_ANY, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}};
r = toku_brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
BRT_CMD_S brtcmd = { BRT_DELETE_ANY, xid, .u.id={key, toku_init_dbt(&val)}};
r = toku_brt_root_put_cmd(brt, &brtcmd, logger);
return r;
}
......@@ -4533,16 +4543,25 @@ toku_brt_lookup (BRT brt, DBT *k, DBT *v, BRT_GET_CALLBACK_FUNCTION getf, void *
int toku_brt_delete_both(BRT brt, DBT *key, DBT *val, TOKUTXN txn) {
//{ unsigned i; printf("del %p keylen=%d key={", brt->db, key->size); for(i=0; i<key->size; i++) printf("%d,", ((char*)key->data)[i]); printf("} datalen=%d data={", val->size); for(i=0; i<val->size; i++) printf("%d,", ((char*)val->data)[i]); printf("}\n"); }
int r;
if (txn && (brt->h->txnid_that_created_or_locked_when_empty != toku_txn_get_txnid(txn))) {
TXNID xid = toku_txn_get_txnid(txn);
if (txn && (brt->h->txnid_that_created_or_locked_when_empty != xid)) {
BYTESTRING keybs = {key->size, toku_memdup_in_rollback(txn, key->data, key->size)};
BYTESTRING databs = {val->size, toku_memdup_in_rollback(txn, val->data, val->size)};
r = toku_logger_save_rollback_cmddeleteboth(txn, toku_txn_get_txnid(txn), toku_cachefile_filenum(brt->cf), keybs, databs);
r = toku_logger_save_rollback_cmddeleteboth(txn, xid, toku_cachefile_filenum(brt->cf), keybs, databs);
if (r!=0) return r;
r = toku_txn_note_brt(txn, brt);
if (r!=0) return r;
}
BRT_CMD_S brtcmd = { BRT_DELETE_BOTH, toku_txn_get_txnid(txn), .u.id={key,val}};
r = toku_brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
TOKULOGGER logger = toku_txn_logger(txn);
if (logger) {
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
r = toku_log_enq_delete_both(logger, (LSN*)0, 0, toku_cachefile_filenum(brt->cf), xid, keybs, valbs);
if (r!=0) return r;
}
BRT_CMD_S brtcmd = { BRT_DELETE_BOTH, xid, .u.id={key,val}};
r = toku_brt_root_put_cmd(brt, &brtcmd, logger);
return r;
}
......
......@@ -58,8 +58,7 @@ int toku_logcursor_create(TOKULOGCURSOR *lc, const char *log_dir) {
cursor->cur_le = NULL;
cursor->is_open = 0;
cursor->cur_logfiles_index = 0;
// cursor->logdir = (char *) toku_malloc(strlen(log_dir)+1);
cursor->logdir = (char *) toku_malloc(strlen(log_dir));
cursor->logdir = (char *) toku_malloc(strlen(log_dir)+1);
if ( NULL==cursor->logdir )
return ENOMEM;
strcpy(cursor->logdir, log_dir);
......
......@@ -96,7 +96,9 @@ const struct logtype logtypes[] = {
{"fassociate", 'f', FA{{"FILENUM", "filenum", 0},
{"BYTESTRING", "fname", 0}, // pathname of file
NULLFIELD}},
{"xstillopen", 's', FA{{"TXNID", "txnid", 0}, {"TXNID", "parent", 0}, NULLFIELD}}, // only record root transactions
{"xstillopen", 's', FA{{"TXNID", "txnid", 0},
{"TXNID", "parent", 0},
NULLFIELD}}, // only record root transactions
// Reords produced by transactions
{"commit", 'C', FA{{"TXNID", "txnid", 0},NULLFIELD}},
{"xabort", 'q', FA{{"TXNID", "txnid", 0},NULLFIELD}},
......@@ -111,14 +113,24 @@ const struct logtype logtypes[] = {
{"FILENUM", "filenum", 0},
NULLFIELD}},
{"fclose", 'e', FA{{"BYTESTRING", "fname", 0},
{"FILENUM", "filenum", 0},
NULLFIELD}},
{"enqrootentry", 'a', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"u_int32_t", "typ", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "data", 0},
NULLFIELD}},
{"FILENUM", "filenum", 0},
NULLFIELD}},
{"enq_insert", 'I', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "value", 0},
NULLFIELD}},
{"enq_delete_both", 'D', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "value", 0},
NULLFIELD}},
{"enq_delete_any", 'E', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},
{"BYTESTRING", "value", 0},
NULLFIELD}},
{"shutdown", 'S', FA{NULLFIELD}},
{"timestamp", 'T', FA{{"u_int64_t", "timestamp", 0},
{"BYTESTRING", "comment", 0},
NULLFIELD}},
......
......@@ -200,6 +200,7 @@ static int toku_recover_backward_fcreate (struct logtype_fcreate *l, struct back
return 0;
}
#if 0
static void
toku_recover_enqrootentry (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, u_int32_t typ, BYTESTRING key, BYTESTRING val) {
struct cf_pair *pair = NULL;
......@@ -225,7 +226,85 @@ static int toku_recover_backward_enqrootentry (struct logtype_enqrootentry *l, s
toku_free_BYTESTRING(l->data);
return 0;
}
#endif
static void
toku_recover_enq_insert (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
struct brt_cmd cmd;
DBT keydbt, valdbt;
cmd.type=BRT_INSERT;
cmd.xid =xid;
cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len);
cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger);
assert(r==0);
toku_free(key.data);
toku_free(val.data);
}
static int toku_recover_backward_enq_insert (struct logtype_enq_insert *l, struct backward_scan_state *UU(bs)) {
toku_free_BYTESTRING(l->key);
toku_free_BYTESTRING(l->value);
return 0;
}
static void
toku_recover_enq_delete_both (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
struct brt_cmd cmd;
DBT keydbt, valdbt;
cmd.type = BRT_DELETE_BOTH;
cmd.xid =xid;
cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len);
cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger);
assert(r==0);
toku_free(key.data);
toku_free(val.data);
}
static int toku_recover_backward_enq_delete_both (struct logtype_enq_delete_both *l, struct backward_scan_state *UU(bs)) {
toku_free_BYTESTRING(l->key);
toku_free_BYTESTRING(l->value);
return 0;
}
static void
toku_recover_enq_delete_any (LSN lsn __attribute__((__unused__)), FILENUM filenum, TXNID xid, BYTESTRING key, BYTESTRING val) {
struct cf_pair *pair = NULL;
int r = find_cachefile(filenum, &pair);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything.
return;
}
struct brt_cmd cmd;
DBT keydbt, valdbt;
cmd.type = BRT_DELETE_ANY;
cmd.xid = xid;
cmd.u.id.key = toku_fill_dbt(&keydbt, key.data, key.len);
cmd.u.id.val = toku_fill_dbt(&valdbt, val.data, val.len);
r = toku_brt_root_put_cmd(pair->brt, &cmd, null_tokulogger);
assert(r==0);
toku_free(key.data);
toku_free(val.data);
}
static int toku_recover_backward_enq_delete_any (struct logtype_enq_delete_any *l, struct backward_scan_state *UU(bs)) {
toku_free_BYTESTRING(l->key);
toku_free_BYTESTRING(l->value);
return 0;
}
static void
toku_recover_fclose (LSN UU(lsn), BYTESTRING UU(fname), FILENUM filenum) {
......@@ -358,6 +437,14 @@ static int toku_recover_backward_timestamp (struct logtype_timestamp *l, struct
return 0;
}
static int toku_recover_shutdown (LSN UU(lsn)) {
return 0;
}
static int toku_recover_backward_shutdown (struct logtype_shutdown *UU(l), struct backward_scan_state *UU(bs)) {
return 0;
}
static int toku_delete_rolltmp_files (const char *log_dir) {
struct dirent *de;
DIR *d = opendir(log_dir);
......
......@@ -49,6 +49,22 @@ test_main (int argc __attribute__((__unused__)),
assert(r == 0);
// TODO verify the log
TOKULOGCURSOR lc = NULL;
r = toku_logcursor_create(&lc, dname);
assert(r == 0 && lc != NULL);
struct log_entry le;
r = toku_logcursor_next(lc, &le);
assert(r == 0 && le.cmd == LT_timestamp);
r = toku_logcursor_next(lc, &le);
assert(r == 0 && le.cmd == LT_timestamp);
r = toku_logcursor_next(lc, &le);
assert(r != 0);
r = toku_logcursor_destroy(&lc);
assert(r == 0 && lc == NULL);
return 0;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment