Commit 2092ff6e authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:2254], merge to main line

git-svn-id: file:///svn/mysql/tokudb-engine/src@16938 c7de825b-a66e-492c-adef-691d508d4ae1
parent 338a3265
......@@ -66,28 +66,70 @@ static const char *ha_tokudb_exts[] = {
//
// This offset is calculated starting from AFTER the NULL bytes
//
inline uint get_var_len_offset(TOKUDB_SHARE* share, TABLE_SHARE* table_share, uint keynr) {
inline u_int32_t get_var_len_offset(KEY_AND_COL_INFO* kc_info, TABLE_SHARE* table_share, uint keynr) {
uint offset = 0;
for (uint i = 0; i < table_share->fields; i++) {
if (share->field_lengths[i] && !bitmap_is_set(&share->key_filters[keynr],i)) {
offset += share->field_lengths[i];
if (kc_info->field_lengths[i] && !bitmap_is_set(&kc_info->key_filters[keynr],i)) {
offset += kc_info->field_lengths[i];
}
}
return offset;
}
inline uint get_len_of_offsets(TOKUDB_SHARE* share, TABLE_SHARE* table_share, uint keynr) {
inline u_int32_t get_len_of_offsets(KEY_AND_COL_INFO* kc_info, TABLE_SHARE* table_share, uint keynr) {
uint len = 0;
for (uint i = 0; i < table_share->fields; i++) {
if (share->length_bytes[i] && !bitmap_is_set(&share->key_filters[keynr],i)) {
len += share->num_offset_bytes;
if (kc_info->length_bytes[i] && !bitmap_is_set(&kc_info->key_filters[keynr],i)) {
len += kc_info->num_offset_bytes;
}
}
return len;
}
static int allocate_key_and_col_info ( struct st_table_share *table_share, KEY_AND_COL_INFO* kc_info) {
int error;
//
// initialize all of the bitmaps
//
for (uint i = 0; i < MAX_KEY + 1; i++) {
error = bitmap_init(
&kc_info->key_filters[i],
NULL,
table_share->fields,
false
);
if (error) {
goto exit;
}
}
//
// create the field lengths
//
kc_info->field_lengths = (uchar *)my_malloc(table_share->fields, MYF(MY_WME | MY_ZEROFILL));
kc_info->length_bytes= (uchar *)my_malloc(table_share->fields, MYF(MY_WME | MY_ZEROFILL));
kc_info->blob_fields= (u_int32_t *)my_malloc(table_share->fields*sizeof(u_int32_t), MYF(MY_WME | MY_ZEROFILL));
if (kc_info->field_lengths == NULL ||
kc_info->length_bytes == NULL ||
kc_info->blob_fields == NULL ) {
error = ENOMEM;
goto exit;
}
exit:
if (error) {
for (uint i = 0; MAX_KEY + 1; i++) {
bitmap_free(&kc_info->key_filters[i]);
}
my_free(kc_info->field_lengths, MYF(MY_ALLOW_ZERO_PTR));
my_free(kc_info->length_bytes, MYF(MY_ALLOW_ZERO_PTR));
my_free(kc_info->blob_fields, MYF(MY_ALLOW_ZERO_PTR));
}
return error;
}
/** @brief
Simple lock controls. The "share" it creates is a structure we will
pass to each tokudb handler. Do you have to have one of these? Well, you have
......@@ -97,7 +139,6 @@ static TOKUDB_SHARE *get_share(const char *table_name, struct st_table_share *ta
TOKUDB_SHARE *share = NULL;
int error = 0;
uint length;
uint i = 0;
pthread_mutex_lock(&tokudb_mutex);
length = (uint) strlen(table_name);
......@@ -122,33 +163,10 @@ static TOKUDB_SHARE *get_share(const char *table_name, struct st_table_share *ta
share->table_name = tmp_name;
strmov(share->table_name, table_name);
//
// initialize all of the bitmaps
//
for (i = 0; i < MAX_KEY + 1; i++) {
error = bitmap_init(
&share->key_filters[i],
NULL,
table_share->fields,
false
);
error = allocate_key_and_col_info(table_share, &share->kc_info);
if (error) {
goto exit;
}
}
//
// create the field lengths
//
share->field_lengths = (uchar *)my_malloc(table_share->fields, MYF(MY_WME | MY_ZEROFILL));
share->length_bytes= (uchar *)my_malloc(table_share->fields, MYF(MY_WME | MY_ZEROFILL));
share->blob_fields= (u_int32_t *)my_malloc(table_share->fields*sizeof(u_int32_t), MYF(MY_WME | MY_ZEROFILL));
if (share->field_lengths == NULL ||
share->length_bytes == NULL ||
share->blob_fields == NULL ) {
goto exit;
}
bzero((void *) share->key_file, sizeof(share->key_file));
......@@ -163,12 +181,6 @@ static TOKUDB_SHARE *get_share(const char *table_name, struct st_table_share *ta
exit:
if (error) {
for (i = 0; MAX_KEY + 1; i++) {
bitmap_free(&share->key_filters[i]);
}
my_free(share->field_lengths, MYF(MY_ALLOW_ZERO_PTR));
my_free(share->length_bytes, MYF(MY_ALLOW_ZERO_PTR));
my_free(share->blob_fields, MYF(MY_ALLOW_ZERO_PTR));
pthread_mutex_destroy(&share->mutex);
my_free((uchar *) share, MYF(0));
share = NULL;
......@@ -176,6 +188,21 @@ exit:
return share;
}
void free_key_and_col_info (KEY_AND_COL_INFO* kc_info) {
for (uint i = 0; i < MAX_KEY+1; i++) {
bitmap_free(&kc_info->key_filters[i]);
}
for (uint i = 0; i < MAX_KEY+1; i++) {
my_free(kc_info->cp_info[i], MYF(MY_ALLOW_ZERO_PTR));
}
my_free(kc_info->field_lengths, MYF(MY_ALLOW_ZERO_PTR));
my_free(kc_info->length_bytes, MYF(MY_ALLOW_ZERO_PTR));
my_free(kc_info->blob_fields, MYF(MY_ALLOW_ZERO_PTR));
}
static int free_share(TOKUDB_SHARE * share, bool mutex_is_locked) {
int error, result = 0;
......@@ -204,17 +231,8 @@ static int free_share(TOKUDB_SHARE * share, bool mutex_is_locked) {
share->key_file[i] = NULL;
}
}
for (uint i = 0; i < MAX_KEY+1; i++) {
bitmap_free(&share->key_filters[i]);
}
for (uint i = 0; i < MAX_KEY+1; i++) {
my_free(share->cp_info[i], MYF(MY_ALLOW_ZERO_PTR));
}
my_free(share->field_lengths, MYF(MY_ALLOW_ZERO_PTR));
my_free(share->length_bytes, MYF(MY_ALLOW_ZERO_PTR));
my_free(share->blob_fields, MYF(MY_ALLOW_ZERO_PTR));
free_key_and_col_info(&share->kc_info);
if (share->status_block && (error = share->status_block->close(share->status_block, 0))) {
assert(error == 0);
......@@ -292,6 +310,11 @@ typedef struct smart_dbt_ai_info {
uint pk_index;
} *SMART_DBT_AI_INFO;
typedef struct row_buffers {
uchar** key_buff;
uchar** rec_buff;
} *ROW_BUFFERS;
static int smart_dbt_ai_callback (DBT const *key, DBT const *row, void *context) {
int error = 0;
SMART_DBT_AI_INFO info = (SMART_DBT_AI_INFO)context;
......@@ -510,9 +533,6 @@ ulonglong retrieve_auto_increment(uint16 type, uint32 offset,const uchar *record
}
inline uint get_null_offset(TABLE* table, Field* field) {
return (uint) ((uchar*) field->null_ptr - (uchar*) table->record[0]);
}
inline bool
is_null_field( TABLE* table, Field* field, const uchar* record) {
......@@ -788,39 +808,6 @@ uchar* pack_toku_field_blob(
return (to_tokudb + len_bytes + length);
}
const uchar* unpack_toku_field_blob(
uchar *to_mysql,
const uchar* from_tokudb,
Field* field
)
{
u_int32_t len_bytes = field->row_pack_length();
u_int32_t length = 0;
const uchar* data_ptr = NULL;
memcpy(to_mysql, from_tokudb, len_bytes);
switch (len_bytes) {
case (1):
length = (u_int32_t)(*from_tokudb);
break;
case (2):
length = uint2korr(from_tokudb);
break;
case (3):
length = uint3korr(from_tokudb);
break;
case (4):
length = uint4korr(from_tokudb);
break;
default:
assert(false);
}
data_ptr = from_tokudb + len_bytes;
memcpy(to_mysql + len_bytes, (uchar *)(&data_ptr), sizeof(uchar *));
return (from_tokudb + len_bytes + length);
}
static int add_table_to_metadata(const char *name, TABLE* table, DB_TXN* txn) {
int error = 0;
......@@ -976,6 +963,122 @@ cleanup:
return error;
}
int generate_keys_vals_for_put(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra) {
int error;
DBT pk_key, pk_val;
ROW_BUFFERS row_buffs = NULL;
bzero(&pk_key, sizeof(pk_key));
bzero(&pk_val, sizeof(pk_val));
pk_key.size = *(u_int32_t *)row->data;
pk_key.data = (uchar *)row->data + sizeof(u_int32_t);
pk_val.size = row->size - pk_key.size - sizeof(u_int32_t);
pk_val.data = (uchar *)pk_key.data + pk_key.size;
row_buffs = (ROW_BUFFERS)extra;
for ( u_int32_t i = 0; i < num_dbs; i++) {
DB* curr_db = dbs[i];
uchar* row_desc = NULL;
u_int32_t desc_size;
row_desc = (uchar *)curr_db->descriptor->data;
row_desc += (*(u_int32_t *)row_desc);
desc_size = (*(u_int32_t *)row_desc) - 4;
row_desc += 4;
if (is_key_pk(row_desc, desc_size)) {
keys[i].data = pk_key.data;
keys[i].size = pk_key.size;
vals[i].data = pk_val.data;
vals[i].size = pk_val.size;
continue;
}
else {
uchar* buff = NULL;
u_int32_t max_key_len = 0;
if (row_buffs != NULL) {
buff = row_buffs->key_buff[i];
}
else {
max_key_len = max_key_size_from_desc(row_desc, desc_size);
max_key_len += pk_key.size;
buff = (uchar *)my_malloc(max_key_len, MYF(MY_WME));
assert(buff != NULL && max_key_len > 0);
}
keys[i].size = pack_key_from_desc(
buff,
row_desc,
desc_size,
&pk_key,
&pk_val
);
if (tokudb_debug & TOKUDB_DEBUG_CHECK_KEY && !max_key_len) {
max_key_len = max_key_size_from_desc(row_desc, desc_size);
max_key_len += pk_key.size;
}
if (max_key_len) {
assert(max_key_len >= keys[i].size);
}
keys[i].data = buff;
}
row_desc += desc_size;
desc_size = (*(u_int32_t *)row_desc) - 4;
row_desc += 4;
if (!is_key_clustering(row_desc, desc_size)) {
bzero(&vals[i], sizeof(DBT));
}
else {
uchar* buff = NULL;
if (row_buffs != NULL) {
buff = row_buffs->rec_buff[i];
}
else {
buff = (uchar *)my_malloc(pk_val.size, MYF(MY_WME));
assert(buff != NULL);
}
vals[i].size = pack_clustering_val_from_desc(
buff,
row_desc,
desc_size,
&pk_val
);
vals[i].data = buff;
}
}
error = 0;
return error;
}
int cleanup_keys_vals_for_put(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra) {
if (extra == NULL) {
//
// handle allocation of buffers in recovery case later
//
for (u_int32_t i = 0; i < num_dbs; i++) {
DB* curr_db = dbs[i];
uchar* row_desc = NULL;
u_int32_t desc_size;
row_desc = (uchar *)curr_db->descriptor->data;
row_desc += (*(u_int32_t *)row_desc);
desc_size = (*(u_int32_t *)row_desc) - 4;
row_desc += 4;
if (is_key_pk(row_desc, desc_size)) {
continue;
}
else {
my_free(keys[i].data, MYF(MY_ALLOW_ZERO_PTR));
my_free(vals[i].data, MYF(MY_ALLOW_ZERO_PTR));
}
}
}
return 0;
}
ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, table_arg)
// flags defined in sql\handler.h
......@@ -1003,6 +1106,8 @@ ha_tokudb::ha_tokudb(handlerton * hton, TABLE_SHARE * table_arg):handler(hton, t
num_blob_bytes = 0;
delay_updating_ai_metadata = false;
ai_metadata_update_required = false;
bzero(mult_key_buff, sizeof(mult_key_buff));
bzero(mult_rec_buff, sizeof(mult_rec_buff));
}
//
......@@ -1160,17 +1265,17 @@ cleanup:
return error;
}
int initialize_col_pack_info(TOKUDB_SHARE* share, TABLE_SHARE* table_share, uint keynr) {
int initialize_col_pack_info(KEY_AND_COL_INFO* kc_info, TABLE_SHARE* table_share, uint keynr) {
int error = ENOSYS;
//
// set up the cp_info
//
assert(share->cp_info[keynr] == NULL);
share->cp_info[keynr] = (COL_PACK_INFO *)my_malloc(
assert(kc_info->cp_info[keynr] == NULL);
kc_info->cp_info[keynr] = (COL_PACK_INFO *)my_malloc(
table_share->fields*sizeof(COL_PACK_INFO),
MYF(MY_WME | MY_ZEROFILL)
);
if (share->cp_info[keynr] == NULL) {
if (kc_info->cp_info[keynr] == NULL) {
error = ENOMEM;
goto exit;
}
......@@ -1178,17 +1283,17 @@ int initialize_col_pack_info(TOKUDB_SHARE* share, TABLE_SHARE* table_share, uint
u_int32_t curr_fixed_offset = 0;
u_int32_t curr_var_index = 0;
for (uint j = 0; j < table_share->fields; j++) {
COL_PACK_INFO* curr = &share->cp_info[keynr][j];
COL_PACK_INFO* curr = &kc_info->cp_info[keynr][j];
//
// need to set the offsets / indexes
// offsets are calculated AFTER the NULL bytes
//
if (!bitmap_is_set(&share->key_filters[keynr],j)) {
if (share->field_lengths[j]) {
if (!bitmap_is_set(&kc_info->key_filters[keynr],j)) {
if (kc_info->field_lengths[j]) {
curr->col_pack_val = curr_fixed_offset;
curr_fixed_offset += share->field_lengths[j];
curr_fixed_offset += kc_info->field_lengths[j];
}
else if (share->length_bytes[j]) {
else if (kc_info->length_bytes[j]) {
curr->col_pack_val = curr_var_index;
curr_var_index++;
}
......@@ -1198,13 +1303,13 @@ int initialize_col_pack_info(TOKUDB_SHARE* share, TABLE_SHARE* table_share, uint
//
// set up the mcp_info
//
share->mcp_info[keynr].var_len_offset = get_var_len_offset(
share,
kc_info->mcp_info[keynr].var_len_offset = get_var_len_offset(
kc_info,
table_share,
keynr
);
share->mcp_info[keynr].len_of_offsets = get_len_of_offsets(
share,
kc_info->mcp_info[keynr].len_of_offsets = get_len_of_offsets(
kc_info,
table_share,
keynr
);
......@@ -1215,31 +1320,10 @@ exit:
return error;
}
int ha_tokudb::initialize_share(
const char* name,
int mode
)
{
int error = 0;
u_int64_t num_rows = 0;
int initialize_key_and_col_info(struct st_table_share *table_share, TABLE* table, KEY_AND_COL_INFO* kc_info, uint hidden_primary_key, uint primary_key) {
int error;
u_int32_t curr_blob_field_index = 0;
u_int32_t max_var_bytes = 0;
bool table_exists;
DBUG_PRINT("info", ("share->use_count %u", share->use_count));
table_exists = true;
error = check_table_in_metadata(name, &table_exists);
if (error) {
goto exit;
}
if (!table_exists) {
sql_print_error("table %s does not exist in metadata, was it moved from someplace else? Not opening table", name);
error = HA_ADMIN_FAILED;
goto exit;
}
//
// fill in the field lengths. 0 means it is a variable sized field length
// fill in length_bytes, 0 means it is fixed or blob
......@@ -1256,13 +1340,13 @@ int ha_tokudb::initialize_share(
case toku_type_fixstring:
pack_length = field->pack_length();
assert(pack_length < 256);
share->field_lengths[i] = (uchar)pack_length;
share->length_bytes[i] = 0;
kc_info->field_lengths[i] = (uchar)pack_length;
kc_info->length_bytes[i] = 0;
break;
case toku_type_blob:
share->field_lengths[i] = 0;
share->length_bytes[i] = 0;
share->blob_fields[curr_blob_field_index] = i;
kc_info->field_lengths[i] = 0;
kc_info->length_bytes[i] = 0;
kc_info->blob_fields[curr_blob_field_index] = i;
curr_blob_field_index++;
break;
case toku_type_varstring:
......@@ -1270,15 +1354,15 @@ int ha_tokudb::initialize_share(
//
// meaning it is variable sized
//
share->field_lengths[i] = 0;
share->length_bytes[i] = (uchar)((Field_varstring *)field)->length_bytes;
kc_info->field_lengths[i] = 0;
kc_info->length_bytes[i] = (uchar)((Field_varstring *)field)->length_bytes;
max_var_bytes += field->field_length;
break;
default:
assert(false);
}
}
share->num_blobs = curr_blob_field_index;
kc_info->num_blobs = curr_blob_field_index;
//
// initialize share->num_offset_bytes
......@@ -1286,10 +1370,10 @@ int ha_tokudb::initialize_share(
// can safely set num_offset_bytes to 1 or 2
//
if (max_var_bytes < 256) {
share->num_offset_bytes = 1;
kc_info->num_offset_bytes = 1;
}
else {
share->num_offset_bytes = 2;
kc_info->num_offset_bytes = 2;
}
......@@ -1300,7 +1384,7 @@ int ha_tokudb::initialize_share(
if (! (i==primary_key && hidden_primary_key) ){
if ( i == primary_key ) {
set_key_filter(
&share->key_filters[primary_key],
&kc_info->key_filters[primary_key],
&table_share->key_info[primary_key],
table,
true
......@@ -1308,14 +1392,14 @@ int ha_tokudb::initialize_share(
}
if (table_share->key_info[i].flags & HA_CLUSTERING) {
set_key_filter(
&share->key_filters[i],
&kc_info->key_filters[i],
&table_share->key_info[i],
table,
true
);
if (!hidden_primary_key) {
set_key_filter(
&share->key_filters[i],
&kc_info->key_filters[i],
&table_share->key_info[primary_key],
table,
true
......@@ -1324,19 +1408,58 @@ int ha_tokudb::initialize_share(
}
}
if (i == primary_key || table_share->key_info[i].flags & HA_CLUSTERING) {
error = initialize_col_pack_info(share,table_share,i);
error = initialize_col_pack_info(kc_info,table_share,i);
if (error) {
goto exit;
}
}
}
exit:
return error;
}
int ha_tokudb::initialize_share(
const char* name,
int mode
)
{
int error = 0;
u_int64_t num_rows = 0;
bool table_exists;
DBUG_PRINT("info", ("share->use_count %u", share->use_count));
table_exists = true;
error = check_table_in_metadata(name, &table_exists);
if (error) {
goto exit;
}
if (!table_exists) {
sql_print_error("table %s does not exist in metadata, was it moved from someplace else? Not opening table", name);
error = HA_ADMIN_FAILED;
goto exit;
}
error = initialize_key_and_col_info(
table_share,
table,
&share->kc_info,
hidden_primary_key,
primary_key
);
if (error) { goto exit; }
error = open_main_dictionary(name, mode == O_RDONLY, NULL);
if (error) { goto exit; }
share->has_unique_keys = false;
/* Open other keys; These are part of the share structure */
for (uint i = 0; i < table_share->keys; i++) {
for (uint i = 0; i < table_share->keys + test(hidden_primary_key); i++) {
if (table_share->key_info[i].flags & HA_NOSAME) {
share->has_unique_keys = true;
}
if (i != primary_key) {
error = open_secondary_dictionary(
&share->key_file[i],
......@@ -1348,6 +1471,10 @@ int ha_tokudb::initialize_share(
if (error) {
goto exit;
}
share->mult_put_flags[i] = DB_YESOVERWRITE;
}
else {
share->mult_put_flags[i] = DB_NOOVERWRITE;
}
}
if (!hidden_primary_key) {
......@@ -1421,7 +1548,6 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
TOKUDB_DBUG_ENTER("ha_tokudb::open %p %s", this, name);
TOKUDB_OPEN();
uint max_key_length;
int error = 0;
int ret_val = 0;
......@@ -1461,13 +1587,26 @@ int ha_tokudb::open(const char *name, int mode, uint test_if_locked) {
}
alloced_rec_buff_length = table_share->rec_buff_length + table_share->fields;
rec_buff = (uchar *) my_malloc(alloced_rec_buff_length, MYF(MY_WME));
rec_buff = (uchar *) my_malloc(alloced_rec_buff_length + max_key_length + sizeof(u_int32_t), MYF(MY_WME));
if (rec_buff == NULL) {
ret_val = 1;
goto exit;
}
for (u_int32_t i = 0; i < (table_share->keys); i++) {
if (i == primary_key) {
continue;
}
mult_key_buff[i] = (uchar *)my_malloc(max_key_length, MYF(MY_WME));
assert(mult_key_buff[i] != NULL);
if (table_share->key_info[i].flags & HA_CLUSTERING) {
mult_rec_buff[i] = (uchar *) my_malloc(alloced_rec_buff_length, MYF(MY_WME));
assert(mult_rec_buff[i]);
}
}
alloced_mult_rec_buff_length = alloced_rec_buff_length;
/* Init shared structure */
share = get_share(name, table_share);
if (share == NULL) {
......@@ -1509,6 +1648,11 @@ exit:
alloc_ptr = NULL;
my_free(rec_buff, MYF(MY_ALLOW_ZERO_PTR));
rec_buff = NULL;
for (u_int32_t i = 0; i < (table_share->keys); i++) {
my_free(mult_key_buff[i], MYF(MY_ALLOW_ZERO_PTR));
my_free(mult_rec_buff[i], MYF(MY_ALLOW_ZERO_PTR));
}
if (error) {
my_errno = error;
}
......@@ -1727,6 +1871,10 @@ int ha_tokudb::__close(int mutex_is_locked) {
my_free(rec_buff, MYF(MY_ALLOW_ZERO_PTR));
my_free(blob_buff, MYF(MY_ALLOW_ZERO_PTR));
my_free(alloc_ptr, MYF(MY_ALLOW_ZERO_PTR));
for (u_int32_t i = 0; i < (table_share->keys); i++) {
my_free(mult_key_buff[i], MYF(MY_ALLOW_ZERO_PTR));
my_free(mult_rec_buff[i], MYF(MY_ALLOW_ZERO_PTR));
}
rec_buff = NULL;
alloc_ptr = NULL;
ha_tokudb::reset();
......@@ -1740,9 +1888,9 @@ int ha_tokudb::__close(int mutex_is_locked) {
// length - size of buffer required for rec_buff
//
bool ha_tokudb::fix_rec_buff_for_blob(ulong length) {
if (!rec_buff || length > alloced_rec_buff_length) {
if (!rec_buff || (length > alloced_rec_buff_length)) {
uchar *newptr;
if (!(newptr = (uchar *) my_realloc((void *) rec_buff, length, MYF(MY_ALLOW_ZERO_PTR))))
if (!(newptr = (uchar *) my_realloc((void *) rec_buff, length+max_key_length+sizeof(u_int32_t), MYF(MY_ALLOW_ZERO_PTR))))
return 1;
rec_buff = newptr;
alloced_rec_buff_length = length;
......@@ -1750,6 +1898,22 @@ bool ha_tokudb::fix_rec_buff_for_blob(ulong length) {
return 0;
}
void ha_tokudb::fix_mult_rec_buff() {
if (alloced_rec_buff_length > alloced_mult_rec_buff_length) {
for (uint i = 0; i < table_share->keys; i++) {
if (table_share->key_info[i].flags & HA_CLUSTERING) {
uchar *newptr;
if (!(newptr = (uchar *) my_realloc((void *) mult_rec_buff[i], alloced_rec_buff_length, MYF(MY_ALLOW_ZERO_PTR)))) {
assert(false);
}
mult_rec_buff[i] = newptr;
}
}
alloced_mult_rec_buff_length = alloced_rec_buff_length;
}
}
/* Calculate max length needed for row */
ulong ha_tokudb::max_row_length(const uchar * buf) {
ulong length = table_share->reclength + table_share->fields * 2;
......@@ -1773,15 +1937,18 @@ ulong ha_tokudb::max_row_length(const uchar * buf) {
// pre-allocated.
// Parameters:
// [out] row - row stored in DBT to be converted
// [out] buf - buffer where row is packed
// [in] record - row in MySQL format
//
int ha_tokudb::pack_row(
DBT * row,
uchar* buf,
const uchar* record,
uint index
)
{
uchar* dest_buf = NULL;
uchar* fixed_field_ptr = NULL;
uchar* var_field_offset_ptr = NULL;
uchar* start_field_data_ptr = NULL;
......@@ -1791,19 +1958,21 @@ int ha_tokudb::pack_row(
my_bitmap_map *old_map = dbug_tmp_use_all_columns(table, table->write_set);
if (table_share->blob_fields) {
if ((buf == NULL) && table_share->blob_fields) {
if (fix_rec_buff_for_blob(max_row_length(record))) {
r = HA_ERR_OUT_OF_MEM;
goto cleanup;
}
}
dest_buf = (buf == NULL) ? rec_buff : buf;
/* Copy null bits */
memcpy(rec_buff, record, table_share->null_bytes);
fixed_field_ptr = rec_buff + table_share->null_bytes;
var_field_offset_ptr = fixed_field_ptr + share->mcp_info[index].var_len_offset;
start_field_data_ptr = var_field_offset_ptr + share->mcp_info[index].len_of_offsets;
var_field_data_ptr = var_field_offset_ptr + share->mcp_info[index].len_of_offsets;
memcpy(dest_buf, record, table_share->null_bytes);
fixed_field_ptr = dest_buf + table_share->null_bytes;
var_field_offset_ptr = fixed_field_ptr + share->kc_info.mcp_info[index].var_len_offset;
start_field_data_ptr = var_field_offset_ptr + share->kc_info.mcp_info[index].len_of_offsets;
var_field_data_ptr = var_field_offset_ptr + share->kc_info.mcp_info[index].len_of_offsets;
//
// assert that when the hidden primary key exists, primary_key_offsets is NULL
......@@ -1811,31 +1980,31 @@ int ha_tokudb::pack_row(
for (uint i = 0; i < table_share->fields; i++) {
Field* field = table->field[i];
uint curr_field_offset = field_offset(field, table);
if (bitmap_is_set(&share->key_filters[index],i)) {
if (bitmap_is_set(&share->kc_info.key_filters[index],i)) {
continue;
}
if (share->field_lengths[i]) {
if (share->kc_info.field_lengths[i]) {
fixed_field_ptr = pack_fixed_field(
fixed_field_ptr,
record + curr_field_offset,
share->field_lengths[i]
share->kc_info.field_lengths[i]
);
}
else if (share->length_bytes[i]) {
else if (share->kc_info.length_bytes[i]) {
var_field_data_ptr = pack_var_field(
var_field_offset_ptr,
var_field_data_ptr,
start_field_data_ptr,
record + curr_field_offset,
share->length_bytes[i],
share->num_offset_bytes
share->kc_info.length_bytes[i],
share->kc_info.num_offset_bytes
);
var_field_offset_ptr += share->num_offset_bytes;
var_field_offset_ptr += share->kc_info.num_offset_bytes;
}
}
for (uint i = 0; i < share->num_blobs; i++) {
Field* field = table->field[share->blob_fields[i]];
for (uint i = 0; i < share->kc_info.num_blobs; i++) {
Field* field = table->field[share->kc_info.blob_fields[i]];
var_field_data_ptr = pack_toku_field_blob(
var_field_data_ptr,
record + field_offset(field, table),
......@@ -1843,8 +2012,8 @@ int ha_tokudb::pack_row(
);
}
row->data = rec_buff;
row->size = (size_t) (var_field_data_ptr - rec_buff);
row->data = dest_buf;
row->size = (size_t) (var_field_data_ptr - dest_buf);
r = 0;
cleanup:
......@@ -1865,7 +2034,7 @@ int ha_tokudb::unpack_blobs(
//
// assert that num_bytes > 0 iff share->num_blobs > 0
//
assert( !((share->num_blobs == 0) && (num_bytes > 0)) );
assert( !((share->kc_info.num_blobs == 0) && (num_bytes > 0)) );
if (num_bytes > num_blob_bytes) {
ptr = (uchar *)my_realloc((void *)blob_buff, num_bytes, MYF(MY_ALLOW_ZERO_PTR));
if (ptr == NULL) {
......@@ -1878,12 +2047,14 @@ int ha_tokudb::unpack_blobs(
memcpy(blob_buff, from_tokudb_blob, num_bytes);
buff= blob_buff;
for (uint i = 0; i < share->num_blobs; i++) {
Field* field = table->field[share->blob_fields[i]];
for (uint i = 0; i < share->kc_info.num_blobs; i++) {
Field* field = table->field[share->kc_info.blob_fields[i]];
u_int32_t len_bytes = field->row_pack_length();
buff = unpack_toku_field_blob(
record + field_offset(field, table),
buff,
field
len_bytes,
false
);
}
......@@ -1892,7 +2063,6 @@ exit:
return error;
}
//
// take the row passed in as a DBT*, and convert it into a row in MySQL format in record
// Parameters:
......@@ -1920,8 +2090,8 @@ int ha_tokudb::unpack_row(
memcpy(record, fixed_field_ptr, table_share->null_bytes);
fixed_field_ptr += table_share->null_bytes;
var_field_offset_ptr = fixed_field_ptr + share->mcp_info[index].var_len_offset;
var_field_data_ptr = var_field_offset_ptr + share->mcp_info[index].len_of_offsets;
var_field_offset_ptr = fixed_field_ptr + share->kc_info.mcp_info[index].var_len_offset;
var_field_data_ptr = var_field_offset_ptr + share->kc_info.mcp_info[index].len_of_offsets;
//
// unpack the key, if necessary
......@@ -1942,23 +2112,23 @@ int ha_tokudb::unpack_row(
//
for (uint i = 0; i < table_share->fields; i++) {
Field* field = table->field[i];
if (bitmap_is_set(&share->key_filters[index],i)) {
if (bitmap_is_set(&share->kc_info.key_filters[index],i)) {
continue;
}
if (share->field_lengths[i]) {
if (share->kc_info.field_lengths[i]) {
fixed_field_ptr = unpack_fixed_field(
record + field_offset(field, table),
fixed_field_ptr,
share->field_lengths[i]
share->kc_info.field_lengths[i]
);
}
//
// here, we DO modify var_field_data_ptr or var_field_offset_ptr
// as we unpack variable sized fields
//
else if (share->length_bytes[i]) {
switch (share->num_offset_bytes) {
else if (share->kc_info.length_bytes[i]) {
switch (share->kc_info.num_offset_bytes) {
case (1):
data_end_offset = var_field_offset_ptr[0];
break;
......@@ -1973,9 +2143,9 @@ int ha_tokudb::unpack_row(
record + field_offset(field, table),
var_field_data_ptr,
data_end_offset - last_offset,
share->length_bytes[i]
share->kc_info.length_bytes[i]
);
var_field_offset_ptr += share->num_offset_bytes;
var_field_offset_ptr += share->kc_info.num_offset_bytes;
var_field_data_ptr += data_end_offset - last_offset;
last_offset = data_end_offset;
}
......@@ -2002,8 +2172,8 @@ int ha_tokudb::unpack_row(
Field* field = table->field[field_index];
unpack_fixed_field(
record + field_offset(field, table),
fixed_field_ptr + share->cp_info[index][field_index].col_pack_val,
share->field_lengths[field_index]
fixed_field_ptr + share->kc_info.cp_info[index][field_index].col_pack_val,
share->kc_info.field_lengths[field_index]
);
}
......@@ -2014,42 +2184,23 @@ int ha_tokudb::unpack_row(
for (u_int32_t i = 0; i < num_var_cols_for_query; i++) {
uint field_index = var_cols_for_query[i];
Field* field = table->field[field_index];
u_int32_t var_field_index = share->cp_info[index][field_index].col_pack_val;
u_int32_t var_field_index = share->kc_info.cp_info[index][field_index].col_pack_val;
u_int32_t data_start_offset;
switch (share->num_offset_bytes) {
case (1):
data_end_offset = (var_field_offset_ptr + var_field_index)[0];
break;
case (2):
data_end_offset = uint2korr(var_field_offset_ptr + 2*var_field_index);
break;
default:
assert(false);
break;
}
u_int32_t field_len;
if (var_field_index) {
switch (share->num_offset_bytes) {
case (1):
data_start_offset = (var_field_offset_ptr + var_field_index - 1)[0];
break;
case (2):
data_start_offset = uint2korr(var_field_offset_ptr + 2*(var_field_index-1));
break;
default:
assert(false);
break;
}
}
else {
data_start_offset = 0;
}
get_var_field_info(
&field_len,
&data_start_offset,
var_field_index,
var_field_offset_ptr,
share->kc_info.num_offset_bytes
);
unpack_var_field(
record + field_offset(field, table),
var_field_data_ptr + data_start_offset,
data_end_offset - data_start_offset,
share->length_bytes[field_index]
field_len,
share->kc_info.length_bytes[field_index]
);
}
......@@ -2057,28 +2208,14 @@ int ha_tokudb::unpack_row(
//
// now the blobs
//
get_blob_field_info(
&data_end_offset,
share->kc_info.mcp_info[index].len_of_offsets,
var_field_data_ptr,
share->kc_info.num_offset_bytes
);
//
// need to set var_field_data_ptr to point to beginning of blobs, which
// is at the end of the var stuff (if they exist), if var stuff does not exist
// then the bottom variable will be 0, and var_field_data_ptr is already
// set correctly
//
if (share->mcp_info[index].len_of_offsets) {
switch (share->num_offset_bytes) {
case (1):
data_end_offset = (var_field_data_ptr - 1)[0];
break;
case (2):
data_end_offset = uint2korr(var_field_data_ptr - 2);
break;
default:
assert(false);
break;
}
var_field_data_ptr += data_end_offset;
}
error = unpack_blobs(
record,
var_field_data_ptr,
......@@ -2304,7 +2441,8 @@ DBT *ha_tokudb::create_dbt_key_from_table(
TOKUDB_DBUG_ENTER("ha_tokudb::create_dbt_key_from_table");
bzero((void *) key, sizeof(*key));
if (hidden_primary_key && keynr == primary_key) {
key->data = current_ident;
key->data = buff;
memcpy(buff, &current_ident, TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH);
key->size = TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH;
*has_null = false;
DBUG_RETURN(key);
......@@ -2658,85 +2796,341 @@ void ha_tokudb::start_bulk_insert(ha_rows rows) {
share->try_table_lock = false;
pthread_mutex_unlock(&share->mutex);
}
}
}
//
// Method that is called at the end of many calls to insert rows
// (ha_tokudb::write_row). If start_bulk_insert is called, then
// this is guaranteed to be called.
//
int ha_tokudb::end_bulk_insert() {
int error = 0;
if (ai_metadata_update_required) {
pthread_mutex_lock(&share->mutex);
error = update_max_auto_inc(share->status_block, share->last_auto_increment);
pthread_mutex_unlock(&share->mutex);
}
delay_updating_ai_metadata = false;
ai_metadata_update_required = false;
return error;
}
int ha_tokudb::is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn) {
DBT key;
int error = 0;
bool has_null;
DBC* tmp_cursor = NULL;
struct index_read_info ir_info;
struct smart_dbt_info info;
bzero((void *)&key, sizeof(key));
info.ha = this;
info.buf = NULL;
info.keynr = dict_index;
ir_info.smart_dbt_info = info;
create_dbt_key_for_lookup(
&key,
key_info,
key_buff3,
record,
&has_null
);
ir_info.orig_key = &key;
if (has_null) {
error = 0;
*is_unique = true;
goto cleanup;
}
error = share->key_file[dict_index]->cursor(
share->key_file[dict_index],
txn,
&tmp_cursor,
0
);
if (error) { goto cleanup; }
error = tmp_cursor->c_getf_set_range(
tmp_cursor,
0,
&key,
smart_dbt_callback_lookup,
&ir_info
);
if (error == DB_NOTFOUND) {
*is_unique = true;
error = 0;
goto cleanup;
}
else if (error) {
goto cleanup;
}
if (ir_info.cmp) {
*is_unique = true;
}
else {
*is_unique = false;
}
error = 0;
cleanup:
if (tmp_cursor) {
int r = tmp_cursor->c_close(tmp_cursor);
assert(r==0);
tmp_cursor = NULL;
}
return error;
}
int ha_tokudb::do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd) {
int error;
//
// first do uniqueness checks
//
if (share->has_unique_keys && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool is_unique_key = table->key_info[keynr].flags & HA_NOSAME;
bool is_unique = false;
if (!is_unique_key) {
continue;
}
//
// if unique key, check uniqueness constraint
// but, we do not need to check it if the key has a null
// and we do not need to check it if unique_checks is off
//
error = is_val_unique(&is_unique, record, &table->key_info[keynr], keynr, txn);
if (error) { goto cleanup; }
if (!is_unique) {
error = DB_KEYEXIST;
last_dup_key = keynr;
goto cleanup;
}
}
}
error = 0;
cleanup:
return error;
}
int ha_tokudb::test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val) {
int error;
DBT row, key;
//
// variables for testing key packing, only used in some debug modes
//
uchar* tmp_pk_key_data = NULL;
uchar* tmp_pk_val_data = NULL;
DBT tmp_pk_key;
DBT tmp_pk_val;
bool has_null;
bzero(&tmp_pk_key, sizeof(DBT));
bzero(&tmp_pk_val, sizeof(DBT));
//
//use for testing the packing of keys
//
tmp_pk_key_data = (uchar *)my_malloc(pk_key->size, MYF(MY_WME));
assert(tmp_pk_key_data);
tmp_pk_val_data = (uchar *)my_malloc(pk_val->size, MYF(MY_WME));
assert(tmp_pk_val_data);
memcpy(tmp_pk_key_data, pk_key->data, pk_key->size);
memcpy(tmp_pk_val_data, pk_val->data, pk_val->size);
tmp_pk_key.data = tmp_pk_key_data;
tmp_pk_key.size = pk_key->size;
tmp_pk_val.data = tmp_pk_val_data;
tmp_pk_val.size = pk_val->size;
for (uint keynr = 0; keynr < table_share->keys; keynr++) {
u_int32_t tmp_num_bytes = 0;
int cmp;
uchar* row_desc = NULL;
u_int32_t desc_size = 0;
if (keynr == primary_key) {
continue;
}
create_dbt_key_from_table(&key, keynr, mult_key_buff[keynr], record, &has_null);
//
// TEST
//
row_desc = (uchar *)share->key_file[keynr]->descriptor->data;
row_desc += (*(u_int32_t *)row_desc);
desc_size = (*(u_int32_t *)row_desc) - 4;
row_desc += 4;
tmp_num_bytes = pack_key_from_desc(
key_buff3,
row_desc,
desc_size,
&tmp_pk_key,
&tmp_pk_val
);
assert(tmp_num_bytes == key.size);
cmp = memcmp(key_buff3,mult_key_buff[keynr],tmp_num_bytes);
assert(cmp == 0);
//
// test key packing of clustering keys
//
if (table->key_info[keynr].flags & HA_CLUSTERING) {
error = pack_row(&row, mult_rec_buff[keynr], (const uchar *) record, keynr);
if (error) { goto cleanup; }
uchar* tmp_buff = NULL;
tmp_buff = (uchar *)my_malloc(alloced_rec_buff_length,MYF(MY_WME));
assert(tmp_buff);
row_desc = (uchar *)share->key_file[keynr]->descriptor->data;
row_desc += (*(u_int32_t *)row_desc);
row_desc += (*(u_int32_t *)row_desc);
desc_size = (*(u_int32_t *)row_desc) - 4;
row_desc += 4;
tmp_num_bytes = pack_clustering_val_from_desc(
tmp_buff,
row_desc,
desc_size,
&tmp_pk_val
);
assert(tmp_num_bytes == row.size);
cmp = memcmp(tmp_buff,mult_rec_buff[keynr],tmp_num_bytes);
assert(cmp == 0);
my_free(tmp_buff,MYF(MY_ALLOW_ZERO_PTR));
}
}
error = 0;
cleanup:
my_free(tmp_pk_key_data,MYF(MY_ALLOW_ZERO_PTR));
my_free(tmp_pk_val_data,MYF(MY_ALLOW_ZERO_PTR));
return error;
}
int ha_tokudb::insert_rows_to_dictionaries(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn) {
int error;
DBT row, key;
u_int32_t put_flags;
THD *thd = ha_thd();
bool is_replace_into;
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) ||
(thd_sql_command(thd) == SQLCOM_REPLACE_SELECT);
//
// first the primary key (because it must be unique, has highest chance of failure)
//
put_flags = hidden_primary_key ? DB_YESOVERWRITE : DB_NOOVERWRITE;
if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS) && !is_replace_into) {
put_flags = DB_YESOVERWRITE;
}
//
// optimization for "REPLACE INTO..." command
// if the command is "REPLACE INTO" and the only table
// is the main table, then we can simply insert the element
// with DB_YESOVERWRITE. If the element does not exist,
// it will act as a normal insert, and if it does exist, it
// will act as a replace, which is exactly what REPLACE INTO is supposed
// to do. We cannot do this if curr_num_DBs > 1, because then we lose
// consistency between indexes
//
if (is_replace_into && (curr_num_DBs == 1)) {
put_flags = DB_YESOVERWRITE; // original put_flags can only be DB_YESOVERWRITE or DB_NOOVERWRITE
}
lockretry {
error = share->file->put(
share->file,
txn,
pk_key,
pk_val,
put_flags
);
lockretry_wait;
}
//
// Method that is called at the end of many calls to insert rows
// (ha_tokudb::write_row). If start_bulk_insert is called, then
// this is guaranteed to be called.
//
int ha_tokudb::end_bulk_insert() {
int error = 0;
if (ai_metadata_update_required) {
pthread_mutex_lock(&share->mutex);
error = update_max_auto_inc(share->status_block, share->last_auto_increment);
pthread_mutex_unlock(&share->mutex);
if (error) {
last_dup_key = primary_key;
goto cleanup;
}
delay_updating_ai_metadata = false;
ai_metadata_update_required = false;
return error;
}
int ha_tokudb::is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn) {
DBT key;
int error = 0;
//
// now insertion for rest of indexes
//
for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool has_null;
DBC* tmp_cursor = NULL;
struct index_read_info ir_info;
struct smart_dbt_info info;
bzero((void *)&key, sizeof(key));
info.ha = this;
info.buf = NULL;
info.keynr = dict_index;
ir_info.smart_dbt_info = info;
if (keynr == primary_key) {
continue;
}
create_dbt_key_for_lookup(
&key,
key_info,
key_buff3,
record,
&has_null
);
ir_info.orig_key = &key;
create_dbt_key_from_table(&key, keynr, mult_key_buff[keynr], record, &has_null);
error = share->key_file[dict_index]->cursor(
share->key_file[dict_index],
txn,
&tmp_cursor,
0
);
put_flags = DB_YESOVERWRITE;
if (table->key_info[keynr].flags & HA_CLUSTERING) {
error = pack_row(&row, NULL, (const uchar *) record, keynr);
if (error) { goto cleanup; }
}
else {
bzero((void *) &row, sizeof(row));
}
error = tmp_cursor->c_getf_set_range(
tmp_cursor,
0,
lockretry {
error = share->key_file[keynr]->put(
share->key_file[keynr],
txn,
&key,
smart_dbt_callback_lookup,
&ir_info
&row,
put_flags
);
if (error == DB_NOTFOUND) {
*is_unique = true;
error = 0;
goto cleanup;
lockretry_wait;
}
else if (error) {
//
// We break if we hit an error, unless it is a dup key error
// and MySQL told us to ignore duplicate key errors
//
if (error) {
last_dup_key = keynr;
goto cleanup;
}
if (ir_info.cmp) {
*is_unique = true;
}
else {
*is_unique = false;
}
error = 0;
cleanup:
if (tmp_cursor) {
int r = tmp_cursor->c_close(tmp_cursor);
assert(r==0);
tmp_cursor = NULL;
return error;
}
int ha_tokudb::insert_rows_to_dictionaries_mult(uchar* row_buff, u_int32_t row_buff_size, DB_TXN* txn, THD* thd) {
int error;
DBT row;
struct row_buffers row_buff_struct;
bool is_replace_into;
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
bzero(&row, sizeof(row));
row.data = row_buff;
row.size = row_buff_size;
row_buff_struct.key_buff = mult_key_buff;
row_buff_struct.rec_buff = mult_rec_buff;
is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) ||
(thd_sql_command(thd) == SQLCOM_REPLACE_SELECT);
if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS) && !is_replace_into) {
share->mult_put_flags[primary_key] = DB_YESOVERWRITE;
}
else {
share->mult_put_flags[primary_key] = DB_NOOVERWRITE;
}
error = db_env->put_multiple(db_env, txn, &row, curr_num_DBs, share->key_file, share->mult_put_flags, &row_buff_struct);
//
// We break if we hit an error, unless it is a dup key error
// and MySQL told us to ignore duplicate key errors
//
if (error) {
last_dup_key = primary_key;
}
return error;
}
......@@ -2751,20 +3145,15 @@ cleanup:
//
int ha_tokudb::write_row(uchar * record) {
TOKUDB_DBUG_ENTER("ha_tokudb::write_row");
DBT row, prim_key, key;
DBT row, prim_key;
int error;
THD *thd = ha_thd();
u_int32_t put_flags;
bool has_null;
DB_TXN* sub_trans = NULL;
DB_TXN* txn = NULL;
bool is_replace_into;
tokudb_trx_data *trx = NULL;
uint curr_num_DBs = table->s->keys + test(hidden_primary_key);
is_replace_into = (thd_sql_command(thd) == SQLCOM_REPLACE) ||
(thd_sql_command(thd) == SQLCOM_REPLACE_SELECT);
//
// some crap that needs to be done because MySQL does not properly abstract
// this work away from us, namely filling in auto increment and setting auto timestamp
......@@ -2802,118 +3191,59 @@ int ha_tokudb::write_row(uchar * record) {
pthread_mutex_unlock(&share->mutex);
}
if ((error = pack_row(&row, (const uchar *) record, primary_key))){
goto cleanup;
}
if (hidden_primary_key) {
get_auto_primary_key(current_ident);
}
if (using_ignore) {
error = db_env->txn_begin(db_env, transaction, &sub_trans, DB_INHERIT_ISOLATION);
if (error) {
if (table_share->blob_fields) {
if (fix_rec_buff_for_blob(max_row_length(record))) {
error = HA_ERR_OUT_OF_MEM;
goto cleanup;
}
}
txn = using_ignore ? sub_trans : transaction;
//
// first the primary key (because it must be unique, has highest chance of failure)
create_dbt_key_from_table(&prim_key, primary_key, rec_buff + sizeof(u_int32_t), record, &has_null);
//
put_flags = hidden_primary_key ? DB_YESOVERWRITE : DB_NOOVERWRITE;
if (thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS) && !is_replace_into) {
put_flags = DB_YESOVERWRITE;
}
//
// optimization for "REPLACE INTO..." command
// if the command is "REPLACE INTO" and the only table
// is the main table, then we can simply insert the element
// with DB_YESOVERWRITE. If the element does not exist,
// it will act as a normal insert, and if it does exist, it
// will act as a replace, which is exactly what REPLACE INTO is supposed
// to do. We cannot do this if curr_num_DBs > 1, because then we lose
// consistency between indexes
// copy len of pk at beginning of rec_buff
//
if (is_replace_into && (curr_num_DBs == 1)) {
put_flags = DB_YESOVERWRITE; // original put_flags can only be DB_YESOVERWRITE or DB_NOOVERWRITE
}
trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
lockretry {
error = share->file->put(
share->file,
txn,
create_dbt_key_from_table(&prim_key, primary_key, key_buff, record, &has_null),
&row,
put_flags
);
lockretry_wait;
memcpy(rec_buff, &prim_key.size, sizeof(u_int32_t));
if ((error = pack_row(&row, rec_buff + prim_key.size+sizeof(u_int32_t), (const uchar *) record, primary_key))){
goto cleanup;
}
if (using_ignore) {
error = db_env->txn_begin(db_env, transaction, &sub_trans, 0);
if (error) {
last_dup_key = primary_key;
goto cleanup;
}
//
// now insertion for rest of indexes
//
for (uint keynr = 0; keynr < table_share->keys; keynr++) {
bool is_unique_key = table->key_info[keynr].flags & HA_NOSAME;
if (keynr == primary_key) {
continue;
}
create_dbt_key_from_table(&key, keynr, key_buff2, record, &has_null);
txn = using_ignore ? sub_trans : transaction;
//
// if unique key, check uniqueness constraint
// but, we do not need to check it if the key has a null
// and we do not need to check it if unique_checks is off
// make sure the buffers for the rows are big enough
//
if (is_unique_key && !has_null && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
bool is_unique = false;
error = is_val_unique(&is_unique, record, &table->key_info[keynr], keynr, txn);
if (error) { goto cleanup; }
if (!is_unique) {
error = DB_KEYEXIST;
last_dup_key = keynr;
goto cleanup;
}
}
fix_mult_rec_buff();
put_flags = DB_YESOVERWRITE;
error = do_uniqueness_checks(record, txn, thd);
if (error) { goto cleanup; }
if (table->key_info[keynr].flags & HA_CLUSTERING) {
error = pack_row(&row, (const uchar *) record, keynr);
if (tokudb_debug & TOKUDB_DEBUG_CHECK_KEY) {
error = test_row_packing(record,&prim_key,&row);
if (error) { goto cleanup; }
}
else {
bzero((void *) &row, sizeof(row));
}
lockretry {
error = share->key_file[keynr]->put(
share->key_file[keynr],
txn,
&key,
&row,
put_flags
);
lockretry_wait;
}
//
// We break if we hit an error, unless it is a dup key error
// and MySQL told us to ignore duplicate key errors
//
if (error) {
last_dup_key = keynr;
goto cleanup;
if (curr_num_DBs == 1 || share->version <= 2) {
error = insert_rows_to_dictionaries(record,&prim_key, &row, txn);
if (error) { goto cleanup; }
}
else {
error = insert_rows_to_dictionaries_mult(rec_buff, sizeof(u_int32_t) + prim_key.size + row.size, txn, thd);
if (error) { goto cleanup; }
}
trx = (tokudb_trx_data *) thd_data_get(thd, tokudb_hton->slot);
if (!error) {
added_rows++;
trx->stmt_progress.inserted++;
......@@ -2976,7 +3306,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
error = remove_key(trans, primary_key, old_row, old_key);
if (error) { goto cleanup; }
error = pack_row(&row, new_row, primary_key);
error = pack_row(&row, NULL, new_row, primary_key);
if (error) { goto cleanup; }
error = share->file->put(share->file, trans, new_key, &row, put_flags);
......@@ -2987,7 +3317,7 @@ int ha_tokudb::update_primary_key(DB_TXN * trans, bool primary_key_changed, cons
}
else {
// Primary key didn't change; just update the row data
error = pack_row(&row, new_row, primary_key);
error = pack_row(&row, NULL, new_row, primary_key);
if (error) { goto cleanup; }
//
......@@ -3068,7 +3398,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
}
if (using_ignore) {
error = db_env->txn_begin(db_env, transaction, &sub_trans, DB_INHERIT_ISOLATION);
error = db_env->txn_begin(db_env, transaction, &sub_trans, 0 );
if (error) {
goto cleanup;
}
......@@ -3112,7 +3442,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
// but, we do not need to check it if the key has a null
// and we do not need to check it if unique_checks is off
//
if (is_unique_key && !has_null && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
if (is_unique_key && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
bool is_unique = false;
error = is_val_unique(&is_unique, new_row, &table->key_info[keynr], keynr, txn);
if (error) { goto cleanup; }
......@@ -3126,7 +3456,7 @@ int ha_tokudb::update_row(const uchar * old_row, uchar * new_row) {
put_flags = DB_YESOVERWRITE;
if (table->key_info[keynr].flags & HA_CLUSTERING) {
error = pack_row(&row, (const uchar *) new_row, keynr);
error = pack_row(&row, NULL, (const uchar *) new_row, keynr);
if (error){ goto cleanup; }
}
else {
......@@ -3303,14 +3633,14 @@ void ha_tokudb::set_query_columns(uint keynr) {
bitmap_is_set(table->write_set,i)
)
{
if (bitmap_is_set(&share->key_filters[key_index],i)) {
if (bitmap_is_set(&share->kc_info.key_filters[key_index],i)) {
read_key = true;
}
else {
//
// if fixed field length
//
if (share->field_lengths[i] != 0) {
if (share->kc_info.field_lengths[i] != 0) {
//
// save the offset into the list
//
......@@ -3320,7 +3650,7 @@ void ha_tokudb::set_query_columns(uint keynr) {
//
// varchar or varbinary
//
else if (share->length_bytes[i] != 0) {
else if (share->kc_info.length_bytes[i] != 0) {
var_cols_for_query[curr_var_col_index] = i;
curr_var_col_index++;
}
......@@ -3749,6 +4079,7 @@ int ha_tokudb::index_read(uchar * buf, const uchar * key, uint key_len, enum ha_
info.keynr = active_index;
ir_info.smart_dbt_info = info;
ir_info.cmp = 0;
flags = SET_READ_FLAG(0);
switch (find_flag) {
......@@ -4868,19 +5199,28 @@ void ha_tokudb::trace_create_table_info(const char *name, TABLE * form) {
//
// creates dictionary for secondary index, with key description key_info, all using txn
//
int ha_tokudb::create_secondary_dictionary(const char* name, TABLE* form, KEY* key_info, DB_TXN* txn) {
int ha_tokudb::create_secondary_dictionary(const char* name, TABLE* form, KEY* key_info, DB_TXN* txn, KEY_AND_COL_INFO* kc_info, u_int32_t keynr) {
int error;
DBT row_descriptor;
uchar* row_desc_buff = NULL;
uchar* ptr = NULL;
char* newname = NULL;
KEY* prim_key = NULL;
char dict_name[MAX_DICT_NAME_LEN];
u_int32_t max_row_desc_buff_size;
uint hpk= (form->s->primary_key >= MAX_KEY) ? TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH : 0;
bzero(&row_descriptor, sizeof(row_descriptor));
row_desc_buff = (uchar *)my_malloc(2*(form->s->fields * 6)+10 ,MYF(MY_WME));
max_row_desc_buff_size = 2*(form->s->fields * 6)+10; // upper bound of key comparison descriptor
max_row_desc_buff_size += get_max_secondary_key_pack_desc_size(kc_info); // upper bound for sec. key part
max_row_desc_buff_size += get_max_clustering_val_pack_desc_size(form->s); // upper bound for clustering val part
row_desc_buff = (uchar *)my_malloc(max_row_desc_buff_size, MYF(MY_WME));
if (row_desc_buff == NULL){ error = ENOMEM; goto cleanup;}
ptr = row_desc_buff;
newname = (char *)my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME));
if (newname == NULL){ error = ENOMEM; goto cleanup;}
......@@ -4894,13 +5234,40 @@ int ha_tokudb::create_secondary_dictionary(const char* name, TABLE* form, KEY* k
// setup the row descriptor
//
row_descriptor.data = row_desc_buff;
row_descriptor.size = create_toku_key_descriptor(
//
// save data necessary for key comparisons
//
ptr += create_toku_key_descriptor(
row_desc_buff,
false,
key_info,
hpk,
prim_key
);
ptr += create_toku_secondary_key_pack_descriptor(
ptr,
hpk,
primary_key,
form->s,
form,
kc_info,
key_info,
prim_key
);
ptr += create_toku_clustering_val_pack_descriptor(
ptr,
primary_key,
form->s,
kc_info,
keynr,
key_info->flags & HA_CLUSTERING
);
row_descriptor.size = ptr - row_desc_buff;
assert(row_descriptor.size <= max_row_desc_buff_size);
error = create_sub_table(newname, &row_descriptor, txn);
cleanup:
my_free(newname, MYF(MY_ALLOW_ZERO_PTR));
......@@ -4912,18 +5279,25 @@ cleanup:
// create and close the main dictionarr with name of "name" using table form, all within
// transaction txn.
//
int ha_tokudb::create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn) {
int ha_tokudb::create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn, KEY_AND_COL_INFO* kc_info) {
int error;
DBT row_descriptor;
uchar* row_desc_buff = NULL;
uchar* ptr = NULL;
char* newname = NULL;
KEY* prim_key = NULL;
u_int32_t max_row_desc_buff_size;
uint hpk= (form->s->primary_key >= MAX_KEY) ? TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH : 0;
bzero(&row_descriptor, sizeof(row_descriptor));
row_desc_buff = (uchar *)my_malloc(2*(form->s->fields * 6)+10 ,MYF(MY_WME));
max_row_desc_buff_size = 2*(form->s->fields * 6)+10; // upper bound of key comparison descriptor
max_row_desc_buff_size += get_max_secondary_key_pack_desc_size(kc_info); // upper bound for sec. key part
max_row_desc_buff_size += get_max_clustering_val_pack_desc_size(form->s); // upper bound for clustering val part
row_desc_buff = (uchar *)my_malloc(max_row_desc_buff_size, MYF(MY_WME));
if (row_desc_buff == NULL){ error = ENOMEM; goto cleanup;}
ptr = row_desc_buff;
newname = (char *)my_malloc(get_max_dict_name_path_length(name),MYF(MY_WME));
if (newname == NULL){ error = ENOMEM; goto cleanup;}
......@@ -4936,7 +5310,10 @@ int ha_tokudb::create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn
// setup the row descriptor
//
row_descriptor.data = row_desc_buff;
row_descriptor.size = create_toku_key_descriptor(
//
// save data necessary for key comparisons
//
ptr += create_toku_key_descriptor(
row_desc_buff,
hpk,
prim_key,
......@@ -4944,6 +5321,23 @@ int ha_tokudb::create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn
NULL
);
ptr += create_toku_main_key_pack_descriptor(
ptr
);
ptr += create_toku_clustering_val_pack_descriptor(
ptr,
primary_key,
form->s,
kc_info,
primary_key,
false
);
row_descriptor.size = ptr - row_desc_buff;
assert(row_descriptor.size <= max_row_desc_buff_size);
/* Create the main table that will hold the real rows */
error = create_sub_table(newname, &row_descriptor, txn);
cleanup:
......@@ -4970,6 +5364,8 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
uint capabilities;
DB_TXN* txn = NULL;
char* newname = NULL;
KEY_AND_COL_INFO kc_info;
bzero(&kc_info, sizeof(kc_info));
pthread_mutex_lock(&tokudb_meta_mutex);
......@@ -4981,6 +5377,9 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
primary_key = form->s->primary_key;
hidden_primary_key = (primary_key >= MAX_KEY) ? TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH : 0;
if (hidden_primary_key) {
primary_key = form->s->keys;
}
/* do some tracing */
trace_create_table_info(name,form);
......@@ -5006,8 +5405,19 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
error = write_auto_inc_create(status_block, create_info->auto_increment_value, txn);
if (error) { goto cleanup; }
error = allocate_key_and_col_info(form->s, &kc_info);
if (error) { goto cleanup; }
error = initialize_key_and_col_info(
form->s,
form,
&kc_info,
hidden_primary_key,
primary_key
);
if (error) { goto cleanup; }
error = create_main_dictionary(name, form, txn);
error = create_main_dictionary(name, form, txn, &kc_info);
if (error) {
goto cleanup;
}
......@@ -5015,7 +5425,7 @@ int ha_tokudb::create(const char *name, TABLE * form, HA_CREATE_INFO * create_in
for (uint i = 0; i < form->s->keys; i++) {
if (i != primary_key) {
error = create_secondary_dictionary(name, form, &form->key_info[i], txn);
error = create_secondary_dictionary(name, form, &form->key_info[i], txn, &kc_info, i);
if (error) {
goto cleanup;
}
......@@ -5066,7 +5476,7 @@ int ha_tokudb::discard_or_import_tablespace(my_bool discard) {
// is_key specifies if it is a secondary index (and hence a "key-" needs to be prepended) or
// if it is not a secondary index
//
int ha_tokudb::delete_or_rename_dictionary( const char* from_name, const char* to_name, char* secondary_name, bool is_key, DB_TXN* txn, bool is_delete) {
int ha_tokudb::delete_or_rename_dictionary( const char* from_name, const char* to_name, const char* secondary_name, bool is_key, DB_TXN* txn, bool is_delete) {
int error;
char dict_name[MAX_DICT_NAME_LEN];
char* new_from_name = NULL;
......@@ -5635,15 +6045,6 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
}
}
//
// first create all the DB's files
//
for (uint i = 0; i < num_of_keys; i++) {
error = create_secondary_dictionary(share->table_name, table_arg, &key_info[i], txn);
if (error) { goto cleanup; }
}
//
// open all the DB files and set the appropriate variables in share
// they go to the end of share->key_file
......@@ -5652,27 +6053,30 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
for (uint i = 0; i < num_of_keys; i++, curr_index++) {
if (key_info[i].flags & HA_CLUSTERING) {
set_key_filter(
&share->key_filters[curr_index],
&share->kc_info.key_filters[curr_index],
&key_info[i],
table_arg,
false
);
if (!hidden_primary_key) {
set_key_filter(
&share->key_filters[curr_index],
&share->kc_info.key_filters[curr_index],
&table_arg->key_info[primary_key],
table_arg,
false
);
}
error = initialize_col_pack_info(share,table_arg->s,curr_index);
error = initialize_col_pack_info(&share->kc_info,table_arg->s,curr_index);
if (error) {
goto cleanup;
}
}
error = create_secondary_dictionary(share->table_name, table_arg, &key_info[i], txn, &share->kc_info, curr_index);
if (error) { goto cleanup; }
error = open_secondary_dictionary(
&share->key_file[curr_index],
&key_info[i],
......@@ -5751,7 +6155,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
// but, we do not need to check it if the key has a null
// and we do not need to check it if unique_checks is off
//
if (is_unique_key && !has_null && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
if (is_unique_key && !thd_test_options(thd, OPTION_RELAXED_UNIQUE_CHECKS)) {
bool is_unique = false;
error = is_val_unique(&is_unique, tmp_record, &key_info[i], curr_index, txn);
if (error) { goto cleanup; }
......@@ -5764,7 +6168,7 @@ int ha_tokudb::add_index(TABLE *table_arg, KEY *key_info, uint num_of_keys) {
}
if (key_info[i].flags & HA_CLUSTERING) {
if ((error = pack_row(&row, (const uchar *) tmp_record, curr_index))){
if ((error = pack_row(&row, NULL, (const uchar *) tmp_record, curr_index))){
goto cleanup;
}
error = share->key_file[curr_index]->put(share->key_file[curr_index], txn, &secondary_key, &row, put_flags);
......@@ -6109,14 +6513,16 @@ int ha_tokudb::truncate_dictionary( uint keynr, DB_TXN* txn ) {
}
if (is_pk) {
error = create_main_dictionary(share->table_name, table, txn);
error = create_main_dictionary(share->table_name, table, txn, &share->kc_info);
}
else {
error = create_secondary_dictionary(
share->table_name,
table,
&table_share->key_info[keynr],
txn
txn,
&share->kc_info,
keynr
);
}
if (error) { goto cleanup; }
......
......@@ -5,14 +5,6 @@
#include <db.h>
#include "hatoku_cmp.h"
typedef struct st_col_pack_info {
u_int32_t col_pack_val; //offset if fixed, pack_index if var
} COL_PACK_INFO;
typedef struct st_multi_col_pack_info {
u_int32_t var_len_offset; //where the fixed length stuff ends and the offsets for var stuff begins
u_int32_t len_of_offsets; //length of the offset bytes in a packed row
} MULTI_COL_PACK_INFO;
//
// This object stores table information that is to be shared
......@@ -49,6 +41,7 @@ typedef struct st_tokudb_share {
// key is hidden
//
DB *key_file[MAX_KEY +1];
u_int32_t mult_put_flags[MAX_KEY+1];
uint status, version, capabilities;
uint ref_length;
//
......@@ -61,14 +54,8 @@ typedef struct st_tokudb_share {
uint ai_field_index;
bool ai_first_col;
MY_BITMAP key_filters[MAX_KEY+1];
uchar* field_lengths; //stores the field lengths of fixed size fields (255 max)
uchar* length_bytes; // stores the length of lengths of varchars and varbinaries
u_int32_t* blob_fields; // list of indexes of blob fields
u_int32_t num_blobs;
MULTI_COL_PACK_INFO mcp_info[MAX_KEY+1];
COL_PACK_INFO* cp_info[MAX_KEY+1];
u_int32_t num_offset_bytes; //number of bytes needed to encode the offset
KEY_AND_COL_INFO kc_info;
//
// we want the following optimization for bulk loads, if the table is empty,
// attempt to grab a table lock. emptiness check can be expensive,
......@@ -76,9 +63,11 @@ typedef struct st_tokudb_share {
// to tell us to not try it again.
//
bool try_table_lock;
bool has_unique_keys;
} TOKUDB_SHARE;
#define HA_TOKU_VERSION 2
#define HA_TOKU_VERSION 3
//
// no capabilities yet
//
......@@ -109,6 +98,8 @@ typedef enum {
} TABLE_LOCK_TYPE;
int create_tokudb_trx_data_instance(tokudb_trx_data** out_trx);
int generate_keys_vals_for_put(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra);
int cleanup_keys_vals_for_put(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra);
class ha_tokudb : public handler {
......@@ -134,6 +125,7 @@ private:
// number of bytes allocated in rec_buff
//
ulong alloced_rec_buff_length;
u_int32_t max_key_length;
//
// buffer used to temporarily store a "packed key"
// data pointer of a DBT will end up pointing to this
......@@ -156,6 +148,13 @@ private:
//
uchar *primary_key_buff;
//
// individual key buffer for each index
//
uchar* mult_key_buff[MAX_KEY];
uchar* mult_rec_buff[MAX_KEY];
ulong alloced_mult_rec_buff_length;
//
// when unpacking blobs, we need to store it in a temporary
// buffer that will persist because MySQL just gets a pointer to the
......@@ -237,11 +236,13 @@ private:
char write_status_msg[200]; //buffer of 200 should be a good upper bound.
bool fix_rec_buff_for_blob(ulong length);
void fix_mult_rec_buff();
uchar current_ident[TOKUDB_HIDDEN_PRIMARY_KEY_LENGTH];
ulong max_row_length(const uchar * buf);
int pack_row(
DBT * row,
uchar* buf,
const uchar* record,
uint index
);
......@@ -283,12 +284,16 @@ private:
int create_txn(THD* thd, tokudb_trx_data* trx);
bool may_table_be_empty();
int delete_or_rename_table (const char* from_name, const char* to_name, bool is_delete);
int delete_or_rename_dictionary( const char* from_name, const char* to_name, char* index_name, bool is_key, DB_TXN* txn, bool is_delete);
int delete_or_rename_dictionary( const char* from_name, const char* to_name, const char* index_name, bool is_key, DB_TXN* txn, bool is_delete);
int truncate_dictionary( uint keynr, DB_TXN* txn );
int create_secondary_dictionary(const char* name, TABLE* form, KEY* key_info, DB_TXN* txn);
int create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn);
int create_secondary_dictionary(const char* name, TABLE* form, KEY* key_info, DB_TXN* txn, KEY_AND_COL_INFO* kc_info, u_int32_t keynr);
int create_main_dictionary(const char* name, TABLE* form, DB_TXN* txn, KEY_AND_COL_INFO* kc_info);
void trace_create_table_info(const char *name, TABLE * form);
int is_val_unique(bool* is_unique, uchar* record, KEY* key_info, uint dict_index, DB_TXN* txn);
int do_uniqueness_checks(uchar* record, DB_TXN* txn, THD* thd);
int insert_rows_to_dictionaries(uchar* record, DBT* pk_key, DBT* pk_val, DB_TXN* txn);
int insert_rows_to_dictionaries_mult(uchar* row, u_int32_t row_size, DB_TXN* txn, THD* thd);
int test_row_packing(uchar* record, DBT* pk_key, DBT* pk_val);
public:
......
......@@ -10,6 +10,85 @@ extern "C" {
#error "WORDS_BIGENDIAN not supported"
#endif
void get_var_field_info(
u_int32_t* field_len,
u_int32_t* start_offset,
u_int32_t var_field_index,
const uchar* var_field_offset_ptr,
u_int32_t num_offset_bytes
)
{
u_int32_t data_start_offset = 0;
u_int32_t data_end_offset = 0;
switch (num_offset_bytes) {
case (1):
data_end_offset = (var_field_offset_ptr + var_field_index)[0];
break;
case (2):
data_end_offset = uint2korr(var_field_offset_ptr + 2*var_field_index);
break;
default:
assert(false);
break;
}
if (var_field_index) {
switch (num_offset_bytes) {
case (1):
data_start_offset = (var_field_offset_ptr + var_field_index - 1)[0];
break;
case (2):
data_start_offset = uint2korr(var_field_offset_ptr + 2*(var_field_index-1));
break;
default:
assert(false);
break;
}
}
else {
data_start_offset = 0;
}
*start_offset = data_start_offset;
*field_len = data_end_offset - data_start_offset;
}
void get_blob_field_info(
u_int32_t* start_offset,
u_int32_t len_of_offsets,
const uchar* var_field_data_ptr,
u_int32_t num_offset_bytes
)
{
u_int32_t data_end_offset;
//
// need to set var_field_data_ptr to point to beginning of blobs, which
// is at the end of the var stuff (if they exist), if var stuff does not exist
// then the bottom variable will be 0, and var_field_data_ptr is already
// set correctly
//
if (len_of_offsets) {
switch (num_offset_bytes) {
case (1):
data_end_offset = (var_field_data_ptr - 1)[0];
break;
case (2):
data_end_offset = uint2korr(var_field_data_ptr - 2);
break;
default:
assert(false);
break;
}
}
else {
data_end_offset = 0;
}
*start_offset = data_end_offset;
}
TOKU_TYPE mysql_to_toku_type (Field* field) {
TOKU_TYPE ret_val = toku_type_unknown;
enum_field_types mysql_type = field->real_type();
......@@ -79,6 +158,23 @@ exit:
}
inline CHARSET_INFO* get_charset_from_num (u_int32_t charset_number) {
//
// patternmatched off of InnoDB, due to MySQL bug 42649
//
if (charset_number == default_charset_info->number) {
return default_charset_info;
}
else if (charset_number == my_charset_latin1.number) {
return &my_charset_latin1;
}
else {
return get_charset(charset_number, MYF(MY_WME));
}
}
//
// used to read the length of a variable sized field in a tokudb key (buf).
//
......@@ -360,15 +456,43 @@ exit:
return ret_val;
}
//
// partially copied from below
//
uchar* pack_toku_varbinary_from_desc(
uchar* to_tokudb,
const uchar* from_desc,
u_int32_t key_part_length, //number of bytes to use to encode the length in to_tokudb
u_int32_t field_length //length of field
)
{
u_int32_t length_bytes_in_tokudb = get_length_bytes_from_max(key_part_length);
u_int32_t length = field_length;
set_if_smaller(length, key_part_length);
//
// copy the length bytes, assuming both are in little endian
//
to_tokudb[0] = (uchar)length & 255;
if (length_bytes_in_tokudb > 1) {
to_tokudb[1] = (uchar) (length >> 8);
}
//
// copy the string
//
memcpy(to_tokudb + length_bytes_in_tokudb, from_desc, length);
return to_tokudb + length + length_bytes_in_tokudb;
}
inline uchar* pack_toku_varbinary(
uchar* to_tokudb,
uchar* from_mysql,
u_int32_t length_bytes_in_tokudb, //number of bytes to use to encode the length in to_tokudb
u_int32_t length_bytes_in_mysql, //number of bytes used to encode the length in from_mysql
u_int32_t max_num_bytes
)
{
u_int32_t length = 0;
u_int32_t length_bytes_in_tokudb;
switch (length_bytes_in_mysql) {
case (0):
length = max_num_bytes;
......@@ -386,8 +510,13 @@ inline uchar* pack_toku_varbinary(
length = uint4korr(from_mysql);
break;
}
//
// from this point on, functionality equivalent to pack_toku_varbinary_from_desc
//
set_if_smaller(length,max_num_bytes);
length_bytes_in_tokudb = get_length_bytes_from_max(max_num_bytes);
//
// copy the length bytes, assuming both are in little endian
//
......@@ -563,6 +692,56 @@ inline uchar* unpack_toku_blob(
}
//
// partially copied from below
//
uchar* pack_toku_varstring_from_desc(
uchar* to_tokudb,
const uchar* from_desc,
u_int32_t key_part_length, //number of bytes to use to encode the length in to_tokudb
u_int32_t field_length,
u_int32_t charset_num//length of field
)
{
CHARSET_INFO* charset = NULL;
u_int32_t length_bytes_in_tokudb = get_length_bytes_from_max(key_part_length);
u_int32_t length = field_length;
u_int32_t local_char_length = 0;
set_if_smaller(length, key_part_length);
charset = get_charset_from_num(charset_num);
//
// copy the string
//
local_char_length= ((charset->mbmaxlen > 1) ?
key_part_length/charset->mbmaxlen : key_part_length);
if (length > local_char_length)
{
local_char_length= my_charpos(
charset,
from_desc,
from_desc+length,
local_char_length
);
set_if_smaller(length, local_char_length);
}
//
// copy the length bytes, assuming both are in little endian
//
to_tokudb[0] = (uchar)length & 255;
if (length_bytes_in_tokudb > 1) {
to_tokudb[1] = (uchar) (length >> 8);
}
//
// copy the string
//
memcpy(to_tokudb + length_bytes_in_tokudb, from_desc, length);
return to_tokudb + length + length_bytes_in_tokudb;
}
inline uchar* pack_toku_varstring(
uchar* to_tokudb,
uchar* from_mysql,
......@@ -633,18 +812,7 @@ inline int cmp_toku_string(
int ret_val = 0;
CHARSET_INFO* charset = NULL;
//
// patternmatched off of InnoDB, due to MySQL bug 42649
//
if (charset_number == default_charset_info->number) {
charset = default_charset_info;
}
else if (charset_number == my_charset_latin1.number) {
charset = &my_charset_latin1;
}
else {
charset = get_charset(charset_number, MYF(MY_WME));
}
charset = get_charset_from_num(charset_number);
ret_val = charset->coll->strnncollsp(
charset,
......@@ -1071,7 +1239,6 @@ uchar* pack_toku_key_field(
new_pos = pack_toku_varbinary(
to_tokudb,
from_mysql,
get_length_bytes_from_max(key_part_length),
((Field_varstring *)field)->length_bytes,
key_part_length
);
......@@ -1132,7 +1299,6 @@ uchar* pack_key_toku_key_field(
new_pos = pack_toku_varbinary(
to_tokudb,
from_mysql,
get_length_bytes_from_max(key_part_length),
2, // for some idiotic reason, 2 bytes are always used here, regardless of length of field
key_part_length
);
......@@ -1394,3 +1560,1299 @@ int tokudb_prefix_cmp_dbt_key(DB *file, const DBT *keya, const DBT *keyb) {
}
u_int32_t create_toku_main_key_pack_descriptor (
uchar* buf
)
{
//
// The first four bytes always contain the offset of where the first key
// ends.
//
uchar* pos = buf + 4;
u_int32_t offset = 0;
//
// one byte states if this is the main dictionary
//
pos[0] = 1;
pos++;
goto exit;
exit:
offset = pos - buf;
buf[0] = (uchar)(offset & 255);
buf[1] = (uchar)((offset >> 8) & 255);
buf[2] = (uchar)((offset >> 16) & 255);
buf[3] = (uchar)((offset >> 24) & 255);
return pos - buf;
}
#define COL_FIX_FIELD 0x11
#define COL_VAR_FIELD 0x22
#define COL_BLOB_FIELD 0x33
#define COL_HAS_NO_CHARSET 0x44
#define COL_HAS_CHARSET 0x55
#define COL_FIX_PK_OFFSET 0x66
#define COL_VAR_PK_OFFSET 0x77
#define CK_FIX_RANGE 0x88
#define CK_VAR_RANGE 0x99
#define COPY_OFFSET_TO_BUF memcpy ( \
pos, \
&kc_info->cp_info[pk_index][field_index].col_pack_val, \
sizeof(u_int32_t) \
); \
pos += sizeof(u_int32_t);
u_int32_t pack_desc_pk_info(uchar* buf, KEY_AND_COL_INFO* kc_info, TABLE_SHARE* table_share, KEY_PART_INFO* key_part) {
uchar* pos = buf;
uint16 field_index = key_part->field->field_index;
Field* field = table_share->field[field_index];
TOKU_TYPE toku_type = mysql_to_toku_type(field);
u_int32_t key_part_length = key_part->length;
u_int32_t field_length;
uchar len_bytes = 0;
switch(toku_type) {
case (toku_type_int):
case (toku_type_double):
case (toku_type_float):
pos[0] = COL_FIX_FIELD;
pos++;
pos[0] = kc_info->field_lengths[field_index];
pos++;
break;
case (toku_type_fixbinary):
pos[0] = COL_FIX_FIELD;
pos++;
field_length = field->pack_length();
set_if_smaller(key_part_length, field_length);
assert(key_part_length < 256);
pos[0] = (uchar)key_part_length;
pos++;
break;
case (toku_type_fixstring):
pos[0] = COL_VAR_FIELD;
pos++;
// always one length byte for fix fields
assert(key_part_length <= 255);
pos[0] = 1;
pos++;
break;
case (toku_type_varbinary):
case (toku_type_varstring):
case (toku_type_blob):
pos[0] = COL_VAR_FIELD;
pos++;
len_bytes = (key_part_length > 255) ? 2 : 1;
pos[0] = len_bytes;
pos++;
break;
default:
assert(false);
}
return pos - buf;
}
u_int32_t pack_desc_pk_offset_info(
uchar* buf,
KEY_AND_COL_INFO* kc_info,
TABLE_SHARE* table_share,
KEY_PART_INFO* key_part,
KEY* prim_key,
uchar* pk_info
)
{
uchar* pos = buf;
uint16 field_index = key_part->field->field_index;
bool found_col_in_pk = false;
u_int32_t index_in_pk;
bool is_constant_offset = true;
u_int32_t offset = 0;
for (uint i = 0; i < prim_key->key_parts; i++) {
KEY_PART_INFO curr = prim_key->key_part[i];
uint16 curr_field_index = curr.field->field_index;
if (pk_info[2*i] == COL_VAR_FIELD) {
is_constant_offset = false;
}
if (curr_field_index == field_index) {
found_col_in_pk = true;
index_in_pk = i;
break;
}
offset += pk_info[2*i + 1];
}
assert(found_col_in_pk);
if (is_constant_offset) {
pos[0] = COL_FIX_PK_OFFSET;
pos++;
memcpy (pos, &offset, sizeof(offset));
pos += sizeof(offset);
}
else {
pos[0] = COL_VAR_PK_OFFSET;
pos++;
memcpy(pos, &index_in_pk, sizeof(index_in_pk));
pos += sizeof(index_in_pk);
}
return pos - buf;
}
u_int32_t pack_desc_offset_info(uchar* buf, KEY_AND_COL_INFO* kc_info, uint pk_index, TABLE_SHARE* table_share, KEY_PART_INFO* key_part) {
uchar* pos = buf;
uint16 field_index = key_part->field->field_index;
Field* field = table_share->field[field_index];
TOKU_TYPE toku_type = mysql_to_toku_type(field);
bool found_index = false;
switch(toku_type) {
case (toku_type_int):
case (toku_type_double):
case (toku_type_float):
case (toku_type_fixbinary):
case (toku_type_fixstring):
pos[0] = COL_FIX_FIELD;
pos++;
// copy the offset
COPY_OFFSET_TO_BUF;
break;
case (toku_type_varbinary):
case (toku_type_varstring):
pos[0] = COL_VAR_FIELD;
pos++;
// copy the offset
COPY_OFFSET_TO_BUF;
break;
case (toku_type_blob):
pos[0] = COL_BLOB_FIELD;
pos++;
for (u_int32_t i = 0; i < kc_info->num_blobs; i++) {
u_int32_t blob_index = kc_info->blob_fields[i];
if (blob_index == field_index) {
u_int32_t val = i;
memcpy(pos, &val, sizeof(u_int32_t));
pos += sizeof(u_int32_t);
found_index = true;
break;
}
}
assert(found_index);
break;
default:
assert(false);
}
return pos - buf;
}
u_int32_t pack_desc_key_length_info(uchar* buf, KEY_AND_COL_INFO* kc_info, TABLE_SHARE* table_share, KEY_PART_INFO* key_part) {
uchar* pos = buf;
uint16 field_index = key_part->field->field_index;
Field* field = table_share->field[field_index];
TOKU_TYPE toku_type = mysql_to_toku_type(field);
u_int32_t key_part_length = key_part->length;
u_int32_t field_length;
switch(toku_type) {
case (toku_type_int):
case (toku_type_double):
case (toku_type_float):
// copy the key_part length
field_length = kc_info->field_lengths[field_index];
memcpy(pos, &field_length, sizeof(field_length));
pos += sizeof(key_part_length);
break;
case (toku_type_fixbinary):
case (toku_type_fixstring):
field_length = field->pack_length();
set_if_smaller(key_part_length, field_length);
case (toku_type_varbinary):
case (toku_type_varstring):
case (toku_type_blob):
// copy the key_part length
memcpy(pos, &key_part_length, sizeof(key_part_length));
pos += sizeof(key_part_length);
break;
default:
assert(false);
}
return pos - buf;
}
u_int32_t pack_desc_char_info(uchar* buf, KEY_AND_COL_INFO* kc_info, TABLE_SHARE* table_share, KEY_PART_INFO* key_part) {
uchar* pos = buf;
uint16 field_index = key_part->field->field_index;
Field* field = table_share->field[field_index];
TOKU_TYPE toku_type = mysql_to_toku_type(field);
u_int32_t charset_num = 0;
switch(toku_type) {
case (toku_type_int):
case (toku_type_double):
case (toku_type_float):
case (toku_type_fixbinary):
case (toku_type_varbinary):
pos[0] = COL_HAS_NO_CHARSET;
pos++;
break;
case (toku_type_fixstring):
case (toku_type_varstring):
case (toku_type_blob):
pos[0] = COL_HAS_CHARSET;
pos++;
// copy the charset
charset_num = field->charset()->number;
pos[0] = (uchar)(charset_num & 255);
pos[1] = (uchar)((charset_num >> 8) & 255);
pos[2] = (uchar)((charset_num >> 16) & 255);
pos[3] = (uchar)((charset_num >> 24) & 255);
pos += 4;
break;
default:
assert(false);
}
return pos - buf;
}
u_int32_t pack_some_row_info (
uchar* buf,
uint pk_index,
TABLE_SHARE* table_share,
KEY_AND_COL_INFO* kc_info
)
{
uchar* pos = buf;
u_int32_t num_null_bytes = 0;
//
// four bytes stating number of null bytes
//
num_null_bytes = table_share->null_bytes;
memcpy(pos, &num_null_bytes, sizeof(num_null_bytes));
pos += sizeof(num_null_bytes);
//
// eight bytes stating mcp_info
//
memcpy(pos, &kc_info->mcp_info[pk_index], sizeof(MULTI_COL_PACK_INFO));
pos += sizeof(MULTI_COL_PACK_INFO);
//
// one byte for the number of offset bytes
//
pos[0] = (uchar)kc_info->num_offset_bytes;
pos++;
return pos - buf;
}
u_int32_t get_max_clustering_val_pack_desc_size(
TABLE_SHARE* table_share
)
{
u_int32_t ret_val = 0;
//
// the fixed stuff:
// first the things in pack_some_row_info
// second another mcp_info
// third a byte that states if blobs exist
ret_val += sizeof(u_int32_t) + sizeof(MULTI_COL_PACK_INFO) + 1;
ret_val += sizeof(MULTI_COL_PACK_INFO);
ret_val++;
//
// now the variable stuff
// an upper bound is, for each field, byte stating if it is fixed or var, followed
// by 8 bytes for endpoints
//
ret_val += (table_share->fields)*(1 + 2*sizeof(u_int32_t));
//
// four bytes storing the length of this portion
//
ret_val += 4;
return ret_val;
}
u_int32_t create_toku_clustering_val_pack_descriptor (
uchar* buf,
uint pk_index,
TABLE_SHARE* table_share,
KEY_AND_COL_INFO* kc_info,
u_int32_t keynr,
bool is_clustering
)
{
uchar* pos = buf + 4;
u_int32_t offset = 0;
bool start_range_set = false;
u_int32_t last_col = 0;
//
// do not need to write anything if the key is not clustering
//
if (!is_clustering) {
goto exit;
}
pos += pack_some_row_info(
pos,
pk_index,
table_share,
kc_info
);
//
// eight bytes stating mcp_info of clustering key
//
memcpy(pos, &kc_info->mcp_info[keynr], sizeof(MULTI_COL_PACK_INFO));
pos += sizeof(MULTI_COL_PACK_INFO);
//
// store bit that states if blobs exist
//
pos[0] = (kc_info->num_blobs) ? 1 : 0;
pos++;
//
// descriptor assumes that all fields filtered from pk are
// also filtered from clustering key val. Doing check here to
// make sure something unexpected does not happen
//
for (uint i = 0; i < table_share->fields; i++) {
bool col_filtered = bitmap_is_set(&kc_info->key_filters[keynr],i);
bool col_filtered_in_pk = bitmap_is_set(&kc_info->key_filters[pk_index],i);
if (col_filtered_in_pk) {
assert(col_filtered);
}
}
//
// first handle the fixed fields
//
start_range_set = false;
last_col = 0;
for (uint i = 0; i < table_share->fields; i++) {
bool col_filtered = bitmap_is_set(&kc_info->key_filters[keynr],i);
if (kc_info->field_lengths[i] == 0) {
//
// not a fixed field, continue
//
continue;
}
if (col_filtered && start_range_set) {
//
// need to set the end range
//
start_range_set = false;
u_int32_t end_offset = kc_info->cp_info[pk_index][last_col].col_pack_val + kc_info->field_lengths[last_col];
memcpy(pos, &end_offset, sizeof(end_offset));
pos += sizeof(end_offset);
}
else if (!col_filtered) {
if (!start_range_set) {
pos[0] = CK_FIX_RANGE;
pos++;
start_range_set = true;
u_int32_t start_offset = kc_info->cp_info[pk_index][i].col_pack_val;
memcpy(pos, &start_offset , sizeof(start_offset));
pos += sizeof(start_offset);
}
last_col = i;
}
else {
continue;
}
}
if (start_range_set) {
//
// need to set the end range
//
start_range_set = false;
u_int32_t end_offset = kc_info->cp_info[pk_index][last_col].col_pack_val+ kc_info->field_lengths[last_col];
memcpy(pos, &end_offset, sizeof(end_offset));
pos += sizeof(end_offset);
}
//
// now handle the var fields
//
start_range_set = false;
last_col = 0;
for (uint i = 0; i < table_share->fields; i++) {
bool col_filtered = bitmap_is_set(&kc_info->key_filters[keynr],i);
if (kc_info->length_bytes[i] == 0) {
//
// not a var field, continue
//
continue;
}
if (col_filtered && start_range_set) {
//
// need to set the end range
//
start_range_set = false;
u_int32_t end_offset = kc_info->cp_info[pk_index][last_col].col_pack_val;
memcpy(pos, &end_offset, sizeof(end_offset));
pos += sizeof(end_offset);
}
else if (!col_filtered) {
if (!start_range_set) {
pos[0] = CK_VAR_RANGE;
pos++;
start_range_set = true;
u_int32_t start_offset = kc_info->cp_info[pk_index][i].col_pack_val;
memcpy(pos, &start_offset , sizeof(start_offset));
pos += sizeof(start_offset);
}
last_col = i;
}
else {
continue;
}
}
if (start_range_set) {
start_range_set = false;
u_int32_t end_offset = kc_info->cp_info[pk_index][last_col].col_pack_val;
memcpy(pos, &end_offset, sizeof(end_offset));
pos += sizeof(end_offset);
}
exit:
offset = pos - buf;
buf[0] = (uchar)(offset & 255);
buf[1] = (uchar)((offset >> 8) & 255);
buf[2] = (uchar)((offset >> 16) & 255);
buf[3] = (uchar)((offset >> 24) & 255);
return pos - buf;
}
u_int32_t pack_clustering_val_from_desc(
uchar* buf,
void* row_desc,
u_int32_t row_desc_size,
DBT* pk_val
)
{
uchar* null_bytes_src_ptr = NULL;
uchar* fixed_src_ptr = NULL;
uchar* var_src_offset_ptr = NULL;
uchar* var_src_data_ptr = NULL;
uchar* fixed_dest_ptr = NULL;
uchar* var_dest_offset_ptr = NULL;
uchar* var_dest_data_ptr = NULL;
uchar* orig_var_dest_data_ptr = NULL;
uchar* desc_pos = (uchar *)row_desc;
u_int32_t num_null_bytes = 0;
u_int32_t num_offset_bytes;
MULTI_COL_PACK_INFO src_mcp_info, dest_mcp_info;
uchar has_blobs;
memcpy(&num_null_bytes, desc_pos, sizeof(num_null_bytes));
desc_pos += sizeof(num_null_bytes);
memcpy(&src_mcp_info, desc_pos, sizeof(src_mcp_info));
desc_pos += sizeof(src_mcp_info);
num_offset_bytes = desc_pos[0];
desc_pos++;
memcpy(&dest_mcp_info, desc_pos, sizeof(dest_mcp_info));
desc_pos += sizeof(dest_mcp_info);
has_blobs = desc_pos[0];
desc_pos++;
//
//set the variables
//
null_bytes_src_ptr = (uchar *)pk_val->data;
fixed_src_ptr = null_bytes_src_ptr + num_null_bytes;
var_src_offset_ptr = fixed_src_ptr + src_mcp_info.var_len_offset;
var_src_data_ptr = var_src_offset_ptr + src_mcp_info.len_of_offsets;
fixed_dest_ptr = buf + num_null_bytes;
var_dest_offset_ptr = fixed_dest_ptr + dest_mcp_info.var_len_offset;
var_dest_data_ptr = var_dest_offset_ptr + dest_mcp_info.len_of_offsets;
orig_var_dest_data_ptr = var_dest_data_ptr;
//
// copy the null bytes
//
memcpy(buf, null_bytes_src_ptr, num_null_bytes);
while ( (u_int32_t)(desc_pos - (uchar *)row_desc) < row_desc_size) {
u_int32_t start, end, length;
uchar curr = desc_pos[0];
desc_pos++;
memcpy(&start, desc_pos, sizeof(start));
desc_pos += sizeof(start);
memcpy(&end, desc_pos, sizeof(end));
desc_pos += sizeof(end);
assert (start <= end);
if (curr == CK_FIX_RANGE) {
length = end - start;
memcpy(fixed_dest_ptr, fixed_src_ptr + start, length);
fixed_dest_ptr += length;
}
else if (curr == CK_VAR_RANGE) {
u_int32_t start_data_size;
u_int32_t start_data_offset;
u_int32_t end_data_size;
u_int32_t end_data_offset;
u_int32_t offset_diffs;
get_var_field_info(
&start_data_size,
&start_data_offset,
start,
var_src_offset_ptr,
num_offset_bytes
);
get_var_field_info(
&end_data_size,
&end_data_offset,
end,
var_src_offset_ptr,
num_offset_bytes
);
length = end_data_offset + end_data_size - start_data_offset;
//
// copy the data
//
memcpy(
var_dest_data_ptr,
var_src_data_ptr + start_data_offset,
length
);
var_dest_data_ptr += length;
//
// put in offset info
//
offset_diffs = (end_data_offset + end_data_size) - (u_int32_t)(var_dest_data_ptr - orig_var_dest_data_ptr);
for (u_int32_t i = start; i <= end; i++) {
if ( num_offset_bytes == 1 ) {
assert(offset_diffs < 256);
var_dest_offset_ptr[0] = var_src_offset_ptr[i] - (uchar)offset_diffs;
var_dest_offset_ptr++;
}
else if ( num_offset_bytes == 2 ) {
u_int32_t tmp = uint2korr(var_src_offset_ptr + 2*i);
u_int32_t new_offset = tmp - offset_diffs;
assert(new_offset < 1<<16);
int2store(var_dest_offset_ptr,new_offset);
var_dest_offset_ptr += 2;
}
else {
assert(false);
}
}
}
else {
assert(false);
}
}
//
// copy blobs
// at this point, var_dest_data_ptr is pointing to the end, where blobs should be located
// so, we put the blobs at var_dest_data_ptr
//
if (has_blobs) {
u_int32_t num_blob_bytes;
u_int32_t start_offset;
uchar* src_blob_ptr = NULL;
get_blob_field_info(
&start_offset,
src_mcp_info.len_of_offsets,
var_src_data_ptr,
num_offset_bytes
);
src_blob_ptr = var_src_data_ptr + start_offset;
num_blob_bytes = pk_val->size - (start_offset + (var_src_data_ptr - null_bytes_src_ptr));
memcpy(var_dest_data_ptr, src_blob_ptr, num_blob_bytes);
var_dest_data_ptr += num_blob_bytes;
}
return var_dest_data_ptr - buf;
}
u_int32_t get_max_secondary_key_pack_desc_size(
KEY_AND_COL_INFO* kc_info
)
{
u_int32_t ret_val = 0;
//
// the fixed stuff:
// byte that states if main dictionary
// byte that states if hpk
// the things in pack_some_row_info
ret_val++;
ret_val++;
ret_val += sizeof(u_int32_t) + sizeof(MULTI_COL_PACK_INFO) + 1;
//
// now variable sized stuff
//
// first the blobs
ret_val += sizeof(kc_info->num_blobs);
ret_val+= kc_info->num_blobs;
// then the pk
// one byte for num key parts
// two bytes for each key part
ret_val++;
ret_val += MAX_REF_PARTS*2;
// then the key
// null bit, then null byte,
// then 1 byte stating what it is, then 4 for offset, 4 for key length,
// 1 for if charset exists, and 4 for charset
ret_val += MAX_REF_PARTS*(1 + sizeof(u_int32_t) + 1 + 3*sizeof(u_int32_t) + 1);
//
// four bytes storing the length of this portion
//
ret_val += 4;
return ret_val;
}
u_int32_t create_toku_secondary_key_pack_descriptor (
uchar* buf,
bool has_hpk,
uint pk_index,
TABLE_SHARE* table_share,
TABLE* table,
KEY_AND_COL_INFO* kc_info,
KEY* key_info,
KEY* prim_key
)
{
//
// The first four bytes always contain the offset of where the first key
// ends.
//
uchar* pk_info = NULL;
uchar* pos = buf + 4;
u_int32_t offset = 0;
//
// first byte states that it is NOT main dictionary
//
pos[0] = 0;
pos++;
//
// one byte states if main dictionary has an hpk or not
//
if (has_hpk) {
pos[0] = 1;
}
else {
pos[0] = 0;
}
pos++;
pos += pack_some_row_info(
pos,
pk_index,
table_share,
kc_info
);
//
// store blob information
//
memcpy(pos, &kc_info->num_blobs, sizeof(kc_info->num_blobs));
pos += sizeof(u_int32_t);
for (u_int32_t i = 0; i < kc_info->num_blobs; i++) {
//
// store length bytes for each blob
//
Field* field = table_share->field[kc_info->blob_fields[i]];
pos[0] = (uchar)field->row_pack_length();
pos++;
}
//
// store the pk information
//
if (has_hpk) {
pos[0] = 0;
pos++;
}
else {
//
// store number of parts
//
assert(prim_key->key_parts < 128);
pos[0] = 2*prim_key->key_parts;
pos++;
//
// for each part, store if it is a fixed field or var field
// if fixed, store number of bytes, if var, store
// number of length bytes
// total should be two bytes per key part stored
//
pk_info = pos;
uchar* tmp = pos;
for (uint i = 0; i < prim_key->key_parts; i++) {
tmp += pack_desc_pk_info(
tmp,
kc_info,
table_share,
&prim_key->key_part[i]
);
}
//
// asserting that we moved forward as much as we think we have
//
assert(tmp - pos == (2*prim_key->key_parts));
pos = tmp;
}
for (uint i = 0; i < key_info->key_parts; i++) {
KEY_PART_INFO curr_kpi = key_info->key_part[i];
uint16 field_index = curr_kpi.field->field_index;
Field* field = table_share->field[field_index];
bool is_col_in_pk = false;
if (bitmap_is_set(&kc_info->key_filters[pk_index],field_index)) {
assert(!has_hpk && prim_key != NULL);
is_col_in_pk = true;
}
else {
is_col_in_pk = false;
}
pos[0] = field->null_bit;
pos++;
if (is_col_in_pk) {
//
// assert that columns in pk do not have a null bit
// because in MySQL, pk columns cannot be null
//
assert(!field->null_bit);
}
if (field->null_bit) {
u_int32_t null_offset = get_null_offset(table,table->field[field_index]);
memcpy(pos, &null_offset, sizeof(u_int32_t));
pos += sizeof(u_int32_t);
}
if (is_col_in_pk) {
pos += pack_desc_pk_offset_info(
pos,
kc_info,
table_share,
&curr_kpi,
prim_key,
pk_info
);
}
else {
pos += pack_desc_offset_info(
pos,
kc_info,
pk_index,
table_share,
&curr_kpi
);
}
pos += pack_desc_key_length_info(
pos,
kc_info,
table_share,
&curr_kpi
);
pos += pack_desc_char_info(
pos,
kc_info,
table_share,
&curr_kpi
);
}
offset = pos - buf;
buf[0] = (uchar)(offset & 255);
buf[1] = (uchar)((offset >> 8) & 255);
buf[2] = (uchar)((offset >> 16) & 255);
buf[3] = (uchar)((offset >> 24) & 255);
return pos - buf;
}
u_int32_t skip_key_in_desc(
uchar* row_desc
)
{
uchar* pos = row_desc;
uchar col_bin_or_char;
//
// skip the byte that states if it is a fix field or var field, we do not care
//
pos++;
//
// skip the offset information
//
pos += sizeof(u_int32_t);
//
// skip the key_part_length info
//
pos += sizeof(u_int32_t);
col_bin_or_char = pos[0];
pos++;
if (col_bin_or_char == COL_HAS_NO_CHARSET) {
goto exit;
}
//
// skip the charset info
//
pos += 4;
exit:
return (u_int32_t)(pos-row_desc);
}
u_int32_t max_key_size_from_desc(
void* row_desc,
u_int32_t row_desc_size
)
{
uchar* desc_pos = (uchar *)row_desc;
u_int32_t num_blobs;
u_int32_t num_pk_columns;
u_int32_t max_size = 0;
// skip byte that states if main dictionary
bool is_main_dictionary = desc_pos[0];
desc_pos++;
assert(!is_main_dictionary);
// skip hpk byte
desc_pos++;
// skip num_null_bytes
desc_pos += sizeof(u_int32_t);
// skip mcp_info
desc_pos += sizeof(MULTI_COL_PACK_INFO);
// skip offset_bytes
desc_pos++;
// skip over blobs
memcpy(&num_blobs, desc_pos, sizeof(num_blobs));
desc_pos += sizeof(num_blobs);
desc_pos += num_blobs;
// skip over pk info
num_pk_columns = desc_pos[0]/2;
desc_pos++;
desc_pos += 2*num_pk_columns;
while ( (u_int32_t)(desc_pos - (uchar *)row_desc) < row_desc_size) {
uchar has_charset;
u_int32_t key_length = 0;
uchar null_bit = desc_pos[0];
desc_pos++;
if (null_bit) {
//
// column is NULLable, skip null_offset, and add a null byte
//
max_size++;
desc_pos += sizeof(u_int32_t);
}
//
// skip over byte that states if fix or var
//
desc_pos++;
// skip over offset
desc_pos += sizeof(u_int32_t);
//
// get the key length and add it to return value
//
memcpy(&key_length, desc_pos, sizeof(key_length));
desc_pos += sizeof(key_length);
max_size += key_length;
max_size += 2; // 2 bytes for a potential length bytes, we are upperbounding, does not need to be super tight
has_charset = desc_pos[0];
desc_pos++;
u_int32_t charset_num;
if (has_charset == COL_HAS_CHARSET) {
// skip over charsent num
desc_pos += sizeof(charset_num);
}
else {
assert(has_charset == COL_HAS_NO_CHARSET);
}
}
return max_size;
}
u_int32_t pack_key_from_desc(
uchar* buf,
void* row_desc,
u_int32_t row_desc_size,
DBT* pk_key,
DBT* pk_val
)
{
MULTI_COL_PACK_INFO mcp_info;
u_int32_t num_null_bytes;
u_int32_t num_blobs;
u_int32_t num_pk_columns;
uchar* blob_lengths = NULL;
uchar* pk_info = NULL;
uchar* pk_data_ptr = NULL;
uchar* null_bytes_ptr = NULL;
uchar* fixed_field_ptr = NULL;
uchar* var_field_offset_ptr = NULL;
const uchar* var_field_data_ptr = NULL;
u_int32_t num_offset_bytes;
uchar* packed_key_pos = buf;
uchar* desc_pos = (uchar *)row_desc;
bool is_main_dictionary = desc_pos[0];
desc_pos++;
assert(!is_main_dictionary);
//
// get the constant info out of descriptor
//
bool hpk = desc_pos[0];
desc_pos++;
memcpy(&num_null_bytes, desc_pos, sizeof(num_null_bytes));
desc_pos += sizeof(num_null_bytes);
memcpy(&mcp_info, desc_pos, sizeof(mcp_info));
desc_pos += sizeof(mcp_info);
num_offset_bytes = desc_pos[0];
desc_pos++;
memcpy(&num_blobs, desc_pos, sizeof(num_blobs));
desc_pos += sizeof(num_blobs);
blob_lengths = desc_pos;
desc_pos += num_blobs;
num_pk_columns = desc_pos[0]/2;
desc_pos++;
pk_info = desc_pos;
desc_pos += 2*num_pk_columns;
//
// now start packing the key
//
//
// pack the infinity byte
//
packed_key_pos[0] = COL_ZERO;
packed_key_pos++;
//
// now start packing each column of the key, as described in descriptor
//
if (!hpk) {
// +1 for the infinity byte
pk_data_ptr = (uchar *)pk_key->data + 1;
}
null_bytes_ptr = (uchar *)pk_val->data;
fixed_field_ptr = null_bytes_ptr + num_null_bytes;
var_field_offset_ptr = fixed_field_ptr + mcp_info.var_len_offset;
var_field_data_ptr = var_field_offset_ptr + mcp_info.len_of_offsets;
while ( (u_int32_t)(desc_pos - (uchar *)row_desc) < row_desc_size) {
uchar col_fix_val;
uchar has_charset;
u_int32_t col_pack_val = 0;
u_int32_t key_length = 0;
uchar null_bit = desc_pos[0];
desc_pos++;
if (null_bit) {
//
// column is NULLable, need to check the null bytes to see if it is NULL
//
u_int32_t null_offset = 0;
bool is_field_null;
memcpy(&null_offset, desc_pos, sizeof(null_offset));
desc_pos += sizeof(null_offset);
is_field_null = (null_bytes_ptr[null_offset] & null_bit) ? true: false;
if (is_field_null) {
packed_key_pos[0] = NULL_COL_VAL;
packed_key_pos++;
desc_pos += skip_key_in_desc(desc_pos);
continue;
}
else {
packed_key_pos[0] = NONNULL_COL_VAL;
packed_key_pos++;
}
}
//
// now pack the column (unless it was NULL, and we continued)
//
col_fix_val = desc_pos[0];
desc_pos++;
memcpy(&col_pack_val, desc_pos, sizeof(col_pack_val));
desc_pos += sizeof(col_pack_val);
memcpy(&key_length, desc_pos, sizeof(key_length));
desc_pos += sizeof(key_length);
has_charset = desc_pos[0];
desc_pos++;
u_int32_t charset_num;
if (has_charset == COL_HAS_CHARSET) {
memcpy(&charset_num, desc_pos, sizeof(charset_num));
desc_pos += sizeof(charset_num);
}
else {
assert(has_charset == COL_HAS_NO_CHARSET);
}
//
// case where column is in pk val
//
if (col_fix_val == COL_FIX_FIELD || col_fix_val == COL_VAR_FIELD || col_fix_val == COL_BLOB_FIELD) {
if (col_fix_val == COL_FIX_FIELD && has_charset == COL_HAS_NO_CHARSET) {
memcpy(packed_key_pos, &fixed_field_ptr[col_pack_val], key_length);
packed_key_pos += key_length;
}
else if (col_fix_val == COL_VAR_FIELD && has_charset == COL_HAS_NO_CHARSET) {
u_int32_t data_start_offset = 0;
u_int32_t data_size = 0;
get_var_field_info(
&data_size,
&data_start_offset,
col_pack_val,
var_field_offset_ptr,
num_offset_bytes
);
//
// length of this field in this row is data_size
// data is located beginning at var_field_data_ptr + data_start_offset
//
packed_key_pos = pack_toku_varbinary_from_desc(
packed_key_pos,
var_field_data_ptr + data_start_offset,
key_length, //number of bytes to use to encode the length in to_tokudb
data_size //length of field
);
}
else {
const uchar* data_start = NULL;
u_int32_t data_start_offset = 0;
u_int32_t data_size = 0;
if (col_fix_val == COL_FIX_FIELD) {
data_start_offset = col_pack_val;
data_size = key_length;
data_start = fixed_field_ptr + data_start_offset;
}
else if (col_fix_val == COL_VAR_FIELD){
get_var_field_info(
&data_size,
&data_start_offset,
col_pack_val,
var_field_offset_ptr,
num_offset_bytes
);
data_start = var_field_data_ptr + data_start_offset;
}
else if (col_fix_val == COL_BLOB_FIELD) {
u_int32_t blob_index = col_pack_val;
u_int32_t blob_offset;
const uchar* blob_ptr = NULL;
u_int32_t field_len;
u_int32_t field_len_bytes = blob_lengths[blob_index];
get_blob_field_info(
&blob_offset,
mcp_info.len_of_offsets,
var_field_data_ptr,
num_offset_bytes
);
blob_ptr = var_field_data_ptr + blob_offset;
assert(num_blobs > 0);
//
// skip over other blobs to get to the one we want to make a key out of
//
for (u_int32_t i = 0; i < blob_index; i++) {
blob_ptr = unpack_toku_field_blob(
NULL,
blob_ptr,
blob_lengths[i],
true
);
}
//
// at this point, blob_ptr is pointing to the blob we want to make a key from
//
field_len = get_blob_field_len(blob_ptr, field_len_bytes);
//
// now we set the variables to make the key
//
data_start = blob_ptr + field_len_bytes;
data_size = field_len;
}
else {
assert(false);
}
packed_key_pos = pack_toku_varstring_from_desc(
packed_key_pos,
data_start,
key_length,
data_size,
charset_num
);
}
}
//
// case where column is in pk key
//
else {
if (col_fix_val == COL_FIX_PK_OFFSET) {
memcpy(packed_key_pos, &pk_data_ptr[col_pack_val], key_length);
packed_key_pos += key_length;
}
else if (col_fix_val == COL_VAR_PK_OFFSET) {
uchar* tmp_pk_data_ptr = pk_data_ptr;
u_int32_t index_in_pk = col_pack_val;
//
// skip along in pk to the right column
//
for (u_int32_t i = 0; i < index_in_pk; i++) {
if (pk_info[2*i] == COL_FIX_FIELD) {
tmp_pk_data_ptr += pk_info[2*i + 1];
}
else if (pk_info[2*i] == COL_VAR_FIELD) {
u_int32_t len_bytes = pk_info[2*i + 1];
u_int32_t len;
if (len_bytes == 1) {
len = tmp_pk_data_ptr[0];
tmp_pk_data_ptr++;
}
else if (len_bytes == 2) {
len = uint2korr(tmp_pk_data_ptr);
tmp_pk_data_ptr += 2;
}
else {
assert(false);
}
tmp_pk_data_ptr += len;
}
else {
assert(false);
}
}
//
// at this point, tmp_pk_data_ptr is pointing at the column
//
u_int32_t is_fix_field = pk_info[2*index_in_pk];
if (is_fix_field == COL_FIX_FIELD) {
memcpy(packed_key_pos, tmp_pk_data_ptr, key_length);
packed_key_pos += key_length;
}
else if (is_fix_field == COL_VAR_FIELD) {
const uchar* data_start = NULL;
u_int32_t data_size = 0;
u_int32_t len_bytes = pk_info[2*index_in_pk + 1];
if (len_bytes == 1) {
data_size = tmp_pk_data_ptr[0];
tmp_pk_data_ptr++;
}
else if (len_bytes == 2) {
data_size = uint2korr(tmp_pk_data_ptr);
tmp_pk_data_ptr += 2;
}
else {
assert(false);
}
data_start = tmp_pk_data_ptr;
if (has_charset == COL_HAS_CHARSET) {
packed_key_pos = pack_toku_varstring_from_desc(
packed_key_pos,
data_start,
key_length,
data_size,
charset_num
);
}
else if (has_charset == COL_HAS_NO_CHARSET) {
packed_key_pos = pack_toku_varbinary_from_desc(
packed_key_pos,
data_start,
key_length,
data_size //length of field
);
}
else {
assert(false);
}
}
else {
assert(false);
}
}
else {
assert(false);
}
}
}
assert( (u_int32_t)(desc_pos - (uchar *)row_desc) == row_desc_size);
//
// now append the primary key to the end of the key
//
if (hpk) {
memcpy(packed_key_pos, pk_key->data, pk_key->size);
packed_key_pos += pk_key->size;
}
else {
memcpy(packed_key_pos, (uchar *)pk_key->data + 1, pk_key->size - 1);
packed_key_pos += (pk_key->size - 1);
}
return (u_int32_t)(packed_key_pos - buf); //
}
......@@ -10,7 +10,92 @@ extern "C" {
#include <db.h>
typedef struct st_col_pack_info {
u_int32_t col_pack_val; //offset if fixed, pack_index if var
} COL_PACK_INFO;
typedef struct st_multi_col_pack_info {
u_int32_t var_len_offset; //where the fixed length stuff ends and the offsets for var stuff begins
u_int32_t len_of_offsets; //length of the offset bytes in a packed row
} MULTI_COL_PACK_INFO;
typedef struct st_key_and_col_info {
MY_BITMAP key_filters[MAX_KEY+1];
uchar* field_lengths; //stores the field lengths of fixed size fields (255 max)
uchar* length_bytes; // stores the length of lengths of varchars and varbinaries
u_int32_t* blob_fields; // list of indexes of blob fields
u_int32_t num_blobs;
MULTI_COL_PACK_INFO mcp_info[MAX_KEY+1];
COL_PACK_INFO* cp_info[MAX_KEY+1];
u_int32_t num_offset_bytes; //number of bytes needed to encode the offset
} KEY_AND_COL_INFO;
void get_var_field_info(
u_int32_t* field_len,
u_int32_t* start_offset,
u_int32_t var_field_index,
const uchar* var_field_offset_ptr,
u_int32_t num_offset_bytes
);
void get_blob_field_info(
u_int32_t* start_offset,
u_int32_t len_of_offsets,
const uchar* var_field_data_ptr,
u_int32_t num_offset_bytes
);
inline u_int32_t get_blob_field_len(
const uchar* from_tokudb,
u_int32_t len_bytes
)
{
u_int32_t length = 0;
switch (len_bytes) {
case (1):
length = (u_int32_t)(*from_tokudb);
break;
case (2):
length = uint2korr(from_tokudb);
break;
case (3):
length = uint3korr(from_tokudb);
break;
case (4):
length = uint4korr(from_tokudb);
break;
default:
assert(false);
}
return length;
}
inline const uchar* unpack_toku_field_blob(
uchar *to_mysql,
const uchar* from_tokudb,
u_int32_t len_bytes,
bool skip
)
{
u_int32_t length = 0;
const uchar* data_ptr = NULL;
if (!skip) {
memcpy(to_mysql, from_tokudb, len_bytes);
}
length = get_blob_field_len(from_tokudb,len_bytes);
data_ptr = from_tokudb + len_bytes;
if (!skip) {
memcpy(to_mysql + len_bytes, (uchar *)(&data_ptr), sizeof(uchar *));
}
return (from_tokudb + len_bytes + length);
}
inline uint get_null_offset(TABLE* table, Field* field) {
return (uint) ((uchar*) field->null_ptr - (uchar*) table->record[0]);
}
typedef enum {
......@@ -29,6 +114,21 @@ typedef enum {
TOKU_TYPE mysql_to_toku_type (Field* field);
uchar* pack_toku_varbinary_from_desc(
uchar* to_tokudb,
const uchar* from_desc,
u_int32_t key_part_length, //number of bytes to use to encode the length in to_tokudb
u_int32_t field_length //length of field
);
uchar* pack_toku_varstring_from_desc(
uchar* to_tokudb,
const uchar* from_desc,
u_int32_t key_part_length, //number of bytes to use to encode the length in to_tokudb
u_int32_t field_length,
u_int32_t charset_num//length of field
);
uchar* pack_toku_key_field(
uchar* to_tokudb,
......@@ -107,5 +207,78 @@ int create_toku_key_descriptor(
bool is_second_hpk,
KEY* second_key
);
u_int32_t create_toku_main_key_pack_descriptor (
uchar* buf
);
u_int32_t get_max_clustering_val_pack_desc_size(
TABLE_SHARE* table_share
);
u_int32_t create_toku_clustering_val_pack_descriptor (
uchar* buf,
uint pk_index,
TABLE_SHARE* table_share,
KEY_AND_COL_INFO* kc_info,
u_int32_t keynr,
bool is_clustering
);
inline bool is_key_clustering(
void* row_desc,
u_int32_t row_desc_size
)
{
return (row_desc_size > 0);
}
u_int32_t pack_clustering_val_from_desc(
uchar* buf,
void* row_desc,
u_int32_t row_desc_size,
DBT* pk_val
);
u_int32_t get_max_secondary_key_pack_desc_size(
KEY_AND_COL_INFO* kc_info
);
u_int32_t create_toku_secondary_key_pack_descriptor (
uchar* buf,
bool has_hpk,
uint pk_index,
TABLE_SHARE* table_share,
TABLE* table,
KEY_AND_COL_INFO* kc_info,
KEY* key_info,
KEY* prim_key
);
inline bool is_key_pk(
void* row_desc,
u_int32_t row_desc_size
)
{
uchar* buf = (uchar *)row_desc;
return buf[0];
}
u_int32_t max_key_size_from_desc(
void* row_desc,
u_int32_t row_desc_size
);
u_int32_t pack_key_from_desc(
uchar* buf,
void* row_desc,
u_int32_t row_desc_size,
DBT* pk_key,
DBT* pk_val
);
#endif
......@@ -35,6 +35,7 @@ extern ulong tokudb_debug;
#define TOKUDB_DEBUG_AUTO_INCREMENT 64
#define TOKUDB_DEBUG_LOCK 256
#define TOKUDB_DEBUG_LOCKRETRY 512
#define TOKUDB_DEBUG_CHECK_KEY 1024
#define TOKUDB_TRACE(f, ...) \
printf("%d:%s:%d:" f, my_tid(), __FILE__, __LINE__, ##__VA_ARGS__);
......
......@@ -106,7 +106,7 @@ u_int32_t tokudb_write_status_frequency;
u_int32_t tokudb_read_status_frequency;
my_bool tokudb_prelock_empty;
#ifdef TOKUDB_VERSION
char *tokudb_version = TOKUDB_VERSION;
char *tokudb_version = (char *)TOKUDB_VERSION;
#else
char *tokudb_version;
#endif
......@@ -273,6 +273,9 @@ static int tokudb_init_func(void *p) {
if (tokudb_debug & TOKUDB_DEBUG_INIT) TOKUDB_TRACE("%s:env open:flags=%x\n", __FUNCTION__, tokudb_init_flags);
r = db_env->set_multiple_callbacks(db_env, generate_keys_vals_for_put, cleanup_keys_vals_for_put, NULL, NULL);
assert(!r);
r = db_env->open(db_env, tokudb_home, tokudb_init_flags, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH|S_IWOTH);
if (tokudb_debug & TOKUDB_DEBUG_INIT) TOKUDB_TRACE("%s:env opened:return=%d\n", __FUNCTION__, r);
......@@ -285,7 +288,6 @@ static int tokudb_init_func(void *p) {
r = db_env->checkpointing_set_period(db_env, tokudb_checkpointing_period);
assert(!r);
r = db_create(&metadata_db, db_env, 0);
if (r) {
DBUG_PRINT("info", ("failed to create metadata db %d\n", r));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment