Commit cce38f80 authored by Will DeVries's avatar Will DeVries Committed by Sergei Petrunia

Add support for commit on command completion.

parent d4480d25
......@@ -40,7 +40,8 @@ enum clustrix_transaction_flags {
CLUSTRIX_TRANS_COMMIT = 2,
CLUSTRIX_TRANS_ROLLBACK = 4,
CLUSTRIX_STMT_NEW = 8,
CLUSTRIX_STMT_ROLLBACK = 16
CLUSTRIX_STMT_ROLLBACK = 16,
CLUSTRIX_TRANS_COMMIT_ON_FINISH = 32
};
/****************************************************************************
......@@ -147,6 +148,7 @@ int clustrix_connection::connect()
int clustrix_connection::begin_command(uchar command)
{
assert(has_transaction);
command_length = 0;
int error_code = 0;
if ((error_code = add_command_operand_uchar(command)))
......@@ -155,7 +157,7 @@ int clustrix_connection::begin_command(uchar command)
if ((error_code = add_command_operand_uchar(commit_flag_next)))
return error_code;
commit_flag_next = 0;
commit_flag_next &= CLUSTRIX_TRANS_COMMIT_ON_FINISH;
return error_code;
}
......@@ -190,6 +192,7 @@ int clustrix_connection::read_query_response()
return error_code;
}
auto_commit_closed();
return 0;
}
......@@ -251,6 +254,21 @@ bool clustrix_connection::rollback_trans()
DBUG_RETURN(TRUE);
}
void clustrix_connection::auto_commit_next()
{
commit_flag_next |= CLUSTRIX_TRANS_COMMIT_ON_FINISH;
}
void clustrix_connection::auto_commit_closed()
{
assert(has_transaction);
if (commit_flag_next & CLUSTRIX_TRANS_COMMIT_ON_FINISH) {
has_transaction = FALSE;
has_statement_trans = FALSE;
commit_flag_next &= ~CLUSTRIX_TRANS_COMMIT_ON_FINISH;
}
}
bool clustrix_connection::begin_stmt_trans()
{
DBUG_ENTER("clustrix_connection::begin_stmt_trans");
......@@ -421,6 +439,7 @@ int clustrix_connection::key_read(ulonglong clustrix_table_oid, uint index,
if (packet_length == packet_error)
return mysql_errno(&clustrix_net);
auto_commit_closed();
uchar *data = clustrix_net.net.read_pos;
*rowdata_length = safe_net_field_length_ll(&data, packet_length);
*rowdata = (uchar *)my_malloc(*rowdata_length, MYF(MY_WME));
......@@ -467,13 +486,14 @@ class clustrix_connection_cursor {
DBUG_RETURN(0);
}
int load_rows_impl()
int load_rows_impl(bool *stmt_completed)
{
DBUG_ENTER("clustrix_connection_cursor::load_rows_impl");
int error_code = 0;
ulong packet_length = cli_safe_read(clustrix_net);
if (packet_length == packet_error) {
error_code = mysql_errno(clustrix_net);
*stmt_completed = TRUE;
if (error_code == HA_ERR_END_OF_FILE) {
// We have read all rows for query.
eof_reached = TRUE;
......@@ -492,7 +512,7 @@ class clustrix_connection_cursor {
if ((error_code = cache_row(rowdata, rowdata_length)))
DBUG_RETURN(error_code);
DBUG_RETURN(load_rows_impl());
DBUG_RETURN(load_rows_impl(stmt_completed));
}
public:
......@@ -521,20 +541,22 @@ class clustrix_connection_cursor {
DBUG_VOID_RETURN;
}
int load_rows()
int load_rows(bool *stmt_completed)
{
DBUG_ENTER("clustrix_connection_cursor::load_rows");
current_row = 0;
last_row = 0;
DBUG_RETURN(load_rows_impl());
DBUG_RETURN(load_rows_impl(stmt_completed));
}
int initialize()
int initialize(bool *stmt_completed)
{
DBUG_ENTER("clustrix_connection_cursor::initialize");
ulong packet_length = cli_safe_read(clustrix_net);
if (packet_length == packet_error)
if (packet_length == packet_error) {
*stmt_completed = TRUE;
DBUG_RETURN(mysql_errno(clustrix_net));
}
unsigned char *pos = clustrix_net->net.read_pos;
scan_refid = safe_net_field_length_ll(&pos, packet_length);
......@@ -544,7 +566,7 @@ class clustrix_connection_cursor {
if (!rows)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
DBUG_RETURN(load_rows());
DBUG_RETURN(load_rows(stmt_completed));
}
uchar *retrieve_row(ulong *rowdata_length)
......@@ -564,6 +586,7 @@ class clustrix_connection_cursor {
};
int allocate_clustrix_connection_cursor(MYSQL *clustrix_net, ulong buffer_size,
bool *stmt_completed,
clustrix_connection_cursor **scan)
{
DBUG_ENTER("allocate_clustrix_connection_cursor");
......@@ -571,7 +594,7 @@ int allocate_clustrix_connection_cursor(MYSQL *clustrix_net, ulong buffer_size,
if (!*scan)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
DBUG_RETURN((*scan)->initialize());
DBUG_RETURN((*scan)->initialize(stmt_completed));
}
int clustrix_connection::scan_table(ulonglong clustrix_table_oid, uint index,
......@@ -603,7 +626,12 @@ int clustrix_connection::scan_table(ulonglong clustrix_table_oid, uint index,
if ((error_code = send_command()))
return error_code;
return allocate_clustrix_connection_cursor(&clustrix_net, row_req, scan);
bool stmt_completed = FALSE;
error_code = allocate_clustrix_connection_cursor(&clustrix_net, row_req,
&stmt_completed, scan);
if (stmt_completed)
auto_commit_closed();
return error_code;
}
/**
......@@ -655,7 +683,12 @@ int clustrix_connection::scan_query(String &stmt, uchar *fieldtype, uint fields,
if ((error_code = send_command()))
return error_code;
return allocate_clustrix_connection_cursor(&clustrix_net, row_req, scan);
bool stmt_completed = FALSE;
error_code = allocate_clustrix_connection_cursor(&clustrix_net, row_req,
&stmt_completed, scan);
if (stmt_completed)
auto_commit_closed();
return error_code;
}
/**
......@@ -734,7 +767,12 @@ int clustrix_connection::scan_from_key(ulonglong clustrix_table_oid, uint index,
if ((error_code = send_command()))
return error_code;
return allocate_clustrix_connection_cursor(&clustrix_net, row_req, scan);
bool stmt_completed = FALSE;
error_code = allocate_clustrix_connection_cursor(&clustrix_net, row_req,
&stmt_completed, scan);
if (stmt_completed)
auto_commit_closed();
return error_code;
}
int clustrix_connection::scan_next(clustrix_connection_cursor *scan,
......@@ -762,7 +800,11 @@ int clustrix_connection::scan_next(clustrix_connection_cursor *scan,
if ((error_code = send_command()))
return error_code;
if ((error_code = scan->load_rows()))
bool stmt_completed = FALSE;
error_code = scan->load_rows(&stmt_completed);
if (stmt_completed)
auto_commit_closed();
if (error_code)
return error_code;
*rowdata = scan->retrieve_row(rowdata_length);
......@@ -792,9 +834,8 @@ int clustrix_connection::scan_end(clustrix_connection_cursor *scan)
if ((error_code = send_command()))
return error_code;
ulong packet_length = cli_safe_read(&clustrix_net);
if (packet_length == packet_error)
return mysql_errno(&clustrix_net);
if ((error_code = read_query_response()))
return error_code;
return 0;
}
......
......@@ -76,6 +76,8 @@ class clustrix_connection
bool begin_trans();
bool commit_trans();
bool rollback_trans();
void auto_commit_next();
void auto_commit_closed();
inline bool has_trans()
{
return has_transaction;
......
......@@ -933,11 +933,12 @@ int ha_clustrixdb::external_lock(THD *thd, int lock_type)
clustrix_connection *trx = get_trx(thd, &error_code);
if (lock_type != F_UNLCK) {
trx->begin_trans();
trx->begin_stmt_trans();
trans_register_ha(thd, FALSE, clustrixdb_hton);
if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) {
trx->begin_stmt_trans();
trans_register_ha(thd, TRUE, clustrixdb_hton);
}
}
return 0;
......
......@@ -145,6 +145,9 @@ create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex)
if (!trx)
goto err;
if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trx->auto_commit_next();
if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits,
num_null_bytes, field_metadata,
field_metadata_size,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment