Commit 32156c75 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

refs #6184 new cardinality and status code with better unit tests

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@54450 c7de825b-a66e-492c-adef-691d508d4ae1
parent 70107a43
......@@ -20,6 +20,8 @@ static inline void thd_data_set(THD *thd, int slot, void *data) {
thd->ha_data[slot].ha_ptr = data;
}
static inline uint get_key_parts(const KEY *key);
#undef PACKAGE
#undef VERSION
#undef HAVE_DTRACE
......@@ -31,6 +33,9 @@ static inline void thd_data_set(THD *thd, int slot, void *data) {
#else
#endif
#include "tokudb_buffer.h"
#include "tokudb_status.h"
#include "tokudb_card.h"
#include "ha_tokudb.h"
#include "hatoku_hton.h"
#include <mysql/plugin.h>
......@@ -213,10 +218,8 @@ static int free_share(TOKUDB_SHARE * share, bool mutex_is_locked) {
free_key_and_col_info(&share->kc_info);
if (share->status_block && (error = share->status_block->close(share->status_block, 0))) {
error = tokudb::close_status(&share->status_block);
assert(error == 0);
result = error;
}
my_hash_delete(&tokudb_open_tables, (uchar *) share);
thr_lock_delete(&share->lock);
......@@ -1285,17 +1288,12 @@ bool ha_tokudb::has_auto_increment_flag(uint* index) {
return ai_found;
}
#define status_dict_pagesize 1024
static int open_status_dictionary(DB** ptr, const char* name, DB_TXN* txn) {
int error;
char* newname = NULL;
uint open_mode = DB_THREAD;
uint32_t pagesize = 0;
newname = (char *)my_malloc(
get_max_dict_name_path_length(name),
MYF(MY_WME)
);
MYF(MY_WME));
if (newname == NULL) {
error = ENOMEM;
goto cleanup;
......@@ -1304,32 +1302,9 @@ static int open_status_dictionary(DB** ptr, const char* name, DB_TXN* txn) {
if (tokudb_debug & TOKUDB_DEBUG_OPEN) {
TOKUDB_TRACE("open:%s\n", newname);
}
error = db_create(ptr, db_env, 0);
if (error) { goto cleanup; }
error = (*ptr)->open((*ptr), txn, newname, NULL, DB_BTREE, open_mode, 0);
if (error) {
goto cleanup;
}
error = (*ptr)->get_pagesize(*ptr, &pagesize);
if (error) {
goto cleanup;
}
if (pagesize > status_dict_pagesize) {
error = (*ptr)->change_pagesize(*ptr, status_dict_pagesize);
if (error) { goto cleanup; }
}
error = tokudb::open_status(db_env, ptr, newname, txn);
cleanup:
if (error) {
if (*ptr) {
int r = (*ptr)->close(*ptr, 0);
assert(r==0);
*ptr = NULL;
}
}
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
return error;
}
......@@ -5937,13 +5912,13 @@ int ha_tokudb::info(uint flag) {
if ((flag & HA_STATUS_CONST)) {
stats.max_data_file_length= 9223372036854775807ULL;
uint64_t rec_per_key[table_share->key_parts];
error = share->get_card_from_status(txn, table_share->key_parts, rec_per_key);
error = tokudb::get_card_from_status(share->status_block, txn, table_share->key_parts, rec_per_key);
if (error == 0) {
share->set_card_in_key_info(table, table_share->key_parts, rec_per_key);
tokudb::set_card_in_key_info(table, table_share->key_parts, rec_per_key);
} else {
for (uint i = 0; i < table_share->key_parts; i++)
rec_per_key[i] = 0;
share->set_card_in_key_info(table, table_share->key_parts, rec_per_key);
tokudb::set_card_in_key_info(table, table_share->key_parts, rec_per_key);
}
}
/* Don't return key if we got an error for the internal primary key */
......@@ -6911,21 +6886,14 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
/* Create status.tokudb and save relevant metadata */
make_name(newname, name, "status");
error = db_create(&status_block, db_env, 0);
if (error) { goto cleanup; }
error = status_block->set_pagesize(status_block, status_dict_pagesize);
if (error) { goto cleanup; }
error = status_block->open(status_block, txn, newname, NULL, DB_BTREE, DB_CREATE | DB_EXCL, 0);
error = tokudb::create_status(db_env, &status_block, newname, txn);
if (error) { goto cleanup; }
version = HA_TOKU_VERSION;
capabilities = HA_TOKU_CAP;
error = write_to_status(status_block, hatoku_new_version,&version,sizeof(version), txn);
if (error) { goto cleanup; }
capabilities = HA_TOKU_CAP;
error = write_to_status(status_block, hatoku_capabilities,&capabilities,sizeof(capabilities), txn);
if (error) { goto cleanup; }
......@@ -6978,7 +6946,7 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
error = 0;
cleanup:
if (status_block != NULL) {
int r = status_block->close(status_block, 0);
int r = tokudb::close_status(&status_block);
assert(r==0);
}
free_key_and_col_info(&kc_info);
......@@ -8281,7 +8249,6 @@ Item* ha_tokudb::idx_cond_push(uint keyno_arg, Item* idx_cond_arg) {
}
// table admin
#include "tokudb_card.cc"
#include "ha_tokudb_admin.cc"
// update functions
......
......@@ -11,21 +11,6 @@
//
#define HA_TOKU_CAP 0
//
// These are keys that will be used for retrieving metadata in status.tokudb
// To get the version, one looks up the value associated with key hatoku_version
// in status.tokudb
//
typedef ulonglong HA_METADATA_KEY;
#define hatoku_old_version 0
#define hatoku_capabilities 1
#define hatoku_max_ai 2 //maximum auto increment value found so far
#define hatoku_ai_create_value 3
#define hatoku_key_name 4
#define hatoku_frm_data 5
#define hatoku_new_version 6
#define hatoku_cardinality 7
class ha_tokudb;
typedef struct loader_context {
......@@ -105,29 +90,6 @@ class TOKUDB_SHARE {
bool replace_into_fast;
rw_lock_t num_DBs_lock;
uint32_t num_DBs;
// Set the key_info cardinality counters for the table.
void set_card_in_key_info(TABLE *table, uint rec_per_keys, uint64_t rec_per_key[]);
// Put the cardinality counters into the status dictionary.
void set_card_in_status(DB_TXN *txn, uint rec_per_keys, uint64_t rec_per_key[]);
// Get the cardinality counters from the status dictionary.
int get_card_from_status(DB_TXN *txn, uint rec_per_keys, uint64_t rec_per_key[]);
// Delete the cardinality counters from the status dictionary.
void delete_card_from_status(DB_TXN *txn);
// Get the val for a given key in the status dictionary.
// Returns 0 if successful.
int get_status(DB_TXN *txn, HA_METADATA_KEY k, DBT *val);
int get_status(DB_TXN *txn, HA_METADATA_KEY k, void *p, size_t s);
// Put a val for a given key into the status dictionary.
int put_status(DB_TXN *txn, HA_METADATA_KEY k, void *p, size_t s);
// Delete a key from the status dictionary.
int delete_status(DB_TXN *txn, HA_METADATA_KEY k);
};
typedef struct st_filter_key_part_info {
......@@ -495,7 +457,6 @@ class ha_tokudb : public handler {
int rename_table(const char *from, const char *to);
int optimize(THD * thd, HA_CHECK_OPT * check_opt);
int analyze(THD * thd, HA_CHECK_OPT * check_opt);
int analyze_key(THD *thd, DB_TXN *txn, uint key_i, KEY *key_info, uint64_t num_key_parts, uint64_t *rec_per_key_part);
int write_row(uchar * buf);
int update_row(const uchar * old_data, uchar * new_data);
int delete_row(const uchar * buf);
......
volatile int ha_tokudb_analyze_wait = 0; // debug
struct analyze_progress_extra {
THD *thd;
TOKUDB_SHARE *share;
TABLE_SHARE *table_share;
uint key_i;
const char *key_name;
time_t t_start;
char *write_status_msg;
};
static int analyze_progress(void *v_extra, uint64_t rows) {
struct analyze_progress_extra *extra = (struct analyze_progress_extra *) v_extra;
THD *thd = extra->thd;
if (thd->killed)
return ER_ABORTING_CONNECTION;
time_t t_now = time(0);
time_t t_limit = get_analyze_time(thd);
time_t t_start = extra->t_start;
if (t_limit > 0 && t_now - t_start > t_limit)
return 0;
float progress_rows = 0.0;
TOKUDB_SHARE *share = extra->share;
if (share->rows > 0)
progress_rows = (float) rows / (float) share->rows;
float progress_time = 0.0;
if (t_limit > 0)
progress_time = (float) (t_now - t_start) / (float) t_limit;
char *write_status_msg = extra->write_status_msg;
TABLE_SHARE *table_share = extra->table_share;
sprintf(write_status_msg, "%s.%s.%s %u of %u %.lf%% rows %.lf%% time",
table_share->db.str, table_share->table_name.str, extra->key_name,
extra->key_i, table_share->keys, progress_rows * 100.0, progress_time * 100.0);
thd_proc_info(thd, write_status_msg);
return 0;
}
int ha_tokudb::analyze(THD *thd, HA_CHECK_OPT *check_opt) {
TOKUDB_DBUG_ENTER("ha_tokudb::analyze");
while (ha_tokudb_analyze_wait) sleep(1); // debug concurrency issues
......@@ -14,7 +51,12 @@ int ha_tokudb::analyze(THD *thd, HA_CHECK_OPT *check_opt) {
for (uint i = 0; result == HA_ADMIN_OK && i < table_share->keys; i++) {
KEY *key_info = &table_share->key_info[i];
uint64_t num_key_parts = get_key_parts(key_info);
int error = analyze_key(thd, txn, i, key_info, num_key_parts, &rec_per_key[next_key_part]);
const char *key_name = i == primary_key ? "primary" : key_info->name;
struct analyze_progress_extra analyze_progress_extra = {
thd, share, table_share, i, key_name, time(0), write_status_msg
};
int error = tokudb::analyze_card(share->key_file[i], txn, false, num_key_parts, &rec_per_key[next_key_part],
tokudb_cmp_dbt_key_parts, analyze_progress, &analyze_progress_extra);
if (error) {
result = HA_ADMIN_FAILED;
} else {
......@@ -31,97 +73,10 @@ int ha_tokudb::analyze(THD *thd, HA_CHECK_OPT *check_opt) {
}
}
if (result == HA_ADMIN_OK)
share->set_card_in_status(txn, table_share->key_parts, rec_per_key);
tokudb::set_card_in_status(share->status_block, txn, table_share->key_parts, rec_per_key);
TOKUDB_DBUG_RETURN(result);
}
// Compute records per key for all key parts of the ith key of the table.
// For each key part, put records per key part in *rec_per_key_part[key_part_index].
// Returns 0 if success, otherwise an error number.
// TODO statistical dives into the FT
int ha_tokudb::analyze_key(THD *thd, DB_TXN *txn, uint key_i, KEY *key_info, uint64_t num_key_parts, uint64_t *rec_per_key_part) {
TOKUDB_DBUG_ENTER("ha_tokudb::analyze_key");
int error = 0;
DB *db = share->key_file[key_i];
DBC *cursor = NULL;
error = db->cursor(db, txn, &cursor, 0);
if (error == 0) {
uint64_t rows = 0;
uint64_t unique_rows[num_key_parts];
for (uint64_t i = 0; i < num_key_parts; i++)
unique_rows[i] = 1;
// stop looking when the entire dictionary was analyzed, or a cap on execution time was reached, or the analyze was killed.
DBT key = {}; key.flags = DB_DBT_REALLOC;
DBT prev_key = {}; prev_key.flags = DB_DBT_REALLOC;
time_t t_start = time(0);
while (1) {
error = cursor->c_get(cursor, &key, 0, DB_NEXT);
if (error != 0) {
if (error == DB_NOTFOUND)
error = 0; // eof is not an error
break;
}
rows++;
// first row is a unique row, otherwise compare with the previous key
bool copy_key = false;
if (rows == 1) {
copy_key = true;
} else {
// compare this key with the previous key. ignore appended PK for SK's.
// TODO if a prefix is different, then all larger keys that include the prefix are also different.
// TODO if we are comparing the entire primary key or the entire unique secondary key, then the cardinality must be 1,
// so we can avoid computing it.
for (uint64_t i = 0; i < num_key_parts; i++) {
int cmp = tokudb_cmp_dbt_key_parts(db, &prev_key, &key, i+1);
if (cmp != 0) {
unique_rows[i]++;
copy_key = true;
}
}
}
// prev_key = key
if (copy_key) {
prev_key.data = realloc(prev_key.data, key.size);
assert(prev_key.data);
prev_key.size = key.size;
memcpy(prev_key.data, key.data, prev_key.size);
}
// check for limit
if ((rows % 1000) == 0) {
if (thd->killed) {
error = ER_ABORTING_CONNECTION;
break;
}
time_t t_now = time(0);
time_t t_limit = get_analyze_time(thd);
if (t_limit > 0 && t_now - t_start > t_limit)
break;
float progress_rows = 0.0;
if (share->rows > 0)
progress_rows = (float) rows / (float) share->rows;
float progress_time = 0.0;
if (t_limit > 0)
progress_time = (float) (t_now - t_start) / (float) t_limit;
sprintf(write_status_msg, "%s.%s.%s %u of %u %.lf%% rows %.lf%% time",
table_share->db.str, table_share->table_name.str, key_i == primary_key ? "primary" : table_share->key_info[key_i].name,
key_i, table_share->keys, progress_rows * 100.0, progress_time * 100.0);
thd_proc_info(thd, write_status_msg);
}
}
// cleanup
free(key.data);
free(prev_key.data);
int close_error = cursor->c_close(cursor);
assert(close_error == 0);
// return cardinality
if (error == 0) {
for (uint64_t i = 0; i < num_key_parts; i++)
rec_per_key_part[i] = rows / unique_rows[i];
}
}
TOKUDB_DBUG_RETURN(error);
}
static int hot_poll_fun(void *extra, float progress) {
HOT_OPTIMIZE_CONTEXT context = (HOT_OPTIMIZE_CONTEXT)extra;
if (context->thd->killed) {
......
......@@ -422,7 +422,7 @@ int ha_tokudb::alter_table_add_index(TABLE *altered_table, Alter_inplace_info *h
my_free(key_info);
if (error == 0)
share->delete_card_from_status(ctx->alter_txn);
tokudb::delete_card_from_status(share->status_block, ctx->alter_txn);
return error;
}
......@@ -469,7 +469,7 @@ int ha_tokudb::alter_table_drop_index(TABLE *altered_table, Alter_inplace_info *
int error = drop_indexes(table, index_drop_offsets, ha_alter_info->index_drop_count, key_info, ctx->alter_txn);
if (error == 0)
share->delete_card_from_status(ctx->alter_txn);
tokudb::delete_card_from_status(share->status_block, ctx->alter_txn);
return error;
}
......
......@@ -4,6 +4,8 @@ CHECKS = $(patsubst %,%.check,$(TARGETS))
CPPFLAGS = -I..
CXXFLAGS = -g -fno-implicit-templates
FRACTALTREE_DIR = ../../../../tokudb/release
ifeq ($(GCOV),1)
CXXFLAGS += -fprofile-arcs -ftest-coverage
endif
......@@ -11,7 +13,7 @@ endif
all: $(TARGETS)
clean:
rm -rf $(TARGETS) *.gcov *.gcno *.gcda
rm -rf $(TARGETS) *.gcov *.gcno *.gcda *.testdir
check: $(CHECKS)
true
......@@ -22,4 +24,17 @@ check: $(CHECKS)
max_test.check: max_test
valgrind -q ./$< 1 2
card_test: card_test.cc
$(CXX) -g -o $@ $< -I.. -I$(FRACTALTREE_DIR)/include -L$(FRACTALTREE_DIR)/lib -ltokudb -ltokuportability
card_1: card_1.cc
$(CXX) -g -o $@ $< -I.. -I$(FRACTALTREE_DIR)/include -L$(FRACTALTREE_DIR)/lib -ltokudb -ltokuportability
card_inf: card_inf.cc
$(CXX) -g -o $@ $< -I.. -I$(FRACTALTREE_DIR)/include -L$(FRACTALTREE_DIR)/lib -ltokudb -ltokuportability
card_inf_1: card_inf_1.cc
$(CXX) -g -o $@ $< -I.. -I$(FRACTALTREE_DIR)/include -L$(FRACTALTREE_DIR)/lib -ltokudb -ltokuportability
card_random_1: card_random_1.cc
$(CXX) -g -o $@ $< -I.. -I$(FRACTALTREE_DIR)/include -L$(FRACTALTREE_DIR)/lib -ltokudb -ltokuportability
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
// Test cardinality algorithm on a single level unique key
#ident "Copyright (c) 2013 Tokutek Inc. All rights reserved."
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <assert.h>
#include <errno.h>
#include <db.h>
#include <endian.h>
#include <byteswap.h>
#include <sys/stat.h>
typedef unsigned long long ulonglong;
#include "tokudb_status.h"
#include "tokudb_buffer.h"
// Provide some mimimal MySQL classes just to compile the tokudb cardinality functions
class KEY_INFO {
public:
uint flags;
uint64_t *rec_per_key;
};
#define HA_NOSAME 1
class TABLE_SHARE {
public:
uint primary_key;
uint keys;
};
class TABLE {
public:
TABLE_SHARE *s;
KEY_INFO *key_info;
};
uint get_key_parts(KEY_INFO *key_info) {
return 0;
}
#include "tokudb_card.h"
static uint32_t hton32(uint32_t n) {
#if BYTE_ORDER == LITTLE_ENDIAN
return bswap_32(n);
#else
return n;
#endif
}
struct key {
uint32_t k0;
}; // __attribute__((packed));
struct val {
uint32_t v0;
}; // __attribute__((packed));
// load nrows into the db
static void load_db(DB_ENV *env, DB *db, uint32_t nrows) {
DB_TXN *txn = NULL;
int r = env->txn_begin(env, NULL, &txn, 0);
assert(r == 0);
DB_LOADER *loader = NULL;
uint32_t db_flags[1] = { 0 };
uint32_t dbt_flags[1] = { 0 };
uint32_t loader_flags = 0;
r = env->create_loader(env, txn, &loader, db, 1, &db, db_flags, dbt_flags, loader_flags);
assert(r == 0);
for (uint32_t seq = 0; seq < nrows ; seq++) {
struct key k = { hton32(seq) };
struct val v = { seq };
DBT key = { .data = &k, .size = sizeof k };
DBT val = { .data = &v, .size = sizeof v };
r = loader->put(loader, &key, &val);
assert(r == 0);
}
r = loader->close(loader);
assert(r == 0);
r = txn->commit(txn, 0);
assert(r == 0);
}
static int analyze_key_compare(DB *db, const DBT *a, const DBT *b, uint level) {
assert(level == 1);
assert(a->size == b->size);
return memcmp(a->data, b->data, a->size);
}
static void test_card(DB_ENV *env, DB *db, uint64_t nrows, uint64_t expect_card) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0);
assert(r == 0);
uint64_t num_key_parts = 1;
uint64_t rec_per_key[num_key_parts];
r = tokudb::analyze_card(db, txn, false, num_key_parts, rec_per_key, analyze_key_compare, NULL, NULL);
assert(r == 0);
assert(rec_per_key[0] == expect_card);
r = txn->commit(txn, 0);
assert(r == 0);
}
int main(int argc, char * const argv[]) {
uint32_t nrows = 1000000;
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "--nrows") == 0 && i+1 < argc) {
nrows = atoi(argv[++i]);
continue;
}
}
int r;
r = system("rm -rf " __FILE__ ".testdir");
assert(r == 0);
r = mkdir(__FILE__ ".testdir", S_IRWXU+S_IRWXG+S_IRWXO);
assert(r == 0);
DB_ENV *env = NULL;
r = db_env_create(&env, 0);
assert(r == 0);
r = env->open(env, __FILE__ ".testdir", DB_INIT_MPOOL + DB_INIT_LOG + DB_INIT_LOCK + DB_INIT_TXN + DB_PRIVATE + DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO);
assert(r == 0);
// create the db
DB *db = NULL;
r = db_create(&db, env, 0);
assert(r == 0);
r = db->open(db, NULL, "test.db", 0, DB_BTREE, DB_CREATE + DB_AUTO_COMMIT, S_IRWXU+S_IRWXG+S_IRWXO);
assert(r == 0);
// load the db
load_db(env, db, nrows);
// test cardinality
test_card(env, db, nrows, 1);
r = db->close(db, 0);
assert(r == 0);
r = env->close(env, 0);
assert(r == 0);
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
// Test cardinality algorithm on a single level identical key
#ident "Copyright (c) 2013 Tokutek Inc. All rights reserved."
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <assert.h>
#include <errno.h>
#include <db.h>
#include <endian.h>
#include <byteswap.h>
#include <sys/stat.h>
typedef unsigned long long ulonglong;
#include "tokudb_status.h"
#include "tokudb_buffer.h"
// Provide some mimimal MySQL classes just to compile the tokudb cardinality functions
class KEY_INFO {
public:
uint flags;
uint64_t *rec_per_key;
};
#define HA_NOSAME 1
class TABLE_SHARE {
public:
uint primary_key;
uint keys;
};
class TABLE {
public:
TABLE_SHARE *s;
KEY_INFO *key_info;
};
uint get_key_parts(KEY_INFO *key_info) {
return 0;
}
#include "tokudb_card.h"
static uint32_t hton32(uint32_t n) {
#if BYTE_ORDER == LITTLE_ENDIAN
return bswap_32(n);
#else
return n;
#endif
}
struct key {
uint32_t k0;
uint32_t seq;
}; // __attribute__((packed));
struct val {
uint32_t v0;
}; // __attribute__((packed));
// load nrows into the db
static void load_db(DB_ENV *env, DB *db, uint32_t nrows) {
DB_TXN *txn = NULL;
int r = env->txn_begin(env, NULL, &txn, 0);
assert(r == 0);
DB_LOADER *loader = NULL;
uint32_t db_flags[1] = { 0 };
uint32_t dbt_flags[1] = { 0 };
uint32_t loader_flags = 0;
r = env->create_loader(env, txn, &loader, db, 1, &db, db_flags, dbt_flags, loader_flags);
assert(r == 0);
for (uint32_t seq = 0; seq < nrows ; seq++) {
struct key k = { 0, hton32(seq) };
struct val v = { seq };
DBT key = { .data = &k, .size = sizeof k };
DBT val = { .data = &v, .size = sizeof v };
r = loader->put(loader, &key, &val);
assert(r == 0);
}
r = loader->close(loader);
assert(r == 0);
r = txn->commit(txn, 0);
assert(r == 0);
}
// only compare the first level of the key
static int analyze_key_compare(DB *db, const DBT *a, const DBT *b, uint level) {
assert(level == 1);
assert(a->size == b->size);
assert(a->size == sizeof (struct key));
return memcmp(a->data, b->data, sizeof (uint32_t));
}
static void test_card(DB_ENV *env, DB *db, uint64_t nrows, uint64_t expect_card) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0);
assert(r == 0);
uint64_t num_key_parts = 1;
uint64_t rec_per_key[num_key_parts];
r = tokudb::analyze_card(db, txn, false, num_key_parts, rec_per_key, analyze_key_compare, NULL, NULL);
assert(r == 0);
assert(rec_per_key[0] == expect_card);
r = txn->commit(txn, 0);
assert(r == 0);
}
int main(int argc, char * const argv[]) {
uint64_t nrows = 1000000;
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "--nrows") == 0 && i+1 < argc) {
nrows = atoll(argv[++i]);
continue;
}
}
int r;
r = system("rm -rf " __FILE__ ".testdir");
assert(r == 0);
r = mkdir(__FILE__ ".testdir", S_IRWXU+S_IRWXG+S_IRWXO);
assert(r == 0);
DB_ENV *env = NULL;
r = db_env_create(&env, 0);
assert(r == 0);
r = env->open(env, __FILE__ ".testdir", DB_INIT_MPOOL + DB_INIT_LOG + DB_INIT_LOCK + DB_INIT_TXN + DB_PRIVATE + DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO);
assert(r == 0);
// create the db
DB *db = NULL;
r = db_create(&db, env, 0);
assert(r == 0);
r = db->open(db, NULL, "test.db", 0, DB_BTREE, DB_CREATE + DB_AUTO_COMMIT, S_IRWXU+S_IRWXG+S_IRWXO);
assert(r == 0);
// load the db
load_db(env, db, nrows);
// test cardinality
test_card(env, db, nrows, nrows);
r = db->close(db, 0);
assert(r == 0);
r = env->close(env, 0);
assert(r == 0);
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
// Test cardinality algorithm on a 2 level key where the first level is identical and the second level is unique
#ident "Copyright (c) 2013 Tokutek Inc. All rights reserved."
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <assert.h>
#include <errno.h>
#include <db.h>
#include <endian.h>
#include <byteswap.h>
#include <sys/stat.h>
typedef unsigned long long ulonglong;
#include "tokudb_status.h"
#include "tokudb_buffer.h"
// Provide some mimimal MySQL classes just to compile the tokudb cardinality functions
class KEY_INFO {
public:
uint flags;
uint64_t *rec_per_key;
};
#define HA_NOSAME 1
class TABLE_SHARE {
public:
uint primary_key;
uint keys;
};
class TABLE {
public:
TABLE_SHARE *s;
KEY_INFO *key_info;
};
uint get_key_parts(KEY_INFO *key_info) {
return 0;
}
#include "tokudb_card.h"
static uint32_t hton32(uint32_t n) {
#if BYTE_ORDER == LITTLE_ENDIAN
return bswap_32(n);
#else
return n;
#endif
}
struct key {
uint32_t k0;
uint32_t seq;
}; // __attribute__((packed));
struct val {
uint32_t v0;
}; // __attribute__((packed));
// load nrows into the db
static void load_db(DB_ENV *env, DB *db, uint32_t nrows) {
DB_TXN *txn = NULL;
int r = env->txn_begin(env, NULL, &txn, 0);
assert(r == 0);
DB_LOADER *loader = NULL;
uint32_t db_flags[1] = { 0 };
uint32_t dbt_flags[1] = { 0 };
uint32_t loader_flags = 0;
r = env->create_loader(env, txn, &loader, db, 1, &db, db_flags, dbt_flags, loader_flags);
assert(r == 0);
for (uint32_t seq = 0; seq < nrows ; seq++) {
struct key k = { 0, hton32(seq) };
struct val v = { seq };
DBT key = { .data = &k, .size = sizeof k };
DBT val = { .data = &v, .size = sizeof v };
r = loader->put(loader, &key, &val);
assert(r == 0);
}
r = loader->close(loader);
assert(r == 0);
r = txn->commit(txn, 0);
assert(r == 0);
}
static int analyze_key_compare(DB *db, const DBT *a, const DBT *b, uint level) {
assert(a->size == b->size);
switch (level) {
default:
assert(0);
case 1:
return memcmp(a->data, b->data, sizeof (uint32_t));
case 2:
assert(a->size == sizeof (struct key));
return memcmp(a->data, b->data, sizeof (struct key));
}
}
static void test_card(DB_ENV *env, DB *db, uint64_t nrows, uint64_t expect[]) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0);
assert(r == 0);
uint64_t num_key_parts = 2;
uint64_t rec_per_key[num_key_parts];
r = tokudb::analyze_card(db, txn, false, num_key_parts, rec_per_key, analyze_key_compare, NULL, NULL);
assert(r == 0);
assert(rec_per_key[0] == expect[0]);
assert(rec_per_key[1] == expect[1]);
r = txn->commit(txn, 0);
assert(r == 0);
}
int main(int argc, char * const argv[]) {
uint64_t nrows = 1000000;
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "--nrows") == 0 && i+1 < argc) {
nrows = atoll(argv[++i]);
continue;
}
}
int r;
r = system("rm -rf " __FILE__ ".testdir");
assert(r == 0);
r = mkdir(__FILE__ ".testdir", S_IRWXU+S_IRWXG+S_IRWXO);
assert(r == 0);
DB_ENV *env = NULL;
r = db_env_create(&env, 0);
assert(r == 0);
r = env->open(env, __FILE__ ".testdir", DB_INIT_MPOOL + DB_INIT_LOG + DB_INIT_LOCK + DB_INIT_TXN + DB_PRIVATE + DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO);
assert(r == 0);
// create the db
DB *db = NULL;
r = db_create(&db, env, 0);
assert(r == 0);
r = db->open(db, NULL, "test.db", 0, DB_BTREE, DB_CREATE + DB_AUTO_COMMIT, S_IRWXU+S_IRWXG+S_IRWXO);
assert(r == 0);
// load the db
load_db(env, db, nrows);
uint64_t expect[2] = { nrows, 1 };
test_card(env, db, nrows, expect);
r = db->close(db, 0);
assert(r == 0);
r = env->close(env, 0);
assert(r == 0);
return 0;
}
/* -*- mode: C++; c-basic-offset: 4; indent-tabs-mode: nil -*- */
// vim: ft=cpp:expandtab:ts=8:sw=4:softtabstop=4:
// Test cardinality algorithm on a 2 level key where the first level is random in a space of size maxrand
// and the second level is unique.
#ident "Copyright (c) 2013 Tokutek Inc. All rights reserved."
#include <stdio.h>
#include <stdlib.h>
#include <memory.h>
#include <assert.h>
#include <errno.h>
#include <db.h>
#include <endian.h>
#include <byteswap.h>
#include <sys/stat.h>
typedef unsigned long long ulonglong;
#include "tokudb_status.h"
#include "tokudb_buffer.h"
// Provide some mimimal MySQL classes just to compile the tokudb cardinality functions
class KEY_INFO {
public:
uint flags;
uint64_t *rec_per_key;
};
#define HA_NOSAME 1
class TABLE_SHARE {
public:
uint primary_key;
uint keys;
};
class TABLE {
public:
TABLE_SHARE *s;
KEY_INFO *key_info;
};
uint get_key_parts(KEY_INFO *key_info) {
return 0;
}
#include "tokudb_card.h"
static uint32_t hton32(uint32_t n) {
#if BYTE_ORDER == LITTLE_ENDIAN
return bswap_32(n);
#else
return n;
#endif
}
struct key {
uint32_t r;
uint32_t seq;
}; // __attribute__((packed));
struct val {
uint32_t v0;
}; // __attribute__((packed));
// load nrows into the db
static void load_db(DB_ENV *env, DB *db, uint32_t nrows, uint32_t maxrand) {
DB_TXN *txn = NULL;
int r = env->txn_begin(env, NULL, &txn, 0);
assert(r == 0);
DB_LOADER *loader = NULL;
uint32_t db_flags[1] = { 0 };
uint32_t dbt_flags[1] = { 0 };
uint32_t loader_flags = 0;
r = env->create_loader(env, txn, &loader, db, 1, &db, db_flags, dbt_flags, loader_flags);
assert(r == 0);
for (uint32_t seq = 0; seq < nrows ; seq++) {
struct key k = { hton32(random() % maxrand), hton32(seq) };
struct val v = { seq };
DBT key = { .data = &k, .size = sizeof k };
DBT val = { .data = &v, .size = sizeof v };
r = loader->put(loader, &key, &val);
assert(r == 0);
}
r = loader->close(loader);
assert(r == 0);
r = txn->commit(txn, 0);
assert(r == 0);
}
static int analyze_key_compare(DB *db, const DBT *a, const DBT *b, uint level) {
assert(a->size == b->size);
switch (level) {
default:
assert(0);
case 1:
return memcmp(a->data, b->data, sizeof (uint32_t));
case 2:
assert(a->size == sizeof (struct key));
return memcmp(a->data, b->data, sizeof (struct key));
}
}
static void test_card(DB_ENV *env, DB *db, uint64_t nrows, uint64_t expect[]) {
int r;
DB_TXN *txn = NULL;
r = env->txn_begin(env, NULL, &txn, 0);
assert(r == 0);
uint64_t num_key_parts = 2;
uint64_t rec_per_key[num_key_parts];
r = tokudb::analyze_card(db, txn, false, num_key_parts, rec_per_key, analyze_key_compare, NULL, NULL);
assert(r == 0);
assert(rec_per_key[0] == expect[0]);
assert(rec_per_key[1] == expect[1]);
r = txn->commit(txn, 0);
assert(r == 0);
}
int main(int argc, char * const argv[]) {
uint64_t nrows = 1000000;
uint32_t maxrand = 10;
for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "--nrows") == 0 && i+1 < argc) {
nrows = atoll(argv[++i]);
continue;
}
if (strcmp(argv[i], "--maxrand") == 0 && i+1 < argc) {
maxrand = atoi(argv[++i]);
continue;
}
}
int r;
r = system("rm -rf " __FILE__ ".testdir");
assert(r == 0);
r = mkdir(__FILE__ ".testdir", S_IRWXU+S_IRWXG+S_IRWXO);
assert(r == 0);
DB_ENV *env = NULL;
r = db_env_create(&env, 0);
assert(r == 0);
r = env->open(env, __FILE__ ".testdir", DB_INIT_MPOOL + DB_INIT_LOG + DB_INIT_LOCK + DB_INIT_TXN + DB_PRIVATE + DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO);
assert(r == 0);
// create the db
DB *db = NULL;
r = db_create(&db, env, 0);
assert(r == 0);
r = db->open(db, NULL, "test.db", 0, DB_BTREE, DB_CREATE + DB_AUTO_COMMIT, S_IRWXU+S_IRWXG+S_IRWXO);
assert(r == 0);
// load the db
load_db(env, db, nrows, maxrand);
uint64_t expect[2] = { nrows/maxrand, 1 };
test_card(env, db, nrows, expect);
r = db->close(db, 0);
assert(r == 0);
r = env->close(env, 0);
assert(r == 0);
return 0;
}
// test tokudb cardinality in status dictionary
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <assert.h>
#include <memory.h>
#include <errno.h>
#include <sys/stat.h>
#include <db.h>
typedef unsigned long long ulonglong;
#include <tokudb_status.h>
#include <tokudb_buffer.h>
// Provide some mimimal MySQL classes just to compile the tokudb cardinality functions
class KEY_INFO {
public:
uint flags;
uint64_t *rec_per_key;
};
#define HA_NOSAME 1
class TABLE_SHARE {
public:
uint primary_key;
uint keys;
};
class TABLE {
public:
TABLE_SHARE *s;
KEY_INFO *key_info;
};
uint get_key_parts(KEY_INFO *key_info) {
return 0;
}
#include <tokudb_card.h>
// verify that we can create and close a status dictionary
static void test_create(DB_ENV *env) {
int error;
DB_TXN *txn = NULL;
error = env->txn_begin(env, NULL, &txn, 0);
assert(error == 0);
DB *status_db = NULL;
error = tokudb::create_status(env, &status_db, "status.db", txn);
assert(error == 0);
error = txn->commit(txn, 0);
assert(error == 0);
error = tokudb::close_status(&status_db);
assert(error == 0);
}
// verify that no card row in status works
static void test_no_card(DB_ENV *env) {
int error;
DB_TXN *txn = NULL;
error = env->txn_begin(env, NULL, &txn, 0);
assert(error == 0);
DB *status_db = NULL;
error = tokudb::open_status(env, &status_db, "status.db", txn);
assert(error == 0);
error = tokudb::get_card_from_status(status_db, txn, 0, NULL);
assert(error == DB_NOTFOUND);
error = txn->commit(txn, 0);
assert(error == 0);
error = tokudb::close_status(&status_db);
assert(error == 0);
}
// verify that a card row with 0 array elements works
static void test_0(DB_ENV *env) {
int error;
DB_TXN *txn = NULL;
error = env->txn_begin(env, NULL, &txn, 0);
assert(error == 0);
DB *status_db = NULL;
error = tokudb::open_status(env, &status_db, "status.db", txn);
assert(error == 0);
tokudb::set_card_in_status(status_db, txn, 0, NULL);
error = tokudb::get_card_from_status(status_db, txn, 0, NULL);
assert(error == 0);
error = txn->commit(txn, 0);
assert(error == 0);
error = tokudb::close_status(&status_db);
assert(error == 0);
}
// verify that writing and reading card info works for several sized card arrays
static void test_10(DB_ENV *env) {
int error;
for (uint64_t i = 0; i < 20; i++) {
uint64_t rec_per_key[i];
for (uint64_t j = 0; j < i; j++)
rec_per_key[j] = j == 0 ? 10+i : 10 * rec_per_key[j-1];
DB_TXN *txn = NULL;
error = env->txn_begin(env, NULL, &txn, 0);
assert(error == 0);
DB *status_db = NULL;
error = tokudb::open_status(env, &status_db, "status.db", txn);
assert(error == 0);
tokudb::set_card_in_status(status_db, txn, i, rec_per_key);
uint64_t stored_rec_per_key[i];
error = tokudb::get_card_from_status(status_db, txn, i, stored_rec_per_key);
assert(error == 0);
for (uint64_t j = 0; j < i; j++)
assert(rec_per_key[j] == stored_rec_per_key[j]);
error = txn->commit(txn, 0);
assert(error == 0);
error = tokudb::close_status(&status_db);
assert(error == 0);
error = env->txn_begin(env, NULL, &txn, 0);
assert(error == 0);
error = tokudb::open_status(env, &status_db, "status.db", txn);
assert(error == 0);
tokudb::set_card_in_status(status_db, txn, i, rec_per_key);
error = tokudb::get_card_from_status(status_db, txn, i, stored_rec_per_key);
assert(error == 0);
for (uint64_t j = 0; j < i; j++)
assert(rec_per_key[j] == stored_rec_per_key[j]);
error = txn->commit(txn, 0);
assert(error == 0);
error = tokudb::close_status(&status_db);
assert(error == 0);
}
}
int main(int argc, char *argv[]) {
int error;
error = system("rm -rf " __FILE__ ".testdir");
assert(error == 0);
error = mkdir(__FILE__ ".testdir", S_IRWXU+S_IRWXG+S_IRWXO);
assert(error == 0);
DB_ENV *env = NULL;
error = db_env_create(&env, 0);
assert(error == 0);
error = env->open(env, __FILE__ ".testdir", DB_INIT_MPOOL + DB_INIT_LOG + DB_INIT_LOCK + DB_INIT_TXN + DB_PRIVATE + DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO);
assert(error == 0);
test_create(env);
test_no_card(env);
test_0(env);
test_10(env);
error = env->close(env, 0);
assert(error == 0);
return 0;
}
int TOKUDB_SHARE::get_status(DB_TXN *txn, HA_METADATA_KEY k, DBT *val) {
DBT key = {}; key.data = &k; key.size = sizeof k;
int error = status_block->get(status_block, txn, &key, val, 0);
return error;
}
int TOKUDB_SHARE::get_status(DB_TXN *txn, HA_METADATA_KEY k, void *p, size_t s) {
DBT key = {}; key.data = &k; key.size = sizeof k;
DBT val = {}; val.data = p; val.size = (uint32_t) s; val.flags = DB_DBT_USERMEM;
int error = status_block->get(status_block, txn, &key, &val, 0);
return error;
}
int TOKUDB_SHARE::put_status(DB_TXN *txn, HA_METADATA_KEY k, void *p, size_t s) {
DBT key = {}; key.data = &k; key.size = sizeof k;
DBT val = {}; val.data = p; val.size = (uint32_t) s;
int error = status_block->put(status_block, txn, &key, &val, 0);
return error;
}
int TOKUDB_SHARE::delete_status(DB_TXN *txn, HA_METADATA_KEY k) {
DBT key = {}; key.data = &k; key.size = sizeof k;
int error = status_block->del(status_block, txn, &key, DB_DELETE_ANY);
return error;
}
void TOKUDB_SHARE::set_card_in_key_info(TABLE *table, uint rec_per_keys, uint64_t rec_per_key[]) {
uint next_key_part = 0;
for (uint i = 0; i < table->s->keys; i++) {
bool is_unique_key = (i == table->s->primary_key) || (table->key_info[i].flags & HA_NOSAME);
uint num_key_parts = get_key_parts(&table->key_info[i]);
for (uint j = 0; j < num_key_parts; j++) {
assert(next_key_part < rec_per_keys);
ulong val = rec_per_key[next_key_part++];
if (is_unique_key && j == num_key_parts-1)
val = 1;
table->key_info[i].rec_per_key[j] = val;
}
}
}
#include "tokudb_buffer.h"
void TOKUDB_SHARE::set_card_in_status(DB_TXN *txn, uint rec_per_keys, uint64_t rec_per_key[]) {
// encode cardinality into the buffer
tokudb::buffer b;
size_t s;
s = b.append_ui<uint32_t>(rec_per_keys);
assert(s > 0);
for (uint i = 0; i < rec_per_keys; i++) {
s = b.append_ui<uint64_t>(rec_per_key[i]);
assert(s > 0);
}
// write cardinality to status
int error = put_status(txn, hatoku_cardinality, b.data(), b.size());
assert(error == 0);
}
int TOKUDB_SHARE::get_card_from_status(DB_TXN *txn, uint rec_per_keys, uint64_t rec_per_key[]) {
// read cardinality from status
DBT val = {}; val.flags = DB_DBT_REALLOC;
int error = get_status(txn, hatoku_cardinality, &val);
if (error == 0) {
// decode cardinality from the buffer
tokudb::buffer b(val.data, 0, val.size);
size_t s;
uint32_t num_parts;
s = b.consume_ui<uint32_t>(&num_parts);
if (s == 0 || num_parts != rec_per_keys)
error = EINVAL;
if (error == 0) {
for (uint i = 0; i < rec_per_keys; i++) {
s = b.consume_ui<uint64_t>(&rec_per_key[i]);
if (s == 0) {
error = EINVAL;
break;
}
}
}
}
// cleanup
free(val.data);
return error;
}
void TOKUDB_SHARE::delete_card_from_status(DB_TXN *txn) {
int error = delete_status(txn, hatoku_cardinality);
assert(error == 0);
}
namespace tokudb {
// Set the key_info cardinality counters for the table.
void set_card_in_key_info(TABLE *table, uint rec_per_keys, uint64_t rec_per_key[]) {
uint next_key_part = 0;
for (uint i = 0; i < table->s->keys; i++) {
bool is_unique_key = (i == table->s->primary_key) || (table->key_info[i].flags & HA_NOSAME);
uint num_key_parts = get_key_parts(&table->key_info[i]);
for (uint j = 0; j < num_key_parts; j++) {
assert(next_key_part < rec_per_keys);
ulong val = rec_per_key[next_key_part++];
if (is_unique_key && j == num_key_parts-1)
val = 1;
table->key_info[i].rec_per_key[j] = val;
}
}
}
// Put the cardinality counters into the status dictionary.
void set_card_in_status(DB *status_db, DB_TXN *txn, uint rec_per_keys, uint64_t rec_per_key[]) {
// encode cardinality into the buffer
tokudb::buffer b;
size_t s;
s = b.append_ui<uint32_t>(rec_per_keys);
assert(s > 0);
for (uint i = 0; i < rec_per_keys; i++) {
s = b.append_ui<uint64_t>(rec_per_key[i]);
assert(s > 0);
}
// write cardinality to status
int error = write_to_status(status_db, hatoku_cardinality, b.data(), b.size(), txn);
assert(error == 0);
}
// Get the cardinality counters from the status dictionary.
int get_card_from_status(DB *status_db, DB_TXN *txn, uint rec_per_keys, uint64_t rec_per_key[]) {
// read cardinality from status
void *buf = 0; size_t buf_size = 0;
int error = get_status_realloc(status_db, txn, hatoku_cardinality, &buf, &buf_size);
if (error == 0) {
// decode cardinality from the buffer
tokudb::buffer b(buf, 0, buf_size);
size_t s;
uint32_t num_parts;
s = b.consume_ui<uint32_t>(&num_parts);
if (s == 0 || num_parts != rec_per_keys)
error = EINVAL;
if (error == 0) {
for (uint i = 0; i < rec_per_keys; i++) {
s = b.consume_ui<uint64_t>(&rec_per_key[i]);
if (s == 0) {
error = EINVAL;
break;
}
}
}
}
// cleanup
free(buf);
return error;
}
// Delete the cardinality counters from the status dictionary.
void delete_card_from_status(DB *status_db, DB_TXN *txn) {
int error = remove_from_status(status_db, hatoku_cardinality, txn);
assert(error == 0);
}
// Compute records per key for all key parts of the ith key of the table.
// For each key part, put records per key part in *rec_per_key_part[key_part_index].
// Returns 0 if success, otherwise an error number.
// TODO statistical dives into the FT
int analyze_card(DB *db, DB_TXN *txn, bool is_unique, uint64_t num_key_parts, uint64_t *rec_per_key_part,
int (*key_compare)(DB *, const DBT *, const DBT *, uint),
int (*analyze_progress)(void *extra, uint64_t rows), void *progress_extra) {
int error = 0;
DBC *cursor = NULL;
error = db->cursor(db, txn, &cursor, 0);
if (error == 0) {
uint64_t rows = 0;
uint64_t unique_rows[num_key_parts];
for (uint64_t i = 0; i < num_key_parts; i++)
unique_rows[i] = 1;
// stop looking when the entire dictionary was analyzed, or a cap on execution time was reached, or the analyze was killed.
DBT key = {}; key.flags = DB_DBT_REALLOC;
DBT prev_key = {}; prev_key.flags = DB_DBT_REALLOC;
while (1) {
error = cursor->c_get(cursor, &key, 0, DB_NEXT);
if (error != 0) {
if (error == DB_NOTFOUND)
error = 0; // eof is not an error
break;
}
rows++;
// first row is a unique row, otherwise compare with the previous key
bool copy_key = false;
if (rows == 1) {
copy_key = true;
} else {
// compare this key with the previous key. ignore appended PK for SK's.
// TODO if a prefix is different, then all larger keys that include the prefix are also different.
// TODO if we are comparing the entire primary key or the entire unique secondary key, then the cardinality must be 1,
// so we can avoid computing it.
for (uint64_t i = 0; i < num_key_parts; i++) {
int cmp = key_compare(db, &prev_key, &key, i+1);
if (cmp != 0) {
unique_rows[i]++;
copy_key = true;
}
}
}
// prev_key = key
if (copy_key) {
prev_key.data = realloc(prev_key.data, key.size);
assert(prev_key.data);
prev_key.size = key.size;
memcpy(prev_key.data, key.data, prev_key.size);
}
// check for limit
if (analyze_progress && (rows % 1000) == 0) {
error = analyze_progress(progress_extra, rows);
if (error)
break;
}
}
// cleanup
free(key.data);
free(prev_key.data);
int close_error = cursor->c_close(cursor);
assert(close_error == 0);
// return cardinality
if (error == 0) {
for (uint64_t i = 0; i < num_key_parts; i++)
rec_per_key_part[i] = rows / unique_rows[i];
}
}
return error;
}
}
#ifndef _TOKUDB_STATUS_H
#define _TOKUDB_STATUS_H
// These are keys that will be used for retrieving metadata in status.tokudb
// To get the version, one looks up the value associated with key hatoku_version
// in status.tokudb
typedef ulonglong HA_METADATA_KEY;
#define hatoku_old_version 0
#define hatoku_capabilities 1
#define hatoku_max_ai 2 //maximum auto increment value found so far
#define hatoku_ai_create_value 3
#define hatoku_key_name 4
#define hatoku_frm_data 5
#define hatoku_new_version 6
#define hatoku_cardinality 7
// use a very small pagesize for the status dictionary
#define status_dict_pagesize 1024
namespace tokudb {
// get the value for a given key in the status dictionary. copy the value to the supplied buffer.
// returns 0 if successful.
int get_status(DB *status_db, DB_TXN *txn, HA_METADATA_KEY k, void *p, size_t s, size_t *sp) {
DBT key = {}; key.data = &k; key.size = sizeof k;
DBT val = {}; val.data = p; val.ulen = (uint32_t) s; val.flags = DB_DBT_USERMEM;
int error = status_db->get(status_db, txn, &key, &val, 0);
if (error == 0) {
*sp = val.size;
}
return error;
}
// get the value for a given key in the status dictionary. put the value in a realloced buffer.
// returns 0 if successful.
int get_status_realloc(DB *status_db, DB_TXN *txn, HA_METADATA_KEY k, void **pp, size_t *sp) {
DBT key = {}; key.data = &k; key.size = sizeof k;
DBT val = {}; val.data = *pp; val.size = (uint32_t) *sp; val.flags = DB_DBT_REALLOC;
int error = status_db->get(status_db, txn, &key, &val, 0);
if (error == 0) {
*pp = val.data;
*sp = val.size;
}
return error;
}
// write a key value pair into the status dictionary, overwriting the previous value if any.
// auto create a txn if necessary.
// returns 0 if successful.
int write_metadata(DB *status_db, void *key_data, uint key_size, void* val_data, uint val_size, DB_TXN *txn) {
DBT key = {}; key.data = key_data; key.size = key_size;
DBT value = {}; value.data = val_data; value.size = val_size;
int error = status_db->put(status_db, txn, &key, &value, 0);
return error;
}
// write a key value pair into the status dictionary, overwriting the previous value if any.
// the key must be a HA_METADATA_KEY.
// returns 0 if successful.
int write_to_status(DB *status_db, HA_METADATA_KEY curr_key_data, void *val, size_t val_size, DB_TXN *txn) {
return write_metadata(status_db, &curr_key_data, sizeof curr_key_data, val, val_size, txn);
}
// remove a key from the status dictionary.
// auto create a txn if necessary.
// returns 0 if successful.
int remove_metadata(DB *status_db, void *key_data, uint key_size, DB_TXN *txn) {
DBT key = {}; key.data = key_data; key.size = key_size;
int error = status_db->del(status_db, txn, &key, DB_DELETE_ANY);
return error;
}
// remove a key from the status dictionary.
// the key must be a HA_METADATA_KEY
// returns 0 if successful.
int remove_from_status(DB *status_db, HA_METADATA_KEY curr_key_data, DB_TXN *txn) {
return remove_metadata(status_db, &curr_key_data, sizeof curr_key_data, txn);
}
int close_status(DB **status_db_ptr) {
int error = 0;
DB *status_db = *status_db_ptr;
if (status_db) {
error = status_db->close(status_db, 0);
if (error == 0)
*status_db_ptr = NULL;
}
return error;
}
int create_status(DB_ENV *env, DB **status_db_ptr, const char *name, DB_TXN *txn) {
int error;
DB *status_db = NULL;
error = db_create(&status_db, env, 0);
if (error == 0) {
error = status_db->set_pagesize(status_db, status_dict_pagesize);
}
if (error == 0) {
error = status_db->open(status_db, txn, name, NULL, DB_BTREE, DB_CREATE | DB_EXCL, 0);
}
if (error == 0) {
*status_db_ptr = status_db;
} else {
int r = close_status(&status_db);
assert(r == 0);
}
return error;
}
int open_status(DB_ENV *env, DB **status_db_ptr, const char *name, DB_TXN *txn) {
int error = 0;
DB *status_db = NULL;
error = db_create(&status_db, env, 0);
if (error == 0) {
error = status_db->open(status_db, txn, name, NULL, DB_BTREE, DB_THREAD, 0);
}
if (error == 0) {
uint32_t pagesize = 0;
error = status_db->get_pagesize(status_db, &pagesize);
if (error == 0 && pagesize > status_dict_pagesize) {
error = status_db->change_pagesize(status_db, status_dict_pagesize);
}
}
if (error == 0) {
*status_db_ptr = status_db;
} else {
int r = close_status(&status_db);
assert(r == 0);
}
return error;
}
}
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment