Commit 787db809 authored by Marko Mäkelä's avatar Marko Mäkelä

Correctly update SYS_COLUMNS

innobase_instant_try(): If any columns were dropped or
reordered, delete any SYS_COLUMNS records for the following
columns, and insert SYS_COLUMNS records for all subsequent
stored columns as well as for all virtual columns.

commit_try_norebuild(): If ctx->is_instant(), let the virtual
columns be added or dropped by innobase_instant_try().
parent d6b0915f
......@@ -4208,9 +4208,9 @@ innodb_update_n_cols(const dict_table_t* table, ulint n_cols, trx_t* trx)
static
bool
innobase_add_virtual_try(
Alter_inplace_info* ha_alter_info,
const dict_table_t* user_table,
trx_t* trx)
const Alter_inplace_info* ha_alter_info,
const dict_table_t* user_table,
trx_t* trx)
{
ha_innobase_inplace_ctx* ctx;
dberr_t err = DB_SUCCESS;
......@@ -4281,7 +4281,7 @@ static bool innobase_instant_add_col(
/** Delete the column from the sys_columns system table.
@param[in] id table id
@param[in] pos SYS_COLUMNS.POS
@param[in] pos first SYS_COLUMNS.POS
@param[in,out] trx data dictionary transaction
@retval true Failure
@retval false Success. */
......@@ -4296,7 +4296,7 @@ static bool innobase_instant_drop_col(table_id_t id, ulint pos, trx_t* trx)
"PROCEDURE DELETE_COL () IS\n"
"BEGIN\n"
"DELETE FROM SYS_COLUMNS WHERE \n"
"TABLE_ID = :id AND POS = :pos;\n"
"TABLE_ID = :id AND POS >= :pos;\n"
"END;\n", FALSE, trx);
if (err != DB_SUCCESS) {
my_error(ER_INTERNAL_ERROR, MYF(0),
......@@ -4307,56 +4307,238 @@ static bool innobase_instant_drop_col(table_id_t id, ulint pos, trx_t* trx)
return false;
}
/** Update SYS_COLUMNS.POS for a stored column during instant ALTER TABLE
@param[in] table_id table id
@param[in] old_pos old position of the column
@param[in] new_pos new position of the column
@param[in,out] trx data dictionary transaction
@retval true Failure
@retval false Success. */
static bool innobase_instant_update_pos(
table_id_t table_id,
ulint old_pos,
ulint new_pos,
trx_t* trx)
/** Update INNODB SYS_COLUMNS on new virtual column's position
@param[in] table InnoDB table
@param[in] old_pos old position
@param[in] new_pos new position
@param[in] trx transaction
@return DB_SUCCESS if successful, otherwise error code */
static
dberr_t
innobase_update_v_pos_sys_columns(
const dict_table_t* table,
ulint old_pos,
ulint new_pos,
trx_t* trx)
{
pars_info_t* info = pars_info_create();
pars_info_add_ull_literal(info, "id", table_id);
pars_info_add_int4_literal(info, "old_pos", old_pos);
pars_info_add_int4_literal(info, "new_pos", new_pos);
dberr_t err = que_eval_sql(
info,
"PROCEDURE UPDATE_COL () IS\n"
"BEGIN\n"
"UPDATE SYS_COLUMNS SET POS = :new_pos WHERE\n"
"TABLE_ID = :id AND POS = :old_pos;\n"
"END;\n", FALSE, trx);
if (err != DB_SUCCESS) {
my_error(ER_INTERNAL_ERROR, MYF(0),
"InnoDB: Updating SYS_COLUMNS.POS failed");
return true;
pars_info_add_int4_literal(info, "pos", old_pos);
pars_info_add_int4_literal(info, "val", new_pos);
pars_info_add_ull_literal(info, "id", table->id);
dberr_t error = que_eval_sql(
info,
"PROCEDURE P () IS\n"
"BEGIN\n"
"UPDATE SYS_COLUMNS\n"
"SET POS = :val\n"
"WHERE POS = :pos\n"
"AND TABLE_ID = :id;\n"
"END;\n",
FALSE, trx);
return(error);
}
/** Update INNODB SYS_VIRTUAL table with new virtual column position
@param[in] table InnoDB table
@param[in] old_pos old position
@param[in] new_pos new position
@param[in] trx transaction
@return DB_SUCCESS if successful, otherwise error code */
static
dberr_t
innobase_update_v_pos_sys_virtual(
const dict_table_t* table,
ulint old_pos,
ulint new_pos,
trx_t* trx)
{
pars_info_t* info = pars_info_create();
pars_info_add_int4_literal(info, "pos", old_pos);
pars_info_add_int4_literal(info, "val", new_pos);
pars_info_add_ull_literal(info, "id", table->id);
dberr_t error = que_eval_sql(
info,
"PROCEDURE P () IS\n"
"BEGIN\n"
"UPDATE SYS_VIRTUAL\n"
"SET POS = :val\n"
"WHERE POS = :pos\n"
"AND TABLE_ID = :id;\n"
"END;\n",
FALSE, trx);
return(error);
}
/** Update InnoDB system tables on dropping a virtual column
@param[in] table InnoDB table
@param[in] col_name column name of the dropping column
@param[in] drop_col col information for the dropping column
@param[in] n_prev_dropped number of previously dropped columns in the
same alter clause
@param[in] trx transaction
@return DB_SUCCESS if successful, otherwise error code */
static
dberr_t
innobase_drop_one_virtual_sys_columns(
const dict_table_t* table,
const char* col_name,
dict_col_t* drop_col,
ulint n_prev_dropped,
trx_t* trx)
{
pars_info_t* info = pars_info_create();
pars_info_add_ull_literal(info, "id", table->id);
pars_info_add_str_literal(info, "name", col_name);
dberr_t error = que_eval_sql(
info,
"PROCEDURE P () IS\n"
"BEGIN\n"
"DELETE FROM SYS_COLUMNS\n"
"WHERE TABLE_ID = :id\n"
"AND NAME = :name;\n"
"END;\n",
FALSE, trx);
if (error != DB_SUCCESS) {
return(error);
}
return false;
dict_v_col_t* v_col = dict_table_get_nth_v_col_mysql(
table, drop_col->ind);
/* Adjust column positions for all subsequent columns */
for (ulint i = v_col->v_pos + 1; i < table->n_v_cols; i++) {
dict_v_col_t* t_col = dict_table_get_nth_v_col(table, i);
ulint old_p = dict_create_v_col_pos(
t_col->v_pos - n_prev_dropped,
t_col->m_col.ind - n_prev_dropped);
ulint new_p = dict_create_v_col_pos(
t_col->v_pos - 1 - n_prev_dropped,
ulint(t_col->m_col.ind) - 1 - n_prev_dropped);
error = innobase_update_v_pos_sys_columns(
table, old_p, new_p, trx);
if (error != DB_SUCCESS) {
return(error);
}
error = innobase_update_v_pos_sys_virtual(
table, old_p, new_p, trx);
if (error != DB_SUCCESS) {
return(error);
}
}
return(error);
}
/** Delete virtual column's info from INNODB SYS_VIRTUAL
@param[in] table InnoDB table
@param[in] pos position of the virtual column to be deleted
@param[in] trx transaction
@return DB_SUCCESS if successful, otherwise error code */
static
dberr_t
innobase_drop_one_virtual_sys_virtual(
const dict_table_t* table,
ulint pos,
trx_t* trx)
{
pars_info_t* info = pars_info_create();
pars_info_add_ull_literal(info, "id", table->id);
pars_info_add_int4_literal(info, "pos", pos);
dberr_t error = que_eval_sql(
info,
"PROCEDURE P () IS\n"
"BEGIN\n"
"DELETE FROM SYS_VIRTUAL\n"
"WHERE TABLE_ID = :id\n"
"AND POS = :pos;\n"
"END;\n",
FALSE, trx);
return(error);
}
/** Update system table for dropping virtual column(s)
@param[in] ha_alter_info Data used during in-place alter
@param[in] user_table InnoDB table
@param[in] trx transaction
@retval true Failure
@retval false Success */
static
bool
innobase_drop_virtual_try(
const Alter_inplace_info* ha_alter_info,
const dict_table_t* user_table,
trx_t* trx)
{
ha_innobase_inplace_ctx* ctx;
dberr_t err = DB_SUCCESS;
ctx = static_cast<ha_innobase_inplace_ctx*>
(ha_alter_info->handler_ctx);
for (ulint i = 0; i < ctx->num_to_drop_vcol; i++) {
ulint pos = dict_create_v_col_pos(
ctx->drop_vcol[i].v_pos - i,
ctx->drop_vcol[i].m_col.ind - i);
err = innobase_drop_one_virtual_sys_virtual(
user_table, pos, trx);
if (err != DB_SUCCESS) {
my_error(ER_INTERNAL_ERROR, MYF(0),
"InnoDB: DROP COLUMN...VIRTUAL");
return(true);
}
err = innobase_drop_one_virtual_sys_columns(
user_table, ctx->drop_vcol_name[i],
&(ctx->drop_vcol[i].m_col), i, trx);
if (err != DB_SUCCESS) {
my_error(ER_INTERNAL_ERROR, MYF(0),
"InnoDB: DROP COLUMN...VIRTUAL");
return(true);
}
}
ulint n_col = unsigned(user_table->n_cols) - DATA_N_SYS_COLS;
ulint n_v_col = unsigned(user_table->n_v_cols)
- ctx->num_to_drop_vcol;
ulint new_n = dict_table_encode_n_col(n_col, n_v_col)
| ((user_table->flags & DICT_TF_COMPACT) << 31);
return innodb_update_n_cols(user_table, new_n, trx);
}
/** Insert or update SYS_COLUMNS and the hidden metadata record
for instant ALTER TABLE.
@param[in,out] ctx ALTER TABLE context for the current partition
@param[in,out] ha_alter_info ALTER TABLE context for the current partition
@param[in] altered_table MySQL table that is being altered
@param[in] table MySQL table as it is before the ALTER operation
@param[in,out] trx dictionary transaction
@retval true failure
@retval false success */
static
bool
innobase_op_instant_try(
ha_innobase_inplace_ctx*ctx,
const TABLE* altered_table,
const TABLE* table,
trx_t* trx)
static bool innobase_instant_try(
const Alter_inplace_info* ha_alter_info,
const TABLE* altered_table,
const TABLE* table,
trx_t* trx)
{
ha_innobase_inplace_ctx* ctx = static_cast<ha_innobase_inplace_ctx*>(
ha_alter_info->handler_ctx);
DBUG_ASSERT(!ctx->need_rebuild());
if (!ctx->is_instant()) return false;
......@@ -4380,44 +4562,56 @@ innobase_op_instant_try(
dict_table_copy_types(row, user_table);
Field** af = altered_table->field;
Field** const end = altered_table->field + altered_table->s->fields;
Field** uf = table->field;
Field** const uf_end = table->field + table->s->fields;
for (ulint i = 0, next_pos = 0; uf < uf_end; uf++) {
if (!(*uf)->stored_in_db()) {
List_iterator_fast<Create_field> cf_it(
ha_alter_info->alter_info->create_list);
bool has_dropped = false;
for (uint i = 0, dropped = 0;;) {
if (af < end && !(*af)->stored_in_db()) {
af++; cf_it++;
continue;
}
if (ctx->col_map[i] == ULINT_UNDEFINED) {
if (innobase_instant_drop_col(user_table->id, i,
trx)) {
return true;
dfield_t* d = dtuple_get_nth_field(row, i);
if (ctx->col_map[i] != i) {
if (!has_dropped) {
has_dropped = true;
if (innobase_instant_drop_col(user_table->id,
i, trx)) {
return true;
}
}
if (!next_pos) {
next_pos = i + 1;
if (ctx->col_map[i] == ULINT_UNDEFINED) {
DBUG_ASSERT(dropped
< user_table->instant->n_dropped);
const dict_col_t& col = user_table->instant
->dropped[dropped++];
DBUG_ASSERT(col.is_dropped());
if (col.is_nullable()) {
dfield_set_null(d);
} else {
dfield_set_data(d, field_ref_zero,
col.len);
}
i++;
continue;
}
} else if (next_pos && innobase_instant_update_pos(
user_table->id, i, next_pos++ - 1, trx)) {
return true;
}
i++;
}
const uint c = i - dropped;
const dict_col_t* col = dict_table_get_nth_col(user_table, c);
DBUG_ASSERT(!col->is_virtual());
DBUG_ASSERT(!col->is_dropped());
/* FIXME: adjust SYS_COLUMNS.POS for old (not added/dropped)
virtual columns */
for (uint i = 0; af < end; af++) {
if (!(*af)->stored_in_db()) {
continue;
if (col->mtype == DATA_SYS) {
DBUG_ASSERT(!col->is_added());
break;
}
dict_col_t* col = dict_table_get_nth_col(user_table, i);
DBUG_ASSERT(af < end);
DBUG_ASSERT(!strcmp((*af)->field_name.str,
dict_table_get_col_name(user_table, i)));
dfield_t* d = dtuple_get_nth_field(row, i);
dict_table_get_col_name(user_table, c)));
if (col->is_added()) {
dfield_set_data(d, col->def_val.data,
......@@ -4455,27 +4649,22 @@ innobase_op_instant_try(
}
}
if (i + DATA_N_SYS_COLS
< (ctx->old_n_cols - ctx->n_instant_drop_cols)) {
i++;
continue;
}
/** Update the system column position before adding new column
in the table. */
for (ulint j = ctx->old_n_cols - DATA_N_SYS_COLS; j > i; j--) {
if (innobase_instant_update_pos(
user_table->id, j - 1, j, trx)) {
return true;
}
}
const Create_field* new_field = cf_it++;
/* new_field->field would point to an existing column.
If it is NULL, the column was added by this ALTER TABLE. */
DBUG_ASSERT(new_field->field || col->is_added());
if (innobase_instant_add_col(
user_table->id, i, (*af)->field_name.str, d, trx)) {
if (!has_dropped && new_field->field) {
/* The record is already present in SYS_COLUMNS. */
DBUG_ASSERT(!col->is_dropped());
} else if (innobase_instant_add_col(user_table->id, c,
(*af)->field_name.str,
d, trx)) {
return true;
}
i++;
af++;
}
if (innodb_update_n_cols(user_table, dict_table_encode_n_col(
......@@ -4487,6 +4676,32 @@ innobase_op_instant_try(
return true;
}
if (has_dropped) {
for (uint i = 0; i < user_table->n_v_cols; i++) {
if (DB_SUCCESS
!= innobase_add_one_virtual(
user_table,
dict_table_get_v_col_name(user_table, i),
&user_table->v_cols[i], trx)) {
my_error(ER_INTERNAL_ERROR, MYF(0),
"InnoDB: ADD COLUMN...VIRTUAL");
return true;
}
}
} else {
if ((ha_alter_info->handler_flags & ALTER_DROP_VIRTUAL_COLUMN)
&& innobase_drop_virtual_try(ha_alter_info, user_table,
trx)) {
return true;
}
if ((ha_alter_info->handler_flags & ALTER_ADD_VIRTUAL_COLUMN)
&& innobase_add_virtual_try(ha_alter_info, user_table,
trx)) {
return true;
}
}
unsigned i = unsigned(user_table->n_cols) - DATA_N_SYS_COLS;
byte trx_id[DATA_TRX_ID_LEN], roll_ptr[DATA_ROLL_PTR_LEN];
dfield_set_data(dtuple_get_nth_field(row, i++), field_ref_zero,
......@@ -4587,8 +4802,9 @@ innobase_op_instant_try(
&big_rec, update, UPD_NODE_NO_ORD_CHANGE,
thr, trx->id, &mtr);
offsets = rec_get_offsets(btr_pcur_get_rec(&pcur), index, offsets,
true, ULINT_UNDEFINED, &offsets_heap);
offsets = rec_get_offsets(
btr_pcur_get_rec(&pcur), index, offsets,
true, ULINT_UNDEFINED, &offsets_heap);
if (big_rec) {
if (err == DB_SUCCESS) {
err = btr_store_big_rec_extern_fields(
......@@ -4652,222 +4868,6 @@ innobase_op_instant_try(
return false;
}
/** Update INNODB SYS_COLUMNS on new virtual column's position
@param[in] table InnoDB table
@param[in] old_pos old position
@param[in] new_pos new position
@param[in] trx transaction
@return DB_SUCCESS if successful, otherwise error code */
static
dberr_t
innobase_update_v_pos_sys_columns(
const dict_table_t* table,
ulint old_pos,
ulint new_pos,
trx_t* trx)
{
pars_info_t* info = pars_info_create();
pars_info_add_int4_literal(info, "pos", old_pos);
pars_info_add_int4_literal(info, "val", new_pos);
pars_info_add_ull_literal(info, "id", table->id);
dberr_t error = que_eval_sql(
info,
"PROCEDURE P () IS\n"
"BEGIN\n"
"UPDATE SYS_COLUMNS\n"
"SET POS = :val\n"
"WHERE POS = :pos\n"
"AND TABLE_ID = :id;\n"
"END;\n",
FALSE, trx);
return(error);
}
/** Update INNODB SYS_VIRTUAL table with new virtual column position
@param[in] table InnoDB table
@param[in] old_pos old position
@param[in] new_pos new position
@param[in] trx transaction
@return DB_SUCCESS if successful, otherwise error code */
static
dberr_t
innobase_update_v_pos_sys_virtual(
const dict_table_t* table,
ulint old_pos,
ulint new_pos,
trx_t* trx)
{
pars_info_t* info = pars_info_create();
pars_info_add_int4_literal(info, "pos", old_pos);
pars_info_add_int4_literal(info, "val", new_pos);
pars_info_add_ull_literal(info, "id", table->id);
dberr_t error = que_eval_sql(
info,
"PROCEDURE P () IS\n"
"BEGIN\n"
"UPDATE SYS_VIRTUAL\n"
"SET POS = :val\n"
"WHERE POS = :pos\n"
"AND TABLE_ID = :id;\n"
"END;\n",
FALSE, trx);
return(error);
}
/** Update InnoDB system tables on dropping a virtual column
@param[in] table InnoDB table
@param[in] col_name column name of the dropping column
@param[in] drop_col col information for the dropping column
@param[in] n_prev_dropped number of previously dropped columns in the
same alter clause
@param[in] trx transaction
@return DB_SUCCESS if successful, otherwise error code */
static
dberr_t
innobase_drop_one_virtual_sys_columns(
const dict_table_t* table,
const char* col_name,
dict_col_t* drop_col,
ulint n_prev_dropped,
trx_t* trx)
{
pars_info_t* info = pars_info_create();
pars_info_add_ull_literal(info, "id", table->id);
pars_info_add_str_literal(info, "name", col_name);
dberr_t error = que_eval_sql(
info,
"PROCEDURE P () IS\n"
"BEGIN\n"
"DELETE FROM SYS_COLUMNS\n"
"WHERE TABLE_ID = :id\n"
"AND NAME = :name;\n"
"END;\n",
FALSE, trx);
if (error != DB_SUCCESS) {
return(error);
}
dict_v_col_t* v_col = dict_table_get_nth_v_col_mysql(
table, drop_col->ind);
/* Adjust column positions for all subsequent columns */
for (ulint i = v_col->v_pos + 1; i < table->n_v_cols; i++) {
dict_v_col_t* t_col = dict_table_get_nth_v_col(table, i);
ulint old_p = dict_create_v_col_pos(
t_col->v_pos - n_prev_dropped,
t_col->m_col.ind - n_prev_dropped);
ulint new_p = dict_create_v_col_pos(
t_col->v_pos - 1 - n_prev_dropped,
ulint(t_col->m_col.ind) - 1 - n_prev_dropped);
error = innobase_update_v_pos_sys_columns(
table, old_p, new_p, trx);
if (error != DB_SUCCESS) {
return(error);
}
error = innobase_update_v_pos_sys_virtual(
table, old_p, new_p, trx);
if (error != DB_SUCCESS) {
return(error);
}
}
return(error);
}
/** Delete virtual column's info from INNODB SYS_VIRTUAL
@param[in] table InnoDB table
@param[in] pos position of the virtual column to be deleted
@param[in] trx transaction
@return DB_SUCCESS if successful, otherwise error code */
static
dberr_t
innobase_drop_one_virtual_sys_virtual(
const dict_table_t* table,
ulint pos,
trx_t* trx)
{
pars_info_t* info = pars_info_create();
pars_info_add_ull_literal(info, "id", table->id);
pars_info_add_int4_literal(info, "pos", pos);
dberr_t error = que_eval_sql(
info,
"PROCEDURE P () IS\n"
"BEGIN\n"
"DELETE FROM SYS_VIRTUAL\n"
"WHERE TABLE_ID = :id\n"
"AND POS = :pos;\n"
"END;\n",
FALSE, trx);
return(error);
}
/** Update system table for dropping virtual column(s)
@param[in] ha_alter_info Data used during in-place alter
@param[in] user_table InnoDB table
@param[in] trx transaction
@retval true Failure
@retval false Success */
static
bool
innobase_drop_virtual_try(
Alter_inplace_info* ha_alter_info,
const dict_table_t* user_table,
trx_t* trx)
{
ha_innobase_inplace_ctx* ctx;
dberr_t err = DB_SUCCESS;
ctx = static_cast<ha_innobase_inplace_ctx*>
(ha_alter_info->handler_ctx);
for (ulint i = 0; i < ctx->num_to_drop_vcol; i++) {
ulint pos = dict_create_v_col_pos(
ctx->drop_vcol[i].v_pos - i,
ctx->drop_vcol[i].m_col.ind - i);
err = innobase_drop_one_virtual_sys_virtual(
user_table, pos, trx);
if (err != DB_SUCCESS) {
my_error(ER_INTERNAL_ERROR, MYF(0),
"InnoDB: DROP COLUMN...VIRTUAL");
return(true);
}
err = innobase_drop_one_virtual_sys_columns(
user_table, ctx->drop_vcol_name[i],
&(ctx->drop_vcol[i].m_col), i, trx);
if (err != DB_SUCCESS) {
my_error(ER_INTERNAL_ERROR, MYF(0),
"InnoDB: DROP COLUMN...VIRTUAL");
return(true);
}
}
ulint n_col = unsigned(user_table->n_cols) - DATA_N_SYS_COLS;
ulint n_v_col = unsigned(user_table->n_v_cols)
- ctx->num_to_drop_vcol;
ulint new_n = dict_table_encode_n_col(n_col, n_v_col)
| ((user_table->flags & DICT_TF_COMPACT) << 31);
return innodb_update_n_cols(user_table, new_n, trx);
}
/** Adjust the create index column number from "New table" to
"old InnoDB table" while we are doing dropping virtual column. Since we do
not create separate new table for the dropping/adding virtual columns.
......@@ -9219,21 +9219,20 @@ commit_try_norebuild(
if ((ha_alter_info->handler_flags
& ALTER_DROP_VIRTUAL_COLUMN)
&& !ctx->is_instant()
&& innobase_drop_virtual_try(ha_alter_info, ctx->old_table, trx)) {
DBUG_RETURN(true);
}
if ((ha_alter_info->handler_flags
& ALTER_ADD_VIRTUAL_COLUMN)
&& !ctx->is_instant()
&& innobase_add_virtual_try(ha_alter_info, ctx->old_table, trx)) {
DBUG_RETURN(true);
}
if (innobase_op_instant_try(ctx, altered_table, old_table, trx)) {
DBUG_RETURN(true);
}
DBUG_RETURN(false);
DBUG_RETURN(innobase_instant_try(ha_alter_info, altered_table,
old_table, trx));
}
/** Commit the changes to the data dictionary cache
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment