Commit 72a58d53 authored by unknown's avatar unknown

Merge bk-internal.mysql.com:/data0/bk/mysql-5.1

into  bk-internal.mysql.com:/data0/bk/mysql-5.1-arch
parents 9e9f196b 7f1ce12a
......@@ -73,6 +73,10 @@ enum options_client
OPT_SLAP_AUTO_GENERATE_UNIQUE_QUERY_NUM,
OPT_SLAP_PRE_QUERY,
OPT_SLAP_POST_QUERY,
OPT_SLAP_PRE_SYSTEM,
OPT_SLAP_POST_SYSTEM,
OPT_SLAP_COMMIT,
OPT_SLAP_DETACH,
OPT_MYSQL_REPLACE_INTO, OPT_BASE64_OUTPUT, OPT_SERVER_ID,
OPT_FIX_TABLE_NAMES, OPT_FIX_DB_NAMES, OPT_SSL_VERIFY_SERVER_CERT,
OPT_DEBUG_INFO, OPT_COLUMN_TYPES, OPT_ERROR_LOG_FILE, OPT_WRITE_BINLOG,
......
......@@ -81,6 +81,7 @@
#define UPDATE_TYPE_REQUIRES_PREFIX 3
#define CREATE_TABLE_TYPE 4
#define SELECT_TYPE_REQUIRES_PREFIX 5
#define DELETE_TYPE_REQUIRES_PREFIX 6
#include "client_priv.h"
#include <mysqld_error.h>
......@@ -122,6 +123,8 @@ static char *host= NULL, *opt_password= NULL, *user= NULL,
*user_supplied_pre_statements= NULL,
*user_supplied_post_statements= NULL,
*default_engine= NULL,
*pre_system= NULL,
*post_system= NULL,
*opt_mysql_unix_port= NULL;
const char *delimiter= "\n";
......@@ -142,6 +145,8 @@ const char *auto_generate_sql_type= "mixed";
static unsigned long connect_flags= CLIENT_MULTI_RESULTS;
static int verbose, delimiter_length;
static uint commit_rate;
static uint detach_rate;
const char *num_int_cols_opt;
const char *num_char_cols_opt;
/* Yes, we do set defaults here */
......@@ -254,6 +259,8 @@ void statement_cleanup(statement *stmt);
void option_cleanup(option_string *stmt);
void concurrency_loop(MYSQL *mysql, uint current, option_string *eptr);
static int run_statements(MYSQL *mysql, statement *stmt);
int slap_connect(MYSQL *mysql);
static int run_query(MYSQL *mysql, const char *query, int len);
static const char ALPHANUMERICS[]=
"0123456789ABCDEFGHIJKLMNOPQRSTWXYZabcdefghijklmnopqrstuvwxyz";
......@@ -451,6 +458,16 @@ void concurrency_loop(MYSQL *mysql, uint current, option_string *eptr)
if (auto_generate_sql_autoincrement || auto_generate_sql_guid_primary)
generate_primary_key_list(mysql, eptr);
if (commit_rate)
run_query(mysql, "SET AUTOCOMMIT=0", strlen("SET AUTOCOMMIT=0"));
if (pre_system)
system(pre_system);
/*
Pre statements are always run after all other logic so they can
correct/adjust any item that they want.
*/
if (pre_statements)
run_statements(mysql, pre_statements);
......@@ -459,6 +476,9 @@ void concurrency_loop(MYSQL *mysql, uint current, option_string *eptr)
if (post_statements)
run_statements(mysql, post_statements);
if (post_system)
system(post_system);
/* We are finished with this run */
if (auto_generate_sql_autoincrement || auto_generate_sql_guid_primary)
drop_primary_key_list();
......@@ -527,6 +547,9 @@ static struct my_option my_long_options[] =
"Number of rows to insert to used in read and write loads (default is 100).\n",
(uchar**) &auto_generate_sql_number, (uchar**) &auto_generate_sql_number,
0, GET_ULL, REQUIRED_ARG, 100, 0, 0, 0, 0, 0},
{"commit", OPT_SLAP_COMMIT, "Commit records after X number of statements.",
(uchar**) &commit_rate, (uchar**) &commit_rate, 0, GET_UINT, REQUIRED_ARG,
0, 0, 0, 0, 0, 0},
{"compress", 'C', "Use compression in server/client protocol.",
(uchar**) &opt_compress, (uchar**) &opt_compress, 0, GET_BOOL, NO_ARG, 0, 0, 0,
0, 0, 0},
......@@ -550,6 +573,9 @@ static struct my_option my_long_options[] =
"Delimiter to use in SQL statements supplied in file or command line.",
(uchar**) &delimiter, (uchar**) &delimiter, 0, GET_STR, REQUIRED_ARG,
0, 0, 0, 0, 0, 0},
{"detach", OPT_SLAP_DETACH, "Detach connections after X number of requests.",
(uchar**) &detach_rate, (uchar**) &detach_rate, 0, GET_UINT, REQUIRED_ARG,
0, 0, 0, 0, 0, 0},
{"engine", 'e', "Storage engine to use for creating the table.",
(uchar**) &default_engine, (uchar**) &default_engine, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
......@@ -589,11 +615,21 @@ static struct my_option my_long_options[] =
(uchar**) &user_supplied_post_statements,
(uchar**) &user_supplied_post_statements,
0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"post-system", OPT_SLAP_POST_SYSTEM,
"System() string to run after the load has completed.",
(uchar**) &post_system,
(uchar**) &post_system,
0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"pre-query", OPT_SLAP_PRE_QUERY,
"Query to run or file containing query to run before executing.",
(uchar**) &user_supplied_pre_statements,
(uchar**) &user_supplied_pre_statements,
0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"pre-system", OPT_SLAP_PRE_SYSTEM,
"System() string to before load has completed.",
(uchar**) &pre_system,
(uchar**) &pre_system,
0, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0},
{"preserve-schema", OPT_MYSQL_PRESERVE_SCHEMA,
"Preserve the schema from the mysqlslap run, this happens unless "
"--auto-generate-sql or --create are used.",
......@@ -1715,6 +1751,7 @@ run_scheduler(stats *sptr, statement *stmts, uint concur, ulonglong limit)
pthread_handler_t run_task(void *p)
{
ulonglong counter= 0, queries;
ulonglong trans_counter;
MYSQL *mysql;
MYSQL_RES *result;
MYSQL_ROW row;
......@@ -1749,38 +1786,28 @@ pthread_handler_t run_task(void *p)
if (!opt_only_print)
{
/* Connect to server */
static ulong connection_retry_sleep= 100000; /* Microseconds */
int i, connect_error= 1;
for (i= 0; i < 10; i++)
{
if (mysql_real_connect(mysql, host, user, opt_password,
create_schema_string,
opt_mysql_port,
opt_mysql_unix_port,
connect_flags))
{
/* Connect suceeded */
connect_error= 0;
break;
}
my_sleep(connection_retry_sleep);
}
if (connect_error)
{
fprintf(stderr,"%s: Error when connecting to server: %d %s\n",
my_progname, mysql_errno(mysql), mysql_error(mysql));
if (slap_connect(mysql))
goto end;
}
}
DBUG_PRINT("info", ("connected."));
if (verbose >= 3)
printf("connected!\n");
queries= 0;
limit_not_met:
for (ptr= con->stmt; ptr && ptr->length; ptr= ptr->next)
for (ptr= con->stmt, trans_counter= 0;
ptr && ptr->length;
ptr= ptr->next, trans_counter++)
{
if (!opt_only_print && detach_rate && !(trans_counter % detach_rate))
{
mysql_close(mysql);
if (slap_connect(mysql))
goto end;
}
/*
We have to execute differently based on query type. This should become a function.
*/
......@@ -1837,6 +1864,9 @@ pthread_handler_t run_task(void *p)
}
queries++;
if (commit_rate && commit_rate <= trans_counter)
run_query(mysql, "COMMIT", strlen("COMMIT"));
if (con->limit && queries == con->limit)
goto end;
}
......@@ -1845,6 +1875,8 @@ pthread_handler_t run_task(void *p)
goto limit_not_met;
end:
if (commit_rate)
run_query(mysql, "COMMIT", strlen("COMMIT"));
if (!opt_only_print)
mysql_close(mysql);
......@@ -2104,3 +2136,34 @@ statement_cleanup(statement *stmt)
my_free(ptr, MYF(0));
}
}
int
slap_connect(MYSQL *mysql)
{
/* Connect to server */
static ulong connection_retry_sleep= 100000; /* Microseconds */
int x, connect_error= 1;
for (x= 0; x < 10; x++)
{
if (mysql_real_connect(mysql, host, user, opt_password,
create_schema_string,
opt_mysql_port,
opt_mysql_unix_port,
connect_flags))
{
/* Connect suceeded */
connect_error= 0;
break;
}
my_sleep(connection_retry_sleep);
}
if (connect_error)
{
fprintf(stderr,"%s: Error when connecting to server: %d %s\n",
my_progname, mysql_errno(mysql), mysql_error(mysql));
return 1;
}
return 0;
}
......@@ -167,3 +167,47 @@ SHOW TABLES;
select * from t1;
SHOW TABLES;
DROP SCHEMA IF EXISTS `mysqlslap`;
DROP SCHEMA IF EXISTS `mysqlslap`;
CREATE SCHEMA `mysqlslap`;
use mysqlslap;
set storage_engine=`heap`;
CREATE TABLE t1 (id int, name varchar(64));
create table t2(foo1 varchar(32), foo2 varchar(32));
INSERT INTO t1 VALUES (1, 'This is a test');
insert into t2 values ('test', 'test2');
SET AUTOCOMMIT=0;
SHOW TABLES;
select * from t1;
select * from t2;
COMMIT;
select * from t1;
select * from t2;
COMMIT;
select * from t1;
select * from t2;
COMMIT;
COMMIT;
SHOW TABLES;
DROP SCHEMA IF EXISTS `mysqlslap`;
DROP SCHEMA IF EXISTS `mysqlslap`;
CREATE SCHEMA `mysqlslap`;
use mysqlslap;
set storage_engine=`myisam`;
CREATE TABLE t1 (id int, name varchar(64));
create table t2(foo1 varchar(32), foo2 varchar(32));
INSERT INTO t1 VALUES (1, 'This is a test');
insert into t2 values ('test', 'test2');
SET AUTOCOMMIT=0;
SHOW TABLES;
select * from t1;
select * from t2;
COMMIT;
select * from t1;
select * from t2;
COMMIT;
select * from t1;
select * from t2;
COMMIT;
COMMIT;
SHOW TABLES;
DROP SCHEMA IF EXISTS `mysqlslap`;
......@@ -36,3 +36,5 @@
--exec $MYSQL_SLAP --silent --concurrency=5 --iterations=1 --number-int-cols=2 --number-char-cols=3 --auto-generate-sql --auto-generate-sql-guid-primary --auto-generate-sql-load-type=key --auto-generate-sql-execute-number=5 --auto-generate-sql-secondary-indexes=3
--exec $MYSQL_SLAP --only-print --delimiter=";" --query="select * from t1;select * from t2" --create="CREATE TABLE t1 (id int, name varchar(64)); create table t2(foo1 varchar(32), foo2 varchar(32)); INSERT INTO t1 VALUES (1, 'This is a test'); insert into t2 values ('test', 'test2')" --engine="heap,myisam" --post-query="SHOW TABLES" --pre-query="SHOW TABLES";
--exec $MYSQL_SLAP --only-print --delimiter=";" --query="select * from t1;select * from t2" --create="CREATE TABLE t1 (id int, name varchar(64)); create table t2(foo1 varchar(32), foo2 varchar(32)); INSERT INTO t1 VALUES (1, 'This is a test'); insert into t2 values ('test', 'test2')" --engine="heap,myisam" --post-query="SHOW TABLES" --pre-query="SHOW TABLES" --number-of-queries=6 --commit=1;
......@@ -436,6 +436,9 @@ int ha_archive::init_archive_writer()
}
/*
No locks are required because it is associated with just one handler instance
*/
int ha_archive::init_archive_reader()
{
DBUG_ENTER("ha_archive::init_archive_reader");
......@@ -794,15 +797,16 @@ int ha_archive::write_row(uchar *buf)
if (share->crashed)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
if (!share->archive_write_open)
if (init_archive_writer())
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
ha_statistic_increment(&SSV::ha_write_count);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
table->timestamp_field->set_time();
pthread_mutex_lock(&share->mutex);
if (!share->archive_write_open)
if (init_archive_writer())
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
if (table->next_number_field && record == table->record[0])
{
KEY *mkey= &table->s->key_info[0]; // We only support one key right now
......@@ -992,24 +996,6 @@ int ha_archive::rnd_init(bool scan)
{
DBUG_PRINT("info", ("archive will retrieve %llu rows",
(unsigned long long) scan_rows));
stats.records= 0;
/*
If dirty, we lock, and then reset/flush the data.
I found that just calling azflush() doesn't always work.
*/
pthread_mutex_lock(&share->mutex);
scan_rows= share->rows_recorded;
if (share->dirty == TRUE)
{
if (share->dirty == TRUE)
{
DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
azflush(&(share->archive_write), Z_SYNC_FLUSH);
share->dirty= FALSE;
}
}
pthread_mutex_unlock(&share->mutex);
if (read_data_header(&archive))
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
......@@ -1223,9 +1209,7 @@ int ha_archive::rnd_next(uchar *buf)
current_position= aztell(&archive);
rc= get_row(&archive, buf);
if (rc != HA_ERR_END_OF_FILE)
stats.records++;
table->status=rc ? STATUS_NOT_FOUND: 0;
DBUG_RETURN(rc);
}
......@@ -1461,12 +1445,33 @@ void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
int ha_archive::info(uint flag)
{
DBUG_ENTER("ha_archive::info");
/*
If dirty, we lock, and then reset/flush the data.
I found that just calling azflush() doesn't always work.
*/
pthread_mutex_lock(&share->mutex);
if (share->dirty == TRUE)
{
if (share->dirty == TRUE)
{
DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
azflush(&(share->archive_write), Z_SYNC_FLUSH);
share->dirty= FALSE;
}
}
/*
This should be an accurate number now, though bulk and delayed inserts can
cause the number to be inaccurate.
*/
stats.records= share->rows_recorded;
pthread_mutex_unlock(&share->mutex);
scan_rows= stats.records;
stats.deleted= 0;
DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
/* Costs quite a bit more to get all information */
if (flag & HA_STATUS_TIME)
{
......@@ -1486,7 +1491,9 @@ int ha_archive::info(uint flag)
if (flag & HA_STATUS_AUTO)
{
init_archive_reader();
pthread_mutex_lock(&share->mutex);
azflush(&archive, Z_SYNC_FLUSH);
pthread_mutex_unlock(&share->mutex);
stats.auto_increment_value= archive.auto_increment;
}
......@@ -1554,7 +1561,9 @@ int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
old_proc_info= thd_proc_info(thd, "Checking table");
/* Flush any waiting data */
pthread_mutex_lock(&share->mutex);
azflush(&(share->archive_write), Z_SYNC_FLUSH);
pthread_mutex_unlock(&share->mutex);
/*
Now we will rewind the archive file so that we are positioned at the
......
......@@ -88,6 +88,8 @@ class ha_archive: public handler
{
return (HA_NO_TRANSACTIONS | HA_REC_NOT_IN_SEQ | HA_CAN_BIT_FIELD |
HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE |
HA_STATS_RECORDS_IS_EXACT |
HA_HAS_RECORDS |
HA_FILE_BASED | HA_CAN_INSERT_DELAYED | HA_CAN_GEOMETRY);
}
ulong index_flags(uint idx, uint part, bool all_parts) const
......@@ -101,6 +103,7 @@ class ha_archive: public handler
uint max_supported_keys() const { return 1; }
uint max_supported_key_length() const { return sizeof(ulonglong); }
uint max_supported_key_part_length() const { return sizeof(ulonglong); }
ha_rows records() { return share->rows_recorded; }
int index_init(uint keynr, bool sorted);
virtual int index_read(uchar * buf, const uchar * key,
uint key_len, enum ha_rkey_function find_flag);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment