Commit 6a31aea5 authored by Sergei Golubchik's avatar Sergei Golubchik

BUG#30301356 - SOME EVENTS ARE DELAYED AFTER DROPPING EVENT

queues.c cleanup and refactoring.

Restore old version of _downhead() (from before cd483c55)
that works well in an average case. Use it for queue_fix().

Move existing specialized version of _downhead() to queue_replace()
where it'll be handling the case it was specifically optimized for
(moving the element to the end of the queue).
And correct it to fix the heap not only down, but also up
(this fixes BUG#30301356).

Add unit tests.

Collateral cosmetic fixes.
parent 69bd7317
...@@ -54,7 +54,7 @@ typedef struct st_queue { ...@@ -54,7 +54,7 @@ typedef struct st_queue {
#define queue_top(queue) ((queue)->root[1]) #define queue_top(queue) ((queue)->root[1])
#define queue_element(queue,index) ((queue)->root[index]) #define queue_element(queue,index) ((queue)->root[index])
#define queue_end(queue) ((queue)->root[(queue)->elements]) #define queue_end(queue) ((queue)->root[(queue)->elements])
#define queue_replace_top(queue) _downheap(queue, 1, (queue)->root[1]) #define queue_replace_top(queue) _downheap(queue, 1)
#define queue_set_cmp_arg(queue, set_arg) (queue)->first_cmp_arg= set_arg #define queue_set_cmp_arg(queue, set_arg) (queue)->first_cmp_arg= set_arg
#define queue_set_max_at_top(queue, set_arg) \ #define queue_set_max_at_top(queue, set_arg) \
(queue)->max_at_top= set_arg ? -1 : 1 (queue)->max_at_top= set_arg ? -1 : 1
...@@ -62,23 +62,23 @@ typedef struct st_queue { ...@@ -62,23 +62,23 @@ typedef struct st_queue {
typedef int (*queue_compare)(void *,uchar *, uchar *); typedef int (*queue_compare)(void *,uchar *, uchar *);
int init_queue(QUEUE *queue,uint max_elements,uint offset_to_key, int init_queue(QUEUE *queue,uint max_elements,uint offset_to_key,
pbool max_at_top, queue_compare compare, my_bool max_at_top, queue_compare compare,
void *first_cmp_arg, uint offset_to_queue_pos, void *first_cmp_arg, uint offset_to_queue_pos,
uint auto_extent); uint auto_extent);
int reinit_queue(QUEUE *queue,uint max_elements,uint offset_to_key, int reinit_queue(QUEUE *queue,uint max_elements,uint offset_to_key,
pbool max_at_top, queue_compare compare, my_bool max_at_top, queue_compare compare,
void *first_cmp_arg, uint offset_to_queue_pos, void *first_cmp_arg, uint offset_to_queue_pos,
uint auto_extent); uint auto_extent);
int resize_queue(QUEUE *queue, uint max_elements); int resize_queue(QUEUE *queue, uint max_elements);
void delete_queue(QUEUE *queue); void delete_queue(QUEUE *queue);
void queue_insert(QUEUE *queue,uchar *element); void queue_insert(QUEUE *queue, uchar *element);
int queue_insert_safe(QUEUE *queue, uchar *element); int queue_insert_safe(QUEUE *queue, uchar *element);
uchar *queue_remove(QUEUE *queue,uint idx); uchar *queue_remove(QUEUE *queue,uint idx);
void queue_replace(QUEUE *queue,uint idx); void queue_replace(QUEUE *queue,uint idx);
#define queue_remove_all(queue) { (queue)->elements= 0; } #define queue_remove_all(queue) { (queue)->elements= 0; }
#define queue_is_full(queue) (queue->elements == queue->max_elements) #define queue_is_full(queue) (queue->elements == queue->max_elements)
void _downheap(QUEUE *queue, uint idx, uchar *element); void _downheap(QUEUE *queue, uint idx);
void queue_fix(QUEUE *queue); void queue_fix(QUEUE *queue);
#define is_queue_inited(queue) ((queue)->root != 0) #define is_queue_inited(queue) ((queue)->root != 0)
......
...@@ -70,10 +70,9 @@ ...@@ -70,10 +70,9 @@
*/ */
int init_queue(QUEUE *queue, uint max_elements, uint offset_to_key, int init_queue(QUEUE *queue, uint max_elements, uint offset_to_key,
pbool max_at_top, int (*compare) (void *, uchar *, uchar *), my_bool max_at_top, int (*compare) (void *, uchar *, uchar *),
void *first_cmp_arg, uint offset_to_queue_pos, void *first_cmp_arg, uint offset_to_queue_pos,
uint auto_extent) uint auto_extent)
{ {
DBUG_ENTER("init_queue"); DBUG_ENTER("init_queue");
if ((queue->root= (uchar **) my_malloc((max_elements + 1) * sizeof(void*), if ((queue->root= (uchar **) my_malloc((max_elements + 1) * sizeof(void*),
...@@ -109,7 +108,7 @@ int init_queue(QUEUE *queue, uint max_elements, uint offset_to_key, ...@@ -109,7 +108,7 @@ int init_queue(QUEUE *queue, uint max_elements, uint offset_to_key,
*/ */
int reinit_queue(QUEUE *queue, uint max_elements, uint offset_to_key, int reinit_queue(QUEUE *queue, uint max_elements, uint offset_to_key,
pbool max_at_top, int (*compare) (void *, uchar *, uchar *), my_bool max_at_top, int (*compare) (void *, uchar *, uchar *),
void *first_cmp_arg, uint offset_to_queue_pos, void *first_cmp_arg, uint offset_to_queue_pos,
uint auto_extent) uint auto_extent)
{ {
...@@ -182,6 +181,28 @@ void delete_queue(QUEUE *queue) ...@@ -182,6 +181,28 @@ void delete_queue(QUEUE *queue)
} }
static void insert_at(QUEUE *queue, uchar *element, uint idx)
{
uint next_index, offset_to_key= queue->offset_to_key;
uint offset_to_queue_pos= queue->offset_to_queue_pos;
/* max_at_top swaps the comparison if we want to order by desc */
while ((next_index= idx >> 1) > 0 &&
queue->compare(queue->first_cmp_arg,
element + offset_to_key,
queue->root[next_index] + offset_to_key) *
queue->max_at_top < 0)
{
queue->root[idx]= queue->root[next_index];
if (offset_to_queue_pos)
(*(uint*) (queue->root[idx] + offset_to_queue_pos-1))= idx;
idx= next_index;
}
queue->root[idx]= element;
if (offset_to_queue_pos)
(*(uint*) (element + offset_to_queue_pos-1))= idx;
}
/* /*
Insert element in queue Insert element in queue
...@@ -191,28 +212,10 @@ void delete_queue(QUEUE *queue) ...@@ -191,28 +212,10 @@ void delete_queue(QUEUE *queue)
element Element to insert element Element to insert
*/ */
void queue_insert(register QUEUE *queue, uchar *element) void queue_insert(QUEUE *queue, uchar *element)
{ {
reg2 uint idx, next;
uint offset_to_queue_pos= queue->offset_to_queue_pos;
DBUG_ASSERT(queue->elements < queue->max_elements); DBUG_ASSERT(queue->elements < queue->max_elements);
insert_at(queue, element, ++queue->elements);
idx= ++queue->elements;
/* max_at_top swaps the comparison if we want to order by desc */
while (idx > 1 &&
(queue->compare(queue->first_cmp_arg,
element + queue->offset_to_key,
queue->root[(next= idx >> 1)] +
queue->offset_to_key) * queue->max_at_top) < 0)
{
queue->root[idx]= queue->root[next];
if (offset_to_queue_pos)
(*(uint*) (queue->root[idx] + offset_to_queue_pos-1))= idx;
idx= next;
}
queue->root[idx]= element;
if (offset_to_queue_pos)
(*(uint*) (element+ offset_to_queue_pos-1))= idx;
} }
...@@ -230,7 +233,7 @@ void queue_insert(register QUEUE *queue, uchar *element) ...@@ -230,7 +233,7 @@ void queue_insert(register QUEUE *queue, uchar *element)
2 auto_extend is 0; No insertion done 2 auto_extend is 0; No insertion done
*/ */
int queue_insert_safe(register QUEUE *queue, uchar *element) int queue_insert_safe(QUEUE *queue, uchar *element)
{ {
if (queue->elements == queue->max_elements) if (queue->elements == queue->max_elements)
...@@ -259,81 +262,55 @@ int queue_insert_safe(register QUEUE *queue, uchar *element) ...@@ -259,81 +262,55 @@ int queue_insert_safe(register QUEUE *queue, uchar *element)
pointer to removed element pointer to removed element
*/ */
uchar *queue_remove(register QUEUE *queue, uint idx) uchar *queue_remove(QUEUE *queue, uint idx)
{ {
uchar *element; uchar *element;
DBUG_ASSERT(idx >= 1 && idx <= queue->elements); DBUG_ASSERT(idx >= 1);
DBUG_ASSERT(idx <= queue->elements);
element= queue->root[idx]; element= queue->root[idx];
_downheap(queue, idx, queue->root[queue->elements--]); queue->root[idx]= queue->root[queue->elements--];
queue_replace(queue, idx);
return element; return element;
} }
/* /*
Add element to fixed position and update heap Restores the heap property from idx down the heap
SYNOPSIS SYNOPSIS
_downheap() _downheap()
queue Queue to use queue Queue to use
idx Index of element to change idx Index of element to change
element Element to store at 'idx'
NOTE
This only works if element is >= all elements <= start_idx
*/ */
void _downheap(register QUEUE *queue, uint start_idx, uchar *element) void _downheap(QUEUE *queue, uint idx)
{ {
uint elements,half_queue,offset_to_key, next_index, offset_to_queue_pos; uchar *element= queue->root[idx];
register uint idx= start_idx; uint next_index,
my_bool first= TRUE; elements= queue->elements,
half_queue= elements >> 1,
offset_to_key=queue->offset_to_key; offset_to_key= queue->offset_to_key,
offset_to_queue_pos= queue->offset_to_queue_pos; offset_to_queue_pos= queue->offset_to_queue_pos;
half_queue= (elements= queue->elements) >> 1;
while (idx <= half_queue) while (idx <= half_queue)
{ {
next_index=idx+idx; next_index= idx+idx;
if (next_index < elements && if (next_index < elements &&
(queue->compare(queue->first_cmp_arg, (queue->compare(queue->first_cmp_arg,
queue->root[next_index]+offset_to_key, queue->root[next_index]+offset_to_key,
queue->root[next_index+1]+offset_to_key) * queue->root[next_index+1]+offset_to_key) *
queue->max_at_top) > 0) queue->max_at_top) > 0)
next_index++; next_index++;
if (first && if ((queue->compare(queue->first_cmp_arg,
(((queue->compare(queue->first_cmp_arg,
queue->root[next_index]+offset_to_key, queue->root[next_index]+offset_to_key,
element+offset_to_key) * queue->max_at_top) >= 0))) element+offset_to_key) * queue->max_at_top) >= 0)
{ break;
queue->root[idx]= element;
if (offset_to_queue_pos)
(*(uint*) (element + offset_to_queue_pos-1))= idx;
return;
}
first= FALSE;
queue->root[idx]= queue->root[next_index];
if (offset_to_queue_pos)
(*(uint*) (queue->root[idx] + offset_to_queue_pos-1))= idx;
idx=next_index;
}
/*
Insert the element into the right position. This is the same code
as we have in queue_insert()
*/
while ((next_index= (idx >> 1)) > start_idx &&
queue->compare(queue->first_cmp_arg,
element+offset_to_key,
queue->root[next_index]+offset_to_key)*
queue->max_at_top < 0)
{
queue->root[idx]= queue->root[next_index]; queue->root[idx]= queue->root[next_index];
if (offset_to_queue_pos) if (offset_to_queue_pos)
(*(uint*) (queue->root[idx] + offset_to_queue_pos-1))= idx; (*(uint*) (queue->root[idx] + offset_to_queue_pos-1))= idx;
idx= next_index; idx= next_index;
} }
queue->root[idx]= element; queue->root[idx]=element;
if (offset_to_queue_pos) if (offset_to_queue_pos)
(*(uint*) (element + offset_to_queue_pos-1))= idx; (*(uint*) (element + offset_to_queue_pos-1))= idx;
} }
...@@ -351,7 +328,7 @@ void queue_fix(QUEUE *queue) ...@@ -351,7 +328,7 @@ void queue_fix(QUEUE *queue)
{ {
uint i; uint i;
for (i= queue->elements >> 1; i > 0; i--) for (i= queue->elements >> 1; i > 0; i--)
_downheap(queue, i, queue_element(queue, i)); _downheap(queue, i);
} }
...@@ -362,13 +339,47 @@ void queue_fix(QUEUE *queue) ...@@ -362,13 +339,47 @@ void queue_fix(QUEUE *queue)
queue_replace() queue_replace()
queue Queue to use queue Queue to use
idx Index of element to change idx Index of element to change
element Element to store at 'idx'
NOTE
optimized for the case when the new position is close to the end of the
heap (typical for queue_remove() replacements).
*/ */
void queue_replace(QUEUE *queue, uint idx) void queue_replace(QUEUE *queue, uint idx)
{ {
uchar *element= queue->root[idx]; uchar *element= queue->root[idx];
DBUG_ASSERT(idx >= 1 && idx <= queue->elements); uint next_index,
queue_remove(queue, idx); elements= queue->elements,
queue_insert(queue, element); half_queue= elements>>1,
offset_to_key= queue->offset_to_key,
offset_to_queue_pos= queue->offset_to_queue_pos;
my_bool first= TRUE;
while (idx <= half_queue)
{
next_index= idx + idx;
if (next_index < elements &&
queue->compare(queue->first_cmp_arg,
queue->root[next_index]+offset_to_key,
queue->root[next_index+1]+offset_to_key) *
queue->max_at_top > 0)
next_index++;
if (first &&
queue->compare(queue->first_cmp_arg,
queue->root[next_index]+offset_to_key,
element+offset_to_key) * queue->max_at_top >= 0)
{
queue->root[idx]= element;
if (offset_to_queue_pos)
(*(uint*) (element + offset_to_queue_pos-1))= idx;
break;
}
first= FALSE;
queue->root[idx]= queue->root[next_index];
if (offset_to_queue_pos)
(*(uint*) (queue->root[idx] + offset_to_queue_pos-1))= idx;
idx=next_index;
}
insert_at(queue, element, idx);
} }
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA
MY_ADD_TESTS(bitmap base64 my_atomic my_rdtsc lf my_malloc my_getopt dynstring MY_ADD_TESTS(bitmap base64 my_atomic my_rdtsc lf my_malloc my_getopt dynstring
LINK_LIBRARIES mysys) queues LINK_LIBRARIES mysys)
MY_ADD_TESTS(my_vsnprintf LINK_LIBRARIES strings mysys) MY_ADD_TESTS(my_vsnprintf LINK_LIBRARIES strings mysys)
IF(WIN32) IF(WIN32)
......
/* Copyright (c) 2020, MariaDB Corporation
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1335 USA */
#include <my_global.h>
#include <my_sys.h>
#include <queues.h>
#include "tap.h"
int cmp(void *arg __attribute__((unused)), uchar *a, uchar *b)
{
return *a < *b ? -1 : *a > *b;
}
#define rnd(R) ((uint)(my_rnd(R) * INT_MAX32))
#define el(Q,I) ((uint)*queue_element(Q, I))
my_bool verbose;
my_bool check_queue(QUEUE *queue)
{
char b[1024]={0}, *s, *e=b+sizeof(b)-2;
my_bool ok=1;
uint i;
s= b + my_snprintf(b, e-b, "%x", el(queue, 1));
for (i=2; i <= queue->elements; i++)
{
s+= my_snprintf(s, e-s, ", %x", el(queue, i));
ok &= el(queue, i) <= el(queue, i>>1);
}
if (!ok || verbose)
diag("%s", b);
return ok;
}
int main(int argc __attribute__((unused)), char *argv[])
{
QUEUE q, *queue=&q;
MY_INIT(argv[0]);
plan(19);
verbose=1;
init_queue(queue, 256, 0, 1, cmp, NULL, 0, 0);
queue_insert(queue, (uchar*)"\x99");
queue_insert(queue, (uchar*)"\x19");
queue_insert(queue, (uchar*)"\x36");
queue_insert(queue, (uchar*)"\x17");
queue_insert(queue, (uchar*)"\x12");
queue_insert(queue, (uchar*)"\x05");
queue_insert(queue, (uchar*)"\x25");
queue_insert(queue, (uchar*)"\x09");
queue_insert(queue, (uchar*)"\x15");
queue_insert(queue, (uchar*)"\x06");
queue_insert(queue, (uchar*)"\x11");
queue_insert(queue, (uchar*)"\x01");
queue_insert(queue, (uchar*)"\x04");
queue_insert(queue, (uchar*)"\x13");
queue_insert(queue, (uchar*)"\x24");
ok(check_queue(queue), "after insert");
queue_remove(queue, 5);
ok(check_queue(queue), "after remove 5th");
queue_element(queue, 1) = (uchar*)"\x01";
queue_element(queue, 2) = (uchar*)"\x10";
queue_element(queue, 3) = (uchar*)"\x04";
queue_element(queue, 4) = (uchar*)"\x09";
queue_element(queue, 5) = (uchar*)"\x13";
queue_element(queue, 6) = (uchar*)"\x03";
queue_element(queue, 7) = (uchar*)"\x08";
queue_element(queue, 8) = (uchar*)"\x07";
queue_element(queue, 9) = (uchar*)"\x06";
queue_element(queue,10) = (uchar*)"\x12";
queue_element(queue,11) = (uchar*)"\x05";
queue_element(queue,12) = (uchar*)"\x02";
queue_element(queue,13) = (uchar*)"\x11";
queue->elements= 13;
ok(!check_queue(queue), "manually filled (queue property violated)");
queue_fix(queue);
ok(check_queue(queue), "fixed");
ok(*queue_remove_top(queue) == 0x13, "remove top 13");
ok(*queue_remove_top(queue) == 0x12, "remove top 12");
ok(*queue_remove_top(queue) == 0x11, "remove top 11");
ok(*queue_remove_top(queue) == 0x10, "remove top 10");
ok(*queue_remove_top(queue) == 0x09, "remove top 9");
ok(*queue_remove_top(queue) == 0x08, "remove top 8");
ok(*queue_remove_top(queue) == 0x07, "remove top 7");
ok(*queue_remove_top(queue) == 0x06, "remove top 6");
ok(*queue_remove_top(queue) == 0x05, "remove top 5");
ok(*queue_remove_top(queue) == 0x04, "remove top 4");
ok(*queue_remove_top(queue) == 0x03, "remove top 3");
ok(*queue_remove_top(queue) == 0x02, "remove top 2");
ok(*queue_remove_top(queue) == 0x01, "remove top 1");
/* random test */
{
int i, res;
struct my_rnd_struct rand;
my_rnd_init(&rand, (ulong)(intptr)&i, (ulong)(intptr)argv);
verbose=0;
for (res= i=1; i <= 250; i++)
{
uchar *s=alloca(2);
*s= rnd(&rand) % 251;
queue_insert(queue, s);
res &= check_queue(queue);
}
ok(res, "inserted 250");
while (queue->elements)
{
queue_remove(queue, (rnd(&rand) % queue->elements) + 1);
res &= check_queue(queue);
}
ok(res, "removed 250");
}
delete_queue(queue);
my_end(0);
return exit_status();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment