Commit 07d38f91 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4541], merge to main

git-svn-id: file:///svn/toku/tokudb@41538 c7de825b-a66e-492c-adef-691d508d4ae1
parent dc86d566
......@@ -422,7 +422,9 @@ static void print_db_struct (void) {
"const DBT* (*dbt_neg_infty)(void)/* Return the special DBT that refers to negative infinity in the lock table.*/",
"void (*get_max_row_size) (DB*, u_int32_t *max_key_size, u_int32_t *max_row_size)",
"DESCRIPTOR descriptor /* saved row/dictionary descriptor for aiding in comparisons */",
"DESCRIPTOR cmp_descriptor /* saved row/dictionary descriptor for aiding in comparisons */",
"int (*change_descriptor) (DB*, DB_TXN*, const DBT* descriptor, u_int32_t) /* change row/dictionary descriptor for a db. Available only while db is open */",
"int (*update_cmp_descriptor) (DB*) /* Update cmp descriptor. Available only while db is open */",
"int (*getf_set)(DB*, DB_TXN*, u_int32_t, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_set without a persistent cursor) */",
"int (*optimize)(DB*) /* Run garbage collecion and promote all transactions older than oldest. Amortized (happens during flattening) */",
"int (*hot_optimize)(DB*, int (*progress_callback)(void *progress_extra, float progress), void *progress_extra)",
......
......@@ -303,7 +303,9 @@ struct __toku_db {
const DBT* (*dbt_neg_infty)(void)/* Return the special DBT that refers to negative infinity in the lock table.*/;
void (*get_max_row_size) (DB*, u_int32_t *max_key_size, u_int32_t *max_row_size);
DESCRIPTOR descriptor /* saved row/dictionary descriptor for aiding in comparisons */;
DESCRIPTOR cmp_descriptor /* saved row/dictionary descriptor for aiding in comparisons */;
int (*change_descriptor) (DB*, DB_TXN*, const DBT* descriptor, u_int32_t) /* change row/dictionary descriptor for a db. Available only while db is open */;
int (*update_cmp_descriptor) (DB*) /* Update cmp descriptor. Available only while db is open */;
int (*getf_set)(DB*, DB_TXN*, u_int32_t, DBT*, YDB_CALLBACK_FUNCTION, void*) /* same as DBC->c_getf_set without a persistent cursor) */;
int (*optimize)(DB*) /* Run garbage collecion and promote all transactions older than oldest. Amortized (happens during flattening) */;
int (*hot_optimize)(DB*, int (*progress_callback)(void *progress_extra, float progress), void *progress_extra);
......
......@@ -311,7 +311,7 @@ ctm_pick_child(struct brt_header *h,
childnum = toku_brtnode_which_child(
parent,
&ctme->target_key,
&h->descriptor,
&h->cmp_descriptor,
h->compare_fun);
}
return childnum;
......@@ -1074,7 +1074,7 @@ flush_this_child(
set_BNC(node, childnum, toku_create_empty_nl());
// now we have a bnc to flush to the child
r = toku_bnc_flush_to_child(h->compare_fun, h->update_fun, &h->descriptor, h->cf, bnc, child); assert_zero(r);
r = toku_bnc_flush_to_child(h->compare_fun, h->update_fun, &h->cmp_descriptor, h->cf, bnc, child); assert_zero(r);
destroy_nonleaf_childinfo(bnc);
}
......@@ -1567,7 +1567,7 @@ flush_some_child(
r = toku_bnc_flush_to_child(
h->compare_fun,
h->update_fun,
&h->descriptor,
&h->cmp_descriptor,
h->cf,
bnc,
child
......@@ -1729,7 +1729,7 @@ static void flush_node_fun(void *fe_v)
r = toku_bnc_flush_to_child(
fe->h->compare_fun,
fe->h->update_fun,
&fe->h->descriptor,
&fe->h->cmp_descriptor,
fe->h->cf,
fe->bnc,
fe->node
......
......@@ -122,7 +122,7 @@ hot_just_pick_child(struct brt_header *h,
// Find the pivot boundary.
childnum = toku_brtnode_hot_next_child(parent,
&flusher->highest_pivot_key,
&h->descriptor,
&h->cmp_descriptor,
h->compare_fun);
}
......
......@@ -386,9 +386,7 @@ struct brt_header {
BLOCKNUM root_blocknum; // roots of the dictionary
unsigned int flags;
DESCRIPTOR_S descriptor;
int free_me_count; // Descriptors are freed asynchronously, so we cannot free() them.
void **free_me; // Instead we just accumulate them in this array. These are void* that we must free() later.
DESCRIPTOR_S cmp_descriptor;
BLOCK_TABLE blocktable;
// If a transaction created this BRT, which one?
......
......@@ -1265,7 +1265,7 @@ static void setup_brtnode_partitions(BRTNODE node, struct brtnode_fetch_extra* b
// and check if it is available
assert(bfe->search);
bfe->child_to_read = toku_brt_search_which_child(
&bfe->h->descriptor,
&bfe->h->cmp_descriptor,
bfe->h->compare_fun,
node,
bfe->search
......@@ -1636,7 +1636,7 @@ deserialize_brtnode_from_rbuf(
switch (BP_STATE(node,i)) {
case PT_AVAIL:
// case where we read and decompress the partition
decompress_and_deserialize_worker(curr_rbuf, curr_sb, node, i, &bfe->h->descriptor, bfe->h->compare_fun);
decompress_and_deserialize_worker(curr_rbuf, curr_sb, node, i, &bfe->h->cmp_descriptor, bfe->h->compare_fun);
continue;
case PT_COMPRESSED:
// case where we leave the partition in the compressed state
......@@ -1698,7 +1698,7 @@ toku_deserialize_bp_from_disk(BRTNODE node, BRTNODE_DISK_DATA ndd, int childnum,
read_and_decompress_sub_block(&rb, &curr_sb);
// at this point, sb->uncompressed_ptr stores the serialized node partition
deserialize_brtnode_partition(&curr_sb, node, childnum, &bfe->h->descriptor, bfe->h->compare_fun);
deserialize_brtnode_partition(&curr_sb, node, childnum, &bfe->h->cmp_descriptor, bfe->h->compare_fun);
toku_free(raw_block);
}
......@@ -2253,6 +2253,8 @@ deserialize_brtheader_versioned (int fd, struct rbuf *rb, struct brt_header **br
invariant(h);
invariant((uint32_t) h->layout_version == version);
deserialize_descriptor_from(fd, h->blocktable, &(h->descriptor), version);
h->cmp_descriptor.dbt.size = h->descriptor.dbt.size;
h->cmp_descriptor.dbt.data = toku_xmemdup(h->descriptor.dbt.data, h->descriptor.dbt.size);
switch (version) {
case BRT_LAYOUT_VERSION_13:
invariant(h->layout_version == BRT_LAYOUT_VERSION_13);
......
......@@ -142,7 +142,7 @@ int toku_testsetup_insert_to_leaf (BRT brt, BLOCKNUM blocknum, char *key, int ke
brtnode_put_cmd (
brt->h->compare_fun,
brt->h->update_fun,
&brt->h->descriptor,
&brt->h->cmp_descriptor,
node,
&cmd,
true,
......@@ -209,7 +209,7 @@ int toku_testsetup_insert_to_nonleaf (BRT brt, BLOCKNUM blocknum, enum brt_msg_t
DBT k;
int childnum = toku_brtnode_which_child(node,
toku_fill_dbt(&k, key, keylen),
&brt->h->descriptor, brt->compare_fun);
&brt->h->cmp_descriptor, brt->compare_fun);
XIDS xids_0 = xids_get_root_xids();
MSN msn = next_dummymsn();
......
......@@ -172,7 +172,7 @@ verify_sorted_by_key_msn(BRT brt, FIFO fifo, OMT mt) {
assert_zero(r);
size_t offset = (size_t) v;
if (i > 0) {
struct toku_fifo_entry_key_msn_cmp_extra extra = { .desc = &brt->h->descriptor, .cmp = brt->compare_fun, .fifo = fifo };
struct toku_fifo_entry_key_msn_cmp_extra extra = { .desc = &brt->h->cmp_descriptor, .cmp = brt->compare_fun, .fifo = fifo };
if (toku_fifo_entry_key_msn_cmp(&extra, &last_offset, &offset) >= 0) {
result = TOKUDB_NEEDS_REPAIR;
break;
......@@ -186,7 +186,7 @@ verify_sorted_by_key_msn(BRT brt, FIFO fifo, OMT mt) {
static int
count_eq_key_msn(BRT brt, FIFO fifo, OMT mt, const void *key, size_t keylen, MSN msn) {
struct toku_fifo_entry_key_msn_heaviside_extra extra = {
.desc = &brt->h->descriptor, .cmp = brt->compare_fun, .fifo = fifo, .key = key, .keylen = keylen, .msn = msn
.desc = &brt->h->cmp_descriptor, .cmp = brt->compare_fun, .fifo = fifo, .key = key, .keylen = keylen, .msn = msn
};
OMTVALUE v; u_int32_t idx;
int r = toku_omt_find_zero(mt, toku_fifo_entry_key_msn_heaviside, &extra, &v, &idx);
......
This diff is collapsed.
......@@ -32,6 +32,7 @@ C_BEGIN
typedef int(*BRT_GET_CALLBACK_FUNCTION)(ITEMLEN keylen, bytevec key, ITEMLEN vallen, bytevec val, void *extra, bool lock_only);
int toku_open_brt (const char *fname, int is_create, BRT *, int nodesize, int basementnodesize, CACHETABLE, TOKUTXN, int(*)(DB *,const DBT*,const DBT*), DB*) __attribute__ ((warn_unused_result));
void toku_brt_update_cmp_descriptor(BRT t);
int toku_brt_change_descriptor(BRT t, const DBT* old_descriptor, const DBT* new_descriptor, BOOL do_log, TOKUTXN txn);
int toku_update_descriptor(struct brt_header * h, DESCRIPTOR d, int fd);
// Note: See the locking discussion in brt.c for toku_brt_change_descriptor and toku_update_descriptor.
......
......@@ -48,7 +48,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
bool made_change;
u_int64_t workdone=0;
toku_apply_cmd_to_leaf(brt->compare_fun, brt->update_fun, &brt->h->descriptor, leafnode, &cmd, &made_change, &workdone, NULL, NULL);
toku_apply_cmd_to_leaf(brt->compare_fun, brt->update_fun, &brt->h->cmp_descriptor, leafnode, &cmd, &made_change, &workdone, NULL, NULL);
{
int r = toku_brt_lookup(brt, &thekey, lookup_checkf, &pair);
assert(r==0);
......@@ -56,7 +56,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
}
BRT_MSG_S badcmd = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } };
toku_apply_cmd_to_leaf(brt->compare_fun, brt->update_fun, &brt->h->descriptor, leafnode, &badcmd, &made_change, &workdone, NULL, NULL);
toku_apply_cmd_to_leaf(brt->compare_fun, brt->update_fun, &brt->h->cmp_descriptor, leafnode, &badcmd, &made_change, &workdone, NULL, NULL);
// message should be rejected for duplicate msn, row should still have original val
......@@ -69,7 +69,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
// now verify that message with proper msn gets through
msn = next_dummymsn();
BRT_MSG_S cmd2 = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &val2 } };
toku_apply_cmd_to_leaf(brt->compare_fun, brt->update_fun, &brt->h->descriptor, leafnode, &cmd2, &made_change, &workdone, NULL, NULL);
toku_apply_cmd_to_leaf(brt->compare_fun, brt->update_fun, &brt->h->cmp_descriptor, leafnode, &cmd2, &made_change, &workdone, NULL, NULL);
// message should be accepted, val should have new value
{
......@@ -81,7 +81,7 @@ append_leaf(BRT brt, BRTNODE leafnode, void *key, size_t keylen, void *val, size
// now verify that message with lesser (older) msn is rejected
msn.msn = msn.msn - 10;
BRT_MSG_S cmd3 = { BRT_INSERT, msn, xids_get_root_xids(), .u.id = { &thekey, &badval } };
toku_apply_cmd_to_leaf(brt->compare_fun, brt->update_fun, &brt->h->descriptor, leafnode, &cmd3, &made_change, &workdone, NULL, NULL);
toku_apply_cmd_to_leaf(brt->compare_fun, brt->update_fun, &brt->h->cmp_descriptor, leafnode, &cmd3, &made_change, &workdone, NULL, NULL);
// message should be rejected, val should still have value in pair2
{
......
......@@ -289,6 +289,7 @@ BDB_DONTRUN_TESTS = \
test_blobs_leaf_split \
test_bulk_fetch \
test_compression_methods \
test_cmp_descriptor \
test_cursor_delete_2119 \
test_db_descriptor \
test_db_descriptor_named_db \
......
......@@ -8,9 +8,9 @@
static int my_compare (DB *db, const DBT *a, const DBT *b) {
assert(db);
assert(db->descriptor);
assert(db->descriptor->dbt.size >= 3);
char *data = db->descriptor->dbt.data;
assert(db->cmp_descriptor);
assert(db->cmp_descriptor->dbt.size >= 3);
char *data = db->cmp_descriptor->dbt.data;
assert(data[0]=='f');
assert(data[1]=='o');
assert(data[2]=='o');
......@@ -117,7 +117,7 @@ int test_main(int argc, char * const argv[]) {
dbt_init(&desc, "foo", sizeof("foo"));
IN_TXN_COMMIT(env, NULL, txn, 0,
CHK(db->change_descriptor(db, txn, &desc, 0)));
CHK(db->update_cmp_descriptor(db));
pthread_t thd;
CHK(toku_pthread_create(&thd, NULL, startA, NULL));
......
// test that an update calls back into the update function
#include "test.h"
const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_TXN|DB_PRIVATE;
DB_ENV *env;
BOOL cmp_desc_is_four;
u_int32_t four_byte_desc = 0xffffffff;
u_int64_t eight_byte_desc = 0x12345678ffffffff;
static int generate_row_for_put(
DB *UU(dest_db),
DB *UU(src_db),
DBT *dest_key,
DBT *dest_val,
const DBT *src_key,
const DBT *src_val
)
{
dest_key->data = src_key->data;
dest_key->size = src_key->size;
dest_key->flags = 0;
dest_val->data = src_val->data;
dest_val->size = src_val->size;
dest_val->flags = 0;
return 0;
}
static void assert_cmp_desc_valid (DB* db) {
if (cmp_desc_is_four) {
assert(db->cmp_descriptor->dbt.size == sizeof(four_byte_desc));
}
else {
assert(db->cmp_descriptor->dbt.size == sizeof(eight_byte_desc));
}
unsigned char* cmp_desc_data = db->cmp_descriptor->dbt.data;
assert(cmp_desc_data[0] == 0xff);
assert(cmp_desc_data[1] == 0xff);
assert(cmp_desc_data[2] == 0xff);
assert(cmp_desc_data[3] == 0xff);
}
static void assert_desc_four (DB* db) {
assert(db->descriptor->dbt.size == sizeof(four_byte_desc));
assert(*(u_int32_t *)(db->descriptor->dbt.data) == four_byte_desc);
}
static void assert_desc_eight (DB* db) {
assert(db->descriptor->dbt.size == sizeof(eight_byte_desc));
assert(*(u_int64_t *)(db->descriptor->dbt.data) == eight_byte_desc);
}
static int
desc_int64_dbt_cmp (DB *db, const DBT *a, const DBT *b) {
assert_cmp_desc_valid(db);
assert(a);
assert(b);
assert(a->size == sizeof(int64_t));
assert(b->size == sizeof(int64_t));
int64_t x = *(int64_t *) a->data;
int64_t y = *(int64_t *) b->data;
if (x<y) return -1;
if (x>y) return 1;
return 0;
}
static void setup (void) {
int r;
CHK(system("rm -rf " ENVDIR));
CHK(toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO));
CHK(db_env_create(&env, 0));
env->set_errfile(env, stderr);
r = env->set_default_bt_compare(env, desc_int64_dbt_cmp); CKERR(r);
//r = env->set_cachesize(env, 0, 500000, 1); CKERR(r);
r = env->set_generate_row_callback_for_put(env, generate_row_for_put); CKERR(r);
CHK(env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO));
}
static void cleanup (void) {
CHK(env->close(env, 0));
}
static void do_inserts_and_queries(DB* db) {
int r = 0;
DB_TXN* write_txn = NULL;
r = env->txn_begin(env, NULL, &write_txn, 0);
CKERR(r);
for (int i = 0; i < 2000; i++) {
u_int64_t key_data = random();
u_int64_t val_data = random();
DBT key, val;
dbt_init(&key, &key_data, sizeof(key_data));
dbt_init(&val, &val_data, sizeof(val_data));
CHK(db->put(db, write_txn, &key, &val, 0));
}
r = write_txn->commit(write_txn, 0);
CKERR(r);
for (int i = 0; i < 2; i++) {
DB_TXN* read_txn = NULL;
r = env->txn_begin(env, NULL, &read_txn, 0);
CKERR(r);
DBC* cursor = NULL;
r = db->cursor(db, read_txn, &cursor, 0);
CKERR(r);
if (i == 0) {
r = cursor->c_pre_acquire_range_lock(
cursor,
db->dbt_neg_infty(),
db->dbt_pos_infty()
);
CKERR(r);
}
while(r != DB_NOTFOUND) {
DBT key, val;
memset(&key, 0, sizeof(key));
memset(&val, 0, sizeof(val));
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
assert(r == 0 || r == DB_NOTFOUND);
}
r = cursor->c_close(cursor);
CKERR(r);
r = read_txn->commit(read_txn, 0);
CKERR(r);
}
}
static void run_test(void) {
DB* db = NULL;
int r;
cmp_desc_is_four = TRUE;
DBT orig_desc;
memset(&orig_desc, 0, sizeof(orig_desc));
orig_desc.size = sizeof(four_byte_desc);
orig_desc.data = &four_byte_desc;
DBT other_desc;
memset(&other_desc, 0, sizeof(other_desc));
other_desc.size = sizeof(eight_byte_desc);
other_desc.data = &eight_byte_desc;
DB_LOADER *loader = NULL;
DBT key, val;
u_int64_t k = 0;
u_int64_t v = 0;
IN_TXN_COMMIT(env, NULL, txn_create, 0, {
CHK(db_create(&db, env, 0));
assert(db->descriptor == NULL);
r = db->set_pagesize(db, 2048);
CKERR(r);
r = db->set_readpagesize(db, 1024);
CKERR(r);
CHK(db->open(db, txn_create, "foo.db", NULL, DB_BTREE, DB_CREATE, 0666));
assert(db->descriptor->dbt.size == 0);
assert(db->cmp_descriptor->dbt.size == 0);
CHK(db->change_descriptor(db, txn_create, &orig_desc, 0));
assert_desc_four(db);
assert(db->cmp_descriptor->dbt.size == 0);
CHK(db->update_cmp_descriptor(db));
assert_cmp_desc_valid(db);
r = env->create_loader(env, txn_create, &loader, db, 1, &db, NULL, NULL, 0);
CKERR(r);
dbt_init(&key, &k, sizeof k);
dbt_init(&val, &v, sizeof v);
r = loader->put(loader, &key, &val);
CKERR(r);
r = loader->close(loader);
CKERR(r);
assert_cmp_desc_valid(db);
});
assert_cmp_desc_valid(db);
CKERR(r);
do_inserts_and_queries(db);
IN_TXN_COMMIT(env, NULL, txn_1, 0, {
CHK(db->change_descriptor(db, txn_1, &other_desc, 0));
assert_desc_eight(db);
assert_cmp_desc_valid(db);
});
assert_desc_eight(db);
assert_cmp_desc_valid(db);
do_inserts_and_queries(db);
IN_TXN_ABORT(env, NULL, txn_1, 0, {
CHK(db->change_descriptor(db, txn_1, &orig_desc, 0));
assert_desc_four(db);
assert_cmp_desc_valid(db);
});
assert_desc_eight(db);
assert_cmp_desc_valid(db);
do_inserts_and_queries(db);
CHK(db->close(db, 0));
// verify that after close and reopen, cmp_descriptor is now
// latest descriptor
cmp_desc_is_four = FALSE;
CHK(db_create(&db, env, 0));
CHK(db->open(db, NULL, "foo.db", NULL, DB_BTREE, DB_AUTO_COMMIT, 0666));
assert_desc_eight(db);
assert_cmp_desc_valid(db);
do_inserts_and_queries(db);
CHK(db->close(db, 0));
}
int test_main (int argc, char * const argv[]) {
parse_args(argc, argv);
setup();
run_test();
cleanup();
return 0;
}
......@@ -499,6 +499,12 @@ toku_db_rename(DB * db, const char *fname, const char *dbname, const char *newna
return r;
}
static int
toku_db_update_cmp_descriptor(DB *db) {
toku_brt_update_cmp_descriptor(db->i->brt);
return 0;
}
//
// This function is the only way to set a descriptor of a DB.
//
......@@ -1071,6 +1077,7 @@ toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
#undef SDB
// methods that take the ydb lock in some capacity,
// but not from beginning to end
result->update_cmp_descriptor = toku_db_update_cmp_descriptor;
result->del = autotxn_db_del;
result->put = autotxn_db_put;
result->update = autotxn_db_update;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment