Commit c9c765a6 authored by Andrei Elkin's avatar Andrei Elkin

MDEV-14014 Unittest extension to cover concurrent IO_CACHE read and write by...

MDEV-14014 Unittest extension to cover concurrent IO_CACHE read and write by the dump and user threads.
parent 2c14c7d5
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
MY_ADD_TESTS(bitmap base64 my_atomic my_rdtsc lf my_malloc my_getopt dynstring MY_ADD_TESTS(bitmap base64 my_atomic my_rdtsc lf my_malloc my_getopt dynstring
aes aes my_io_cache_conc
LINK_LIBRARIES mysys) LINK_LIBRARIES mysys)
MY_ADD_TESTS(my_vsnprintf LINK_LIBRARIES strings mysys) MY_ADD_TESTS(my_vsnprintf LINK_LIBRARIES strings mysys)
......
/* Copyright (c) 2006, 2011, Oracle and/or its affiliates. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#define MY_IO_CACHE_CONC
int n_writers=2;
int n_readers=20;
unsigned long long n_messages=2000;
int cache_read_with_care=1;
#include "thr_template.c"
#define FILL 0x5A
#define CACHE_SIZE 16384
//IO_CACHE info;
#define INFO_TAIL ", pos_in_file = %llu, pos_in_mem = %lu\n", \
ptr_log->pos_in_file, (*ptr_log->current_pos - ptr_log->request_pos)
#define BUF_SIZE 2000
#define HDR_SIZE 8
my_off_t _end_pos;
uint last_written= 0;
IO_CACHE write_log;
void set_end_pos(my_off_t val)
{
// mutext must be hold
_end_pos= val;
pthread_cond_broadcast(&cond2);
}
pthread_handler_t writer(void *arg)
{
uchar buf[BUF_SIZE];
longlong param= *(ulonglong*) arg;
IO_CACHE *ptr_log= &write_log;
my_thread_init();
memset(buf, FILL, sizeof(buf));
diag("MDEV-14014 Dump thread reads past last 'officially' written byte");
for (; param > 0; param--)
{
int res;
// Generate a message of arb size that has at least 1 byte of payload
uint32 size= rand() % (BUF_SIZE - HDR_SIZE - 1) + HDR_SIZE + 1;
int4store(buf, size );
// Lock
pthread_mutex_lock(&mutex);
int4store(buf + 4, ++last_written);
res= my_b_write(ptr_log, buf, size);
//ok(res == 0, "buffer is written" INFO_TAIL);
res= my_b_flush_io_cache(ptr_log, 1);
set_end_pos(my_b_write_tell(ptr_log));
pthread_mutex_unlock(&mutex);
// Unlock
//ok(res == 0, "flush" INFO_TAIL);
}
pthread_mutex_lock(&mutex);
if (!--running_threads)
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
my_thread_end();
return 0;
}
my_off_t get_end_pos()
{
my_off_t ret;
pthread_mutex_lock(&mutex);
ret= _end_pos;
pthread_mutex_unlock(&mutex);
return ret;
}
my_off_t wait_new_events()
{
my_off_t ret;
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond2, &mutex);
ret= _end_pos;
pthread_mutex_unlock(&mutex);
return ret;
}
pthread_handler_t reader(void *arg)
{
int res;
uchar buf[BUF_SIZE];
File file= -1;
const char *log_file_name="my.log";
IO_CACHE read_log;
IO_CACHE *ptr_log= &read_log;
longlong n_messages= (*(longlong*) arg) * n_writers;
my_off_t log_pos;
//uint last_read= 0;
my_thread_init();
memset(buf, 0, sizeof( buf));
diag("MDEV-14014 Dump thread reads past last 'officially' written byte");
file= my_open(//key_file_binlog,
log_file_name, O_CREAT | O_RDONLY | O_BINARY | O_SHARE,
MYF(MY_WME));
//ok(file >= 0, "mysql_file_open\n");
res= init_io_cache(ptr_log, file, IO_SIZE*2, READ_CACHE, 0, 0,
MYF(MY_WME|MY_DONT_CHECK_FILESIZE));
//ok(res == 0, "init_io_cache");
log_pos= my_b_tell(ptr_log);
for (; n_messages > 0;)
{
my_off_t end_pos= get_end_pos();
size_t size;
if (log_pos >= end_pos)
end_pos= wait_new_events();
if (cache_read_with_care)
ptr_log->end_of_file= end_pos;
while (log_pos < end_pos)
{
// Read a message in two steps
res= my_b_read(ptr_log, buf, HDR_SIZE);
//ok(res == 0, "my_b_read HDR_SIZE");
size= uint4korr(buf);
ok(size >= HDR_SIZE && size <= BUF_SIZE, "msg size within HDR_SIZE, BUF_SIZE\n");
//ok(uint4korr(buf+4) == ++last_read, "current msg number succeeds the last one\n");
res= my_b_read(ptr_log, buf + HDR_SIZE, size - HDR_SIZE);
//ok(res == 0, "my_b_read payload");
ok(res == 0 && buf[HDR_SIZE] == FILL && buf[size - 1] == FILL, "my_b_read sane");
n_messages--;
//ok(n_messages >= 0, "param is not negative");
log_pos= my_b_tell(ptr_log);
}
}
//my_sleep(1000000);
close_cached_file(ptr_log);
pthread_mutex_lock(&mutex);
if (!--running_threads)
pthread_cond_signal(&cond);
pthread_mutex_unlock(&mutex);
my_thread_end();
return 0;
}
void do_tests()
{
const char *log_file_name="my.log";
File file= my_open(//key_file_binlog,
log_file_name, O_CREAT | O_RDWR | O_BINARY | O_SHARE,
MYF(MY_WME));
int res;
IO_CACHE *ptr_log= &write_log;
ok(file >= 0, "mysql_file_open\n");
res= init_io_cache(ptr_log, file, IO_SIZE*2, WRITE_CACHE, 0, 0,
MYF(MY_WME|MY_DONT_CHECK_FILESIZE));
ok(res == 0, "init_io_cache");
test_concurrently2("my_io_cache_conc", writer, reader,
n_writers, n_readers, n_messages);
//my_sync(ptr_log->file, MYF(MY_WME|MY_SYNC_FILESIZE));
close_cached_file(ptr_log);
}
...@@ -23,6 +23,7 @@ volatile uint32 bad; ...@@ -23,6 +23,7 @@ volatile uint32 bad;
pthread_attr_t thr_attr; pthread_attr_t thr_attr;
pthread_mutex_t mutex; pthread_mutex_t mutex;
pthread_cond_t cond; pthread_cond_t cond;
pthread_cond_t cond2;
uint running_threads; uint running_threads;
void do_tests(); void do_tests();
...@@ -52,15 +53,66 @@ void test_concurrently(const char *test, pthread_handler handler, int n, int m) ...@@ -52,15 +53,66 @@ void test_concurrently(const char *test, pthread_handler handler, int n, int m)
ok(!bad, "tested %s in %g secs (%d)", test, ((double)now)/1e9, bad); ok(!bad, "tested %s in %g secs (%d)", test, ((double)now)/1e9, bad);
} }
void test_concurrently2(const char *test, pthread_handler handler1,
pthread_handler handler2, int n1, int n2, ulonglong param_h)
{
pthread_t t;
ulonglong now= my_interval_timer();
bad= 0;
diag("Testing %s with %d+%d threads... ", test, n1, n2);
running_threads= n1 + n2;
for (; n1 ; n1--)
{
if (pthread_create(&t, &thr_attr, handler1, &param_h) != 0)
{
diag("Could not create thread");
abort();
}
}
for (; n2 ; n2--)
{
if (pthread_create(&t, &thr_attr, handler2, &param_h) != 0)
{
diag("Could not create thread");
abort();
}
}
pthread_mutex_lock(&mutex);
while (running_threads)
pthread_cond_wait(&cond, &mutex);
pthread_mutex_unlock(&mutex);
now= my_interval_timer() - now;
ok(!bad, "tested %s in %g secs (%d)", test, ((double)now)/1e9, bad);
}
#ifdef MY_IO_CACHE_CONC
int main(int argc, char **argv)
#else
int main(int argc __attribute__((unused)), char **argv) int main(int argc __attribute__((unused)), char **argv)
#endif
{ {
MY_INIT(argv[0]); MY_INIT(argv[0]);
#ifdef MY_IO_CACHE_CONC
if (argc > 1)
n_readers= atoi(argv[1]);
if (argc > 2)
n_writers= atoi(argv[2]);
if (argc > 3)
n_messages= atoi(argv[3]);
if (argc > 4)
cache_read_with_care= atoi(argv[4]);
#else
if (argv[1] && *argv[1]) if (argv[1] && *argv[1])
DBUG_SET_INITIAL(argv[1]); DBUG_SET_INITIAL(argv[1]);
#endif
pthread_mutex_init(&mutex, 0); pthread_mutex_init(&mutex, 0);
pthread_cond_init(&cond, 0); pthread_cond_init(&cond, 0);
pthread_cond_init(&cond2, 0);
pthread_attr_init(&thr_attr); pthread_attr_init(&thr_attr);
pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED); pthread_attr_setdetachstate(&thr_attr,PTHREAD_CREATE_DETACHED);
...@@ -80,6 +132,7 @@ int main(int argc __attribute__((unused)), char **argv) ...@@ -80,6 +132,7 @@ int main(int argc __attribute__((unused)), char **argv)
#endif #endif
pthread_mutex_destroy(&mutex); pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond); pthread_cond_destroy(&cond);
pthread_cond_destroy(&cond2);
pthread_attr_destroy(&thr_attr); pthread_attr_destroy(&thr_attr);
my_end(0); my_end(0);
return exit_status(); return exit_status();
......
...@@ -34,3 +34,7 @@ ADD_EXECUTABLE(mf_iocache-t mf_iocache-t.cc ../../sql/mf_iocache_encr.cc) ...@@ -34,3 +34,7 @@ ADD_EXECUTABLE(mf_iocache-t mf_iocache-t.cc ../../sql/mf_iocache_encr.cc)
TARGET_LINK_LIBRARIES(mf_iocache-t mysys mytap) TARGET_LINK_LIBRARIES(mf_iocache-t mysys mytap)
ADD_DEPENDENCIES(mf_iocache-t GenError) ADD_DEPENDENCIES(mf_iocache-t GenError)
MY_ADD_TEST(mf_iocache) MY_ADD_TEST(mf_iocache)
# ADD_EXECUTABLE(my_io_cache_conc-t my_io_cache_conc-t.cc)
# TARGET_LINK_LIBRARIES(my_io_cache_conc-t mysys)
# MY_ADD_TEST(my_io_cache_conc)
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#include <my_sys.h> #include <my_sys.h>
#include <my_crypt.h> #include <my_crypt.h>
#include <tap.h> #include <tap.h>
#include <mysql/psi/mysql_file.h>
/*** tweaks and stubs for encryption code to compile ***************/ /*** tweaks and stubs for encryption code to compile ***************/
#define KEY_SIZE (128/8) #define KEY_SIZE (128/8)
...@@ -285,11 +286,53 @@ void mdev14014() ...@@ -285,11 +286,53 @@ void mdev14014()
close_cached_file(&info); close_cached_file(&info);
} }
void mdev14014_2()
{
int res;
uchar buf_o[200];
uchar buf_i[200];
File file= -1;
const char *log_file_name="my.log";
IO_CACHE log;
memset(buf_i, 0, sizeof( buf_i));
memset(buf_o, FILL, sizeof(buf_o));
diag("MDEV-14014 Dump thread reads past last 'officially' written byte");
init_io_cache_encryption();
file= my_open(//key_file_binlog,
log_file_name, O_CREAT | O_RDWR | O_BINARY | O_SHARE,
MYF(MY_WME));
ok(file >= 0, "mysql_file_open");
res= init_io_cache(&log, file, IO_SIZE*2, WRITE_CACHE, 0, 0,
MYF(MY_WME|MY_DONT_CHECK_FILESIZE));
ok(res == 0, "init_io_cache");
//res= open_cached_file(&info, 0, 0, CACHE_SIZE, 0);
//ok(res == 0, "open_cached_file" INFO_TAIL);
res= my_b_write(&log, buf_o, sizeof(buf_o));
ok(res == 0, "buffer is written" INFO_TAIL);
res= my_b_flush_io_cache(&log, 1);
ok(res == 0, "flush" INFO_TAIL);
res= reinit_io_cache(&log, READ_CACHE, 0, 0, 0);
ok(res == 0, "reinit READ_CACHE" INFO_TAIL);
log.end_of_file= 100;
res= my_b_read(&log, buf_i, sizeof(buf_i));
ok(res == 1 && buf_i[100] == 0 && buf_i[200-1] == 0,
"short read leaves buf_i[100..200-1] == 0");
close_cached_file(&log);
}
int main(int argc __attribute__((unused)),char *argv[]) int main(int argc __attribute__((unused)),char *argv[])
{ {
MY_INIT(argv[0]); MY_INIT(argv[0]);
plan(51); plan(57);
/* temp files with and without encryption */ /* temp files with and without encryption */
encrypt_tmp_files= 1; encrypt_tmp_files= 1;
...@@ -306,6 +349,7 @@ int main(int argc __attribute__((unused)),char *argv[]) ...@@ -306,6 +349,7 @@ int main(int argc __attribute__((unused)),char *argv[])
encrypt_tmp_files= 0; encrypt_tmp_files= 0;
mdev14014(); mdev14014();
mdev14014_2();
my_end(0); my_end(0);
return exit_status(); return exit_status();
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment