Commit 7f022824 authored by Michael Widenius's avatar Michael Widenius

Fixes for bug#37276:

- Fixes key corruption where pages wasn't marked that they contained transid.
- Fixed bug in block record that could create too small rows when transid was removed. This fixed some asserts() when testing block length

Added --abort-source-on-error to the mysql client. (Very helpfull when debugging wrong .sql files)

client/client_priv.h:
  Added enum for new option to mysql
client/mysql.cc:
  Added new option --abort-source-on-error to allow one to get 'source' to stop on error. This is 0 by default, to keep old behaviour.
  Print file name and line number when getting an error when doing source filename
sql/field.cc:
  Removed old dead code
sql/sql_class.cc:
  Removed old dead code
storage/maria/ma_blockrec.c:
  Added checking of min block length to check_directory()
  When creating new directory entry, set length to 0; This ensures that the assert checks() on block lengths works properly.
  Added more DBUG_ASSERT() and more calls to check_directory()
  In get_head_or_tail_page(), send in correct min_block_length. This is now safe as the length of new blocks are now 0
storage/maria/ma_delete.c:
  When moving key to parent page, mark page if key has transid
storage/maria/ma_write.c:
  Fixed bug where _ma_find_last_pos() didn't set int_key->flag.
  Simpilifed code by unrolling loop.
parent f9ef13d5
......@@ -80,5 +80,6 @@ enum options_client
OPT_FIX_TABLE_NAMES, OPT_FIX_DB_NAMES, OPT_SSL_VERIFY_SERVER_CERT,
OPT_DEBUG_INFO, OPT_DEBUG_CHECK, OPT_COLUMN_TYPES, OPT_ERROR_LOG_FILE,
OPT_WRITE_BINLOG, OPT_DUMP_DATE,
OPT_ABORT_SOURCE_ON_ERROR,
OPT_MAX_CLIENT_OPTION
};
......@@ -43,7 +43,7 @@
#include <locale.h>
#endif
const char *VER= "14.14";
const char *VER= "14.15";
/* Don't try to make a nice table if the data is too big */
#define MAX_COLUMN_LENGTH 1024
......@@ -142,7 +142,7 @@ static my_bool ignore_errors=0,wait_flag=0,quick=0,
default_charset_used= 0, opt_secure_auth= 0,
default_pager_set= 0, opt_sigint_ignore= 0,
show_warnings= 0, executing_query= 0, interrupted_query= 0;
static my_bool debug_info_flag, debug_check_flag;
static my_bool debug_info_flag, debug_check_flag, batch_abort_on_error;
static my_bool column_types_flag;
static my_bool preserve_comments= 0;
static ulong opt_max_allowed_packet, opt_net_buffer_length;
......@@ -1308,6 +1308,10 @@ static struct my_option my_long_options[] =
0, 0, 0, 0, 0},
{"help", 'I', "Synonym for -?", 0, 0, 0, GET_NO_ARG, NO_ARG, 0,
0, 0, 0, 0, 0},
{"abort-source-on-error", OPT_ABORT_SOURCE_ON_ERROR,
"Abort 'source filename' operations in case of errors",
(uchar**) &batch_abort_on_error, (uchar**) &batch_abort_on_error, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0},
#ifdef __NETWARE__
{"autoclose", OPT_AUTO_CLOSE, "Auto close the screen on exit for Netware.",
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0},
......@@ -1359,7 +1363,7 @@ static struct my_option my_long_options[] =
{"vertical", 'E', "Print the output of a query (rows) vertically.",
(uchar**) &vertical, (uchar**) &vertical, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0,
0},
{"force", 'f', "Continue even if we get an sql error.",
{"force", 'f', "Continue even if we get an sql error. Sets abort-source-on-error to 0",
(uchar**) &ignore_errors, (uchar**) &ignore_errors, 0, GET_BOOL, NO_ARG, 0, 0,
0, 0, 0, 0},
{"named-commands", 'G',
......@@ -1716,6 +1720,9 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
#endif
break;
#include <sslopt-case.h>
case 'f':
batch_abort_on_error= 0;
break;
case 'V':
usage(1);
exit(0);
......@@ -3887,6 +3894,7 @@ static int com_source(String *buffer, char *line)
int error;
STATUS old_status;
FILE *sql_file;
my_bool save_ignore_errors;
/* Skip space from file name */
while (my_isspace(charset_info,*line))
......@@ -3918,16 +3926,25 @@ static int com_source(String *buffer, char *line)
/* Save old status */
old_status=status;
save_ignore_errors= ignore_errors;
bfill((char*) &status,sizeof(status),(char) 0);
status.batch=old_status.batch; // Run in batch mode
status.line_buff=line_buff;
status.file_name=source_name;
glob_buffer.length(0); // Empty command buffer
ignore_errors= !batch_abort_on_error;
error= read_and_execute(false);
ignore_errors= save_ignore_errors;
status=old_status; // Continue as before
my_fclose(sql_file,MYF(0));
batch_readline_end(line_buff);
/*
If we got an error during source operation, don't abort the client
if ignore_errors is set
*/
if (error && batch_abort_on_error && ignore_errors)
error= -1;
return error;
}
......@@ -4433,12 +4450,19 @@ put_info(const char *str,INFO_TYPE info_type, uint error, const char *sqlstate)
if (error)
{
if (sqlstate)
(void) tee_fprintf(file, "ERROR %d (%s): ", error, sqlstate);
(void) tee_fprintf(file, "ERROR %d (%s)", error, sqlstate);
else
(void) tee_fprintf(file, "ERROR %d: ", error);
(void) tee_fprintf(file, "ERROR %d", error);
}
else
tee_puts("ERROR: ", file);
tee_fputs("ERROR", file);
if (status.query_start_line && line_numbers)
{
(void) fprintf(file," at line %lu",status.query_start_line);
if (status.file_name)
(void) fprintf(file," in file: '%s'", status.file_name);
}
tee_fputs(": ", file);
}
else
vidattr(A_BOLD);
......@@ -4447,7 +4471,7 @@ put_info(const char *str,INFO_TYPE info_type, uint error, const char *sqlstate)
}
if (unbuffered)
fflush(file);
return info_type == INFO_ERROR ? -1 : 0;
return info_type == INFO_ERROR ? (ignore_errors ? -1 : 1): 0;
}
......
......@@ -9650,8 +9650,7 @@ bool Create_field::init(THD *thd, char *fld_name, enum_field_types fld_type,
}
break;
case MYSQL_TYPE_DATE:
/* Old date type. */
if (protocol_version != PROTOCOL_VERSION-1)
/* We don't support creation of MYSQL_TYPE_DATE anymore */
sql_type= MYSQL_TYPE_NEWDATE;
/* fall trough */
case MYSQL_TYPE_NEWDATE:
......
......@@ -3188,70 +3188,6 @@ THD::binlog_prepare_pending_rows_event(TABLE*, uint32, MY_BITMAP const*,
Update_rows_log_event *);
#endif
#ifdef NOT_USED
static char const*
field_type_name(enum_field_types type)
{
switch (type) {
case MYSQL_TYPE_DECIMAL:
return "MYSQL_TYPE_DECIMAL";
case MYSQL_TYPE_TINY:
return "MYSQL_TYPE_TINY";
case MYSQL_TYPE_SHORT:
return "MYSQL_TYPE_SHORT";
case MYSQL_TYPE_LONG:
return "MYSQL_TYPE_LONG";
case MYSQL_TYPE_FLOAT:
return "MYSQL_TYPE_FLOAT";
case MYSQL_TYPE_DOUBLE:
return "MYSQL_TYPE_DOUBLE";
case MYSQL_TYPE_NULL:
return "MYSQL_TYPE_NULL";
case MYSQL_TYPE_TIMESTAMP:
return "MYSQL_TYPE_TIMESTAMP";
case MYSQL_TYPE_LONGLONG:
return "MYSQL_TYPE_LONGLONG";
case MYSQL_TYPE_INT24:
return "MYSQL_TYPE_INT24";
case MYSQL_TYPE_DATE:
return "MYSQL_TYPE_DATE";
case MYSQL_TYPE_TIME:
return "MYSQL_TYPE_TIME";
case MYSQL_TYPE_DATETIME:
return "MYSQL_TYPE_DATETIME";
case MYSQL_TYPE_YEAR:
return "MYSQL_TYPE_YEAR";
case MYSQL_TYPE_NEWDATE:
return "MYSQL_TYPE_NEWDATE";
case MYSQL_TYPE_VARCHAR:
return "MYSQL_TYPE_VARCHAR";
case MYSQL_TYPE_BIT:
return "MYSQL_TYPE_BIT";
case MYSQL_TYPE_NEWDECIMAL:
return "MYSQL_TYPE_NEWDECIMAL";
case MYSQL_TYPE_ENUM:
return "MYSQL_TYPE_ENUM";
case MYSQL_TYPE_SET:
return "MYSQL_TYPE_SET";
case MYSQL_TYPE_TINY_BLOB:
return "MYSQL_TYPE_TINY_BLOB";
case MYSQL_TYPE_MEDIUM_BLOB:
return "MYSQL_TYPE_MEDIUM_BLOB";
case MYSQL_TYPE_LONG_BLOB:
return "MYSQL_TYPE_LONG_BLOB";
case MYSQL_TYPE_BLOB:
return "MYSQL_TYPE_BLOB";
case MYSQL_TYPE_VAR_STRING:
return "MYSQL_TYPE_VAR_STRING";
case MYSQL_TYPE_STRING:
return "MYSQL_TYPE_STRING";
case MYSQL_TYPE_GEOMETRY:
return "MYSQL_TYPE_GEOMETRY";
}
return "Unknown";
}
#endif
namespace {
/**
......
......@@ -645,7 +645,7 @@ static void _ma_print_directory(uchar *buff, uint block_size)
}
static void check_directory(uchar *buff, uint block_size)
static void check_directory(uchar *buff, uint block_size, uint min_row_length)
{
uchar *dir, *end;
uint max_entry= (uint) buff[DIR_COUNT_OFFSET];
......@@ -666,6 +666,7 @@ static void check_directory(uchar *buff, uint block_size)
if (offset)
{
DBUG_ASSERT(offset >= end_of_prev_row);
DBUG_ASSERT(!length || length >= min_row_length);
end_of_prev_row= offset + length;
}
else
......@@ -688,7 +689,7 @@ static void check_directory(uchar *buff, uint block_size)
DBUG_ASSERT(deleted == 0);
}
#else
#define check_directory(A,B)
#define check_directory(A,B,C)
#endif /* DBUG_OFF */
......@@ -855,7 +856,7 @@ static my_bool extend_area_on_page(MARIA_HA *info,
int2store(dir + 2, length);
*ret_offset= rec_offset;
*ret_length= length;
check_directory(buff, block_size);
check_directory(buff, block_size, info ? info->s->base.min_block_length : 0);
DBUG_RETURN(0);
}
......@@ -1059,11 +1060,12 @@ static uchar *find_free_position(MARIA_HA *info,
PAGE_SUFFIX_SIZE);
length= start_of_next_entry(dir) - first_pos;
int2store(dir, first_pos); /* Update dir entry */
int2store(dir + 2, length);
int2store(dir + 2, 0);
*res_rownr= free_entry;
*res_length= length;
check_directory(buff, block_size);
check_directory(buff, block_size,
info ? info->s->base.min_block_length : 0);
DBUG_RETURN(dir);
}
/* No free places in dir; create a new one */
......@@ -1080,10 +1082,11 @@ static uchar *find_free_position(MARIA_HA *info,
length= (uint) (dir - buff - first_pos);
DBUG_ASSERT(length <= *empty_space);
int2store(dir, first_pos);
int2store(dir+2, length); /* Max length of region */
int2store(dir + 2, 0); /* Max length of region */
*res_rownr= max_entry;
*res_length= length;
check_directory(buff, block_size, info ? info->s->base.min_block_length : 0);
DBUG_RETURN(dir);
}
......@@ -1162,7 +1165,8 @@ static my_bool extend_directory(MARIA_HA *info, uchar *buff, uint block_size,
}
}
check_directory(buff, block_size);
check_directory(buff, block_size,
info ? min(info->s->base.min_block_length, length) : 0);
DBUG_RETURN(0);
}
......@@ -1543,6 +1547,7 @@ void _ma_compact_block_page(uchar *buff, uint block_size, uint rownr,
/* Extend rownr block to cover hole */
rownr_length= next_free_pos - start_of_found_block;
int2store(dir+2, rownr_length);
DBUG_ASSERT(rownr_length >= min_row_length);
}
else
{
......@@ -1551,6 +1556,7 @@ void _ma_compact_block_page(uchar *buff, uint block_size, uint rownr,
/* Extend last block to cover whole page */
uint length= ((uint) (dir - buff) - start_of_found_block);
int2store(dir+2, length);
DBUG_ASSERT(length >= min_row_length);
}
else
{
......@@ -1560,6 +1566,7 @@ void _ma_compact_block_page(uchar *buff, uint block_size, uint rownr,
}
buff[PAGE_TYPE_OFFSET]&= ~(uchar) PAGE_CAN_BE_COMPACTED;
}
check_directory(buff, block_size, min_row_length);
DBUG_EXECUTE("directory", _ma_print_directory(buff, block_size););
DBUG_VOID_RETURN;
}
......@@ -1691,16 +1698,12 @@ static my_bool get_head_or_tail_page(MARIA_HA *info,
{
if (res->empty_space + res->length >= length)
{
/*
We can't just verify min_block_length here as the newly found block
may be smaller than min_block_length.
*/
_ma_compact_block_page(res->buff, block_size, res->rownr, 1,
page_type == HEAD_PAGE ?
info->trn->min_read_from : 0,
page_type == HEAD_PAGE ?
min(res->length, share->base.min_block_length) :
0);
(page_type == HEAD_PAGE ?
info->trn->min_read_from : 0),
(page_type == HEAD_PAGE ?
share->base.min_block_length :
0));
/* All empty space are now after current position */
dir= dir_entry_pos(res->buff, block_size, res->rownr);
res->length= res->empty_space= uint2korr(dir+2);
......@@ -2730,6 +2733,8 @@ static my_bool write_block_record(MARIA_HA *info,
head_block->empty_space= 0; /* Page is full */
head_block->used|= BLOCKUSED_USED;
check_directory(page_buff, share->block_size, share->base.min_block_length);
/*
Now we have to write tail pages, as we need to store the position
to them in the row extent header.
......@@ -3836,7 +3841,7 @@ static int delete_dir_entry(uchar *buff, uint block_size, uint record_number,
}
#endif
check_directory(buff, block_size);
check_directory(buff, block_size, 0);
empty_space= uint2korr(buff + EMPTY_SPACE_OFFSET);
dir= dir_entry_pos(buff, block_size, record_number);
length= uint2korr(dir + 2);
......@@ -3911,7 +3916,7 @@ static int delete_dir_entry(uchar *buff, uint block_size, uint record_number,
*empty_space_res= empty_space;
check_directory(buff, block_size);
check_directory(buff, block_size, 0);
DBUG_RETURN(0);
}
......
......@@ -687,6 +687,9 @@ static int del(MARIA_HA *info, MARIA_KEY *key,
bmove(keypos,keypos-length, (int) (endpos-keypos)+length);
(*keyinfo->store_key)(keyinfo,keypos,&s_temp);
key_start= keypos;
if (tmp_key.flag & (SEARCH_USER_KEY_HAS_TRANSID |
SEARCH_PAGE_KEY_HAS_TRANSID))
_ma_mark_page_with_transid(share, anc_buff);
/* Save pointer to next leaf on parent page */
if (!(*keyinfo->get_key)(&ret_key, page_flag, share->base.key_reflength,
......
......@@ -1027,11 +1027,11 @@ static uchar *_ma_find_last_pos(MARIA_HA *info, MARIA_KEY *int_key,
uchar *page, uchar **after_key)
{
uint keys, length, key_ref_length, page_flag;
uint last_data_length, last_ref_length;
uchar *key, *end, *lastpos,*prevpos;
uchar *end, *lastpos, *prevpos;
uchar key_buff[MARIA_MAX_KEY_BUFF];
MARIA_SHARE *share= info->s;
MARIA_KEYDEF *keyinfo= int_key->keyinfo;
MARIA_KEY tmp_key;
DBUG_ENTER("_ma_find_last_pos");
key_ref_length= share->keypage_header;
......@@ -1053,33 +1053,35 @@ static uchar *_ma_find_last_pos(MARIA_HA *info, MARIA_KEY *int_key,
DBUG_RETURN(end);
}
LINT_INIT(prevpos);
LINT_INIT(last_data_length);
LINT_INIT(last_ref_length);
end=page+length-key_ref_length;
key= int_key->data;
length=0;
lastpos=page;
int_key->data= key_buff;
tmp_key.data= key_buff;
tmp_key.keyinfo= int_key->keyinfo;
key_buff[0]= 0; /* Safety */
while (page < end)
/* We know that there are at least 2 keys on the page */
if (!(length=(*keyinfo->get_key)(&tmp_key, page_flag, 0, &page)))
{
maria_print_error(keyinfo->share, HA_ERR_CRASHED);
my_errno=HA_ERR_CRASHED;
DBUG_RETURN(0);
}
do
{
prevpos=lastpos; lastpos=page;
last_data_length= int_key->data_length;
last_ref_length= int_key->ref_length;
memcpy(key, key_buff, length); /* previous key */
if (!(length=(*keyinfo->get_key)(int_key, page_flag, 0, &page)))
int_key->data_length= tmp_key.data_length;
int_key->ref_length= tmp_key.ref_length;
int_key->flag= tmp_key.flag;
memcpy(int_key->data, key_buff, length); /* previous key */
if (!(length=(*keyinfo->get_key)(&tmp_key, page_flag, 0, &page)))
{
maria_print_error(keyinfo->share, HA_ERR_CRASHED);
my_errno=HA_ERR_CRASHED;
DBUG_RETURN(0);
}
}
int_key->data= key;
int_key->data_length= last_data_length;
int_key->ref_length= last_ref_length;
} while (page < end);
*after_key=lastpos;
DBUG_PRINT("exit",("returns: 0x%lx page: 0x%lx end: 0x%lx",
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment