Commit 349b65d3 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul

Improvements to rollback. Addresses #27.

git-svn-id: file:///svn/tokudb@2435 c7de825b-a66e-492c-adef-691d508d4ae1
parent 4c62025d
......@@ -947,7 +947,9 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
} else if (cmd->type == BRT_DELETE) {
u_int32_t delta;
int r = toku_pma_delete(node->u.l.buffer, cmd->u.id.key, 0, node->rand4fingerprint, &node->local_fingerprint, &delta);
int r = toku_pma_delete(node->u.l.buffer, cmd->u.id.key, (DBT*)0,
logger, cmd->xid, node->thisnodename,
node->rand4fingerprint, &node->local_fingerprint, &delta, &node->log_lsn);
if (r == BRT_OK) {
node->u.l.n_bytes_in_buffer -= delta;
node->dirty = 1;
......@@ -957,7 +959,9 @@ static int brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd,
} else if (cmd->type == BRT_DELETE_BOTH) {
u_int32_t delta;
int r = toku_pma_delete(node->u.l.buffer, cmd->u.id.key, cmd->u.id.val, node->rand4fingerprint, &node->local_fingerprint, &delta);
int r = toku_pma_delete(node->u.l.buffer, cmd->u.id.key, cmd->u.id.val,
logger, cmd->xid, node->thisnodename,
node->rand4fingerprint, &node->local_fingerprint, &delta, &node->log_lsn);
if (r == BRT_OK) {
node->u.l.n_bytes_in_buffer -= delta;
node->dirty = 1;
......@@ -1737,7 +1741,6 @@ int toku_brt_lookup (BRT brt, DBT *k, DBT *v) {
int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
int r;
DBT val;
printf("removing\n");
BRT_CMD_S brtcmd = { BRT_DELETE, toku_txn_get_txnid(txn), .u.id={key, toku_init_dbt(&val)}};
r = brt_root_put_cmd(brt, &brtcmd, toku_txn_logger(txn));
return r;
......
......@@ -169,14 +169,18 @@ static int toku_pma_cursor_set_range_both(PMA_CURSOR cursor, DBT *key, DBT *val)
return r;
}
static int toku_pma_cursor_delete_under(PMA_CURSOR cursor, u_int32_t *kvsize, u_int32_t rand4sem, u_int32_t *fingerprint) {
static int toku_pma_cursor_delete_under(PMA_CURSOR cursor, u_int32_t *kvsize,
TOKULOGGER logger, TXNID xid, DISKOFF diskoff,
u_int32_t rand4sem, u_int32_t *fingerprint, LSN*node_lsn) {
cursor = cursor; kvsize = kvsize; rand4sem = rand4sem; fingerprint = fingerprint;
DBT key; toku_init_dbt(&key); key.flags = DB_DBT_MALLOC;
DBT val; toku_init_dbt(&val); val.flags = DB_DBT_MALLOC;
int r = toku_pma_cursor_get_current(cursor, &key, &val, 0);
if (r == 0) {
PMA pma = cursor->pma;
r = toku_pma_delete(pma, &key, pma->dup_mode & TOKU_DB_DUPSORT ? &val : 0, rand4sem, fingerprint, kvsize);
r = toku_pma_delete(pma, &key, pma->dup_mode & TOKU_DB_DUPSORT ? &val : 0,
logger, xid, diskoff,
rand4sem, fingerprint, kvsize, node_lsn);
assert(r == 0);
}
toku_destroy_dbt(&key);
......
......@@ -297,7 +297,7 @@ static void do_insert (PMA pma, const void *key, int keylen, const void *data, i
static int do_delete (PMA pma, const void *key, int keylen, const void *data, int datalen, u_int32_t rand4fingerprint, u_int32_t *sum, u_int32_t *expect_fingerprint) {
DBT k;
assert(*sum==*expect_fingerprint);
int r = toku_pma_delete(pma, toku_fill_dbt(&k, key, keylen), 0, rand4fingerprint, sum, 0);
int r = toku_pma_delete(pma, toku_fill_dbt(&k, key, keylen), (DBT*)0, (TOKULOGGER)0, (TXNID)0, (DISKOFF)0, rand4fingerprint, sum, 0, (LSN*)0);
if (r==BRT_OK) {
add_fingerprint_and_check(-rand4fingerprint, *sum, expect_fingerprint, key, keylen, data, datalen); // negative rand4 means subtract.
toku_pma_verify_fingerprint(pma, rand4fingerprint, *sum);
......@@ -308,7 +308,9 @@ static int do_delete (PMA pma, const void *key, int keylen, const void *data, in
static int do_delete_both (PMA pma, const void *key, int keylen, const void *data, int datalen, u_int32_t rand4fingerprint, u_int32_t *sum, u_int32_t *expect_fingerprint) {
DBT k, v;
assert(*sum==*expect_fingerprint);
int r = toku_pma_delete(pma, toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, data, datalen), rand4fingerprint, sum, 0);
int r = toku_pma_delete(pma, toku_fill_dbt(&k, key, keylen), toku_fill_dbt(&v, data, datalen),
(TOKULOGGER)0, (TXNID)0, (DISKOFF)0,
rand4fingerprint, sum, 0, (LSN*)0);
if (r==BRT_OK) {
add_fingerprint_and_check(-rand4fingerprint, *sum, expect_fingerprint, key, keylen, data, datalen); // negative rand4 means subtract.
toku_pma_verify_fingerprint(pma, rand4fingerprint, *sum);
......@@ -337,7 +339,9 @@ static void test_pma_random_pick (void) {
assert(keylen==6); assert(vallen==6);
assert(strcmp(key,"hello")==0);
assert(strcmp(val,"there")==0);
r = toku_pma_delete(pma, toku_fill_dbt(&k, "nothello", 9), 0, rand4fingerprint, &sum, 0);
r = toku_pma_delete(pma, toku_fill_dbt(&k, "nothello", 9), (DBT*)0,
(TOKULOGGER)0, (TXNID)0, (DISKOFF)0,
rand4fingerprint, &sum, 0, (LSN*)0);
assert(r==DB_NOTFOUND);
assert(sum==expect_fingerprint); // didn't change because nothing was deleted.
......@@ -1551,7 +1555,9 @@ static void test_pma_double_delete() {
k = 1;
toku_fill_dbt(&key, &k, sizeof k);
r = toku_pma_delete(pma, &key, 0, rand4fingerprint, &sum, 0);
r = toku_pma_delete(pma, &key, (DBT*)0,
(TOKULOGGER)0, (TXNID)0, (DISKOFF)0,
rand4fingerprint, &sum, 0, (LSN*)0);
assert(r == DB_NOTFOUND);
assert(sum == expect_fingerprint);
......@@ -1802,7 +1808,10 @@ static void test_pma_cursor_first(int n) {
r = toku_pma_cursor_set_position_first(cursor);
if (r != 0) break;
k = htonl(i);
r = toku_pma_delete(pma, toku_fill_dbt(&key, &k, sizeof k), 0, rand4fingerprint, &sum, 0); assert(r == 0);
r = toku_pma_delete(pma, toku_fill_dbt(&key, &k, sizeof k), (DBT*)0,
(TOKULOGGER)0, (TXNID)0, (DISKOFF)0,
rand4fingerprint, &sum, 0, (LSN*)0);
assert(r == 0);
}
assert(i == n);
r = toku_pma_cursor_free(&cursor); assert(r == 0);
......@@ -1944,7 +1953,9 @@ static void test_pma_cursor_delete_under() {
u_int32_t kvsize;
/* delete under an uninitialized cursor should fail */
r = toku_pma_cursor_delete_under(cursor, &kvsize, rand4fingerprint, &expect_fingerprint);
r = toku_pma_cursor_delete_under(cursor, &kvsize,
(TOKULOGGER)0, (TXNID)0, (DISKOFF)0,
rand4fingerprint, &expect_fingerprint, (LSN*)0);
assert(r == DB_NOTFOUND || r != 0);
int k, v;
......@@ -1976,11 +1987,15 @@ static void test_pma_cursor_delete_under() {
toku_free(val.data);
/* delete under should succeed */
r = toku_pma_cursor_delete_under(cursor, &kvsize, rand4fingerprint, &expect_fingerprint);
r = toku_pma_cursor_delete_under(cursor, &kvsize,
(TOKULOGGER)0, (TXNID)0, (DISKOFF)0,
rand4fingerprint, &expect_fingerprint, (LSN*)0);
assert(r == 0);
/* 2nd delete under should fail */
r = toku_pma_cursor_delete_under(cursor, &kvsize, rand4fingerprint, &expect_fingerprint);
r = toku_pma_cursor_delete_under(cursor, &kvsize,
(TOKULOGGER)0, (TXNID)0, (DISKOFF)0,
rand4fingerprint, &expect_fingerprint, (LSN*)0);
assert(r == DB_NOTFOUND || r != 0);
}
assert(i == n);
......@@ -2013,7 +2028,9 @@ static void test_pma_cursor_delete_under_mode(int n, int dup_mode) {
u_int32_t kvsize;
/* delete under an uninitialized cursor should fail */
r = toku_pma_cursor_delete_under(cursor, &kvsize, rand4fingerprint, &expect_fingerprint);
r = toku_pma_cursor_delete_under(cursor, &kvsize,
(TOKULOGGER)0, (TXNID)0, (DISKOFF)0,
rand4fingerprint, &expect_fingerprint, (LSN*)0);
assert(r == DB_NOTFOUND || r != 0);
int k, v;
......@@ -2051,11 +2068,15 @@ static void test_pma_cursor_delete_under_mode(int n, int dup_mode) {
toku_free(val.data);
/* delete under should succeed */
r = toku_pma_cursor_delete_under(cursor, &kvsize, rand4fingerprint, &expect_fingerprint);
r = toku_pma_cursor_delete_under(cursor, &kvsize,
(TOKULOGGER)0, (TXNID)0, (DISKOFF)0,
rand4fingerprint, &expect_fingerprint, (LSN*)0);
assert(r == 0);
/* 2nd delete under should fail */
r = toku_pma_cursor_delete_under(cursor, &kvsize, rand4fingerprint, &expect_fingerprint);
r = toku_pma_cursor_delete_under(cursor, &kvsize,
(TOKULOGGER)0, (TXNID)0, (DISKOFF)0,
rand4fingerprint, &expect_fingerprint, (LSN*)0);
assert(r == DB_NOTFOUND || r != 0);
}
assert(i == n);
......@@ -2225,7 +2246,9 @@ static void test_dup_key_delete(int n, int mode) {
}
k = htonl(2);
r = toku_pma_delete(pma, toku_fill_dbt(&key, &k, sizeof k), 0, rand4fingerprint, &sum, 0);
r = toku_pma_delete(pma, toku_fill_dbt(&key, &k, sizeof k), (DBT*)0,
(TOKULOGGER)0, (TXNID)0, (DISKOFF)0,
rand4fingerprint, &sum, 0, (LSN*)0);
if (r != 0) assert(n == 0);
expect_fingerprint = sum_before_all_the_duplicates;
assert(sum == expect_fingerprint);
......
......@@ -763,7 +763,30 @@ static int pma_delete_dup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_
return found ? BRT_OK : DB_NOTFOUND;
}
static int pma_delete_nodup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) {
static int pma_log_delete (PMA pma, const char *key, int keylen, const char *val, int vallen,
DISKOFF diskoff, int idx, TOKULOGGER logger, TXNID xid, LSN *node_lsn) {
{
const BYTESTRING deletedkey = { keylen, (char*)key };
const BYTESTRING deleteddata = { vallen, (char*)val };
int r=toku_log_deleteinleaf(logger, xid, pma->filenum, diskoff, idx, deletedkey, deleteddata);
if (r!=0) return r;
}
if (logger) {
TOKUTXN txn;
if (0!=toku_txnid2txn(logger, xid, &txn)) return -1;
const BYTESTRING deletedkey = { keylen, toku_memdup(key, keylen) };
const BYTESTRING deleteddata = { vallen, toku_memdup(val, vallen) };
int r=toku_logger_save_rollback_deleteatleaf(txn, pma->filenum, deletedkey, deleteddata);
if (r!=0) { toku_free(deletedkey.data); toku_free(deleteddata.data); return r;
}
if (node_lsn) *node_lsn = toku_logger_last_lsn(logger);
}
return 0;
}
static int pma_delete_nodup (PMA pma, DBT *k, DBT *v,
TOKULOGGER logger, TXNID xid, DISKOFF diskoff,
u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size, LSN *node_lsn) {
/* find the left most matching key in the pma */
int found;
unsigned int here;
......@@ -771,6 +794,9 @@ static int pma_delete_nodup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int3
struct kv_pair *kv = pma->pairs[here];
if (!found || !kv_pair_inuse(kv))
return DB_NOTFOUND;
int r=pma_log_delete(pma, kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv),
diskoff, here, logger, xid, node_lsn);
if (r!=0) return r;
*deleted_size = PMA_ITEM_OVERHEAD + KEY_VALUE_OVERHEAD + kv_pair_keylen(kv) + kv_pair_vallen(kv);
*fingerprint -= rand4sem*toku_calccrc32_kvpair (kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
pma_mfree_kv_pair(pma, kv);
......@@ -780,7 +806,9 @@ static int pma_delete_nodup (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int3
return BRT_OK;
}
int toku_pma_delete (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size) {
int toku_pma_delete (PMA pma, DBT *k, DBT *v,
TOKULOGGER logger, TXNID xid, DISKOFF diskoff,
u_int32_t rand4sem, u_int32_t *fingerprint, u_int32_t *deleted_size, LSN *node_lsn) {
u_int32_t my_deleted_size;
if (!deleted_size)
deleted_size = &my_deleted_size;
......@@ -788,7 +816,7 @@ int toku_pma_delete (PMA pma, DBT *k, DBT *v, u_int32_t rand4sem, u_int32_t *fin
if (pma->dup_mode & TOKU_DB_DUPSORT)
return pma_delete_dup(pma, k, v, rand4sem, fingerprint, deleted_size);
else
return pma_delete_nodup(pma, k, v, rand4sem, fingerprint, deleted_size);
return pma_delete_nodup(pma, k, v, logger, xid, diskoff, rand4sem, fingerprint, deleted_size, node_lsn);
}
static void pma_delete_at(PMA pma, int here) {
......@@ -867,24 +895,8 @@ int toku_pma_insert_or_replace (PMA pma, DBT *k, DBT *v,
struct kv_pair *kv = pma->pairs[idx];
*replaced_v_size = kv->vallen;
*fingerprint -= rand4fingerprint*toku_calccrc32_kvpair(kv_pair_key_const(kv), kv_pair_keylen(kv), kv_pair_val_const(kv), kv_pair_vallen(kv));
{
{
const BYTESTRING deletedkey = { kv->keylen, kv_pair_key(kv) };
const BYTESTRING deleteddata = { kv->vallen, kv_pair_val(kv) };
r=toku_log_deleteinleaf(logger, xid, pma->filenum, diskoff, idx, deletedkey, deleteddata);
r=pma_log_delete(pma, kv_pair_key(kv), kv->keylen, kv_pair_val(kv), kv->vallen, diskoff, idx, logger, xid, node_lsn);
if (r!=0) return r;
}
if (logger) {
TOKUTXN txn;
if (0!=toku_txnid2txn(logger, xid, &txn)) return -1;
const BYTESTRING deletedkey = { kv->keylen, toku_memdup(kv_pair_key(kv), kv->keylen) };
const BYTESTRING deleteddata = { kv->vallen, toku_memdup(kv_pair_val(kv), kv->vallen) };
r=toku_logger_save_rollback_deleteatleaf(txn, pma->filenum, deletedkey, deleteddata);
if (r!=0) { toku_free(deletedkey.data); toku_free(deleteddata.data); return r;
}
}
}
if (logger && node_lsn) *node_lsn = toku_logger_last_lsn(logger);
if (v->size == (unsigned int) kv_pair_vallen(kv)) {
memcpy(kv_pair_val(kv), v->data, v->size);
} else {
......
......@@ -61,7 +61,9 @@ int pma_replace (PMA, bytevec key, ITEMLEN keylen, bytevec data, ITEMLEN datalen
* The case where val!=0 should work for both DUP and NODUP dictionaries.
* For NODUP dictionaries, the value is deleted only if both the key and the value match.
*/
int toku_pma_delete (PMA, DBT */*key*/, DBT */*val*/, u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/, u_int32_t *deleted_size);
int toku_pma_delete (PMA, DBT */*key*/, DBT */*val*/,
TOKULOGGER, TXNID, DISKOFF,
u_int32_t /*random for fingerprint*/, u_int32_t */*fingerprint*/, u_int32_t *deleted_size, LSN*);
int toku_pma_insert_or_replace (PMA /*pma*/, DBT */*k*/, DBT */*v*/,
int */*replaced_v_size*/, /* If it is a replacement, set to the size of the old value, otherwise set to -1. */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment