Commit 17b4ad3a authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

refs #5334 support inplace expansion of int fields in 5.5

git-svn-id: file:///svn/mysql/tokudb-engine/tokudb-engine@47574 c7de825b-a66e-492c-adef-691d508d4ae1
parent a2b7b014
...@@ -530,7 +530,9 @@ public: ...@@ -530,7 +530,9 @@ public:
int alter_table_add_index(TABLE *altered_table, Alter_inplace_info *ha_alter_info); int alter_table_add_index(TABLE *altered_table, Alter_inplace_info *ha_alter_info);
int alter_table_drop_index(TABLE *altered_table, Alter_inplace_info *ha_alter_info); int alter_table_drop_index(TABLE *altered_table, Alter_inplace_info *ha_alter_info);
int alter_table_add_or_drop_column(TABLE *altered_table, Alter_inplace_info *ha_alter_info); int alter_table_add_or_drop_column(TABLE *altered_table, Alter_inplace_info *ha_alter_info);
int alter_table_change_varchar_column(TABLE *altered_table, Alter_inplace_info *ha_alter_info); int alter_table_expand_varchar_columns(TABLE *altered_table, Alter_inplace_info *ha_alter_info);
int alter_table_expand_all_fixed_columns(TABLE *altered_table, Alter_inplace_info *ha_alter_info);
int alter_table_expand_fixed_column(TABLE *altered_table, Alter_inplace_info *ha_alter_info, int expand_field_num);
void print_alter_info(TABLE *altered_table, Alter_inplace_info *ha_alter_info); void print_alter_info(TABLE *altered_table, Alter_inplace_info *ha_alter_info);
int setup_kc_info(TABLE *altered_table, KEY_AND_COL_INFO *kc_info); int setup_kc_info(TABLE *altered_table, KEY_AND_COL_INFO *kc_info);
public: public:
......
...@@ -24,7 +24,8 @@ public: ...@@ -24,7 +24,8 @@ public:
compression_changed = false; compression_changed = false;
table_kc_info = NULL; table_kc_info = NULL;
altered_table_kc_info = NULL; altered_table_kc_info = NULL;
change_column_update_needed = false; expand_varchar_update_needed = false;
expand_fixed_update_needed = false;
} }
~tokudb_alter_ctx() { ~tokudb_alter_ctx() {
if (altered_table_kc_info) if (altered_table_kc_info)
...@@ -38,7 +39,8 @@ public: ...@@ -38,7 +39,8 @@ public:
bool drop_index_changed; bool drop_index_changed;
bool compression_changed; bool compression_changed;
enum toku_compression_method orig_compression_method; enum toku_compression_method orig_compression_method;
bool change_column_update_needed; bool expand_varchar_update_needed;
bool expand_fixed_update_needed;
Dynamic_array<uint> changed_fields; Dynamic_array<uint> changed_fields;
KEY_AND_COL_INFO *table_kc_info; KEY_AND_COL_INFO *table_kc_info;
KEY_AND_COL_INFO *altered_table_kc_info; KEY_AND_COL_INFO *altered_table_kc_info;
...@@ -139,7 +141,9 @@ fix_handler_flags(TABLE *table, TABLE *altered_table, Alter_inplace_info *ha_alt ...@@ -139,7 +141,9 @@ fix_handler_flags(TABLE *table, TABLE *altered_table, Alter_inplace_info *ha_alt
} }
} }
// try to change ALTER_COLUMN_TYPE to ALTER_COLUMN_EQUAL_PACK_LENGTH // Mysql thinks that varchar(100) is a different type than varchar(300) due to the size of the length field.
// Therefore, it encodes an alter varchar expansion as a ALTER_COLUMN_TYPE rather than an ALTER_COLUMN_EQUAL_PACK_LENGTH.
// Try to change ALTER_COLUMN_TYPE to ALTER_COLUMN_EQUAL_PACK_LENGTH
if (handler_flags & Alter_inplace_info::ALTER_COLUMN_TYPE) { if (handler_flags & Alter_inplace_info::ALTER_COLUMN_TYPE) {
bool change_it = false; bool change_it = false;
Dynamic_array<uint> changed_fields; Dynamic_array<uint> changed_fields;
...@@ -388,11 +392,11 @@ ha_tokudb::inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha_alte ...@@ -388,11 +392,11 @@ ha_tokudb::inplace_alter_table(TABLE *altered_table, Alter_inplace_info *ha_alte
ctx->compression_changed = true; ctx->compression_changed = true;
} }
} }
if (error == 0 && (ctx->handler_flags & Alter_inplace_info::ALTER_COLUMN_EQUAL_PACK_LENGTH)) { if (error == 0 && ctx->expand_varchar_update_needed)
if (ctx->change_column_update_needed) { error = alter_table_expand_varchar_columns(altered_table, ha_alter_info);
error = alter_table_change_varchar_column(altered_table, ha_alter_info);
} if (error == 0 && ctx->expand_fixed_update_needed)
} error = alter_table_expand_all_fixed_columns(altered_table, ha_alter_info);
bool result = false; // success bool result = false; // success
if (error) { if (error) {
...@@ -598,47 +602,6 @@ ha_tokudb::alter_table_add_or_drop_column(TABLE *altered_table, Alter_inplace_in ...@@ -598,47 +602,6 @@ ha_tokudb::alter_table_add_or_drop_column(TABLE *altered_table, Alter_inplace_in
return error; return error;
} }
// Handle change column varchar expansion. For all clustered keys, broadcast an update message to readjust the varchar offsets.
int
ha_tokudb::alter_table_change_varchar_column(TABLE *altered_table, Alter_inplace_info *ha_alter_info) {
int error = 0;
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
uint32_t curr_num_DBs = table->s->keys + test(hidden_primary_key);
for (uint32_t i = 0; i < curr_num_DBs; i++) {
if (i == primary_key || table_share->key_info[i].flags & HA_CLUSTERING) {
uint32_t offset_start = table_share->null_bytes + share->kc_info.mcp_info[i].fixed_field_size;
uint32_t offset_end = offset_start + share->kc_info.mcp_info[i].len_of_offsets;
// make the expand varchar offsets message
DBT expand = {};
expand.size = sizeof (uchar) + sizeof offset_start + sizeof offset_end;
expand.data = my_malloc(expand.size, MYF(MY_WME));
if (!expand.data) {
error = ENOMEM;
break;
}
uchar *expand_ptr = (uchar *)expand.data;
expand_ptr[0] = EXPAND_VARCHAR_OFFSETS;
expand_ptr += sizeof (uchar);
memcpy(expand_ptr, &offset_start, sizeof offset_start);
expand_ptr += sizeof offset_start;
memcpy(expand_ptr, &offset_end, sizeof offset_end);
expand_ptr += sizeof offset_end;
// and broadcast it into the tree
error = share->key_file[i]->update_broadcast(share->key_file[i], ctx->alter_txn, &expand, DB_IS_RESETTING_OP);
my_free(expand.data);
if (error)
break;
}
}
return error;
}
// Commit or abort the alter operations. // Commit or abort the alter operations.
// If commit then write the new frm data to the status using the alter transaction. // If commit then write the new frm data to the status using the alter transaction.
// If abort then abort the alter transaction and try to rollback the non-transactional changes. // If abort then abort the alter transaction and try to rollback the non-transactional changes.
...@@ -707,17 +670,49 @@ ha_tokudb::commit_inplace_alter_table(TABLE *altered_table, Alter_inplace_info * ...@@ -707,17 +670,49 @@ ha_tokudb::commit_inplace_alter_table(TABLE *altered_table, Alter_inplace_info *
int int
ha_tokudb::setup_kc_info(TABLE *altered_table, KEY_AND_COL_INFO *altered_kc_info) { ha_tokudb::setup_kc_info(TABLE *altered_table, KEY_AND_COL_INFO *altered_kc_info) {
int error = allocate_key_and_col_info(altered_table->s, altered_kc_info); int error = allocate_key_and_col_info(altered_table->s, altered_kc_info);
if (error) { if (error == 0)
error = initialize_key_and_col_info(altered_table->s, altered_table, altered_kc_info, hidden_primary_key, primary_key);
return error; return error;
}
// Handle change column varchar expansion. For all clustered keys, broadcast an update message to readjust the varchar offsets.
int
ha_tokudb::alter_table_expand_varchar_columns(TABLE *altered_table, Alter_inplace_info *ha_alter_info) {
int error = 0;
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
uint32_t curr_num_DBs = table->s->keys + test(hidden_primary_key);
for (uint32_t i = 0; i < curr_num_DBs; i++) {
if (i == primary_key || table_share->key_info[i].flags & HA_CLUSTERING) {
uint32_t offset_start = table_share->null_bytes + share->kc_info.mcp_info[i].fixed_field_size;
uint32_t offset_end = offset_start + share->kc_info.mcp_info[i].len_of_offsets;
// make the expand varchar offsets message
DBT expand = {};
expand.size = sizeof (uchar) + sizeof offset_start + sizeof offset_end;
expand.data = my_malloc(expand.size, MYF(MY_WME));
if (!expand.data) {
error = ENOMEM;
break;
}
uchar *expand_ptr = (uchar *)expand.data;
expand_ptr[0] = TOKU_OP_EXPAND_VARCHAR_OFFSETS;
expand_ptr += sizeof (uchar);
memcpy(expand_ptr, &offset_start, sizeof offset_start);
expand_ptr += sizeof offset_start;
memcpy(expand_ptr, &offset_end, sizeof offset_end);
expand_ptr += sizeof offset_end;
// and broadcast it into the tree
error = share->key_file[i]->update_broadcast(share->key_file[i], ctx->alter_txn, &expand, DB_IS_RESETTING_OP);
my_free(expand.data);
if (error)
break;
}
} }
error = initialize_key_and_col_info(
altered_table->s,
altered_table,
altered_kc_info,
hidden_primary_key,
primary_key
);
return error; return error;
} }
...@@ -760,7 +755,7 @@ change_varchar_length_is_supported(TABLE *table, TABLE *altered_table, Alter_inp ...@@ -760,7 +755,7 @@ change_varchar_length_is_supported(TABLE *table, TABLE *altered_table, Alter_inp
if (ctx->table_kc_info->num_offset_bytes > ctx->altered_table_kc_info->num_offset_bytes) if (ctx->table_kc_info->num_offset_bytes > ctx->altered_table_kc_info->num_offset_bytes)
return false; // something is wrong return false; // something is wrong
if (ctx->table_kc_info->num_offset_bytes < ctx->altered_table_kc_info->num_offset_bytes) if (ctx->table_kc_info->num_offset_bytes < ctx->altered_table_kc_info->num_offset_bytes)
ctx->change_column_update_needed = true; // sum of varchar lengths changed from 1 to 2 ctx->expand_varchar_update_needed = true; // sum of varchar lengths changed from 1 to 2
} }
return true; return true;
} }
...@@ -788,6 +783,134 @@ change_length_is_supported(TABLE *table, TABLE *altered_table, Alter_inplace_inf ...@@ -788,6 +783,134 @@ change_length_is_supported(TABLE *table, TABLE *altered_table, Alter_inplace_inf
return false; return false;
} }
int
ha_tokudb::alter_table_expand_all_fixed_columns(TABLE *altered_table, Alter_inplace_info *ha_alter_info) {
int error = 0;
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
assert(ctx->changed_fields.elements() <= 1);
for (int ai = 0; error == 0 && ai < ctx->changed_fields.elements(); ai++) {
uint expand_field_num = ctx->changed_fields.at(ai);
error = alter_table_expand_fixed_column(altered_table, ha_alter_info, expand_field_num);
}
return error;
}
static uint32_t
field_offset(uint32_t null_bytes, KEY_AND_COL_INFO *kc_info, int expand_field_num) {
uint32_t offset = null_bytes;
for (int i = 0; i < expand_field_num; i++)
offset += kc_info->field_lengths[i];
return offset;
}
static bool
is_unsigned(Field *f) {
switch (f->key_type()) {
case HA_KEYTYPE_BINARY:
case HA_KEYTYPE_USHORT_INT:
case HA_KEYTYPE_UINT24:
case HA_KEYTYPE_ULONG_INT:
case HA_KEYTYPE_ULONGLONG:
return true;
case HA_KEYTYPE_INT8:
case HA_KEYTYPE_SHORT_INT:
case HA_KEYTYPE_INT24:
case HA_KEYTYPE_LONG_INT:
case HA_KEYTYPE_LONGLONG:
return false;
default:
assert(0);
}
}
// Send an expand message into all clustered indexes including the primary
int
ha_tokudb::alter_table_expand_fixed_column(TABLE *altered_table, Alter_inplace_info *ha_alter_info, int expand_field_num) {
int error = 0;
tokudb_alter_ctx *ctx = static_cast<tokudb_alter_ctx *>(ha_alter_info->handler_ctx);
Field *old_field = table->field[expand_field_num];
TOKU_TYPE old_field_type = mysql_to_toku_type(old_field);
Field *new_field = altered_table->field[expand_field_num];
TOKU_TYPE new_field_type = mysql_to_toku_type(new_field);
assert(old_field_type == new_field_type);
uint32_t curr_num_DBs = table->s->keys + test(hidden_primary_key);
for (uint32_t i = 0; i < curr_num_DBs; i++) {
if (i == primary_key || table_share->key_info[i].flags & HA_CLUSTERING) {
uchar operation;
// make the expand int field message
DBT expand = {};
expand.size = 1+4+4+4+4;
switch (old_field_type) {
case toku_type_int:
operation = TOKU_OP_EXPAND_INT_TYPE;
expand.size += 1;
break;
case toku_type_fixstring:
operation = TOKU_OP_EXPAND_CHAR_TYPE;
break;
case toku_type_fixbinary:
operation = TOKU_OP_EXPAND_BINARY_TYPE;
break;
default:
assert(0);
}
expand.data = my_malloc(expand.size, MYF(MY_WME));
if (!expand.data) {
error = ENOMEM;
break;
}
uchar *expand_ptr = (uchar *)expand.data;
expand_ptr[0] = operation;
expand_ptr += sizeof (uchar);
uint32_t old_offset = field_offset(table_share->null_bytes, ctx->table_kc_info, expand_field_num);
memcpy(expand_ptr, &old_offset, sizeof old_offset);
expand_ptr += sizeof old_offset;
uint32_t old_length = ctx->table_kc_info->field_lengths[expand_field_num];
assert(old_length == old_field->pack_length());
memcpy(expand_ptr, &old_length, sizeof old_length);
expand_ptr += sizeof old_length;
uint32_t new_offset = field_offset(table_share->null_bytes, ctx->altered_table_kc_info, expand_field_num);
memcpy(expand_ptr, &new_offset, sizeof new_offset);
expand_ptr += sizeof new_offset;
uint32_t new_length = ctx->altered_table_kc_info->field_lengths[expand_field_num];
assert(new_length == new_field->pack_length());
memcpy(expand_ptr, &new_length, sizeof new_length);
expand_ptr += sizeof new_length;
switch (operation) {
case TOKU_OP_EXPAND_INT_TYPE:
assert(is_unsigned(old_field) == is_unsigned(new_field));
expand_ptr[0] = is_unsigned(old_field);
expand_ptr += sizeof (uchar);
break;
case TOKU_OP_EXPAND_CHAR_TYPE:
case TOKU_OP_EXPAND_BINARY_TYPE:
break;
default:
assert(0);
}
assert(expand_ptr == (uchar *)expand.data + expand.size);
// and broadcast it into the tree
error = share->key_file[i]->update_broadcast(share->key_file[i], ctx->alter_txn, &expand, DB_IS_RESETTING_OP);
my_free(expand.data);
if (error)
break;
}
}
return error;
}
// Return true if the MySQL type is an int type // Return true if the MySQL type is an int type
static bool static bool
is_int_type(enum_field_types t) { is_int_type(enum_field_types t) {
...@@ -803,9 +926,9 @@ is_int_type(enum_field_types t) { ...@@ -803,9 +926,9 @@ is_int_type(enum_field_types t) {
} }
} }
// Return true if two int type fields are be changed inplace // Return true if two fixed length fields can be changed inplace
static bool static bool
change_type_int_is_supported(TABLE *table, TABLE *altered_table, Field *old_field, Field *new_field, bool &update_needed) { change_fixed_length_is_supported(TABLE *table, TABLE *altered_table, Field *old_field, Field *new_field, tokudb_alter_ctx *ctx) {
// no change in size is supported // no change in size is supported
if (old_field->pack_length() == new_field->pack_length()) if (old_field->pack_length() == new_field->pack_length())
return true; return true;
...@@ -814,19 +937,27 @@ change_type_int_is_supported(TABLE *table, TABLE *altered_table, Field *old_fiel ...@@ -814,19 +937,27 @@ change_type_int_is_supported(TABLE *table, TABLE *altered_table, Field *old_fiel
return false; return false;
if (field_in_key(table, old_field)) if (field_in_key(table, old_field))
return false; return false;
update_needed = true; ctx->expand_fixed_update_needed = true;
return true; return true;
} }
// Return true if a field type can be changed inplace // Return true if two field types can be changed inplace
static bool static bool
change_type_is_supported(TABLE *table, TABLE *altered_table, Field *old_field, Field *new_field, bool &update_needed) { change_type_is_supported(TABLE *table, TABLE *altered_table, Field *old_field, Field *new_field, tokudb_alter_ctx *ctx) {
enum_field_types old_type = old_field->real_type(); enum_field_types old_type = old_field->real_type();
enum_field_types new_type = new_field->real_type(); enum_field_types new_type = new_field->real_type();
if (is_int_type(old_type) && is_int_type(new_type)) if (is_int_type(old_type)) {
return change_type_int_is_supported(table, altered_table, old_field, new_field, update_needed); if (is_int_type(new_type) && is_unsigned(old_field) == is_unsigned(new_field))
return change_fixed_length_is_supported(table, altered_table, old_field, new_field, ctx);
else else
return false; return false;
} else if (old_type == MYSQL_TYPE_STRING) {
if (new_type == MYSQL_TYPE_STRING && old_field->binary() == new_field->binary())
return change_fixed_length_is_supported(table, altered_table, old_field, new_field, ctx);
else
return false;
} else
return false;
} }
// Return true if all changed field types can be changed inplace // Return true if all changed field types can be changed inplace
...@@ -838,11 +969,8 @@ change_type_is_supported(TABLE *table, TABLE *altered_table, Alter_inplace_info ...@@ -838,11 +969,8 @@ change_type_is_supported(TABLE *table, TABLE *altered_table, Alter_inplace_info
uint i = ctx->changed_fields.at(ai); uint i = ctx->changed_fields.at(ai);
Field *old_field = table->field[i]; Field *old_field = table->field[i];
Field *new_field = altered_table->field[i]; Field *new_field = altered_table->field[i];
bool update_needed = false; if (!change_type_is_supported(table, altered_table, old_field, new_field, ctx))
if (!change_type_is_supported(table, altered_table, old_field, new_field, update_needed))
return false; return false;
if (update_needed)
return false; // updates not yet supported
} }
return true; return true;
} }
......
// update operation codes. these codes get stuffed into update messages, so they can not change.
// update functions enum {
#define UP_COL_ADD_OR_DROP 0 TOKU_OP_COL_ADD_OR_DROP = 0,
#define EXPAND_VARCHAR_OFFSETS 1 TOKU_OP_EXPAND_VARCHAR_OFFSETS = 1,
TOKU_OP_EXPAND_INT_TYPE = 2,
// add or drop column operations TOKU_OP_EXPAND_CHAR_TYPE = 3,
TOKU_OP_EXPAND_BINARY_TYPE = 4,
TOKU_OP_ADD_INT = 5,
TOKU_OP_SUB_INT = 6,
};
#define UP_COL_ADD_OR_DROP TOKU_OP_COL_ADD_OR_DROP
// add or drop column sub-operations
#define COL_DROP 0xaa #define COL_DROP 0xaa
#define COL_ADD 0xbb #define COL_ADD 0xbb
...@@ -14,43 +22,59 @@ ...@@ -14,43 +22,59 @@
#define STATIC_ROW_MUTATOR_SIZE 1+8+2+8+8+8 #define STATIC_ROW_MUTATOR_SIZE 1+8+2+8+8+8
/* // how much space do I need for the mutators?
how much space do I need for the mutators? // static stuff first:
static stuff first: // operation 1 == UP_COL_ADD_OR_DROP
1 - UP_COL_ADD_OR_DROP // 8 - old null, new null
8 - old null, new null // 2 - old num_offset, new num_offset
2 - old num_offset, new num_offset // 8 - old fixed_field size, new fixed_field_size
8 - old fixed_field size, new fixed_field_size // 8 - old and new length of offsets
8 - old and new length of offsets // 8 - old and new starting null bit position
8 - old and new starting null bit position // TOTAL: 27
TOTAL: 27
// dynamic stuff:
dynamic stuff: // 4 - number of columns
4 - number of columns // for each column:
for each column: // 1 - add or drop
1 - add or drop // 1 - is nullable
1 - is nullable // 4 - if nullable, position
4 - if nullable, position // 1 - if add, whether default is null or not
1 - if add, whether default is null or not // 1 - if fixed, var, or not
1 - if fixed, var, or not // for fixed, entire default
for fixed, entire default // for var, 4 bytes length, then entire default
for var, 4 bytes length, then entire default // for blob, nothing
for blob, nothing // So, an upperbound is 4 + num_fields(12) + all default stuff
So, an upperbound is 4 + num_fields(12) + all default stuff
// static blob stuff:
static blob stuff: // 4 - num blobs
4 - num blobs // 1 byte for each num blobs in old table
1 byte for each num blobs in old table // So, an upperbound is 4 + kc_info->num_blobs
So, an upperbound is 4 + kc_info->num_blobs
// dynamic blob stuff:
dynamic blob stuff: // for each blob added:
for each blob added: // 1 - state if we are adding or dropping
1 - state if we are adding or dropping // 4 - blob index
4 - blob index // if add, 1 len bytes
if add, 1 len bytes // at most, 4 0's
at most, 4 0's // So, upperbound is num_blobs(1+4+1+4) = num_columns*10
So, upperbound is num_blobs(1+4+1+4) = num_columns*10
*/ // operation 1 == TOKU_OP_EXPAND_VARCHAR_OFFSETS
// offset_start 4 starting offset of the variable length field offsets
// offset end 4 ending offset of the variable length field offsets
// operation 1 == TOKU_OP_EXPAND_INT_TYPE, TOKU_OP_EXPAND_CHAR_TYPE, TOKU_OP_EXPAND_BINARY_TYPE
// old offset 4
// old length 4
// new offset 4
// new length 4
// if operation == TOKU_OP_EXPAND_INT_TYPE
// is unsigned 1
// operation 1 == TOKU_OP_INT_ADD or TOKU_OP_INT_SUB
// offset 4 starting offset of the int type field
// length 4 length of the int type field
// value 4 value to add or subtract (common use case is increment or decrement by 1)
// is unsigned 1
// //
// checks whether the bit at index pos in data is set or not // checks whether the bit at index pos in data is set or not
...@@ -608,7 +632,7 @@ tokudb_expand_varchar_offsets( ...@@ -608,7 +632,7 @@ tokudb_expand_varchar_offsets(
// decode the operation // decode the operation
uchar operation = extra_pos[0]; uchar operation = extra_pos[0];
assert(operation == EXPAND_VARCHAR_OFFSETS); assert(operation == TOKU_OP_EXPAND_VARCHAR_OFFSETS);
extra_pos += sizeof operation; extra_pos += sizeof operation;
// decode the offset start // decode the offset start
...@@ -678,6 +702,128 @@ cleanup: ...@@ -678,6 +702,128 @@ cleanup:
return error; return error;
} }
static int
tokudb_expand_field(
DB* db,
const DBT *key,
const DBT *old_val,
const DBT *extra,
void (*set_val)(const DBT *new_val, void *set_extra),
void *set_extra
)
{
int error = 0;
uchar *extra_pos = (uchar *)extra->data;
uchar operation = extra_pos[0];
assert(operation == TOKU_OP_EXPAND_INT_TYPE ||
operation == TOKU_OP_EXPAND_CHAR_TYPE ||
operation == TOKU_OP_EXPAND_BINARY_TYPE);
extra_pos += sizeof operation;
uint32_t old_offset;
memcpy(&old_offset, extra_pos, sizeof old_offset);
extra_pos += sizeof old_offset;
uint32_t old_length;
memcpy(&old_length, extra_pos, sizeof old_length);
extra_pos += sizeof old_length;
uint32_t new_offset;
memcpy(&new_offset, extra_pos, sizeof new_offset);
extra_pos += sizeof new_offset;
uint32_t new_length;
memcpy(&new_length, extra_pos, sizeof new_length);
extra_pos += sizeof new_length;
uchar is_unsigned; // for int expansion
switch (operation) {
case TOKU_OP_EXPAND_INT_TYPE:
is_unsigned = extra_pos[0];
extra_pos += sizeof is_unsigned;
assert(is_unsigned == 0 || is_unsigned == 1);
break;
case TOKU_OP_EXPAND_CHAR_TYPE:
case TOKU_OP_EXPAND_BINARY_TYPE:
break;
default:
assert(0);
}
assert(extra_pos == (uchar *)extra->data + extra->size); // consumed the entire message
assert(old_offset == new_offset); // only expand one field per update, so the offset must be the same
assert(new_length >= old_length); // expand only
assert(old_offset + old_length <= old_val->size); // old field within the old val
DBT new_val = {};
if (old_val != NULL) {
// compute the new val from the old val
uchar *old_val_ptr = (uchar *)old_val->data;
// allocate space for the new val's data
uchar *new_val_ptr = (uchar *)my_malloc(old_val->size + (new_length - old_length), MYF(MY_FAE));
if (!new_val_ptr) {
error = ENOMEM;
goto cleanup;
}
new_val.data = new_val_ptr;
// copy up to the old offset
memcpy(new_val_ptr, old_val_ptr, old_offset);
new_val_ptr += new_offset;
old_val_ptr += old_offset;
// read the old field, expand it, write to the new offset
switch (operation) {
case TOKU_OP_EXPAND_INT_TYPE:
if (is_unsigned) {
memset(new_val_ptr, 0, new_length);
} else {
if (old_val_ptr[old_length-1] & 0x80) // sign bit on?
memset(new_val_ptr, 0xff, new_length); // sign extend
else
memset(new_val_ptr, 0, new_length);
}
memcpy(new_val_ptr, old_val_ptr, old_length);
new_val_ptr += new_length;
old_val_ptr += old_length;
break;
case TOKU_OP_EXPAND_CHAR_TYPE:
case TOKU_OP_EXPAND_BINARY_TYPE:
memset(new_val_ptr, ' ', new_length);
memcpy(new_val_ptr, old_val_ptr, old_length);
new_val_ptr += new_length;
old_val_ptr += old_length;
break;
default:
assert(0);
}
// copy the rest
size_t n = old_val->size - (old_val_ptr - (uchar *)old_val->data);
memcpy(new_val_ptr, old_val_ptr, n);
new_val_ptr += n;
old_val_ptr += n;
new_val.size = new_val_ptr - (uchar *)new_val.data;
assert(new_val_ptr == (uchar *)new_val.data + new_val.size);
assert(old_val_ptr == (uchar *)old_val->data + old_val->size);
// set the new val
set_val(&new_val, set_extra);
}
error = 0;
cleanup:
my_free(new_val.data, MYF(MY_ALLOW_ZERO_PTR));
return error;
}
int int
tokudb_update_fun( tokudb_update_fun(
DB* db, DB* db,
...@@ -692,12 +838,17 @@ tokudb_update_fun( ...@@ -692,12 +838,17 @@ tokudb_update_fun(
uchar operation = extra_pos[0]; uchar operation = extra_pos[0];
int error = 0; int error = 0;
switch (operation) { switch (operation) {
case UP_COL_ADD_OR_DROP: case TOKU_OP_COL_ADD_OR_DROP:
error = tokudb_hcad_update_fun(db, key, old_val, extra, set_val, set_extra); error = tokudb_hcad_update_fun(db, key, old_val, extra, set_val, set_extra);
break; break;
case EXPAND_VARCHAR_OFFSETS: case TOKU_OP_EXPAND_VARCHAR_OFFSETS:
error = tokudb_expand_varchar_offsets(db, key, old_val, extra, set_val, set_extra); error = tokudb_expand_varchar_offsets(db, key, old_val, extra, set_val, set_extra);
break; break;
case TOKU_OP_EXPAND_INT_TYPE:
case TOKU_OP_EXPAND_CHAR_TYPE:
case TOKU_OP_EXPAND_BINARY_TYPE:
error = tokudb_expand_field(db, key, old_val, extra, set_val, set_extra);
break;
default: default:
error = EINVAL; error = EINVAL;
break; break;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment