Commit 22d81cf7 authored by Marko Mäkelä's avatar Marko Mäkelä

Properly rebuild SYS_COLUMNS metadata

innobase_add_one_virtual(), innobase_insert_sys_virtual(): Change
the return type to bool, and invoke my_error() when detecting an error.

innodb_insert_sys_columns(): Insert a record into SYS_COLUMNS.
Refactored from innobase_add_one_virtual() and innobase_instant_add_col().

innobase_instant_add_col(): Replace the parameter dfield with type.

innobase_instant_drop_cols(): Drop matching columns from SYS_COLUMNS
and all columns from SYS_VIRTUAL.

innobase_instant_try(): Only invoke innobase_instant_drop_cols() when
there are other changes than adding columns last.

innodb_update_cols(): Renamed from innodb_update_n_cols().
parent 787db809
...@@ -1054,7 +1054,7 @@ ha_innobase::check_if_supported_inplace_alter( ...@@ -1054,7 +1054,7 @@ ha_innobase::check_if_supported_inplace_alter(
*/ */
| ALTER_ADD_NON_UNIQUE_NON_PRIM_INDEX | ALTER_ADD_NON_UNIQUE_NON_PRIM_INDEX
| ALTER_DROP_NON_UNIQUE_NON_PRIM_INDEX); | ALTER_DROP_NON_UNIQUE_NON_PRIM_INDEX);
#if 0 /* MDEV-15562 FIXME: enable this */ #if 0 /* MDEV-15562 FIXME: enable this, and adjust dict_v_col_t::m_col.ind */
if (supports_instant) { if (supports_instant) {
flags &= ~(ALTER_ADD_STORED_BASE_COLUMN flags &= ~(ALTER_ADD_STORED_BASE_COLUMN
| ALTER_DROP_STORED_COLUMN | ALTER_DROP_STORED_COLUMN
...@@ -4085,33 +4085,76 @@ prepare_inplace_drop_virtual( ...@@ -4085,33 +4085,76 @@ prepare_inplace_drop_virtual(
@param[in] pos virtual column column no @param[in] pos virtual column column no
@param[in] base_pos base column pos @param[in] base_pos base column pos
@param[in] trx transaction @param[in] trx transaction
@return DB_SUCCESS if successful, otherwise error code */ @retval false on success
static @retval true on failure (my_error() will have been called) */
dberr_t static bool innobase_insert_sys_virtual(
innobase_insert_sys_virtual(
const dict_table_t* table, const dict_table_t* table,
ulint pos, ulint pos,
ulint base_pos, ulint base_pos,
trx_t* trx) trx_t* trx)
{ {
pars_info_t* info = pars_info_create(); pars_info_t* info = pars_info_create();
pars_info_add_ull_literal(info, "id", table->id); pars_info_add_ull_literal(info, "id", table->id);
pars_info_add_int4_literal(info, "pos", pos); pars_info_add_int4_literal(info, "pos", pos);
pars_info_add_int4_literal(info, "base_pos", base_pos); pars_info_add_int4_literal(info, "base_pos", base_pos);
dberr_t error = que_eval_sql( if (DB_SUCCESS != que_eval_sql(
info, info,
"PROCEDURE P () IS\n" "PROCEDURE P () IS\n"
"BEGIN\n" "BEGIN\n"
"INSERT INTO SYS_VIRTUAL VALUES" "INSERT INTO SYS_VIRTUAL VALUES (:id, :pos, :base_pos);\n"
"(:id, :pos, :base_pos);\n" "END;\n",
"END;\n", FALSE, trx)) {
FALSE, trx); my_error(ER_INTERNAL_ERROR, MYF(0),
"InnoDB: ADD COLUMN...VIRTUAL");
return true;
}
return(error); return false;
}
/** Insert a record to the SYS_COLUMNS dictionary table.
@param[in] table_id table id
@param[in] pos position of the column
@param[in] field_name field name
@param[in] mtype main type
@param[in] prtype precise type
@param[in] len fixed length in bytes, or 0
@param[in] n_base number of base columns of virtual columns, or 0
@retval false on success
@retval true on failure (my_error() will have been called) */
static bool innodb_insert_sys_columns(
table_id_t table_id,
ulint pos,
const char* field_name,
ulint mtype,
ulint prtype,
ulint len,
ulint n_base,
trx_t* trx)
{
pars_info_t* info = pars_info_create();
pars_info_add_ull_literal(info, "id", table_id);
pars_info_add_int4_literal(info, "pos", pos);
pars_info_add_str_literal(info, "name", field_name);
pars_info_add_int4_literal(info, "mtype", mtype);
pars_info_add_int4_literal(info, "prtype", prtype);
pars_info_add_int4_literal(info, "len", len);
pars_info_add_int4_literal(info, "base", n_base);
if (DB_SUCCESS != que_eval_sql(
info,
"PROCEDURE ADD_COL () IS\n"
"BEGIN\n"
"INSERT INTO SYS_COLUMNS VALUES"
"(:id,:pos,:name,:mtype,:prtype,:len,:base);\n"
"END;\n", FALSE, trx)) {
my_error(ER_INTERNAL_ERROR, MYF(0),
"InnoDB: Insert into SYS_COLUMNS failed");
return true;
}
return false;
} }
/** Update INNODB SYS_COLUMNS on new virtual columns /** Update INNODB SYS_COLUMNS on new virtual columns
...@@ -4119,10 +4162,9 @@ innobase_insert_sys_virtual( ...@@ -4119,10 +4162,9 @@ innobase_insert_sys_virtual(
@param[in] col_name column name @param[in] col_name column name
@param[in] vcol virtual column @param[in] vcol virtual column
@param[in] trx transaction @param[in] trx transaction
@return DB_SUCCESS if successful, otherwise error code */ @retval false on success
static @retval true on failure (my_error() will have been called) */
dberr_t static bool innobase_add_one_virtual(
innobase_add_one_virtual(
const dict_table_t* table, const dict_table_t* table,
const char* col_name, const char* col_name,
dict_v_col_t* vcol, dict_v_col_t* vcol,
...@@ -4130,67 +4172,41 @@ innobase_add_one_virtual( ...@@ -4130,67 +4172,41 @@ innobase_add_one_virtual(
{ {
ulint pos = dict_create_v_col_pos(vcol->v_pos, ulint pos = dict_create_v_col_pos(vcol->v_pos,
vcol->m_col.ind); vcol->m_col.ind);
ulint mtype = vcol->m_col.mtype;
ulint prtype = vcol->m_col.prtype;
ulint len = vcol->m_col.len;
pars_info_t* info = pars_info_create();
pars_info_add_ull_literal(info, "id", table->id);
pars_info_add_int4_literal(info, "pos", pos); if (innodb_insert_sys_columns(table->id, pos, col_name,
vcol->m_col.mtype, vcol->m_col.prtype,
pars_info_add_str_literal(info, "name", col_name); vcol->m_col.len, vcol->num_base, trx)) {
pars_info_add_int4_literal(info, "mtype", mtype); return true;
pars_info_add_int4_literal(info, "prtype", prtype);
pars_info_add_int4_literal(info, "len", len);
pars_info_add_int4_literal(info, "prec", vcol->num_base);
dberr_t error = que_eval_sql(
info,
"PROCEDURE P () IS\n"
"BEGIN\n"
"INSERT INTO SYS_COLUMNS VALUES"
"(:id, :pos, :name, :mtype, :prtype, :len, :prec);\n"
"END;\n",
FALSE, trx);
if (error != DB_SUCCESS) {
return(error);
} }
for (ulint i = 0; i < vcol->num_base; i++) { for (ulint i = 0; i < vcol->num_base; i++) {
error = innobase_insert_sys_virtual( if (innobase_insert_sys_virtual(
table, pos, vcol->base_col[i]->ind, trx); table, pos, vcol->base_col[i]->ind, trx)) {
if (error != DB_SUCCESS) { return true;
return(error);
} }
} }
return(error); return false;
} }
/** Update SYS_TABLES.N_COLS in the data dictionary. /** Update SYS_TABLES.N_COLS in the data dictionary.
@param[in] user_table InnoDB table @param[in] user_table InnoDB table
@param[in] n_cols the new value of SYS_TABLES.N_COLS @param[in] n the new value of SYS_TABLES.N_COLS
@param[in] trx transaction @param[in] trx transaction
@return whether the operation failed */ @return whether the operation failed */
static static bool innodb_update_cols(const dict_table_t* table, ulint n, trx_t* trx)
bool
innodb_update_n_cols(const dict_table_t* table, ulint n_cols, trx_t* trx)
{ {
pars_info_t* info = pars_info_create(); pars_info_t* info = pars_info_create();
pars_info_add_int4_literal(info, "n", n_cols); pars_info_add_int4_literal(info, "n", n);
pars_info_add_ull_literal(info, "id", table->id); pars_info_add_ull_literal(info, "id", table->id);
dberr_t err = que_eval_sql(info, if (DB_SUCCESS != que_eval_sql(info,
"PROCEDURE UPDATE_N_COLS () IS\n" "PROCEDURE UPDATE_N_COLS () IS\n"
"BEGIN\n" "BEGIN\n"
"UPDATE SYS_TABLES SET N_COLS = :n" "UPDATE SYS_TABLES SET N_COLS = :n"
" WHERE ID = :id;\n" " WHERE ID = :id;\n"
"END;\n", FALSE, trx); "END;\n", FALSE, trx)) {
if (err != DB_SUCCESS) {
my_error(ER_INTERNAL_ERROR, MYF(0), my_error(ER_INTERNAL_ERROR, MYF(0),
"InnoDB: Updating SYS_TABLES.N_COLS failed"); "InnoDB: Updating SYS_TABLES.N_COLS failed");
return true; return true;
...@@ -4212,80 +4228,52 @@ innobase_add_virtual_try( ...@@ -4212,80 +4228,52 @@ innobase_add_virtual_try(
const dict_table_t* user_table, const dict_table_t* user_table,
trx_t* trx) trx_t* trx)
{ {
ha_innobase_inplace_ctx* ctx; ha_innobase_inplace_ctx* ctx = static_cast<ha_innobase_inplace_ctx*>(
dberr_t err = DB_SUCCESS;
ctx = static_cast<ha_innobase_inplace_ctx*>(
ha_alter_info->handler_ctx); ha_alter_info->handler_ctx);
for (ulint i = 0; i < ctx->num_to_add_vcol; i++) { for (ulint i = 0; i < ctx->num_to_add_vcol; i++) {
if (innobase_add_one_virtual(
err = innobase_add_one_virtual( user_table, ctx->add_vcol_name[i],
user_table, ctx->add_vcol_name[i], &ctx->add_vcol[i], trx)) {
&ctx->add_vcol[i], trx); return true;
if (err != DB_SUCCESS) {
my_error(ER_INTERNAL_ERROR, MYF(0),
"InnoDB: ADD COLUMN...VIRTUAL");
return(true);
} }
} }
ulint n_col = unsigned(user_table->n_cols) - DATA_N_SYS_COLS; ulint n_col = unsigned(user_table->n_cols) - DATA_N_SYS_COLS;
ulint n_v_col = unsigned(user_table->n_v_cols) ulint n_v_col = unsigned(user_table->n_v_cols)
+ ctx->num_to_add_vcol - ctx->num_to_drop_vcol; + ctx->num_to_add_vcol - ctx->num_to_drop_vcol;
ulint new_n = dict_table_encode_n_col(n_col, n_v_col) ulint new_n = dict_table_encode_n_col(n_col, n_v_col)
+ (unsigned(user_table->flags & DICT_TF_COMPACT) << 31); + (unsigned(user_table->flags & DICT_TF_COMPACT) << 31);
return innodb_update_n_cols(user_table, new_n, trx); return innodb_update_cols(user_table, new_n, trx);
} }
/** Add the newly added column in the sys_column system table. /** Add the newly added column in the sys_column system table.
@param[in] table_id table id @param[in] table_id table id
@param[in] pos position of the column @param[in] pos position of the column
@param[in] field_name field name @param[in] field_name field name
@param[in] dfield field @param[in] type data type
@retval true Failure @retval true Failure
@retval false Success. */ @retval false Success. */
static bool innobase_instant_add_col( static bool innobase_instant_add_col(
table_id_t table_id, table_id_t table_id,
ulint pos, ulint pos,
const char* field_name, const char* field_name,
dfield_t* dfield, const dtype_t& type,
trx_t* trx) trx_t* trx)
{ {
pars_info_t* info = pars_info_create(); return innodb_insert_sys_columns(table_id, pos, field_name,
pars_info_add_ull_literal(info, "id", table_id); type.mtype, type.prtype, type.len, 0,
pars_info_add_int4_literal(info, "pos", pos); trx);
pars_info_add_str_literal(info, "name", field_name);
pars_info_add_int4_literal(info, "mtype", dfield->type.mtype);
pars_info_add_int4_literal(info, "prtype", dfield->type.prtype);
pars_info_add_int4_literal(info, "len", dfield->type.len);
dberr_t err = que_eval_sql(
info,
"PROCEDURE ADD_COL () IS\n"
"BEGIN\n"
"INSERT INTO SYS_COLUMNS VALUES"
"(:id,:pos,:name,:mtype,:prtype,:len,0);\n"
"END;\n", FALSE, trx);
if (err != DB_SUCCESS) {
my_error(ER_INTERNAL_ERROR, MYF(0),
"InnoDB: Insert into SYS_COLUMNS failed");
return(true);
}
return false;
} }
/** Delete the column from the sys_columns system table. /** Delete metadata from SYS_COLUMNS and SYS_VIRTUAL.
@param[in] id table id @param[in] id table id
@param[in] pos first SYS_COLUMNS.POS @param[in] pos first SYS_COLUMNS.POS
@param[in,out] trx data dictionary transaction @param[in,out] trx data dictionary transaction
@retval true Failure @retval true Failure
@retval false Success. */ @retval false Success. */
static bool innobase_instant_drop_col(table_id_t id, ulint pos, trx_t* trx) static bool innobase_instant_drop_cols(table_id_t id, ulint pos, trx_t* trx)
{ {
pars_info_t* info = pars_info_create(); pars_info_t* info = pars_info_create();
pars_info_add_ull_literal(info, "id", id); pars_info_add_ull_literal(info, "id", id);
...@@ -4295,12 +4283,13 @@ static bool innobase_instant_drop_col(table_id_t id, ulint pos, trx_t* trx) ...@@ -4295,12 +4283,13 @@ static bool innobase_instant_drop_col(table_id_t id, ulint pos, trx_t* trx)
info, info,
"PROCEDURE DELETE_COL () IS\n" "PROCEDURE DELETE_COL () IS\n"
"BEGIN\n" "BEGIN\n"
"DELETE FROM SYS_COLUMNS WHERE \n" "DELETE FROM SYS_COLUMNS WHERE\n"
"TABLE_ID = :id AND POS >= :pos;\n" "TABLE_ID = :id AND POS >= :pos;\n"
"DELETE FROM SYS_VIRTUAL WHERE TABLE_ID = :id;\n"
"END;\n", FALSE, trx); "END;\n", FALSE, trx);
if (err != DB_SUCCESS) { if (err != DB_SUCCESS) {
my_error(ER_INTERNAL_ERROR, MYF(0), my_error(ER_INTERNAL_ERROR, MYF(0),
"InnoDB: DELETE from SYS_COLUMNS failed"); "InnoDB: DELETE from SYS_COLUMNS/SYS_VIRTUAL failed");
return true; return true;
} }
...@@ -4520,7 +4509,7 @@ innobase_drop_virtual_try( ...@@ -4520,7 +4509,7 @@ innobase_drop_virtual_try(
ulint new_n = dict_table_encode_n_col(n_col, n_v_col) ulint new_n = dict_table_encode_n_col(n_col, n_v_col)
| ((user_table->flags & DICT_TF_COMPACT) << 31); | ((user_table->flags & DICT_TF_COMPACT) << 31);
return innodb_update_n_cols(user_table, new_n, trx); return innodb_update_cols(user_table, new_n, trx);
} }
/** Insert or update SYS_COLUMNS and the hidden metadata record /** Insert or update SYS_COLUMNS and the hidden metadata record
...@@ -4564,20 +4553,29 @@ static bool innobase_instant_try( ...@@ -4564,20 +4553,29 @@ static bool innobase_instant_try(
Field** const end = altered_table->field + altered_table->s->fields; Field** const end = altered_table->field + altered_table->s->fields;
List_iterator_fast<Create_field> cf_it( List_iterator_fast<Create_field> cf_it(
ha_alter_info->alter_info->create_list); ha_alter_info->alter_info->create_list);
bool has_dropped = false; bool rebuild_metadata = false;
for (uint i = 0, dropped = 0;;) { for (uint i = 0, dropped = 0;;) {
if (af < end && !(*af)->stored_in_db()) { if (af < end && !(*af)->stored_in_db()) {
af++; cf_it++; af++; cf_it++;
continue; continue;
} }
dfield_t* d = dtuple_get_nth_field(row, i); if (i + ctx->n_instant_add_cols >= user_table->n_cols) {
/* Avoid out-of-bounds access of ctx->col_map[].
if (ctx->col_map[i] != i) { It is only defined for the pre-existing columns
if (!has_dropped) { of the table, excluding any ADD COLUMN of this
has_dropped = true; ALTER TABLE operation. */
if (innobase_instant_drop_col(user_table->id, } else if (ctx->col_map[i] != i) {
i, trx)) { const bool not_last = user_table->n_cols
+ ctx->n_instant_drop_cols > i
+ ctx->n_instant_add_cols + DATA_N_SYS_COLS;
/* System columns cannot be dropped. */
DBUG_ASSERT(not_last
|| ctx->col_map[i] != ULINT_UNDEFINED);
if (!rebuild_metadata && not_last) {
rebuild_metadata = true;
if (innobase_instant_drop_cols(user_table->id,
i, trx)) {
return true; return true;
} }
} }
...@@ -4585,21 +4583,16 @@ static bool innobase_instant_try( ...@@ -4585,21 +4583,16 @@ static bool innobase_instant_try(
if (ctx->col_map[i] == ULINT_UNDEFINED) { if (ctx->col_map[i] == ULINT_UNDEFINED) {
DBUG_ASSERT(dropped DBUG_ASSERT(dropped
< user_table->instant->n_dropped); < user_table->instant->n_dropped);
const dict_col_t& col = user_table->instant DBUG_ASSERT(user_table->instant->dropped[
->dropped[dropped++]; dropped].is_dropped());
DBUG_ASSERT(col.is_dropped()); dropped++;
if (col.is_nullable()) {
dfield_set_null(d);
} else {
dfield_set_data(d, field_ref_zero,
col.len);
}
i++; i++;
continue; continue;
} }
} }
const uint c = i - dropped; const uint c = i - dropped;
dfield_t* d = dtuple_get_nth_field(row, c);
const dict_col_t* col = dict_table_get_nth_col(user_table, c); const dict_col_t* col = dict_table_get_nth_col(user_table, c);
DBUG_ASSERT(!col->is_virtual()); DBUG_ASSERT(!col->is_virtual());
DBUG_ASSERT(!col->is_dropped()); DBUG_ASSERT(!col->is_dropped());
...@@ -4654,12 +4647,12 @@ static bool innobase_instant_try( ...@@ -4654,12 +4647,12 @@ static bool innobase_instant_try(
If it is NULL, the column was added by this ALTER TABLE. */ If it is NULL, the column was added by this ALTER TABLE. */
DBUG_ASSERT(new_field->field || col->is_added()); DBUG_ASSERT(new_field->field || col->is_added());
if (!has_dropped && new_field->field) { if (!rebuild_metadata && new_field->field) {
/* The record is already present in SYS_COLUMNS. */ /* The record is already present in SYS_COLUMNS. */
DBUG_ASSERT(!col->is_dropped()); DBUG_ASSERT(!col->is_dropped());
} else if (innobase_instant_add_col(user_table->id, c, } else if (innobase_instant_add_col(user_table->id, c,
(*af)->field_name.str, (*af)->field_name.str,
d, trx)) { d->type, trx)) {
return true; return true;
} }
...@@ -4667,24 +4660,21 @@ static bool innobase_instant_try( ...@@ -4667,24 +4660,21 @@ static bool innobase_instant_try(
af++; af++;
} }
if (innodb_update_n_cols(user_table, dict_table_encode_n_col( if (innodb_update_cols(user_table, dict_table_encode_n_col(
unsigned(user_table->n_cols) unsigned(user_table->n_cols)
- DATA_N_SYS_COLS, - DATA_N_SYS_COLS,
user_table->n_v_cols) user_table->n_v_cols)
| (user_table->flags & DICT_TF_COMPACT) << 31, | (user_table->flags & DICT_TF_COMPACT) << 31,
trx)) { trx)) {
return true; return true;
} }
if (has_dropped) { if (rebuild_metadata) {
for (uint i = 0; i < user_table->n_v_cols; i++) { for (uint i = 0; i < user_table->n_v_cols; i++) {
if (DB_SUCCESS if (innobase_add_one_virtual(
!= innobase_add_one_virtual(
user_table, user_table,
dict_table_get_v_col_name(user_table, i), dict_table_get_v_col_name(user_table, i),
&user_table->v_cols[i], trx)) { &user_table->v_cols[i], trx)) {
my_error(ER_INTERNAL_ERROR, MYF(0),
"InnoDB: ADD COLUMN...VIRTUAL");
return true; return true;
} }
} }
......
...@@ -330,6 +330,7 @@ row_mysql_read_geometry( ...@@ -330,6 +330,7 @@ row_mysql_read_geometry(
ulint col_len) /*!< in: MySQL format length */ ulint col_len) /*!< in: MySQL format length */
{ {
byte* data; byte* data;
ut_ad(col_len > 8);
*len = mach_read_from_n_little_endian(ref, col_len - 8); *len = mach_read_from_n_little_endian(ref, col_len - 8);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment