Commit cfedb6d9 authored by unknown's avatar unknown

Fix for bug#4312 ndb table, wrong behaviour on insert .. on duplicate key ..

parent d995bed2
......@@ -78,9 +78,9 @@ unique key(a)
) engine=ndb;
insert into t1 values(1, 'aAa');
insert into t1 values(2, 'aaa');
ERROR 23000: Can't write, because of unique constraint, to table 't1'
ERROR 23000: Duplicate entry '2' for key 1
insert into t1 values(3, 'AAA');
ERROR 23000: Can't write, because of unique constraint, to table 't1'
ERROR 23000: Duplicate entry '3' for key 1
select * from t1 order by p;
p a
1 aAa
......
......@@ -22,7 +22,7 @@ select * from t1 where b = 4 order by a;
a b c
3 4 6
insert into t1 values(8, 2, 3);
ERROR 23000: Can't write, because of unique constraint, to table 't1'
ERROR 23000: Duplicate entry '8' for key 1
select * from t1 order by a;
a b c
1 2 3
......@@ -65,7 +65,7 @@ select * from t2 where b = 4 order by a;
a b c
3 4 6
insert into t2 values(8, 2, 3);
ERROR 23000: Can't write, because of unique constraint, to table 't2'
ERROR 23000: Duplicate entry '8' for key 1
select * from t2 order by a;
a b c
1 2 3
......@@ -123,7 +123,7 @@ pk a
3 NULL
4 4
insert into t1 values (5,0);
ERROR 23000: Can't write, because of unique constraint, to table 't1'
ERROR 23000: Duplicate entry '5' for key 1
select * from t1 order by pk;
pk a
-1 NULL
......@@ -156,7 +156,7 @@ pk a b c
0 NULL 18 NULL
1 3 19 abc
insert into t2 values(2,3,19,'abc');
ERROR 23000: Can't write, because of unique constraint, to table 't2'
ERROR 23000: Duplicate entry '2' for key 1
select * from t2 order by pk;
pk a b c
-1 1 17 NULL
......
......@@ -535,27 +535,46 @@ count(*)
2000
insert into t1 select * from t1 where b < 10 order by pk1;
ERROR 23000: Duplicate entry '9' for key 1
DELETE FROM t1 WHERE pk1=2;
begin;
INSERT IGNORE INTO t1 VALUES(1,2,3);
ERROR HY000: Table storage engine for 't1' doesn't have this option
commit;
select * from t1 where pk1=1;
INSERT IGNORE INTO t1 VALUES(1,2,3),(2,3,4);
select * from t1 where pk1 < 3 order by pk1;
pk1 b c
0 0 0
1 1 1
INSERT IGNORE INTO t1 VALUES(1,2,3);
ERROR HY000: Table storage engine for 't1' doesn't have this option
select * from t1 where pk1=1;
2 3 4
rollback;
INSERT IGNORE INTO t1 VALUES(1,2,3),(2,3,4);
select * from t1 where pk1 < 3 order by pk1;
pk1 b c
0 0 0
1 1 1
REPLACE INTO t1 values(1, 2, 3);
2 3 4
REPLACE INTO t1 values(1, 78, 3);
select * from t1 where pk1=1;
pk1 b c
1 2 3
INSERT INTO t1 VALUES(1,1,1) ON DUPLICATE KEY UPDATE b=79;
ERROR HY000: Table storage engine for 't1' doesn't have this option
select * from t1 where pk1=1;
1 78 3
INSERT INTO t1 VALUES(1,1,1),(3,4,5) ON DUPLICATE KEY UPDATE b=79;
select * from t1 where pk1 < 4 order by pk1;
pk1 b c
0 0 0
1 79 3
2 3 4
3 79 3
INSERT INTO t1 VALUES(1,1,1),(3,4,5) ON DUPLICATE KEY UPDATE b=pk1+c;
select * from t1 where pk1 < 4 order by pk1;
pk1 b c
0 0 0
1 4 3
2 3 4
3 6 3
DELETE FROM t1 WHERE pk1 = 2 OR pk1 = 4 OR pk1 = 6;
INSERT INTO t1 VALUES(1,1,1),(2,2,17),(3,4,5) ON DUPLICATE KEY UPDATE pk1=b;
select * from t1 where pk1 = b and b != c order by pk1;
pk1 b c
1 2 3
2 2 17
4 4 3
6 6 3
DROP TABLE t1;
CREATE TABLE t1(a INT) ENGINE=ndb;
INSERT IGNORE INTO t1 VALUES (1);
......
......@@ -86,9 +86,9 @@ create table t1 (
# ok
insert into t1 values(1, 'aAa');
# fail
--error 1169
--error 1062
insert into t1 values(2, 'aaa');
--error 1169
--error 1062
insert into t1 values(3, 'AAA');
# 1
select * from t1 order by p;
......
......@@ -21,7 +21,7 @@ select * from t1 where b = 4 order by b;
insert into t1 values(7,8,3);
select * from t1 where b = 4 order by a;
-- error 1169
-- error 1062
insert into t1 values(8, 2, 3);
select * from t1 order by a;
delete from t1 where a = 1;
......@@ -49,7 +49,7 @@ select * from t2 where c = 6;
insert into t2 values(7,8,3);
select * from t2 where b = 4 order by a;
-- error 1169
-- error 1062
insert into t2 values(8, 2, 3);
select * from t2 order by a;
delete from t2 where a = 1;
......@@ -92,7 +92,7 @@ insert into t1 values (-1,NULL), (0,0), (1,NULL),(2,2),(3,NULL),(4,4);
select * from t1 order by pk;
--error 1169
--error 1062
insert into t1 values (5,0);
select * from t1 order by pk;
delete from t1 where a = 0;
......@@ -111,7 +111,7 @@ insert into t2 values (-1,1,17,NULL),(0,NULL,18,NULL),(1,3,19,'abc');
select * from t2 order by pk;
--error 1169
--error 1062
insert into t2 values(2,3,19,'abc');
select * from t2 order by pk;
delete from t2 where c IS NOT NULL;
......
......@@ -564,23 +564,37 @@ select count(*) from t1;
--error 1062
insert into t1 select * from t1 where b < 10 order by pk1;
DELETE FROM t1 WHERE pk1=2;
begin;
--error 1031
INSERT IGNORE INTO t1 VALUES(1,2,3);
commit;
select * from t1 where pk1=1;
INSERT IGNORE INTO t1 VALUES(1,2,3),(2,3,4);
select * from t1 where pk1 < 3 order by pk1;
rollback;
--error 1031
INSERT IGNORE INTO t1 VALUES(1,2,3);
select * from t1 where pk1=1;
INSERT IGNORE INTO t1 VALUES(1,2,3),(2,3,4);
select * from t1 where pk1 < 3 order by pk1;
REPLACE INTO t1 values(1, 2, 3);
REPLACE INTO t1 values(1, 78, 3);
select * from t1 where pk1=1;
--error 1031
INSERT INTO t1 VALUES(1,1,1) ON DUPLICATE KEY UPDATE b=79;
select * from t1 where pk1=1;
INSERT INTO t1 VALUES(1,1,1),(3,4,5) ON DUPLICATE KEY UPDATE b=79;
select * from t1 where pk1 < 4 order by pk1;
INSERT INTO t1 VALUES(1,1,1),(3,4,5) ON DUPLICATE KEY UPDATE b=pk1+c;
select * from t1 where pk1 < 4 order by pk1;
DELETE FROM t1 WHERE pk1 = 2 OR pk1 = 4 OR pk1 = 6;
INSERT INTO t1 VALUES(1,1,1),(2,2,17),(3,4,5) ON DUPLICATE KEY UPDATE pk1=b;
select * from t1 where pk1 = b and b != c order by pk1;
# The following test case currently does not work
#DELETE FROM t1;
#CREATE UNIQUE INDEX bi ON t1(b);
#INSERT INTO t1 VALUES
#(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5),
#(6,6,6),(7,7,7),(8,8,8),(9,9,9),(10,10,10);
#INSERT INTO t1 VALUES(0,1,0),(21,21,21) ON DUPLICATE KEY UPDATE pk1=b+10,c=b+10;
#select * from t1 order by pk1;
DROP TABLE t1;
......
......@@ -109,7 +109,7 @@ static const err_code_mapping err_map[]=
{
{ 626, HA_ERR_KEY_NOT_FOUND },
{ 630, HA_ERR_FOUND_DUPP_KEY },
{ 893, HA_ERR_FOUND_DUPP_UNIQUE },
{ 893, HA_ERR_FOUND_DUPP_KEY }, // Unique constraint
{ 721, HA_ERR_TABLE_EXIST },
{ 4244, HA_ERR_TABLE_EXIST },
......@@ -144,7 +144,7 @@ static int ndb_to_mysql_error(const NdbError *err)
// Push the NDB error message as warning
push_warning_printf(current_thd, MYSQL_ERROR::WARN_LEVEL_ERROR,
ER_GET_ERRMSG, ER(ER_GET_ERRMSG),
err->code, err->message, "NDB");
err->code, err->message, "NDB");
return err->code;
}
}
......@@ -1018,7 +1018,8 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
{
Field *field= table->field[i];
if ((thd->query_id == field->query_id) ||
m_retrieve_all_fields)
m_retrieve_all_fields ||
(field->flags & PRI_KEY_FLAG) && m_retrieve_primary_key)
{
if (get_ndb_value(op, field, i, buf))
ERR_RETURN(trans->getNdbError());
......@@ -1029,7 +1030,6 @@ int ha_ndbcluster::pk_read(const byte *key, uint key_len, byte *buf)
m_value[i].ptr= NULL;
}
}
if (execute_no_commit_ie(this,trans) != 0)
{
table->status= STATUS_NOT_FOUND;
......@@ -1093,6 +1093,34 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
DBUG_RETURN(0);
}
/*
Peek to check if a particular row already exists
*/
int ha_ndbcluster::peek_row()
{
NdbConnection *trans= m_active_trans;
NdbOperation *op;
THD *thd= current_thd;
DBUG_ENTER("peek_row");
NdbOperation::LockMode lm=
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type);
if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
op->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError());
int res;
if ((res= set_primary_key(op)))
ERR_RETURN(trans->getNdbError());
if (execute_no_commit_ie(this,trans) != 0)
{
table->status= STATUS_NOT_FOUND;
DBUG_RETURN(ndb_err(trans));
}
DBUG_RETURN(0);
}
/*
Read one record from NDB using unique secondary index
......@@ -1138,7 +1166,7 @@ int ha_ndbcluster::unique_index_read(const byte *key,
{
Field *field= table->field[i];
if ((thd->query_id == field->query_id) ||
(field->flags & PRI_KEY_FLAG))
(field->flags & PRI_KEY_FLAG)) // && m_retrieve_primary_key ??
{
if (get_ndb_value(op, field, i, buf))
ERR_RETURN(op->getNdbError());
......@@ -1566,7 +1594,7 @@ int ha_ndbcluster::filtered_scan(const byte *key, uint key_len,
Field* field= key_part->field;
uint ndb_fieldnr= key_part->fieldnr-1;
DBUG_PRINT("key_part", ("fieldnr: %d", ndb_fieldnr));
// const NDBCOL *col= tab->getColumn(ndb_fieldnr);
//const NDBCOL *col= ((const NDBTAB *) m_table)->getColumn(ndb_fieldnr);
uint32 field_len= field->pack_length();
DBUG_DUMP("key", (char*)key, field_len);
......@@ -1635,9 +1663,17 @@ int ha_ndbcluster::write_row(byte *record)
int res;
DBUG_ENTER("write_row");
if(m_ignore_dup_key_not_supported)
if(m_ignore_dup_key && table->primary_key != MAX_KEY)
{
DBUG_RETURN(HA_ERR_WRONG_COMMAND);
int peek_res= peek_row();
if (!peek_res)
{
m_dupkey= table->primary_key;
DBUG_RETURN(HA_ERR_FOUND_DUPP_KEY);
}
if (peek_res != HA_ERR_KEY_NOT_FOUND)
DBUG_RETURN(peek_res);
}
statistic_increment(ha_write_count,&LOCK_status);
......@@ -1791,7 +1827,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
NdbOperation *op;
uint i;
DBUG_ENTER("update_row");
statistic_increment(ha_update_count,&LOCK_status);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
table->timestamp_field->set_time();
......@@ -2653,15 +2689,15 @@ int ha_ndbcluster::extra(enum ha_extra_function operation)
m_use_write= TRUE;
} else
{
if (table->keys)
m_ignore_dup_key_not_supported= TRUE;
DBUG_PRINT("info", ("Ignoring duplicate key"));
m_ignore_dup_key= TRUE;
}
break;
case HA_EXTRA_NO_IGNORE_DUP_KEY:
DBUG_PRINT("info", ("HA_EXTRA_NO_IGNORE_DUP_KEY"));
DBUG_PRINT("info", ("Turning OFF use of write instead of insert"));
m_use_write= FALSE;
m_ignore_dup_key_not_supported= FALSE;
m_ignore_dup_key= FALSE;
break;
case HA_EXTRA_RETRIEVE_ALL_COLS: /* Retrieve all columns, not just those
where field->query_id is the same as
......@@ -2680,6 +2716,7 @@ int ha_ndbcluster::extra(enum ha_extra_function operation)
break;
case HA_EXTRA_RETRIEVE_PRIMARY_KEY:
DBUG_PRINT("info", ("HA_EXTRA_RETRIEVE_PRIMARY_KEY"));
m_retrieve_primary_key= TRUE;
break;
case HA_EXTRA_CHANGE_KEY_TO_UNIQUE:
DBUG_PRINT("info", ("HA_EXTRA_CHANGE_KEY_TO_UNIQUE"));
......@@ -2942,6 +2979,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
DBUG_ASSERT(m_active_trans);
// Start of transaction
m_retrieve_all_fields= FALSE;
m_retrieve_primary_key= FALSE;
m_ops_pending= 0;
{
NDBDICT *dict= m_ndb->getDictionary();
......@@ -3034,6 +3072,7 @@ int ha_ndbcluster::start_stmt(THD *thd)
// Start of statement
m_retrieve_all_fields= FALSE;
m_retrieve_primary_key= FALSE;
m_ops_pending= 0;
DBUG_RETURN(error);
......@@ -3640,9 +3679,10 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
HA_NO_PREFIX_CHAR_KEYS),
m_share(0),
m_use_write(FALSE),
m_ignore_dup_key_not_supported(FALSE),
m_ignore_dup_key(FALSE),
m_primary_key_update(FALSE),
m_retrieve_all_fields(FALSE),
m_retrieve_primary_key(FALSE),
m_rows_to_insert(1),
m_rows_inserted(0),
m_bulk_insert_rows(1024),
......
......@@ -183,6 +183,7 @@ class ha_ndbcluster: public handler
int pk_read(const byte *key, uint key_len, byte *buf);
int complemented_pk_read(const byte *old_data, byte *new_data);
int peek_row();
int unique_index_read(const byte *key, uint key_len,
byte *buf);
int ordered_index_scan(const key_range *start_key,
......@@ -242,9 +243,10 @@ class ha_ndbcluster: public handler
typedef union { NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
bool m_use_write;
bool m_ignore_dup_key_not_supported;
bool m_ignore_dup_key;
bool m_primary_key_update;
bool m_retrieve_all_fields;
bool m_retrieve_primary_key;
ha_rows m_rows_to_insert;
ha_rows m_rows_inserted;
ha_rows m_bulk_insert_rows;
......
......@@ -453,8 +453,8 @@ int mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
setup_tables(insert_table_list) ||
setup_fields(thd, 0, insert_table_list, *values, 0, 0, 0) ||
(duplic == DUP_UPDATE &&
(setup_fields(thd, 0, insert_table_list, update_fields, 0, 0, 0) ||
setup_fields(thd, 0, insert_table_list, update_values, 0, 0, 0))))
(setup_fields(thd, 0, insert_table_list, update_fields, 1, 0, 0) ||
setup_fields(thd, 0, insert_table_list, update_values, 1, 0, 0))))
DBUG_RETURN(-1);
if (find_real_table_in_list(table_list->next,
table_list->db, table_list->real_name))
......@@ -462,6 +462,9 @@ int mysql_prepare_insert(THD *thd, TABLE_LIST *table_list,
my_error(ER_UPDATE_TABLE_USED, MYF(0), table_list->real_name);
DBUG_RETURN(-1);
}
if (duplic == DUP_UPDATE || duplic == DUP_REPLACE)
table->file->extra(HA_EXTRA_RETRIEVE_PRIMARY_KEY);
DBUG_RETURN(0);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment