Commit 58320780 authored by joreland@mysql.com's avatar joreland@mysql.com

wl#2126 - ndb - Fix handling of null values wrt read multi range

parent dee5a921
......@@ -212,3 +212,54 @@ delete from t1 where d in (12,6,7);
select * from t1 where d in (12,6,7);
a b c d e
drop table t1;
create table t1 (
a int not null primary key,
b int,
c int,
d int,
unique index (b),
index(c)
) engine = ndb;
insert into t1 values
(1,null,1,1),
(2,2,2,2),
(3,null,null,3),
(4,4,null,4),
(5,null,5,null),
(6,6,6,null),
(7,null,null,null),
(8,8,null,null),
(9,null,9,9),
(10,10,10,10),
(11,null,null,11),
(12,12,null,12),
(13,null,13,null),
(14,14,14,null),
(15,null,null,null),
(16,16,null,null);
create table t2 as select * from t1 where a in (5,6,7,8,9,10);
select * from t2 order by a;
a b c d
5 NULL 5 NULL
6 6 6 NULL
7 NULL NULL NULL
8 8 NULL NULL
9 NULL 9 9
10 10 10 10
drop table t2;
create table t2 as select * from t1 where b in (5,6,7,8,9,10);
select * from t2 order by a;
a b c d
6 6 6 NULL
8 8 NULL NULL
10 10 10 10
drop table t2;
create table t2 as select * from t1 where c in (5,6,7,8,9,10);
select * from t2 order by a;
a b c d
5 NULL 5 NULL
6 6 6 NULL
9 NULL 9 9
10 10 10 10
drop table t2;
drop table t1;
......@@ -157,3 +157,45 @@ delete from t1 where d in (12,6,7);
select * from t1 where d in (12,6,7);
drop table t1;
# null handling
create table t1 (
a int not null primary key,
b int,
c int,
d int,
unique index (b),
index(c)
) engine = ndb;
insert into t1 values
(1,null,1,1),
(2,2,2,2),
(3,null,null,3),
(4,4,null,4),
(5,null,5,null),
(6,6,6,null),
(7,null,null,null),
(8,8,null,null),
(9,null,9,9),
(10,10,10,10),
(11,null,null,11),
(12,12,null,12),
(13,null,13,null),
(14,14,14,null),
(15,null,null,null),
(16,16,null,null);
create table t2 as select * from t1 where a in (5,6,7,8,9,10);
select * from t2 order by a;
drop table t2;
create table t2 as select * from t1 where b in (5,6,7,8,9,10);
select * from t2 order by a;
drop table t2;
create table t2 as select * from t1 where c in (5,6,7,8,9,10);
select * from t2 order by a;
drop table t2;
drop table t1;
......@@ -750,6 +750,7 @@ protected:
NdbOperation* next(); // Get next pointer
public:
const NdbOperation* next() const;
const NdbRecAttr* getFirstRecAttr() const;
protected:
enum OperationStatus
......@@ -1005,6 +1006,14 @@ NdbOperation::next() const
{
return theNext;
}
inline
const NdbRecAttr*
NdbOperation::getFirstRecAttr() const
{
return theReceiver.theFirstRecAttr;
}
/******************************************************************************
OperationStatus Status();
......
......@@ -241,6 +241,9 @@ public:
* i.e. objects that has been cloned.
*/
~NdbRecAttr();
public:
const NdbRecAttr* next() const;
private:
NdbRecAttr();
......@@ -252,7 +255,7 @@ private:
void init(); /* Initialise object when allocated */
void next(NdbRecAttr* aRecAttr);
NdbRecAttr* next() const;
NdbRecAttr* next();
int setup(const class NdbDictionary::Column* col, char* aValue);
int setup(const class NdbColumnImpl* anAttrInfo, char* aValue);
......@@ -401,6 +404,13 @@ NdbRecAttr::next(NdbRecAttr* aRecAttr)
inline
NdbRecAttr*
NdbRecAttr::next()
{
return theNext;
}
inline
const NdbRecAttr*
NdbRecAttr::next() const
{
return theNext;
......
......@@ -1109,17 +1109,17 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
if (!(op= trans->getNdbOperation((const NDBTAB *) m_table)) ||
op->readTuple(lm) != 0)
ERR_RETURN(trans->getNdbError());
int res;
if ((res= set_primary_key_from_old_data(op, old_data)))
ERR_RETURN(trans->getNdbError());
int res;
if ((res= set_primary_key_from_old_data(op, old_data)))
ERR_RETURN(trans->getNdbError());
// Read all unreferenced non-key field(s)
for (i= 0; i < no_fields; i++)
{
Field *field= table->field[i];
if (!(field->flags & PRI_KEY_FLAG) &&
(thd->query_id != field->query_id))
if (!((field->flags & PRI_KEY_FLAG) ||
(thd->query_id == field->query_id)))
{
if (get_ndb_value(op, field, i, new_data))
ERR_RETURN(trans->getNdbError());
......@@ -1135,6 +1135,20 @@ int ha_ndbcluster::complemented_pk_read(const byte *old_data, byte *new_data)
// The value have now been fetched from NDB
unpack_record(new_data);
table->status= 0;
/**
* restore m_value
*/
for (i= 0; i < no_fields; i++)
{
Field *field= table->field[i];
if (!((field->flags & PRI_KEY_FLAG) ||
(thd->query_id == field->query_id)))
{
m_value[i].ptr= NULL;
}
}
DBUG_RETURN(0);
}
......@@ -1931,7 +1945,7 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
// Require that the PK for this record has previously been
// read into m_value
uint no_fields= table->fields;
NdbRecAttr* rec= m_value[no_fields].rec;
const NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec);
DBUG_DUMP("key", (char*)rec->aRef(), NDB_HIDDEN_PRIMARY_KEY_LENGTH);
......@@ -2013,7 +2027,7 @@ int ha_ndbcluster::delete_row(const byte *record)
// This table has no primary key, use "hidden" primary key
DBUG_PRINT("info", ("Using hidden key"));
uint no_fields= table->fields;
NdbRecAttr* rec= m_value[no_fields].rec;
const NdbRecAttr* rec= m_value[no_fields].rec;
DBUG_ASSERT(rec != NULL);
if (set_hidden_key(op, no_fields, rec->aRef()))
......@@ -2057,6 +2071,8 @@ void ha_ndbcluster::unpack_record(byte* buf)
Field **field, **end;
NdbValue *value= m_value;
DBUG_ENTER("unpack_record");
end = table->field + table->fields;
// Set null flag(s)
bzero(buf, table->null_bytes);
......@@ -2082,7 +2098,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
}
}
}
#ifndef DBUG_OFF
// Read and print all values that was fetched
if (table->primary_key == MAX_KEY)
......@@ -2091,7 +2107,7 @@ void ha_ndbcluster::unpack_record(byte* buf)
int hidden_no= table->fields;
const NDBTAB *tab= (const NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no);
NdbRecAttr* rec= m_value[hidden_no].rec;
const NdbRecAttr* rec= m_value[hidden_no].rec;
DBUG_ASSERT(rec);
DBUG_PRINT("hidden", ("%d: %s \"%llu\"", hidden_no,
hidden_col->getName(), rec->u_64_value()));
......@@ -2613,7 +2629,7 @@ void ha_ndbcluster::position(const byte *record)
// No primary key, get hidden key
DBUG_PRINT("info", ("Getting hidden key"));
int hidden_no= table->fields;
NdbRecAttr* rec= m_value[hidden_no].rec;
const NdbRecAttr* rec= m_value[hidden_no].rec;
const NDBTAB *tab= (const NDBTAB *) m_table;
const NDBCOL *hidden_col= tab->getColumn(hidden_no);
DBUG_ASSERT(hidden_col->getPrimaryKey() &&
......@@ -4998,6 +5014,7 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
*/
* multi_range_found_p= multi_ranges + multi_range_curr;
memcpy(table->record[0], m_multi_range_result_ptr, reclength);
setup_recattr(m_active_cursor->getOperation()->getFirstRecAttr());
unpack_record(table->record[0]);
table->status= 0;
DBUG_RETURN(0);
......@@ -5009,13 +5026,37 @@ ha_ndbcluster::read_multi_range_next(key_multi_range ** multi_range_found_p)
*/
* multi_range_found_p= multi_ranges + multi_range_curr;
memcpy(table->record[0], m_multi_range_result_ptr, reclength);
setup_recattr(op->getFirstRecAttr());
unpack_record(table->record[0]);
table->status= 0;
table->status= 0;
multi_range_curr++;
op= m_active_trans->getNextCompletedOperation(op);
m_current_multi_operation= m_active_trans->getNextCompletedOperation(op);
m_multi_range_result_ptr += reclength;
DBUG_RETURN(0);
}
int
ha_ndbcluster::setup_recattr(const NdbRecAttr* curr)
{
DBUG_ENTER("setup_recattr");
Field **field, **end;
NdbValue *value= m_value;
end = table->field + table->fields;
for (field= table->field; field < end; field++, value++)
{
if ((* value).ptr)
{
DBUG_ASSERT(curr != 0);
(* value).rec = curr;
curr = curr->next();
}
}
return 0;
}
#endif /* HAVE_NDBCLUSTER_DB */
......@@ -230,7 +230,7 @@ class ha_ndbcluster: public handler
NDB_SHARE *m_share;
NDB_INDEX_DATA m_index[MAX_KEY];
// NdbRecAttr has no reference to blob
typedef union { NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
typedef union { const NdbRecAttr *rec; NdbBlob *blob; void *ptr; } NdbValue;
NdbValue m_value[NDB_MAX_ATTRIBUTES_IN_TABLE];
bool m_use_write;
bool m_ignore_dup_key;
......@@ -259,7 +259,8 @@ class ha_ndbcluster: public handler
byte* m_multi_range_result_ptr;
uint m_multi_range_defined_count;
const NdbOperation* m_current_multi_operation;
int setup_recattr(const NdbRecAttr*);
void set_rec_per_key();
void records_update();
void no_uncommitted_rows_execute_failure();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment