Commit 0deb740c authored by Will DeVries's avatar Will DeVries Committed by Sergei Petrunia

Rework transaction management.

parent edfb639e
This diff is collapsed.
......@@ -31,74 +31,36 @@ class clustrix_connection_cursor;
class clustrix_connection
{
private:
# define COMMAND_BUFFER_SIZE_INCREMENT 1024
# define COMMAND_BUFFER_SIZE_INCREMENT_BITS 10
MYSQL clustrix_net;
uchar *command_buffer;
size_t command_buffer_length;
size_t command_length;
uchar *reply_buffer;
size_t reply_length;
bool has_transaction;
bool has_anonymous_savepoint;
int commit_flag_next;
int trans_state;
int trans_flags;
int allocate_cursor(MYSQL *clustrix_net, ulong buffer_size,
clustrix_connection_cursor **scan);
public:
clustrix_connection()
: command_buffer(NULL), command_buffer_length(0), command_length(0)
{
DBUG_ENTER("clustrix_connection::clustrix_connection");
memset(&clustrix_net, 0, sizeof(MYSQL));
has_anonymous_savepoint = FALSE;
has_transaction = FALSE;
commit_flag_next = 0;
DBUG_VOID_RETURN;
}
~clustrix_connection()
{
DBUG_ENTER("clustrix_connection::~clustrix_connection");
if (is_connected())
disconnect(TRUE);
if (command_buffer)
my_free(command_buffer);
DBUG_VOID_RETURN;
}
clustrix_connection();
~clustrix_connection();
inline bool is_connected()
{
return clustrix_net.net.vio;
}
int connect();
void disconnect(bool is_destructor = FALSE);
int send_transaction_cmd();
bool begin_transaction();
bool commit_transaction();
bool rollback_transaction();
bool has_open_transaction();
int commit_transaction();
int rollback_transaction();
int begin_transaction_next();
int new_statement_next();
int rollback_statement_next(); // also starts new statement
void auto_commit_next();
inline bool has_open_transaction()
{
return has_transaction;
}
bool set_anonymous_savepoint();
bool release_anonymous_savepoint();
bool rollback_to_anonymous_savepoint();
inline bool has_open_anonymous_savepoint()
{
return has_anonymous_savepoint;
}
void auto_commit_closed();
int run_query(String &stmt);
my_ulonglong rows_affected();
int write_row(ulonglong clustrix_table_oid, uchar *packed_row,
size_t packed_size, ulonglong *last_insert_id);
int key_update(ulonglong clustrix_table_oid,
......@@ -111,7 +73,6 @@ class clustrix_connection
clustrix_lock_mode_t lock_mode, MY_BITMAP *read_set,
uchar *packed_key, ulong packed_key_length, uchar **rowdata,
ulong *rowdata_length);
enum sort_order {SORT_NONE = 0, SORT_ASC = 1, SORT_DESC = 2};
enum scan_type {
READ_KEY_OR_NEXT, /* rows with key and greater */
......@@ -121,7 +82,6 @@ class clustrix_connection
READ_FROM_START, /* rows with forwards from first key. */
READ_FROM_LAST, /* rows with backwards from last key. */
};
int scan_table(ulonglong clustrix_table_oid,
clustrix_lock_mode_t lock_mode,
MY_BITMAP *read_set, ushort row_req,
......@@ -159,6 +119,5 @@ class clustrix_connection
int begin_command(uchar command);
int send_command();
int read_query_response();
void auto_commit_closed();
};
#endif // _clustrix_connection_h
......@@ -40,7 +40,6 @@ static MYSQL_SYSVAR_INT
NULL, NULL, -1, -1, 2147483647, 0
);
char *clustrix_host;
static MYSQL_SYSVAR_STR
(
......@@ -104,7 +103,6 @@ static void update_host_list(char *clustrix_host)
DBUG_PRINT("clustrix_host", ("%s", clustrix_host));
}
char *clustrix_username;
static MYSQL_SYSVAR_STR
(
......@@ -727,7 +725,6 @@ int ha_clustrixdb::index_init(uint idx, bool sorted)
sorted_scan = sorted;
return 0;
}
int ha_clustrixdb::index_read(uchar * buf, const uchar * key, uint key_len,
......@@ -1062,6 +1059,8 @@ int ha_clustrixdb::external_lock(THD *thd, int lock_type)
DBUG_ENTER("ha_clustrixdb::external_lock()");
int error_code;
clustrix_connection *trx = get_trx(thd, &error_code);
if (error_code)
DBUG_RETURN(error_code);
if (lock_type == F_WRLCK)
clx_lock_type = CLUSTRIX_EXCLUSIVE;
......@@ -1071,18 +1070,18 @@ int ha_clustrixdb::external_lock(THD *thd, int lock_type)
clx_lock_type = CLUSTRIX_NO_LOCKS;
if (lock_type != F_UNLCK) {
if (!trx->has_open_transaction())
trx->begin_transaction();
if (!trx->has_open_transaction()) {
error_code = trx->begin_transaction_next();
if (error_code)
DBUG_RETURN(error_code);
}
trans_register_ha(thd, FALSE, clustrixdb_hton);
if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
if (!trx->has_open_anonymous_savepoint())
trx->set_anonymous_savepoint();
if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trans_register_ha(thd, TRUE, clustrixdb_hton);
}
}
DBUG_RETURN(0);
DBUG_RETURN(error_code);
}
/****************************************************************************
......@@ -1169,8 +1168,7 @@ void ha_clustrixdb::build_key_packed_row(uint index, const uchar *buf,
} else {
// make a row from the table
table->mark_columns_used_by_index(index, &table->tmp_set);
*packed_key_len = pack_row(table, &table->tmp_set, packed_key,
buf);
*packed_key_len = pack_row(table, &table->tmp_set, packed_key, buf);
}
}
......@@ -1212,19 +1210,15 @@ static int clustrixdb_commit(handlerton *hton, THD *thd, bool all)
clustrix_connection* trx = (clustrix_connection *) thd_get_ha_data(thd, hton);
assert(trx);
bool send_cmd = FALSE;
if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
if (trx->has_open_transaction())
send_cmd = trx->commit_transaction();
} else {
if (trx->has_open_anonymous_savepoint())
send_cmd = trx->release_anonymous_savepoint();
int error_code = 0;
if (trx->has_open_transaction()) {
if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
error_code = trx->commit_transaction();
else
error_code = trx->new_statement_next();
}
if (send_cmd)
return trx->send_transaction_cmd();
return 0;
return error_code;
}
static int clustrixdb_rollback(handlerton *hton, THD *thd, bool all)
......@@ -1232,19 +1226,15 @@ static int clustrixdb_rollback(handlerton *hton, THD *thd, bool all)
clustrix_connection* trx = (clustrix_connection *) thd_get_ha_data(thd, hton);
assert(trx);
bool send_cmd = FALSE;
if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
if (trx->has_open_transaction())
send_cmd = trx->rollback_transaction();
} else {
if (trx->has_open_anonymous_savepoint())
send_cmd = trx->rollback_to_anonymous_savepoint();
int error_code = 0;
if (trx->has_open_transaction()) {
if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
error_code = trx->rollback_transaction();
else
error_code = trx->rollback_statement_next();
}
if (send_cmd)
return trx->send_transaction_cmd();
return 0;
return error_code;
}
static handler* clustrixdb_create_handler(handlerton *hton, TABLE_SHARE *table,
......@@ -1271,7 +1261,6 @@ static int clustrixdb_panic(handlerton *hton, ha_panic_function type)
return 0;
}
static bool clustrixdb_show_status(handlerton *hton, THD *thd,
stat_print_fn *stat_print,
enum ha_stat_type stat_type)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment