Commit f488d822 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Put the leaf application code back in. Addresses #1195.

git-svn-id: file:///svn/toku/tokudb.1195@7134 c7de825b-a66e-492c-adef-691d508d4ae1
parent f9fd7cdd
......@@ -1458,324 +1458,6 @@ int toku_cmd_leafval_bessel (OMTVALUE lev, void *extra) {
// Whenever anything provisional is happening, it's XID must match the cmd's.
static int apply_cmd_to_le_committed (u_int32_t klen, void *kval,
u_int32_t dlen, void *dval,
BRT_CMD cmd,
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) {
//assert(cmd->u.id.key->size == klen);
//assert(memcmp(cmd->u.id.key->data, kval, klen)==0);
switch (cmd->type) {
case BRT_INSERT:
return le_both(cmd->xid,
klen, kval,
dlen, dval,
cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data);
case BRT_DELETE_ANY:
case BRT_DELETE_BOTH:
return le_provdel(cmd->xid,
klen, kval,
dlen, dval,
newlen, disksize, new_data);
case BRT_ABORT_BOTH:
case BRT_ABORT_ANY:
case BRT_COMMIT_BOTH:
case BRT_COMMIT_ANY:
// Just return the original committed record
return le_committed(klen, kval, dlen, dval,
newlen, disksize, new_data);
case BRT_NONE: break;
}
assert(0);
return 0;
}
static int apply_cmd_to_le_both (TXNID xid,
u_int32_t klen, void *kval,
u_int32_t clen, void *cval,
u_int32_t plen, void *pval,
BRT_CMD cmd,
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) {
u_int32_t prev_len;
void *prev_val;
if (xid==cmd->xid) {
// The xids match, so throw away the provisional value.
prev_len = clen; prev_val = cval;
} else {
// If the xids don't match, then we are moving the provisional value to committed status.
prev_len = plen; prev_val = pval;
}
// keep the committed value for rollback.
//assert(cmd->u.id.key->size == klen);
//assert(memcmp(cmd->u.id.key->data, kval, klen)==0);
switch (cmd->type) {
case BRT_INSERT:
return le_both(cmd->xid,
klen, kval,
prev_len, prev_val,
cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data);
case BRT_DELETE_ANY:
case BRT_DELETE_BOTH:
return le_provdel(cmd->xid,
klen, kval,
prev_len, prev_val,
newlen, disksize, new_data);
case BRT_ABORT_BOTH:
case BRT_ABORT_ANY:
// I don't see how you could have an abort where the xids don't match. But do it anyway.
return le_committed(klen, kval,
prev_len, prev_val,
newlen, disksize, new_data);
case BRT_COMMIT_BOTH:
case BRT_COMMIT_ANY:
// In the future we won't even have these commit messages.
return le_committed(klen, kval,
plen, pval,
newlen, disksize, new_data);
case BRT_NONE: break;
}
assert(0);
return 0;
}
static int apply_cmd_to_le_provdel (TXNID xid,
u_int32_t klen, void *kval,
u_int32_t clen, void *cval,
BRT_CMD cmd,
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) {
// keep the committed value for rollback
//assert(cmd->u.id.key->size == klen);
//assert(memcmp(cmd->u.id.key->data, kval, klen)==0);
switch (cmd->type) {
case BRT_INSERT:
if (cmd->xid == xid) {
return le_both(cmd->xid,
klen, kval,
clen, cval,
cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data);
} else {
// It's an insert, but the committed value is deleted (since the xids don't match, we assume the delete took effect)
return le_provpair(cmd->xid,
klen, kval,
cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data);
}
case BRT_DELETE_ANY:
case BRT_DELETE_BOTH:
if (cmd->xid == xid) {
// A delete of a delete could conceivably return the identical value, saving a malloc and a free, but to simplify things we just reallocate it
// because othewise we have to notice not to free() the olditem.
return le_provdel(cmd->xid,
klen, kval,
clen, cval,
newlen, disksize, new_data);
} else {
// The commited value is deleted, and we are deleting, so treat as a delete.
*new_data = 0;
return 0;
}
case BRT_ABORT_BOTH:
case BRT_ABORT_ANY:
// I don't see how the xids could not match...
return le_committed(klen, kval,
clen, cval,
newlen, disksize, new_data);
case BRT_COMMIT_BOTH:
case BRT_COMMIT_ANY:
*new_data = 0;
return 0;
case BRT_NONE: break;
}
assert(0);
return 0;
}
static int apply_cmd_to_le_provpair (TXNID xid,
u_int32_t klen, void *kval,
u_int32_t plen , void *pval,
BRT_CMD cmd,
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) {
//assert(cmd->u.id.key->size == klen);
//assert(memcmp(cmd->u.id.key->data, kval, klen)==0);
switch (cmd->type) {
case BRT_INSERT:
if (cmd->xid == xid) {
// it's still a provpair (the old prov value is lost)
return le_provpair(cmd->xid,
klen, kval,
cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data);
} else {
// the old prov was actually committed.
return le_both(cmd->xid,
klen, kval,
plen, pval,
cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data);
}
case BRT_DELETE_BOTH:
case BRT_DELETE_ANY:
if (cmd->xid == xid) {
// A delete of a provisional pair is nothign
*new_data = 0;
return 0;
} else {
// The prov pair is actually a committed value.
return le_provdel(cmd->xid,
klen, kval,
plen, pval,
newlen, disksize, new_data);
}
case BRT_ABORT_BOTH:
case BRT_ABORT_ANY:
// An abort of a provisional pair is nothing.
*new_data = 0;
return 0;
case BRT_COMMIT_ANY:
case BRT_COMMIT_BOTH:
return le_committed(klen, kval,
plen, pval,
newlen, disksize, new_data);
case BRT_NONE: break;
}
assert(0);
return 0;
}
static int apply_cmd_to_leaf (BRT_CMD cmd,
void *stored_data, // NULL if there was no stored data.
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) {
if (stored_data==0) {
switch (cmd->type) {
case BRT_INSERT:
{
LEAFENTRY le;
int r = le_provpair(cmd->xid,
cmd->u.id.key->size, cmd->u.id.key->data,
cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, &le);
if (r==0) *new_data=le;
return r;
}
case BRT_DELETE_BOTH:
case BRT_DELETE_ANY:
case BRT_ABORT_BOTH:
case BRT_ABORT_ANY:
case BRT_COMMIT_BOTH:
case BRT_COMMIT_ANY:
*new_data = 0;
return 0; // Don't have to insert anything.
case BRT_NONE:
break;
}
assert(0);
return 0;
} else {
LESWITCHCALL(stored_data, apply_cmd_to, cmd,
newlen, disksize, new_data);
}
abort(); return 0; // make certain compilers happy
}
static int
should_compare_both_keys (BRTNODE node, BRT_CMD cmd) {
switch (cmd->type) {
case BRT_INSERT:
return node->flags & TOKU_DB_DUPSORT;
case BRT_DELETE_BOTH:
case BRT_ABORT_BOTH:
case BRT_COMMIT_BOTH:
return 1;
case BRT_DELETE_ANY:
case BRT_ABORT_ANY:
case BRT_COMMIT_ANY:
return 0;
case BRT_NONE:
break;
}
assert(0);
return 0;
}
static int brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
u_int32_t idx, LEAFENTRY le) {
FILENUM filenum = toku_cachefile_filenum(t->cf);
u_int32_t newlen=0, newdisksize=0;
LEAFENTRY newdata=0;
int r = apply_cmd_to_leaf(cmd, le, &newlen, &newdisksize, &newdata);
if (r!=0) return r;
if (newdata) assert(newdisksize == leafentry_disksize(newdata));
//printf("Applying command: %s xid=%lld ", unparse_cmd_type(cmd->type), (long long)cmd->xid);
//toku_print_BYTESTRING(stdout, cmd->u.id.key->size, cmd->u.id.key->data);
//printf(" ");
//toku_print_BYTESTRING(stdout, cmd->u.id.val->size, cmd->u.id.val->data);
//printf(" to \n");
//print_leafentry(stdout, le); printf("\n");
//printf(" got "); print_leafentry(stdout, newdata); printf("\n");
if (le && newdata) {
if (t->txn_that_created != cmd->xid) {
if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) return r;
if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, newdata))) return r;
}
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le);
node->local_fingerprint -= node->rand4fingerprint * toku_le_crc(le);
u_int32_t size = leafentry_memsize(le);
LEAFENTRY new_le = mempool_malloc_from_omt(node->u.l.buffer, &node->u.l.buffer_mempool, newlen);
assert(new_le);
memcpy(new_le, newdata, newlen);
// This mfree must occur after the mempool_malloc so that when the mempool is compressed everything is accounted for.
// But we must compute the size before doing the mempool malloc because otherwise the le pointer is no good.
toku_mempool_mfree(&node->u.l.buffer_mempool, 0, size); // Must pass 0, since le may be no good any more.
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
node->local_fingerprint += node->rand4fingerprint*toku_le_crc(newdata);
toku_free(newdata);
if ((r = toku_omt_set_at(node->u.l.buffer, new_le, idx))) return r;
} else {
if (le) {
// It's there, note that it's gone and remove it from the mempool
if (t->txn_that_created != cmd->xid) {
if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) return r;
}
if ((r = toku_omt_delete_at(node->u.l.buffer, idx))) return r;
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le);
node->local_fingerprint -= node->rand4fingerprint * toku_le_crc(le);
toku_mempool_mfree(&node->u.l.buffer_mempool, 0, leafentry_memsize(le)); // Must pass 0, since le may be no good any more.
}
if (newdata) {
LEAFENTRY new_le = mempool_malloc_from_omt(node->u.l.buffer, &node->u.l.buffer_mempool, newlen);
assert(new_le);
memcpy(new_le, newdata, newlen);
if ((r = toku_omt_insert_at(node->u.l.buffer, new_le, idx))) return r;
if (t->txn_that_created != cmd->xid) {
if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, newdata))) return r;
}
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
node->local_fingerprint += node->rand4fingerprint*toku_le_crc(newdata);
toku_free(newdata);
}
}
// printf("%s:%d rand4=%08x local_fingerprint=%08x this=%08x\n", __FILE__, __LINE__, node->rand4fingerprint, node->local_fingerprint, toku_calccrc32_kvpair_struct(kv));
return 0;
}
static int
brt_leaf_put_cmd_simple (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
u_int64_t *new_size /*OUT*/
......
......@@ -134,6 +134,15 @@ message are not overfull. (But they may be underfull or too fat or too thin.)
// even from unrelated BRTs. This way we only invalidate an OMTCURSOR if
static u_int64_t global_root_put_counter = 0;
enum should_status { SHOULD_OK, SHOULD_MERGE, SHOULD_SPLIT };
//#define SLOW
#ifdef SLOW
#define VERIFY_NODE(t,n) (toku_verify_counts(n), toku_verify_estimates(t,n))
#else
#define VERIFY_NODE(t,n) ((void)0)
#endif
static void
fixup_child_fingerprint (BRTNODE node, int childnum_of_node, BRTNODE child, BRT UU(brt), TOKULOGGER UU(logger))
// Effect: Sum the child fingerprint (and leafentry estimates) and store them in NODE.
......@@ -316,21 +325,485 @@ brt_init_new_root(BRT brt, BRTNODE nodea, BRTNODE nodeb, DBT splitk, CACHEKEY *r
return 0;
}
enum should_status { SHOULD_OK, SHOULD_MERGE, SHOULD_SPLIT };
static int
should_compare_both_keys (BRTNODE node, BRT_CMD cmd)
// Effect: Return nonzero if we need to compare both the key and the value.
{
switch (cmd->type) {
case BRT_INSERT:
return node->flags & TOKU_DB_DUPSORT;
case BRT_DELETE_BOTH:
case BRT_ABORT_BOTH:
case BRT_COMMIT_BOTH:
return 1;
case BRT_DELETE_ANY:
case BRT_ABORT_ANY:
case BRT_COMMIT_ANY:
return 0;
case BRT_NONE:
break;
}
assert(0);
return 0;
}
static int apply_cmd_to_le_committed (u_int32_t klen, void *kval,
u_int32_t dlen, void *dval,
BRT_CMD cmd,
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) {
//assert(cmd->u.id.key->size == klen);
//assert(memcmp(cmd->u.id.key->data, kval, klen)==0);
switch (cmd->type) {
case BRT_INSERT:
return le_both(cmd->xid,
klen, kval,
dlen, dval,
cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data);
case BRT_DELETE_ANY:
case BRT_DELETE_BOTH:
return le_provdel(cmd->xid,
klen, kval,
dlen, dval,
newlen, disksize, new_data);
case BRT_ABORT_BOTH:
case BRT_ABORT_ANY:
case BRT_COMMIT_BOTH:
case BRT_COMMIT_ANY:
// Just return the original committed record
return le_committed(klen, kval, dlen, dval,
newlen, disksize, new_data);
case BRT_NONE: break;
}
assert(0);
return 0;
}
static int apply_cmd_to_le_both (TXNID xid,
u_int32_t klen, void *kval,
u_int32_t clen, void *cval,
u_int32_t plen, void *pval,
BRT_CMD cmd,
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) {
u_int32_t prev_len;
void *prev_val;
if (xid==cmd->xid) {
// The xids match, so throw away the provisional value.
prev_len = clen; prev_val = cval;
} else {
// If the xids don't match, then we are moving the provisional value to committed status.
prev_len = plen; prev_val = pval;
}
// keep the committed value for rollback.
//assert(cmd->u.id.key->size == klen);
//assert(memcmp(cmd->u.id.key->data, kval, klen)==0);
switch (cmd->type) {
case BRT_INSERT:
return le_both(cmd->xid,
klen, kval,
prev_len, prev_val,
cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data);
case BRT_DELETE_ANY:
case BRT_DELETE_BOTH:
return le_provdel(cmd->xid,
klen, kval,
prev_len, prev_val,
newlen, disksize, new_data);
case BRT_ABORT_BOTH:
case BRT_ABORT_ANY:
// I don't see how you could have an abort where the xids don't match. But do it anyway.
return le_committed(klen, kval,
prev_len, prev_val,
newlen, disksize, new_data);
case BRT_COMMIT_BOTH:
case BRT_COMMIT_ANY:
// In the future we won't even have these commit messages.
return le_committed(klen, kval,
plen, pval,
newlen, disksize, new_data);
case BRT_NONE: break;
}
assert(0);
return 0;
}
static int apply_cmd_to_le_provdel (TXNID xid,
u_int32_t klen, void *kval,
u_int32_t clen, void *cval,
BRT_CMD cmd,
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) {
// keep the committed value for rollback
//assert(cmd->u.id.key->size == klen);
//assert(memcmp(cmd->u.id.key->data, kval, klen)==0);
switch (cmd->type) {
case BRT_INSERT:
if (cmd->xid == xid) {
return le_both(cmd->xid,
klen, kval,
clen, cval,
cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data);
} else {
// It's an insert, but the committed value is deleted (since the xids don't match, we assume the delete took effect)
return le_provpair(cmd->xid,
klen, kval,
cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data);
}
case BRT_DELETE_ANY:
case BRT_DELETE_BOTH:
if (cmd->xid == xid) {
// A delete of a delete could conceivably return the identical value, saving a malloc and a free, but to simplify things we just reallocate it
// because othewise we have to notice not to free() the olditem.
return le_provdel(cmd->xid,
klen, kval,
clen, cval,
newlen, disksize, new_data);
} else {
// The commited value is deleted, and we are deleting, so treat as a delete.
*new_data = 0;
return 0;
}
case BRT_ABORT_BOTH:
case BRT_ABORT_ANY:
// I don't see how the xids could not match...
return le_committed(klen, kval,
clen, cval,
newlen, disksize, new_data);
case BRT_COMMIT_BOTH:
case BRT_COMMIT_ANY:
*new_data = 0;
return 0;
case BRT_NONE: break;
}
assert(0);
return 0;
}
static int apply_cmd_to_le_provpair (TXNID xid,
u_int32_t klen, void *kval,
u_int32_t plen , void *pval,
BRT_CMD cmd,
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data) {
//assert(cmd->u.id.key->size == klen);
//assert(memcmp(cmd->u.id.key->data, kval, klen)==0);
switch (cmd->type) {
case BRT_INSERT:
if (cmd->xid == xid) {
// it's still a provpair (the old prov value is lost)
return le_provpair(cmd->xid,
klen, kval,
cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data);
} else {
// the old prov was actually committed.
return le_both(cmd->xid,
klen, kval,
plen, pval,
cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, new_data);
}
case BRT_DELETE_BOTH:
case BRT_DELETE_ANY:
if (cmd->xid == xid) {
// A delete of a provisional pair is nothign
*new_data = 0;
return 0;
} else {
// The prov pair is actually a committed value.
return le_provdel(cmd->xid,
klen, kval,
plen, pval,
newlen, disksize, new_data);
}
case BRT_ABORT_BOTH:
case BRT_ABORT_ANY:
// An abort of a provisional pair is nothing.
*new_data = 0;
return 0;
case BRT_COMMIT_ANY:
case BRT_COMMIT_BOTH:
return le_committed(klen, kval,
plen, pval,
newlen, disksize, new_data);
case BRT_NONE: break;
}
assert(0);
return 0;
}
static int
apply_cmd_to_leaf (BRT_CMD cmd,
void *stored_data, // NULL if there was no stored data.
u_int32_t *newlen, u_int32_t *disksize, LEAFENTRY *new_data)
{
if (stored_data==0) {
switch (cmd->type) {
case BRT_INSERT:
{
LEAFENTRY le;
int r = le_provpair(cmd->xid,
cmd->u.id.key->size, cmd->u.id.key->data,
cmd->u.id.val->size, cmd->u.id.val->data,
newlen, disksize, &le);
if (r==0) *new_data=le;
return r;
}
case BRT_DELETE_BOTH:
case BRT_DELETE_ANY:
case BRT_ABORT_BOTH:
case BRT_ABORT_ANY:
case BRT_COMMIT_BOTH:
case BRT_COMMIT_ANY:
*new_data = 0;
return 0; // Don't have to insert anything.
case BRT_NONE:
break;
}
assert(0);
return 0;
} else {
LESWITCHCALL(stored_data, apply_cmd_to, cmd,
newlen, disksize, new_data);
}
abort(); return 0; // make certain compilers happy
}
static int
brt_leaf_apply_cmd_once (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
u_int32_t idx, LEAFENTRY le)
// Effect: Apply cmd to leafentry
// idx is the location where it goes
// le is old leafentry
{
FILENUM filenum = toku_cachefile_filenum(t->cf);
u_int32_t newlen=0, newdisksize=0;
LEAFENTRY newdata=0;
int r = apply_cmd_to_leaf(cmd, le, &newlen, &newdisksize, &newdata);
if (r!=0) return r;
if (newdata) assert(newdisksize == leafentry_disksize(newdata));
//printf("Applying command: %s xid=%lld ", unparse_cmd_type(cmd->type), (long long)cmd->xid);
//toku_print_BYTESTRING(stdout, cmd->u.id.key->size, cmd->u.id.key->data);
//printf(" ");
//toku_print_BYTESTRING(stdout, cmd->u.id.val->size, cmd->u.id.val->data);
//printf(" to \n");
//print_leafentry(stdout, le); printf("\n");
//printf(" got "); print_leafentry(stdout, newdata); printf("\n");
if (le && newdata) {
if (t->txn_that_created != cmd->xid) {
if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) return r;
if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, newdata))) return r;
}
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le);
node->local_fingerprint -= node->rand4fingerprint * toku_le_crc(le);
u_int32_t size = leafentry_memsize(le);
LEAFENTRY new_le = mempool_malloc_from_omt(node->u.l.buffer, &node->u.l.buffer_mempool, newlen);
assert(new_le);
memcpy(new_le, newdata, newlen);
// This mfree must occur after the mempool_malloc so that when the mempool is compressed everything is accounted for.
// But we must compute the size before doing the mempool malloc because otherwise the le pointer is no good.
toku_mempool_mfree(&node->u.l.buffer_mempool, 0, size); // Must pass 0, since le may be no good any more.
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
node->local_fingerprint += node->rand4fingerprint*toku_le_crc(newdata);
toku_free(newdata);
if ((r = toku_omt_set_at(node->u.l.buffer, new_le, idx))) return r;
} else {
if (le) {
// It's there, note that it's gone and remove it from the mempool
if (t->txn_that_created != cmd->xid) {
if ((r = toku_log_deleteleafentry(logger, &node->log_lsn, 0, filenum, node->thisnodename, idx))) return r;
}
if ((r = toku_omt_delete_at(node->u.l.buffer, idx))) return r;
node->u.l.n_bytes_in_buffer -= OMT_ITEM_OVERHEAD + leafentry_disksize(le);
node->local_fingerprint -= node->rand4fingerprint * toku_le_crc(le);
toku_mempool_mfree(&node->u.l.buffer_mempool, 0, leafentry_memsize(le)); // Must pass 0, since le may be no good any more.
}
if (newdata) {
LEAFENTRY new_le = mempool_malloc_from_omt(node->u.l.buffer, &node->u.l.buffer_mempool, newlen);
assert(new_le);
memcpy(new_le, newdata, newlen);
if ((r = toku_omt_insert_at(node->u.l.buffer, new_le, idx))) return r;
if (t->txn_that_created != cmd->xid) {
if ((r = toku_log_insertleafentry(logger, &node->log_lsn, 0, toku_cachefile_filenum(t->cf), node->thisnodename, idx, newdata))) return r;
}
node->u.l.n_bytes_in_buffer += OMT_ITEM_OVERHEAD + newdisksize;
node->local_fingerprint += node->rand4fingerprint*toku_le_crc(newdata);
toku_free(newdata);
}
}
// printf("%s:%d rand4=%08x local_fingerprint=%08x this=%08x\n", __FILE__, __LINE__, node->rand4fingerprint, node->local_fingerprint, toku_calccrc32_kvpair_struct(kv));
return 0;
}
static int
brt_leaf_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger,
u_int64_t *new_size /*OUT*/
)
// Effect: Put a cmd into a leaf.
// Return the serialization size in *new_size.
// The leaf could end up "too big". It is up to the caller to fix that up.
{
// toku_pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->subtree_fingerprint);
VERIFY_NODE(t, node);
assert(node->height==0);
LEAFENTRY storeddata;
OMTVALUE storeddatav=NULL;
u_int32_t idx;
int r;
int compare_both = should_compare_both_keys(node, cmd);
struct cmd_leafval_bessel_extra be = {t, cmd, compare_both};
//static int counter=0;
//counter++;
//printf("counter=%d\n", counter);
switch (cmd->type) {
case BRT_INSERT:
if (node->u.l.seqinsert) {
idx = toku_omt_size(node->u.l.buffer);
r = toku_omt_fetch(node->u.l.buffer, idx-1, &storeddatav, NULL);
if (r != 0) goto fz;
storeddata = storeddatav;
int cmp = toku_cmd_leafval_bessel(storeddata, &be);
if (cmp >= 0) goto fz;
r = DB_NOTFOUND;
} else {
fz:
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be,
&storeddatav, &idx, NULL);
}
if (r==DB_NOTFOUND) {
storeddata = 0;
} else if (r!=0) {
return r;
} else {
storeddata=storeddatav;
}
r = brt_leaf_apply_cmd_once(t, node, cmd, logger, idx, storeddata);
if (r!=0) return r;
// if the insertion point is within a window of the right edge of
// the leaf then it is sequential
// window = min(32, number of leaf entries/16)
u_int32_t s = toku_omt_size(node->u.l.buffer);
u_int32_t w = s / 16;
if (w == 0) w = 1;
if (w > 32) w = 32;
// within the window?
if (s - idx <= w) {
node->u.l.seqinsert += 1;
} else {
node->u.l.seqinsert = 0;
}
break;
case BRT_DELETE_BOTH:
case BRT_ABORT_BOTH:
case BRT_COMMIT_BOTH:
// Delete the one item
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be,
&storeddatav, &idx, NULL);
if (r == DB_NOTFOUND) break;
if (r != 0) return r;
storeddata=storeddatav;
VERIFY_NODE(t, node);
static int count=0;
count++;
r = brt_leaf_apply_cmd_once(t, node, cmd, logger, idx, storeddata);
if (r!=0) return r;
VERIFY_NODE(t, node);
break;
case BRT_DELETE_ANY:
case BRT_ABORT_ANY:
case BRT_COMMIT_ANY:
// Delete all the matches
r = toku_omt_find_zero(node->u.l.buffer, toku_cmd_leafval_bessel, &be,
&storeddatav, &idx, NULL);
if (r == DB_NOTFOUND) break;
if (r != 0) return r;
storeddata=storeddatav;
while (1) {
int vallen = le_any_vallen(storeddata);
void *save_val = toku_memdup(le_any_val(storeddata), vallen);
r = brt_leaf_apply_cmd_once(t, node, cmd, logger, idx, storeddata);
if (r!=0) return r;
// Now we must find the next one.
DBT valdbt;
BRT_CMD_S ncmd = { cmd->type, cmd->xid, .u.id={cmd->u.id.key, toku_fill_dbt(&valdbt, save_val, vallen)}};
struct cmd_leafval_bessel_extra nbe = {t, &ncmd, 1};
r = toku_omt_find(node->u.l.buffer, toku_cmd_leafval_bessel, &nbe, +1,
&storeddatav, &idx, NULL);
toku_free(save_val);
if (r!=0) break;
storeddata=storeddatav;
{ // Continue only if the next record that we found has the same key.
DBT adbt;
if (t->compare_fun(t->db,
toku_fill_dbt(&adbt, le_any_key(storeddata), le_any_keylen(storeddata)),
cmd->u.id.key) != 0)
break;
}
}
break;
case BRT_NONE: return EINVAL;
}
/// All done doing the work
node->dirty = 1;
// toku_pma_verify_fingerprint(node->u.l.buffer, node->rand4fingerprint, node->subtree_fingerprint);
VERIFY_NODE(t, node);
*new_size = toku_serialize_brtnode_size(node);
return 0;
}
static int
brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum should_status *should, BOOL *did_io)
brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum should_status *should, int *io_count)
// Effect: Push CMD into the subtree rooted at NODE, and indicate whether as a result NODE should split or should merge.
// If NODE is a leaf, then
// put CMD into leaf, applying it to the leafentries
// If NODE is a nonleaf, then copy the cmd into the relevant child fifos.
// For each child fifo that is empty and where the child is in main memory put the command into the child (using this same algorithm)
// Use *did_io to determine whether I/O has already been performed. If it has then we avoid doing additional I/O.
// Use *io_count to determine whether I/O has already been performed. Once I/O has occured, we avoid additional I/O by leaving nodes that are overfull, underfull, overfat, or underfat.
// Set *should as follows:
// { SHOULD_SPLIT if the node is overfull
// *should = { SHOULD_MERGE if the node is underfull
// { SHOULD_OK if the node is ok. (Those cases are mutually exclusive.)
// If we perform I/O then set *did_io to true.
// For every I/O increment *io_count
{
if (node->height==0) {
int r;
......@@ -343,7 +816,7 @@ brtnode_put_cmd (BRT t, BRTNODE node, BRT_CMD cmd, TOKULOGGER logger, enum shoul
} else {
int r;
u_int32_t new_fanout = 0; // Some compiler bug in gcc is complaining that this is uninitialized.
r = brt_nonleaf_put_cmd(t, node, cmd, logger, &new_fanout);
r = brt_nonleaf_put_cmd(t, node, cmd, logger, &new_fanout, io_count);
if (r!=0) return 0;
if (new_fanout > TREE_FANOUT) *should = SHOULD_SPLIT;
else if (new_fanout*4 < TREE_FANOUT) *should = SHOULD_MERGE;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment