Commit ab5ebc0f authored by unknown's avatar unknown

Bug#16218 - Crash on insert delayed

Bug#17294 - INSERT DELAYED puting an \n before data
Bug#16611 - INSERT DELAYED corrupts data
Bug#13707 - Server crash with INSERT DELAYED on MyISAM table
Combined as Bug#16218.

INSERT DELAYED crashed in 5.0 on a table with a varchar that 
could be NULL and was created pre-5.0 (Bugs 16218 and 13707).
INSERT DELAYED corrupted data in 5.0 on a table with varchar 
fields that was created pre-5.0 (Bugs 17294 and 16611).

In case of INSERT DELAYED the open table is copied from the
delayed insert thread to be able to create a record for the 
queue. When copying the fields, a method was used that did 
convert old varchar to new varchar fields and did not set up 
some pointers into the record buffer of the table.

The field conversion was guilty for the misinterpretation of 
the record contents by the delayed insert thread. The wrong
pointer setup was guilty for the crashes.

For Bug 13707 (Server crash with INSERT DELAYED on MyISAM table)
I fixed the above mentioned method to set up one of the pointers.
For Bug 16218 I set up the other pointers too.

But when looking at the corruptions I got aware that converting
the field type was totally wrong for INSERT DELAYED. The copied
table is used to create a record that is to be sent to the
delayed insert thread. Of course it can interpret the record
correctly only if all field types are the same in both table
objects.

So I revoked the fix for Bug 13707 and changed the new_field() 
method so that it can suppress conversions.

No test case as this is a migration problem. One needs to
create a table with 4.x and use it with 5.x. I added two
test scripts to the bug report.


sql/field.cc:
  Bug#16218 - Crash on insert delayed
  Bug#17294 - INSERT DELAYED puting an \n before data
  Bug#16611 - INSERT DELAYED corrupts data
  Bug#13707 - Server crash with INSERT DELAYED on MyISAM table
  Combined as Bug#16218.
  
  Added parameter 'keep_type' to Field::new_field().
  
  Undid the change from Bug 13707 (Server crash with INSERT 
  DELAYED on MyISAM table).
  I solved all four bugs in sql/sql_insert.cc by making exact
  duplicates of the fields. The new_field() method converts
  certain field types, which is wrong for INSERT DELAYED.
sql/field.h:
  Bug#13707 - Server crash with INSERT DELAYED on MyISAM table
  Combined as Bug#16218.
  
  Added parameter 'keep_type' to Field::new_field().
sql/sql_insert.cc:
  Bug#16218 - Crash on insert delayed
  Bug#17294 - INSERT DELAYED puting an \n before data
  Bug#16611 - INSERT DELAYED corrupts data
  Bug#13707 - Server crash with INSERT DELAYED on MyISAM table
  Combined as Bug#16218.
  
  Added comments. Made small style fixes.
  Used the new parameter 'keep_type' of Field::new_field()
  to avoid field type conversion. The table copy must have
  exactly the same types of fields as the original table.
  Otherwise the record contents created by the foreground 
  thread could be misinterpreted by the delayed insert thread.
sql/sql_select.cc:
  Bug#16218 - Crash on insert delayed
  Bug#17294 - INSERT DELAYED puting an \n before data
  Bug#16611 - INSERT DELAYED corrupts data
  Bug#13707 - Server crash with INSERT DELAYED on MyISAM table
  Combined as Bug#16218.
  
  Added parameter 'keep_type' to Field::new_field().
  
  Undid the change from Bug 13707 (Server crash with INSERT 
  DELAYED on MyISAM table).
  I solved all four bugs in sql/sql_insert.cc by making exact
  duplicates of the fields. The new_field() method converts
  certain field types, which is wrong for INSERT DELAYED.
sql/sql_trigger.cc:
  Bug#16218 - Crash on insert delayed
  Bug#17294 - INSERT DELAYED puting an \n before data
  Bug#16611 - INSERT DELAYED corrupts data
  Bug#13707 - Server crash with INSERT DELAYED on MyISAM table
  Combined as Bug#16218.
  
  Added parameter 'keep_type' to Field::new_field().
  
  Undid the change from Bug 13707 (Server crash with INSERT 
  DELAYED on MyISAM table).
  I solved all four bugs in sql/sql_insert.cc by making exact
  duplicates of the fields. The new_field() method converts
  certain field types, which is wrong for INSERT DELAYED.
sql/table.cc:
  Bug#16218 - Crash on insert delayed
  Bug#17294 - INSERT DELAYED puting an \n before data
  Bug#16611 - INSERT DELAYED corrupts data
  Bug#13707 - Server crash with INSERT DELAYED on MyISAM table
  Combined as Bug#16218.
  
  Added parameter 'keep_type' to Field::new_field().
  
  Undid the change from Bug 13707 (Server crash with INSERT 
  DELAYED on MyISAM table).
  I solved all four bugs in sql/sql_insert.cc by making exact
  duplicates of the fields. The new_field() method converts
  certain field types, which is wrong for INSERT DELAYED.
parent 8efe592d
...@@ -1515,7 +1515,8 @@ bool Field::optimize_range(uint idx, uint part) ...@@ -1515,7 +1515,8 @@ bool Field::optimize_range(uint idx, uint part)
} }
Field *Field::new_field(MEM_ROOT *root, struct st_table *new_table) Field *Field::new_field(MEM_ROOT *root, struct st_table *new_table,
bool keep_type __attribute__((unused)))
{ {
Field *tmp; Field *tmp;
if (!(tmp= (Field*) memdup_root(root,(char*) this,size_of()))) if (!(tmp= (Field*) memdup_root(root,(char*) this,size_of())))
...@@ -1540,7 +1541,7 @@ Field *Field::new_key_field(MEM_ROOT *root, struct st_table *new_table, ...@@ -1540,7 +1541,7 @@ Field *Field::new_key_field(MEM_ROOT *root, struct st_table *new_table,
uint new_null_bit) uint new_null_bit)
{ {
Field *tmp; Field *tmp;
if ((tmp= new_field(root, new_table))) if ((tmp= new_field(root, new_table, table == new_table)))
{ {
tmp->ptr= new_ptr; tmp->ptr= new_ptr;
tmp->null_ptr= new_null_ptr; tmp->null_ptr= new_null_ptr;
...@@ -6224,29 +6225,21 @@ uint Field_string::max_packed_col_length(uint max_length) ...@@ -6224,29 +6225,21 @@ uint Field_string::max_packed_col_length(uint max_length)
} }
Field *Field_string::new_field(MEM_ROOT *root, struct st_table *new_table) Field *Field_string::new_field(MEM_ROOT *root, struct st_table *new_table,
bool keep_type)
{ {
Field *new_field; Field *new_field;
if (type() != MYSQL_TYPE_VAR_STRING || table == new_table) if (type() != MYSQL_TYPE_VAR_STRING || keep_type)
return Field::new_field(root, new_table); return Field::new_field(root, new_table, keep_type);
/* /*
Old VARCHAR field which should be modified to a VARCHAR on copy Old VARCHAR field which should be modified to a VARCHAR on copy
This is done to ensure that ALTER TABLE will convert old VARCHAR fields This is done to ensure that ALTER TABLE will convert old VARCHAR fields
to now VARCHAR fields. to now VARCHAR fields.
*/ */
if ((new_field= new Field_varstring(field_length, maybe_null(), return new Field_varstring(field_length, maybe_null(),
field_name, new_table, charset()))) field_name, new_table, charset());
{
/*
delayed_insert::get_local_table() needs a ptr copied from old table.
This is what other new_field() methods do too. The above method of
Field_varstring sets ptr to NULL.
*/
new_field->ptr= ptr;
}
return new_field;
} }
/**************************************************************************** /****************************************************************************
...@@ -6738,9 +6731,11 @@ int Field_varstring::cmp_binary(const char *a_ptr, const char *b_ptr, ...@@ -6738,9 +6731,11 @@ int Field_varstring::cmp_binary(const char *a_ptr, const char *b_ptr,
} }
Field *Field_varstring::new_field(MEM_ROOT *root, struct st_table *new_table) Field *Field_varstring::new_field(MEM_ROOT *root, struct st_table *new_table,
bool keep_type)
{ {
Field_varstring *res= (Field_varstring*) Field::new_field(root, new_table); Field_varstring *res= (Field_varstring*) Field::new_field(root, new_table,
keep_type);
if (res) if (res)
res->length_bytes= length_bytes; res->length_bytes= length_bytes;
return res; return res;
......
...@@ -211,7 +211,8 @@ class Field ...@@ -211,7 +211,8 @@ class Field
*/ */
virtual bool can_be_compared_as_longlong() const { return FALSE; } virtual bool can_be_compared_as_longlong() const { return FALSE; }
virtual void free() {} virtual void free() {}
virtual Field *new_field(MEM_ROOT *root, struct st_table *new_table); virtual Field *new_field(MEM_ROOT *root, struct st_table *new_table,
bool keep_type);
virtual Field *new_key_field(MEM_ROOT *root, struct st_table *new_table, virtual Field *new_key_field(MEM_ROOT *root, struct st_table *new_table,
char *new_ptr, uchar *new_null_ptr, char *new_ptr, uchar *new_null_ptr,
uint new_null_bit); uint new_null_bit);
...@@ -1033,7 +1034,7 @@ class Field_string :public Field_longstr { ...@@ -1033,7 +1034,7 @@ class Field_string :public Field_longstr {
enum_field_types real_type() const { return FIELD_TYPE_STRING; } enum_field_types real_type() const { return FIELD_TYPE_STRING; }
bool has_charset(void) const bool has_charset(void) const
{ return charset() == &my_charset_bin ? FALSE : TRUE; } { return charset() == &my_charset_bin ? FALSE : TRUE; }
Field *new_field(MEM_ROOT *root, struct st_table *new_table); Field *new_field(MEM_ROOT *root, struct st_table *new_table, bool keep_type);
}; };
...@@ -1105,7 +1106,7 @@ class Field_varstring :public Field_longstr { ...@@ -1105,7 +1106,7 @@ class Field_varstring :public Field_longstr {
enum_field_types real_type() const { return MYSQL_TYPE_VARCHAR; } enum_field_types real_type() const { return MYSQL_TYPE_VARCHAR; }
bool has_charset(void) const bool has_charset(void) const
{ return charset() == &my_charset_bin ? FALSE : TRUE; } { return charset() == &my_charset_bin ? FALSE : TRUE; }
Field *new_field(MEM_ROOT *root, struct st_table *new_table); Field *new_field(MEM_ROOT *root, struct st_table *new_table, bool keep_type);
Field *new_key_field(MEM_ROOT *root, struct st_table *new_table, Field *new_key_field(MEM_ROOT *root, struct st_table *new_table,
char *new_ptr, uchar *new_null_ptr, char *new_ptr, uchar *new_null_ptr,
uint new_null_bit); uint new_null_bit);
......
...@@ -17,6 +17,44 @@ ...@@ -17,6 +17,44 @@
/* Insert of records */ /* Insert of records */
/*
INSERT DELAYED
Insert delayed is distinguished from a normal insert by lock_type ==
TL_WRITE_DELAYED instead of TL_WRITE. It first tries to open a
"delayed" table (delayed_get_table()), but falls back to
open_and_lock_tables() on error and proceeds as normal insert then.
Opening a "delayed" table means to find a delayed insert thread that
has the table open already. If this fails, a new thread is created and
waited for to open and lock the table.
If accessing the thread succeeded, in
delayed_insert::get_local_table() the table of the thread is copied
for local use. A copy is required because the normal insert logic
works on a target table, but the other threads table object must not
be used. The insert logic uses the record buffer to create a record.
And the delayed insert thread uses the record buffer to pass the
record to the table handler. So there must be different objects. Also
the copied table is not included in the lock, so that the statement
can proceed even if the real table cannot be accessed at this moment.
Copying a table object is not a trivial operation. Besides the TABLE
object there are the field pointer array, the field objects and the
record buffer. After copying the field objects, their pointers into
the record must be "moved" to point to the new record buffer.
After this setup the normal insert logic is used. Only that for
delayed inserts write_delayed() is called instead of write_record().
It inserts the rows into a queue and signals the delayed insert thread
instead of writing directly to the table.
The delayed insert thread awakes from the signal. It locks the table,
inserts the rows from the queue, unlocks the table, and waits for the
next signal. It does normally live until a FLUSH TABLES or SHUTDOWN.
*/
#include "mysql_priv.h" #include "mysql_priv.h"
#include "sp_head.h" #include "sp_head.h"
#include "sql_trigger.h" #include "sql_trigger.h"
...@@ -1466,6 +1504,7 @@ TABLE *delayed_insert::get_local_table(THD* client_thd) ...@@ -1466,6 +1504,7 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)
my_ptrdiff_t adjust_ptrs; my_ptrdiff_t adjust_ptrs;
Field **field,**org_field, *found_next_number_field; Field **field,**org_field, *found_next_number_field;
TABLE *copy; TABLE *copy;
DBUG_ENTER("delayed_insert::get_local_table");
/* First request insert thread to get a lock */ /* First request insert thread to get a lock */
status=1; status=1;
...@@ -1489,31 +1528,47 @@ TABLE *delayed_insert::get_local_table(THD* client_thd) ...@@ -1489,31 +1528,47 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)
} }
} }
/*
Allocate memory for the TABLE object, the field pointers array, and
one record buffer of reclength size. Normally a table has three
record buffers of rec_buff_length size, which includes alignment
bytes. Since the table copy is used for creating one record only,
the other record buffers and alignment are unnecessary.
*/
client_thd->proc_info="allocating local table"; client_thd->proc_info="allocating local table";
copy= (TABLE*) client_thd->alloc(sizeof(*copy)+ copy= (TABLE*) client_thd->alloc(sizeof(*copy)+
(table->s->fields+1)*sizeof(Field**)+ (table->s->fields+1)*sizeof(Field**)+
table->s->reclength); table->s->reclength);
if (!copy) if (!copy)
goto error; goto error;
/* Copy the TABLE object. */
*copy= *table; *copy= *table;
copy->s= &copy->share_not_to_be_used; copy->s= &copy->share_not_to_be_used;
// No name hashing // No name hashing
bzero((char*) &copy->s->name_hash,sizeof(copy->s->name_hash)); bzero((char*) &copy->s->name_hash,sizeof(copy->s->name_hash));
/* We don't need to change the file handler here */ /* We don't need to change the file handler here */
field=copy->field=(Field**) (copy+1); /* Assign the pointers for the field pointers array and the record. */
copy->record[0]=(byte*) (field+table->s->fields+1); field= copy->field= (Field**) (copy + 1);
memcpy((char*) copy->record[0],(char*) table->record[0],table->s->reclength); copy->record[0]= (byte*) (field + table->s->fields + 1);
memcpy((char*) copy->record[0], (char*) table->record[0],
/* Make a copy of all fields */ table->s->reclength);
adjust_ptrs=PTR_BYTE_DIFF(copy->record[0],table->record[0]); /*
Make a copy of all fields.
The copied fields need to point into the copied record. This is done
by copying the field objects with their old pointer values and then
"move" the pointers by the distance between the original and copied
records. That way we preserve the relative positions in the records.
*/
adjust_ptrs= PTR_BYTE_DIFF(copy->record[0], table->record[0]);
found_next_number_field=table->found_next_number_field; found_next_number_field= table->found_next_number_field;
for (org_field=table->field ; *org_field ; org_field++,field++) for (org_field= table->field; *org_field; org_field++, field++)
{ {
if (!(*field= (*org_field)->new_field(client_thd->mem_root,copy))) if (!(*field= (*org_field)->new_field(client_thd->mem_root, copy, 1)))
return 0; DBUG_RETURN(0);
(*field)->orig_table= copy; // Remove connection (*field)->orig_table= copy; // Remove connection
(*field)->move_field(adjust_ptrs); // Point at copy->record[0] (*field)->move_field(adjust_ptrs); // Point at copy->record[0]
if (*org_field == found_next_number_field) if (*org_field == found_next_number_field)
...@@ -1540,14 +1595,14 @@ TABLE *delayed_insert::get_local_table(THD* client_thd) ...@@ -1540,14 +1595,14 @@ TABLE *delayed_insert::get_local_table(THD* client_thd)
/* Adjust lock_count. This table object is not part of a lock. */ /* Adjust lock_count. This table object is not part of a lock. */
copy->lock_count= 0; copy->lock_count= 0;
return copy; DBUG_RETURN(copy);
/* Got fatal error */ /* Got fatal error */
error: error:
tables_in_use--; tables_in_use--;
status=1; status=1;
pthread_cond_signal(&cond); // Inform thread about abort pthread_cond_signal(&cond); // Inform thread about abort
return 0; DBUG_RETURN(0);
} }
......
...@@ -8217,7 +8217,8 @@ Field* create_tmp_field_from_field(THD *thd, Field* org_field, ...@@ -8217,7 +8217,8 @@ Field* create_tmp_field_from_field(THD *thd, Field* org_field,
org_field->field_name, table, org_field->field_name, table,
org_field->charset()); org_field->charset());
else else
new_field= org_field->new_field(thd->mem_root, table); new_field= org_field->new_field(thd->mem_root, table,
table == org_field->table);
if (new_field) if (new_field)
{ {
if (item) if (item)
...@@ -13072,7 +13073,7 @@ setup_copy_fields(THD *thd, TMP_TABLE_PARAM *param, ...@@ -13072,7 +13073,7 @@ setup_copy_fields(THD *thd, TMP_TABLE_PARAM *param,
saved value saved value
*/ */
Field *field= item->field; Field *field= item->field;
item->result_field=field->new_field(thd->mem_root,field->table); item->result_field=field->new_field(thd->mem_root,field->table, 1);
char *tmp=(char*) sql_alloc(field->pack_length()+1); char *tmp=(char*) sql_alloc(field->pack_length()+1);
if (!tmp) if (!tmp)
goto err; goto err;
......
...@@ -736,7 +736,8 @@ bool Table_triggers_list::prepare_record1_accessors(TABLE *table) ...@@ -736,7 +736,8 @@ bool Table_triggers_list::prepare_record1_accessors(TABLE *table)
QQ: it is supposed that it is ok to use this function for field QQ: it is supposed that it is ok to use this function for field
cloning... cloning...
*/ */
if (!(*old_fld= (*fld)->new_field(&table->mem_root, table))) if (!(*old_fld= (*fld)->new_field(&table->mem_root, table,
table == (*fld)->table)))
return 1; return 1;
(*old_fld)->move_field((my_ptrdiff_t)(table->record[1] - (*old_fld)->move_field((my_ptrdiff_t)(table->record[1] -
table->record[0])); table->record[0]));
......
...@@ -802,7 +802,8 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat, ...@@ -802,7 +802,8 @@ int openfrm(THD *thd, const char *name, const char *alias, uint db_stat,
if (!(field->flags & BLOB_FLAG)) if (!(field->flags & BLOB_FLAG))
{ // Create a new field { // Create a new field
field=key_part->field=field->new_field(&outparam->mem_root, field=key_part->field=field->new_field(&outparam->mem_root,
outparam); outparam,
outparam == field->table);
field->field_length=key_part->length; field->field_length=key_part->length;
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment