Commit d4480d25 authored by Will DeVries's avatar Will DeVries Committed by Sergei Petrunia

Convert transactions to use flags.

parent 43bd32ae
...@@ -31,7 +31,16 @@ enum clustrix_commands { ...@@ -31,7 +31,16 @@ enum clustrix_commands {
CLUSTRIX_SCAN_QUERY, CLUSTRIX_SCAN_QUERY,
CLUSTRIX_KEY_UPDATE, CLUSTRIX_KEY_UPDATE,
CLUSTRIX_SCAN_FROM_KEY, CLUSTRIX_SCAN_FROM_KEY,
CLUSTRIX_UPDATE_QUERY CLUSTRIX_UPDATE_QUERY,
CLUSTRIX_TRANSACTION_CMD
};
enum clustrix_transaction_flags {
CLUSTRIX_TRANS_BEGIN = 1,
CLUSTRIX_TRANS_COMMIT = 2,
CLUSTRIX_TRANS_ROLLBACK = 4,
CLUSTRIX_STMT_NEW = 8,
CLUSTRIX_STMT_ROLLBACK = 16
}; };
/**************************************************************************** /****************************************************************************
...@@ -136,6 +145,20 @@ int clustrix_connection::connect() ...@@ -136,6 +145,20 @@ int clustrix_connection::connect()
DBUG_RETURN(0); DBUG_RETURN(0);
} }
int clustrix_connection::begin_command(uchar command)
{
command_length = 0;
int error_code = 0;
if ((error_code = add_command_operand_uchar(command)))
return error_code;
if ((error_code = add_command_operand_uchar(commit_flag_next)))
return error_code;
commit_flag_next = 0;
return error_code;
}
int clustrix_connection::send_command() int clustrix_connection::send_command()
{ {
my_bool com_error; my_bool com_error;
...@@ -170,75 +193,104 @@ int clustrix_connection::read_query_response() ...@@ -170,75 +193,104 @@ int clustrix_connection::read_query_response()
return 0; return 0;
} }
int clustrix_connection::begin_trans() int clustrix_connection::send_transaction_cmd()
{ {
if (has_transaction) DBUG_ENTER("clustrix_connection::send_transaction_cmd");
return 0; if (!commit_flag_next)
DBUG_RETURN(0);
const char *stmt = "BEGIN TRANSACTION"; int error_code;
int error_code = mysql_real_query(&clustrix_net, stmt, strlen(stmt)); if ((error_code = begin_command(CLUSTRIX_TRANSACTION_CMD)))
if (error_code) DBUG_RETURN(error_code);
return mysql_errno(&clustrix_net); if ((error_code = send_command()))
DBUG_RETURN(error_code);
if ((error_code = read_query_response()))
DBUG_RETURN(mysql_errno(&clustrix_net));
DBUG_RETURN(error_code);
}
bool clustrix_connection::begin_trans()
{
DBUG_ENTER("clustrix_connection::begin_trans");
assert(!has_transaction);
commit_flag_next |= CLUSTRIX_TRANS_BEGIN;
has_transaction = TRUE; has_transaction = TRUE;
return error_code; DBUG_RETURN(TRUE);
} }
int clustrix_connection::commit_trans() bool clustrix_connection::commit_trans()
{ {
const char *stmt = "COMMIT TRANSACTION"; DBUG_ENTER("clustrix_connection::commit_trans");
int error_code = mysql_real_query(&clustrix_net, stmt, strlen(stmt)); assert(has_transaction);
if (error_code)
return mysql_errno(&clustrix_net); if (commit_flag_next & CLUSTRIX_TRANS_BEGIN) {
commit_flag_next &= ~CLUSTRIX_TRANS_BEGIN;
DBUG_RETURN(FALSE);
}
commit_flag_next |= CLUSTRIX_TRANS_COMMIT;
has_transaction = FALSE; has_transaction = FALSE;
has_statement_trans = FALSE; has_statement_trans = FALSE;
return error_code; DBUG_RETURN(TRUE);
} }
int clustrix_connection::rollback_trans() bool clustrix_connection::rollback_trans()
{ {
const char *stmt = "ROLLBACK TRANSACTION"; DBUG_ENTER("clustrix_connection::rollback_trans");
int error_code = mysql_real_query(&clustrix_net, stmt, strlen(stmt)); assert(has_transaction);
if (error_code)
return mysql_errno(&clustrix_net); if (commit_flag_next & CLUSTRIX_TRANS_BEGIN) {
commit_flag_next &= ~CLUSTRIX_TRANS_BEGIN;
DBUG_RETURN(FALSE);
}
commit_flag_next |= CLUSTRIX_TRANS_ROLLBACK;
has_transaction = FALSE; has_transaction = FALSE;
has_statement_trans = FALSE; has_statement_trans = FALSE;
return error_code; DBUG_RETURN(TRUE);
} }
int clustrix_connection::begin_stmt_trans() bool clustrix_connection::begin_stmt_trans()
{ {
DBUG_ENTER("clustrix_connection::begin_stmt_trans");
assert(has_transaction); assert(has_transaction);
if (has_statement_trans) assert(!has_statement_trans);
return 0;
const char *stmt = "SAVEPOINT STMT_TRANS"; commit_flag_next |= CLUSTRIX_STMT_NEW;
int error_code = mysql_real_query(&clustrix_net, stmt, strlen(stmt));
if (error_code)
return mysql_errno(&clustrix_net);
has_statement_trans = TRUE; has_statement_trans = TRUE;
return error_code; DBUG_RETURN(TRUE);
} }
int clustrix_connection::commit_stmt_trans() bool clustrix_connection::commit_stmt_trans()
{ {
DBUG_ENTER("clustrix_connection::commit_stmt_trans");
assert(has_transaction); assert(has_transaction);
const char *stmt = "RELEASE SAVEPOINT STMT_TRANS"; assert(has_statement_trans);
int error_code = mysql_real_query(&clustrix_net, stmt, strlen(stmt));
if (error_code) if (commit_flag_next & CLUSTRIX_STMT_NEW) {
return mysql_errno(&clustrix_net); commit_flag_next &= ~CLUSTRIX_STMT_NEW;
DBUG_RETURN(FALSE);
}
has_statement_trans = FALSE; has_statement_trans = FALSE;
return error_code; DBUG_RETURN(TRUE);
} }
int clustrix_connection::rollback_stmt_trans() bool clustrix_connection::rollback_stmt_trans()
{ {
DBUG_ENTER("clustrix_connection::rollback_stmt_trans");
assert(has_transaction); assert(has_transaction);
const char *stmt = "ROLLBACK TO STMT_TRANS"; assert(has_statement_trans);
int error_code = mysql_real_query(&clustrix_net, stmt, strlen(stmt));
if (error_code) if (commit_flag_next & CLUSTRIX_STMT_NEW) {
return mysql_errno(&clustrix_net); commit_flag_next &= ~CLUSTRIX_STMT_NEW;
DBUG_RETURN(FALSE);
}
commit_flag_next |= CLUSTRIX_STMT_ROLLBACK;
has_statement_trans = FALSE; has_statement_trans = FALSE;
return error_code; DBUG_RETURN(TRUE);
} }
int clustrix_connection::run_query(String &stmt) int clustrix_connection::run_query(String &stmt)
...@@ -261,7 +313,7 @@ int clustrix_connection::write_row(ulonglong clustrix_table_oid, ...@@ -261,7 +313,7 @@ int clustrix_connection::write_row(ulonglong clustrix_table_oid,
int error_code; int error_code;
command_length = 0; command_length = 0;
if ((error_code = add_command_operand_uchar(CLUSTRIX_WRITE_ROW))) if ((error_code = begin_command(CLUSTRIX_WRITE_ROW)))
return error_code; return error_code;
if ((error_code = add_command_operand_ulonglong(clustrix_table_oid))) if ((error_code = add_command_operand_ulonglong(clustrix_table_oid)))
...@@ -289,7 +341,7 @@ int clustrix_connection::key_update(ulonglong clustrix_table_oid, ...@@ -289,7 +341,7 @@ int clustrix_connection::key_update(ulonglong clustrix_table_oid,
int error_code; int error_code;
command_length = 0; command_length = 0;
if ((error_code = add_command_operand_uchar(CLUSTRIX_KEY_UPDATE))) if ((error_code = begin_command(CLUSTRIX_KEY_UPDATE)))
return error_code; return error_code;
if ((error_code = add_command_operand_ulonglong(clustrix_table_oid))) if ((error_code = add_command_operand_ulonglong(clustrix_table_oid)))
...@@ -321,7 +373,7 @@ int clustrix_connection::key_delete(ulonglong clustrix_table_oid, ...@@ -321,7 +373,7 @@ int clustrix_connection::key_delete(ulonglong clustrix_table_oid,
int error_code; int error_code;
command_length = 0; command_length = 0;
if ((error_code = add_command_operand_uchar(CLUSTRIX_KEY_DELETE))) if ((error_code = begin_command(CLUSTRIX_KEY_DELETE)))
return error_code; return error_code;
if ((error_code = add_command_operand_ulonglong(clustrix_table_oid))) if ((error_code = add_command_operand_ulonglong(clustrix_table_oid)))
...@@ -347,7 +399,7 @@ int clustrix_connection::key_read(ulonglong clustrix_table_oid, uint index, ...@@ -347,7 +399,7 @@ int clustrix_connection::key_read(ulonglong clustrix_table_oid, uint index,
int error_code; int error_code;
command_length = 0; command_length = 0;
if ((error_code = add_command_operand_uchar(CLUSTRIX_KEY_READ))) if ((error_code = begin_command(CLUSTRIX_KEY_READ)))
return error_code; return error_code;
if ((error_code = add_command_operand_ulonglong(clustrix_table_oid))) if ((error_code = add_command_operand_ulonglong(clustrix_table_oid)))
...@@ -530,7 +582,7 @@ int clustrix_connection::scan_table(ulonglong clustrix_table_oid, uint index, ...@@ -530,7 +582,7 @@ int clustrix_connection::scan_table(ulonglong clustrix_table_oid, uint index,
int error_code; int error_code;
command_length = 0; command_length = 0;
if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_TABLE))) if ((error_code = begin_command(CLUSTRIX_SCAN_TABLE)))
return error_code; return error_code;
if ((error_code = add_command_operand_ushort(row_req))) if ((error_code = add_command_operand_ushort(row_req)))
...@@ -581,7 +633,7 @@ int clustrix_connection::scan_query(String &stmt, uchar *fieldtype, uint fields, ...@@ -581,7 +633,7 @@ int clustrix_connection::scan_query(String &stmt, uchar *fieldtype, uint fields,
int error_code; int error_code;
command_length = 0; command_length = 0;
if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_QUERY))) if ((error_code = begin_command(CLUSTRIX_SCAN_QUERY)))
return error_code; return error_code;
if ((error_code = add_command_operand_ushort(row_req))) if ((error_code = add_command_operand_ushort(row_req)))
...@@ -655,7 +707,7 @@ int clustrix_connection::scan_from_key(ulonglong clustrix_table_oid, uint index, ...@@ -655,7 +707,7 @@ int clustrix_connection::scan_from_key(ulonglong clustrix_table_oid, uint index,
int error_code; int error_code;
command_length = 0; command_length = 0;
if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_FROM_KEY))) if ((error_code = begin_command(CLUSTRIX_SCAN_FROM_KEY)))
return error_code; return error_code;
if ((error_code = add_command_operand_ushort(row_req))) if ((error_code = add_command_operand_ushort(row_req)))
...@@ -698,7 +750,7 @@ int clustrix_connection::scan_next(clustrix_connection_cursor *scan, ...@@ -698,7 +750,7 @@ int clustrix_connection::scan_next(clustrix_connection_cursor *scan,
int error_code; int error_code;
command_length = 0; command_length = 0;
if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_NEXT))) if ((error_code = begin_command(CLUSTRIX_SCAN_NEXT)))
return error_code; return error_code;
if ((error_code = add_command_operand_ushort(scan->buffer_size))) if ((error_code = add_command_operand_ushort(scan->buffer_size)))
...@@ -731,7 +783,7 @@ int clustrix_connection::scan_end(clustrix_connection_cursor *scan) ...@@ -731,7 +783,7 @@ int clustrix_connection::scan_end(clustrix_connection_cursor *scan)
if (eof_reached) if (eof_reached)
return 0; return 0;
if ((error_code = add_command_operand_uchar(CLUSTRIX_SCAN_STOP))) if ((error_code = begin_command(CLUSTRIX_SCAN_STOP)))
return error_code; return error_code;
if ((error_code = add_command_operand_lcb(scan_refid))) if ((error_code = add_command_operand_lcb(scan_refid)))
......
...@@ -38,6 +38,7 @@ class clustrix_connection ...@@ -38,6 +38,7 @@ class clustrix_connection
bool has_transaction; bool has_transaction;
bool has_statement_trans; bool has_statement_trans;
int commit_flag_next;
public: public:
clustrix_connection() clustrix_connection()
...@@ -47,6 +48,7 @@ class clustrix_connection ...@@ -47,6 +48,7 @@ class clustrix_connection
memset(&clustrix_net, 0, sizeof(MYSQL)); memset(&clustrix_net, 0, sizeof(MYSQL));
has_statement_trans = FALSE; has_statement_trans = FALSE;
has_transaction = FALSE; has_transaction = FALSE;
commit_flag_next = 0;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -69,17 +71,19 @@ class clustrix_connection ...@@ -69,17 +71,19 @@ class clustrix_connection
int connect(); int connect();
void disconnect(bool is_destructor = FALSE); void disconnect(bool is_destructor = FALSE);
int begin_trans();
int commit_trans(); int send_transaction_cmd();
int rollback_trans(); bool begin_trans();
bool commit_trans();
bool rollback_trans();
inline bool has_trans() inline bool has_trans()
{ {
return has_transaction; return has_transaction;
} }
int begin_stmt_trans(); bool begin_stmt_trans();
int commit_stmt_trans(); bool commit_stmt_trans();
int rollback_stmt_trans(); bool rollback_stmt_trans();
inline bool has_stmt_trans() inline bool has_stmt_trans()
{ {
return has_statement_trans; return has_statement_trans;
...@@ -142,6 +146,7 @@ class clustrix_connection ...@@ -142,6 +146,7 @@ class clustrix_connection
int add_command_operand_vlstr(const uchar *str, size_t length); int add_command_operand_vlstr(const uchar *str, size_t length);
int add_command_operand_lex_string(LEX_CSTRING str); int add_command_operand_lex_string(LEX_CSTRING str);
int add_command_operand_bitmap(MY_BITMAP *bitmap); int add_command_operand_bitmap(MY_BITMAP *bitmap);
int begin_command(uchar command);
int send_command(); int send_command();
int read_query_response(); int read_query_response();
}; };
......
...@@ -932,11 +932,8 @@ int ha_clustrixdb::external_lock(THD *thd, int lock_type) ...@@ -932,11 +932,8 @@ int ha_clustrixdb::external_lock(THD *thd, int lock_type)
int error_code; int error_code;
clustrix_connection *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (lock_type != F_UNLCK) { if (lock_type != F_UNLCK) {
if ((error_code = trx->begin_trans())) trx->begin_trans();
return error_code; trx->begin_stmt_trans();
if ((error_code = trx->begin_stmt_trans()))
return error_code;
trans_register_ha(thd, FALSE, clustrixdb_hton); trans_register_ha(thd, FALSE, clustrixdb_hton);
if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
...@@ -1036,36 +1033,42 @@ void ha_clustrixdb::build_key_packed_row(uint index, const uchar *buf, ...@@ -1036,36 +1033,42 @@ void ha_clustrixdb::build_key_packed_row(uint index, const uchar *buf,
static int clustrixdb_commit(handlerton *hton, THD *thd, bool all) static int clustrixdb_commit(handlerton *hton, THD *thd, bool all)
{ {
int error_code = 0;
clustrix_connection* trx = (clustrix_connection *) thd_get_ha_data(thd, hton); clustrix_connection* trx = (clustrix_connection *) thd_get_ha_data(thd, hton);
assert(trx); assert(trx);
bool send_cmd;
if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) { if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
if (trx->has_trans()) if (trx->has_trans())
error_code = trx->commit_trans(); send_cmd = trx->commit_trans();
} else { } else {
if (trx->has_stmt_trans()) if (trx->has_stmt_trans())
error_code = trx->commit_stmt_trans(); send_cmd = trx->commit_stmt_trans();
} }
return error_code; if (send_cmd)
return trx->send_transaction_cmd();
return 0;
} }
static int clustrixdb_rollback(handlerton *hton, THD *thd, bool all) static int clustrixdb_rollback(handlerton *hton, THD *thd, bool all)
{ {
int error_code = 0;
clustrix_connection* trx = (clustrix_connection *) thd_get_ha_data(thd, hton); clustrix_connection* trx = (clustrix_connection *) thd_get_ha_data(thd, hton);
assert(trx); assert(trx);
bool send_cmd;
if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) { if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
if (trx->has_trans()) if (trx->has_trans())
error_code = trx->rollback_trans(); send_cmd = trx->rollback_trans();
} else { } else {
if (trx->has_stmt_trans()) if (trx->has_stmt_trans())
error_code = trx->rollback_stmt_trans(); send_cmd = trx->rollback_stmt_trans();
} }
return error_code; if (send_cmd)
return trx->send_transaction_cmd();
return 0;
} }
static handler* clustrixdb_create_handler(handlerton *hton, TABLE_SHARE *table, static handler* clustrixdb_create_handler(handlerton *hton, TABLE_SHARE *table,
...@@ -1080,12 +1083,11 @@ static int clustrixdb_close_connection(handlerton* hton, THD* thd) ...@@ -1080,12 +1083,11 @@ static int clustrixdb_close_connection(handlerton* hton, THD* thd)
if (!trx) if (!trx)
return 0; /* Transaction is not started */ return 0; /* Transaction is not started */
if (trx->has_stmt_trans()) int error_code = clustrixdb_rollback(clustrixdb_hton, thd, TRUE);
clustrixdb_rollback(clustrixdb_hton, thd, TRUE);
delete trx; delete trx;
return 0; return error_code;
} }
static int clustrixdb_panic(handlerton *hton, ha_panic_function type) static int clustrixdb_panic(handlerton *hton, ha_panic_function type)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment