Commit e1973222 authored by Sergey Petrunya's avatar Sergey Petrunya

MWL#121-125 DS-MRR improvements

- Address Monty's review feedback, part 1
- Fix buildbot failure 
parent c1bb9a2d
...@@ -548,3 +548,53 @@ int key_rec_cmp(void *key_p, uchar *first_rec, uchar *second_rec) ...@@ -548,3 +548,53 @@ int key_rec_cmp(void *key_p, uchar *first_rec, uchar *second_rec)
} while (key_info); /* no more keys to test */ } while (key_info); /* no more keys to test */
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/*
Compare two key tuples.
@brief
Compare two key tuples, i.e. two key values in KeyTupleFormat.
@param part KEY_PART_INFO with key description
@param key1 First key to compare
@param key2 Second key to compare
@param tuple_length Length of key1 (and key2, they are the same) in bytes.
@return
@retval 0 key1 == key2
@retval -1 key1 < key2
@retval +1 key1 > key2
*/
int key_tuple_cmp(KEY_PART_INFO *part, uchar *key1, uchar *key2,
uint tuple_length)
{
uchar *key1_end= key1 + tuple_length;
int len;
int res;
LINT_INIT(len);
for (;key1 < key1_end; key1 += len, key2 += len, part++)
{
len= part->store_length;
if (part->null_bit)
{
if (*key1) // key1 == NULL
{
if (!*key2) // key1(NULL) < key2(notNULL)
return -1;
continue;
}
else if (*key2) // key1(notNULL) > key2 (NULL)
return 1;
/* Step over the NULL bytes for key_cmp() call */
key1++;
key2++;
}
if ((res= part->field->key_cmp(key1, key2)))
return res;
}
return 0;
}
...@@ -348,7 +348,7 @@ int Mrr_ordered_index_reader::get_next(char **range_info) ...@@ -348,7 +348,7 @@ int Mrr_ordered_index_reader::get_next(char **range_info)
{ {
if ((res= kv_it.get_next())) if ((res= kv_it.get_next()))
{ {
kv_it.close_(); kv_it.move_to_next_key_value();
scanning_key_val_iter= FALSE; scanning_key_val_iter= FALSE;
if ((res != HA_ERR_KEY_NOT_FOUND && res != HA_ERR_END_OF_FILE)) if ((res != HA_ERR_KEY_NOT_FOUND && res != HA_ERR_END_OF_FILE))
DBUG_RETURN(res); DBUG_RETURN(res);
...@@ -360,10 +360,12 @@ int Mrr_ordered_index_reader::get_next(char **range_info) ...@@ -360,10 +360,12 @@ int Mrr_ordered_index_reader::get_next(char **range_info)
{ {
while ((res= kv_it.init(this))) while ((res= kv_it.init(this)))
{ {
if ((res != HA_ERR_KEY_NOT_FOUND && res != HA_ERR_END_OF_FILE) || if ((res != HA_ERR_KEY_NOT_FOUND && res != HA_ERR_END_OF_FILE))
key_buffer->is_empty()) DBUG_RETURN(res); /* Some fatal error */
if (key_buffer->is_empty())
{ {
DBUG_RETURN(res); DBUG_RETURN(HA_ERR_END_OF_FILE);
} }
} }
scanning_key_val_iter= TRUE; scanning_key_val_iter= TRUE;
...@@ -452,8 +454,8 @@ int Mrr_ordered_index_reader::refill_buffer() ...@@ -452,8 +454,8 @@ int Mrr_ordered_index_reader::refill_buffer()
DBUG_RETURN(HA_ERR_END_OF_FILE); DBUG_RETURN(HA_ERR_END_OF_FILE);
key_buffer->sort((key_buffer->type() == Lifo_buffer::FORWARD)? key_buffer->sort((key_buffer->type() == Lifo_buffer::FORWARD)?
(qsort2_cmp)Mrr_ordered_index_reader::key_tuple_cmp_reverse : (qsort2_cmp)Mrr_ordered_index_reader::compare_keys_reverse :
(qsort2_cmp)Mrr_ordered_index_reader::key_tuple_cmp, (qsort2_cmp)Mrr_ordered_index_reader::compare_keys,
this); this);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -965,60 +967,30 @@ void DsMrr_impl::dsmrr_close() ...@@ -965,60 +967,30 @@ void DsMrr_impl::dsmrr_close()
/* /*
my_qsort2-compatible function to compare key tuples my_qsort2-compatible static member function to compare key tuples
*/ */
int Mrr_ordered_index_reader::key_tuple_cmp(void* arg, uchar* key1, uchar* key2) int Mrr_ordered_index_reader::compare_keys(void* arg, uchar* key1, uchar* key2)
{ {
Mrr_ordered_index_reader *this_= (Mrr_ordered_index_reader*)arg; Mrr_ordered_index_reader *reader= (Mrr_ordered_index_reader*)arg;
TABLE *table= this_->h->get_table(); TABLE *table= reader->h->get_table();
int res; KEY_PART_INFO *part= table->key_info[reader->h->active_index].key_part;
KEY_PART_INFO *part= table->key_info[this_->h->active_index].key_part;
if (this_->keypar.use_key_pointers) if (reader->keypar.use_key_pointers)
{ {
/* the buffer stores pointers to keys, get to the keys */ /* the buffer stores pointers to keys, get to the keys */
key1= *((uchar**)key1); key1= *((uchar**)key1);
key2= *((uchar**)key2); // todo is this alignment-safe? key2= *((uchar**)key2); // todo is this alignment-safe?
} }
uchar *key1_end= key1 + this_->keypar.key_tuple_length; return key_tuple_cmp(part, key1, key2, reader->keypar.key_tuple_length);
while (key1 < key1_end)
{
Field* f = part->field;
int len = part->store_length;
if (part->null_bit)
{
if (*key1) // key1 == NULL
{
if (!*key2) // key1(NULL) < key2(notNULL)
return -1;
goto equals;
}
else if (*key2) // key1(notNULL) > key2 (NULL)
return 1;
// Step over NULL byte for f->cmp().
key1++;
key2++;
len--;
}
if ((res= f->key_cmp(key1, key2)))
return res;
equals:
key1 += len;
key2 += len;
part++;
}
return 0;
} }
int Mrr_ordered_index_reader::key_tuple_cmp_reverse(void* arg, uchar* key1, int Mrr_ordered_index_reader::compare_keys_reverse(void* arg, uchar* key1,
uchar* key2) uchar* key2)
{ {
return -key_tuple_cmp(arg, key1, key2); return -compare_keys(arg, key1, key2);
} }
...@@ -1174,7 +1146,7 @@ int Key_value_records_iterator::init(Mrr_ordered_index_reader *owner_arg) ...@@ -1174,7 +1146,7 @@ int Key_value_records_iterator::init(Mrr_ordered_index_reader *owner_arg)
while (!identical_key_it.read()) while (!identical_key_it.read())
{ {
if (owner->disallow_identical_key_handling || if (owner->disallow_identical_key_handling ||
Mrr_ordered_index_reader::key_tuple_cmp(owner, key_in_buf, Mrr_ordered_index_reader::compare_keys(owner, key_in_buf,
cur_index_tuple)) cur_index_tuple))
break; break;
last_identical_key_ptr= cur_index_tuple; last_identical_key_ptr= cur_index_tuple;
...@@ -1188,7 +1160,8 @@ int Key_value_records_iterator::init(Mrr_ordered_index_reader *owner_arg) ...@@ -1188,7 +1160,8 @@ int Key_value_records_iterator::init(Mrr_ordered_index_reader *owner_arg)
if (res) if (res)
{ {
close_(); /* Failed to find any matching records */
move_to_next_key_value();
return res; return res;
} }
get_next_row= FALSE; get_next_row= FALSE;
...@@ -1203,7 +1176,10 @@ int Key_value_records_iterator::get_next() ...@@ -1203,7 +1176,10 @@ int Key_value_records_iterator::get_next()
if (get_next_row) if (get_next_row)
{ {
if (owner->keypar.index_ranges_unique) if (owner->keypar.index_ranges_unique)
return HA_ERR_END_OF_FILE; /* Max one match */ {
/* We're using a full unique key, no point to call index_next_same */
return HA_ERR_END_OF_FILE;
}
handler *h= owner->h; handler *h= owner->h;
if ((res= h->ha_index_next_same(h->get_table()->record[0], if ((res= h->ha_index_next_same(h->get_table()->record[0],
...@@ -1220,13 +1196,18 @@ int Key_value_records_iterator::get_next() ...@@ -1220,13 +1196,18 @@ int Key_value_records_iterator::get_next()
identical_key_it.read(); /* This gets us next range_id */ identical_key_it.read(); /* This gets us next range_id */
if (!last_identical_key_ptr || (cur_index_tuple == last_identical_key_ptr)) if (!last_identical_key_ptr || (cur_index_tuple == last_identical_key_ptr))
{ {
/*
We've reached the last of the identical keys that current record is a
match for. Set get_next_row=TRUE so that we read the next index record
on the next call to this function.
*/
get_next_row= TRUE; get_next_row= TRUE;
} }
return 0; return 0;
} }
void Key_value_records_iterator::close_() void Key_value_records_iterator::move_to_next_key_value()
{ {
while (!owner->key_buffer->read() && while (!owner->key_buffer->read() &&
(cur_index_tuple != last_identical_key_ptr)) {} (cur_index_tuple != last_identical_key_ptr)) {}
......
...@@ -137,7 +137,7 @@ class Key_value_records_iterator ...@@ -137,7 +137,7 @@ class Key_value_records_iterator
public: public:
int init(Mrr_ordered_index_reader *owner_arg); int init(Mrr_ordered_index_reader *owner_arg);
int get_next(); int get_next();
void close_(); void move_to_next_key_value();
}; };
...@@ -299,8 +299,8 @@ class Mrr_ordered_index_reader : public Mrr_index_reader ...@@ -299,8 +299,8 @@ class Mrr_ordered_index_reader : public Mrr_index_reader
RANGE_SEQ_IF mrr_funcs; RANGE_SEQ_IF mrr_funcs;
range_seq_t mrr_iter; range_seq_t mrr_iter;
static int key_tuple_cmp(void* arg, uchar* key1, uchar* key2); static int compare_keys(void* arg, uchar* key1, uchar* key2);
static int key_tuple_cmp_reverse(void* arg, uchar* key1, uchar* key2); static int compare_keys_reverse(void* arg, uchar* key1, uchar* key2);
friend class Key_value_records_iterator; friend class Key_value_records_iterator;
friend class DsMrr_impl; friend class DsMrr_impl;
......
...@@ -1883,6 +1883,7 @@ void key_unpack(String *to,TABLE *form,uint index); ...@@ -1883,6 +1883,7 @@ void key_unpack(String *to,TABLE *form,uint index);
bool is_key_used(TABLE *table, uint idx, const MY_BITMAP *fields); bool is_key_used(TABLE *table, uint idx, const MY_BITMAP *fields);
int key_cmp(KEY_PART_INFO *key_part, const uchar *key, uint key_length); int key_cmp(KEY_PART_INFO *key_part, const uchar *key, uint key_length);
extern "C" int key_rec_cmp(void *key_info, uchar *a, uchar *b); extern "C" int key_rec_cmp(void *key_info, uchar *a, uchar *b);
int key_tuple_cmp(KEY_PART_INFO *part, uchar *key1, uchar *key2, uint tuple_length);
bool init_errmessage(void); bool init_errmessage(void);
#endif /* MYSQL_SERVER */ #endif /* MYSQL_SERVER */
......
...@@ -2123,7 +2123,7 @@ enum_nested_loop_state JOIN_CACHE::join_matching_records(bool skip_last) ...@@ -2123,7 +2123,7 @@ enum_nested_loop_state JOIN_CACHE::join_matching_records(bool skip_last)
/* Prepare to retrieve all records of the joined table */ /* Prepare to retrieve all records of the joined table */
if ((error= join_tab_scan->open())) if ((error= join_tab_scan->open()))
goto finish; goto finish; /* psergey-note: if this returns error, we will assert in net_send_statement() */
while (!(error= join_tab_scan->next())) while (!(error= join_tab_scan->next()))
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment