Commit 6195a016 authored by Will DeVries's avatar Will DeVries Committed by Sergei Petrunia

Remove st_clustrixdb_trx.

parent 84cfedff
...@@ -154,10 +154,14 @@ int clustrix_connection::read_query_response() ...@@ -154,10 +154,14 @@ int clustrix_connection::read_query_response()
int clustrix_connection::begin_trans() int clustrix_connection::begin_trans()
{ {
if (has_transaction)
return 0;
const char *stmt = "BEGIN TRANSACTION"; const char *stmt = "BEGIN TRANSACTION";
int error_code = mysql_real_query(&clustrix_net, stmt, strlen(stmt)); int error_code = mysql_real_query(&clustrix_net, stmt, strlen(stmt));
if (error_code) if (error_code)
return mysql_errno(&clustrix_net); return mysql_errno(&clustrix_net);
has_transaction = TRUE;
return error_code; return error_code;
} }
...@@ -167,6 +171,8 @@ int clustrix_connection::commit_trans() ...@@ -167,6 +171,8 @@ int clustrix_connection::commit_trans()
int error_code = mysql_real_query(&clustrix_net, stmt, strlen(stmt)); int error_code = mysql_real_query(&clustrix_net, stmt, strlen(stmt));
if (error_code) if (error_code)
return mysql_errno(&clustrix_net); return mysql_errno(&clustrix_net);
has_transaction = FALSE;
has_statement_trans = FALSE;
return error_code; return error_code;
} }
...@@ -176,6 +182,41 @@ int clustrix_connection::rollback_trans() ...@@ -176,6 +182,41 @@ int clustrix_connection::rollback_trans()
int error_code = mysql_real_query(&clustrix_net, stmt, strlen(stmt)); int error_code = mysql_real_query(&clustrix_net, stmt, strlen(stmt));
if (error_code) if (error_code)
return mysql_errno(&clustrix_net); return mysql_errno(&clustrix_net);
has_transaction = FALSE;
has_statement_trans = FALSE;
return error_code;
}
int clustrix_connection::begin_stmt_trans()
{
if (has_statement_trans)
return 0;
const char *stmt = "SAVEPOINT STMT_TRANS";
int error_code = mysql_real_query(&clustrix_net, stmt, strlen(stmt));
if (error_code)
return mysql_errno(&clustrix_net);
has_statement_trans = TRUE;
return error_code;
}
int clustrix_connection::commit_stmt_trans()
{
const char *stmt = "RELEASE SAVEPOINT STMT_TRANS";
int error_code = mysql_real_query(&clustrix_net, stmt, strlen(stmt));
if (error_code)
return mysql_errno(&clustrix_net);
has_statement_trans = FALSE;
return error_code;
}
int clustrix_connection::rollback_stmt_trans()
{
const char *stmt = "ROLLBACK TO STMT_TRANS";
int error_code = mysql_real_query(&clustrix_net, stmt, strlen(stmt));
if (error_code)
return mysql_errno(&clustrix_net);
has_statement_trans = FALSE;
return error_code; return error_code;
} }
...@@ -330,7 +371,7 @@ int clustrix_connection::scan_init(ulonglong clustrix_table_oid, uint index, ...@@ -330,7 +371,7 @@ int clustrix_connection::scan_init(ulonglong clustrix_table_oid, uint index,
* Sends a command over mysql protocol connection to initiate an * Sends a command over mysql protocol connection to initiate an
* arbitrary query using a query text. * arbitrary query using a query text.
* Uses field types, field metadata and nullability to explicitly * Uses field types, field metadata and nullability to explicitly
* cast result to expected data type. Exploits RBR TABLE_MAP_EVENT * cast result to expected data type. Exploits RBR TABLE_MAP_EVENT
* format + sends SQL text. * format + sends SQL text.
* @args * @args
* stmt& Query text to send * stmt& Query text to send
...@@ -356,7 +397,7 @@ int clustrix_connection::scan_query_init(String &stmt, uchar *fieldtype, ...@@ -356,7 +397,7 @@ int clustrix_connection::scan_query_init(String &stmt, uchar *fieldtype,
if ((error_code = add_command_operand_str(fieldtype, fields))) if ((error_code = add_command_operand_str(fieldtype, fields)))
return error_code; return error_code;
if ((error_code = add_command_operand_str(field_metadata, field_metadata_size))) if ((error_code = add_command_operand_str(field_metadata, field_metadata_size)))
return error_code; return error_code;
......
...@@ -36,12 +36,17 @@ class clustrix_connection ...@@ -36,12 +36,17 @@ class clustrix_connection
uchar *reply_buffer; uchar *reply_buffer;
size_t reply_length; size_t reply_length;
bool has_transaction;
bool has_statement_trans;
public: public:
ulonglong last_insert_id; ulonglong last_insert_id;
clustrix_connection() clustrix_connection()
: command_buffer(NULL), command_buffer_length(0), command_length(0) : command_buffer(NULL), command_buffer_length(0), command_length(0)
{ {
memset(&clustrix_net, 0, sizeof(MYSQL)); memset(&clustrix_net, 0, sizeof(MYSQL));
has_statement_trans = FALSE;
has_transaction = FALSE;
} }
~clustrix_connection() ~clustrix_connection()
...@@ -64,6 +69,18 @@ class clustrix_connection ...@@ -64,6 +69,18 @@ class clustrix_connection
int begin_trans(); int begin_trans();
int commit_trans(); int commit_trans();
int rollback_trans(); int rollback_trans();
inline bool has_trans()
{
return has_transaction;
}
int begin_stmt_trans();
int commit_stmt_trans();
int rollback_stmt_trans();
inline bool has_stmt_trans()
{
return has_statement_trans;
}
int create_table(String &stmt); int create_table(String &stmt);
int delete_table(String &stmt); int delete_table(String &stmt);
......
...@@ -118,61 +118,6 @@ bool derived_handler_setting(THD* thd) ...@@ -118,61 +118,6 @@ bool derived_handler_setting(THD* thd)
return ( thd == NULL ) ? false : THDVAR(thd, derived_handler); return ( thd == NULL ) ? false : THDVAR(thd, derived_handler);
} }
/****************************************************************************
** Class ha_clustrixdb_trx
****************************************************************************/
st_clustrixdb_trx::st_clustrixdb_trx(THD *trx_thd)
{
thd = trx_thd;
clustrix_net = NULL;
//query_id = 0;
//mem_root = NULL;
has_transaction = FALSE;
}
st_clustrixdb_trx::~st_clustrixdb_trx()
{
if (clustrix_net)
delete clustrix_net;
}
int st_clustrixdb_trx::net_init()
{
if (!this->clustrix_net)
{
this->clustrix_net = new clustrix_connection();
int error_code = this->clustrix_net->connect();
if (error_code)
return error_code;
}
return 0;
}
int st_clustrixdb_trx::begin_trans()
{
// XXX: What were these for?
//if (thd->transaction.stmt.trans_did_ddl() ||
// thd->transaction.stmt.modified_non_trans_table)
if (!has_transaction) {
int error_code = this->clustrix_net->begin_trans();
if (error_code)
return error_code;
/* Register for commit/rollback on the transaction */
if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trans_register_ha(thd, FALSE, clustrixdb_hton);
else
trans_register_ha(thd, TRUE, clustrixdb_hton);
has_transaction = TRUE;
}
return 0;
}
/**************************************************************************** /****************************************************************************
** Utility functions ** Utility functions
****************************************************************************/ ****************************************************************************/
...@@ -205,18 +150,19 @@ void decode_objectname(char *buf, const char *path, size_t buf_size) ...@@ -205,18 +150,19 @@ void decode_objectname(char *buf, const char *path, size_t buf_size)
buf[new_path_len] = '\0'; buf[new_path_len] = '\0';
} }
st_clustrixdb_trx *get_trx(THD *thd, int *error_code) clustrix_connection *get_trx(THD *thd, int *error_code)
{ {
*error_code = 0; *error_code = 0;
st_clustrixdb_trx *trx; clustrix_connection *trx;
if (!(trx = (st_clustrixdb_trx *)thd_get_ha_data(thd, clustrixdb_hton))) if (!(trx = (clustrix_connection *)thd_get_ha_data(thd, clustrixdb_hton)))
{ {
if (!(trx = new st_clustrixdb_trx(thd))) { if (!(trx = new clustrix_connection())) {
*error_code = HA_ERR_OUT_OF_MEM; *error_code = HA_ERR_OUT_OF_MEM;
return NULL; return NULL;
} }
if ((*error_code = trx->net_init())) { *error_code = trx->connect();
if (*error_code) {
delete trx; delete trx;
return NULL; return NULL;
} }
...@@ -251,7 +197,7 @@ int ha_clustrixdb::create(const char *name, TABLE *form, HA_CREATE_INFO *info) ...@@ -251,7 +197,7 @@ int ha_clustrixdb::create(const char *name, TABLE *form, HA_CREATE_INFO *info)
{ {
int error_code; int error_code;
THD *thd = ha_thd(); THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
...@@ -287,10 +233,10 @@ int ha_clustrixdb::create(const char *name, TABLE *form, HA_CREATE_INFO *info) ...@@ -287,10 +233,10 @@ int ha_clustrixdb::create(const char *name, TABLE *form, HA_CREATE_INFO *info)
createdb_stmt.append("CREATE DATABASE IF NOT EXISTS `"); createdb_stmt.append("CREATE DATABASE IF NOT EXISTS `");
createdb_stmt.append(form->s->db.str, form->s->db.length); createdb_stmt.append(form->s->db.str, form->s->db.length);
createdb_stmt.append("`"); createdb_stmt.append("`");
trx->clustrix_net->create_table(createdb_stmt); trx->create_table(createdb_stmt);
} }
error_code = trx->clustrix_net->create_table(create_table_stmt); error_code = trx->create_table(create_table_stmt);
return error_code; return error_code;
} }
...@@ -298,7 +244,7 @@ int ha_clustrixdb::delete_table(const char *path) ...@@ -298,7 +244,7 @@ int ha_clustrixdb::delete_table(const char *path)
{ {
int error_code; int error_code;
THD *thd = ha_thd(); THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
...@@ -309,7 +255,7 @@ int ha_clustrixdb::delete_table(const char *path) ...@@ -309,7 +255,7 @@ int ha_clustrixdb::delete_table(const char *path)
char decoded_tbname[FN_REFLEN]; char decoded_tbname[FN_REFLEN];
decode_objectname(decoded_tbname, dbname_end + 1, FN_REFLEN); decode_objectname(decoded_tbname, dbname_end + 1, FN_REFLEN);
String delete_cmd; String delete_cmd;
delete_cmd.append("DROP TABLE `"); delete_cmd.append("DROP TABLE `");
delete_cmd.append(path + 2, dbname_end - path - 2); delete_cmd.append(path + 2, dbname_end - path - 2);
...@@ -318,14 +264,14 @@ int ha_clustrixdb::delete_table(const char *path) ...@@ -318,14 +264,14 @@ int ha_clustrixdb::delete_table(const char *path)
delete_cmd.append("`"); delete_cmd.append("`");
return trx->clustrix_net->delete_table(delete_cmd); return trx->delete_table(delete_cmd);
} }
int ha_clustrixdb::rename_table(const char* from, const char* to) int ha_clustrixdb::rename_table(const char* from, const char* to)
{ {
int error_code; int error_code;
THD *thd = ha_thd(); THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
...@@ -343,7 +289,7 @@ int ha_clustrixdb::rename_table(const char* from, const char* to) ...@@ -343,7 +289,7 @@ int ha_clustrixdb::rename_table(const char* from, const char* to)
char decoded_to_tbname[FN_REFLEN]; char decoded_to_tbname[FN_REFLEN];
decode_objectname(decoded_to_tbname, to_dbname_end + 1, FN_REFLEN); decode_objectname(decoded_to_tbname, to_dbname_end + 1, FN_REFLEN);
String rename_cmd; String rename_cmd;
rename_cmd.append("RENAME TABLE `"); rename_cmd.append("RENAME TABLE `");
rename_cmd.append(from + 2, from_dbname_end - from - 2); rename_cmd.append(from + 2, from_dbname_end - from - 2);
...@@ -355,7 +301,7 @@ int ha_clustrixdb::rename_table(const char* from, const char* to) ...@@ -355,7 +301,7 @@ int ha_clustrixdb::rename_table(const char* from, const char* to)
rename_cmd.append(decoded_to_tbname); rename_cmd.append(decoded_to_tbname);
rename_cmd.append("`;"); rename_cmd.append("`;");
return trx->clustrix_net->rename_table(rename_cmd); return trx->rename_table(rename_cmd);
} }
...@@ -398,11 +344,11 @@ int ha_clustrixdb::write_row(uchar *buf) ...@@ -398,11 +344,11 @@ int ha_clustrixdb::write_row(uchar *buf)
{ {
int error_code = 0; int error_code = 0;
THD *thd = ha_thd(); THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
assert(trx->has_transaction); assert(trx->has_stmt_trans());
/* Convert the row format to binlog (packed) format */ /* Convert the row format to binlog (packed) format */
uchar *packed_new_row = (uchar*) my_alloca(estimate_row_size(table)); uchar *packed_new_row = (uchar*) my_alloca(estimate_row_size(table));
...@@ -410,12 +356,12 @@ int ha_clustrixdb::write_row(uchar *buf) ...@@ -410,12 +356,12 @@ int ha_clustrixdb::write_row(uchar *buf)
/* XXX: Clustrix may needs to return HA_ERR_AUTOINC_ERANGE if we hit that /* XXX: Clustrix may needs to return HA_ERR_AUTOINC_ERANGE if we hit that
error. */ error. */
if ((error_code = trx->clustrix_net->write_row(clustrix_table_oid, if ((error_code = trx->write_row(clustrix_table_oid,
packed_new_row, packed_size))) packed_new_row, packed_size)))
goto err; goto err;
if (table->next_number_field) if (table->next_number_field)
insert_id_for_cur_row = trx->clustrix_net->last_insert_id; insert_id_for_cur_row = trx->last_insert_id;
err: err:
if (packed_size) if (packed_size)
...@@ -428,11 +374,11 @@ int ha_clustrixdb::update_row(const uchar *old_data, const uchar *new_data) ...@@ -428,11 +374,11 @@ int ha_clustrixdb::update_row(const uchar *old_data, const uchar *new_data)
{ {
int error_code; int error_code;
THD *thd = ha_thd(); THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
assert(trx->has_transaction); assert(trx->has_stmt_trans());
size_t row_size = estimate_row_size(table); size_t row_size = estimate_row_size(table);
uchar *packed_new_row = (uchar*) my_alloca(row_size); uchar *packed_new_row = (uchar*) my_alloca(row_size);
...@@ -456,19 +402,19 @@ int ha_clustrixdb::delete_row(const uchar *buf) ...@@ -456,19 +402,19 @@ int ha_clustrixdb::delete_row(const uchar *buf)
{ {
int error_code; int error_code;
THD *thd = ha_thd(); THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
assert(trx->has_transaction); assert(trx->has_stmt_trans());
// The estimate should consider only key fields widths. // The estimate should consider only key fields widths.
size_t packed_key_len; size_t packed_key_len;
uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table)); uchar *packed_key = (uchar*) my_alloca(estimate_row_size(table));
build_key_packed_row(table->s->primary_key, packed_key, &packed_key_len); build_key_packed_row(table->s->primary_key, packed_key, &packed_key_len);
if ((error_code = trx->clustrix_net->key_delete(clustrix_table_oid, if ((error_code = trx->key_delete(clustrix_table_oid,
packed_key, packed_key_len))) packed_key, packed_key_len)))
goto err; goto err;
err: err:
...@@ -565,7 +511,7 @@ int ha_clustrixdb::index_init(uint idx, bool sorted) ...@@ -565,7 +511,7 @@ int ha_clustrixdb::index_init(uint idx, bool sorted)
{ {
int error_code = 0; int error_code = 0;
THD *thd = ha_thd(); THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
...@@ -583,7 +529,7 @@ int ha_clustrixdb::index_read(uchar * buf, const uchar * key, uint key_len, ...@@ -583,7 +529,7 @@ int ha_clustrixdb::index_read(uchar * buf, const uchar * key, uint key_len,
DBUG_ENTER("ha_clustrixdb::index_read"); DBUG_ENTER("ha_clustrixdb::index_read");
int error_code = 0; int error_code = 0;
THD *thd = ha_thd(); THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
DBUG_RETURN(error_code); DBUG_RETURN(error_code);
...@@ -596,10 +542,9 @@ int ha_clustrixdb::index_read(uchar * buf, const uchar * key, uint key_len, ...@@ -596,10 +542,9 @@ int ha_clustrixdb::index_read(uchar * buf, const uchar * key, uint key_len,
uchar *rowdata; uchar *rowdata;
ulong rowdata_length; ulong rowdata_length;
if ((error_code = trx->clustrix_net->key_read(clustrix_table_oid, if ((error_code = trx->key_read(clustrix_table_oid, active_index,
active_index, table->read_set, table->read_set, packed_key, packed_key_len,
packed_key, packed_key_len, &rowdata, &rowdata_length)))
&rowdata, &rowdata_length)))
goto err; goto err;
uchar const *current_row_end; uchar const *current_row_end;
...@@ -620,7 +565,7 @@ int ha_clustrixdb::index_first(uchar *buf) ...@@ -620,7 +565,7 @@ int ha_clustrixdb::index_first(uchar *buf)
{ {
int error_code = 0; int error_code = 0;
THD *thd = ha_thd(); THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
...@@ -640,11 +585,9 @@ int ha_clustrixdb::index_first(uchar *buf) ...@@ -640,11 +585,9 @@ int ha_clustrixdb::index_first(uchar *buf)
bitmap_set_all(&scan_fields); bitmap_set_all(&scan_fields);
#endif #endif
if ((error_code = trx->clustrix_net->scan_init(clustrix_table_oid, if ((error_code = trx->scan_init(clustrix_table_oid, active_index,
active_index, clustrix_connection::SORT_NONE, &scan_fields,
clustrix_connection::SORT_NONE, &scan_refid)))
&scan_fields,
&scan_refid)))
return error_code; return error_code;
...@@ -655,7 +598,7 @@ int ha_clustrixdb::index_last(uchar *buf) ...@@ -655,7 +598,7 @@ int ha_clustrixdb::index_last(uchar *buf)
{ {
int error_code = 0; int error_code = 0;
THD *thd = ha_thd(); THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
...@@ -675,11 +618,9 @@ int ha_clustrixdb::index_last(uchar *buf) ...@@ -675,11 +618,9 @@ int ha_clustrixdb::index_last(uchar *buf)
bitmap_set_all(&scan_fields); bitmap_set_all(&scan_fields);
#endif #endif
if ((error_code = trx->clustrix_net->scan_init(clustrix_table_oid, if ((error_code = trx->scan_init(clustrix_table_oid, active_index,
active_index, clustrix_connection::SORT_NONE, &scan_fields,
clustrix_connection::SORT_NONE, &scan_refid)))
&scan_fields,
&scan_refid)))
return error_code; return error_code;
...@@ -721,7 +662,7 @@ int ha_clustrixdb::rnd_init(bool scan) ...@@ -721,7 +662,7 @@ int ha_clustrixdb::rnd_init(bool scan)
{ {
int error_code = 0; int error_code = 0;
THD *thd = ha_thd(); THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
...@@ -744,11 +685,9 @@ int ha_clustrixdb::rnd_init(bool scan) ...@@ -744,11 +685,9 @@ int ha_clustrixdb::rnd_init(bool scan)
bitmap_set_all(&scan_fields); bitmap_set_all(&scan_fields);
#endif #endif
if ((error_code = trx->clustrix_net->scan_init(clustrix_table_oid, if ((error_code = trx->scan_init(clustrix_table_oid, 0,
0, clustrix_connection::SORT_NONE, &scan_fields,
clustrix_connection::SORT_NONE, &scan_refid)))
&scan_fields,
&scan_refid)))
return error_code; return error_code;
return 0; return 0;
...@@ -758,7 +697,7 @@ int ha_clustrixdb::rnd_next(uchar *buf) ...@@ -758,7 +697,7 @@ int ha_clustrixdb::rnd_next(uchar *buf)
{ {
int error_code = 0; int error_code = 0;
THD *thd = ha_thd(); THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
...@@ -767,8 +706,7 @@ int ha_clustrixdb::rnd_next(uchar *buf) ...@@ -767,8 +706,7 @@ int ha_clustrixdb::rnd_next(uchar *buf)
uchar *rowdata; uchar *rowdata;
ulong rowdata_length; ulong rowdata_length;
if ((error_code = trx->clustrix_net->scan_next(scan_refid, &rowdata, if ((error_code = trx->scan_next(scan_refid, &rowdata, &rowdata_length)))
&rowdata_length)))
return error_code; return error_code;
if (has_hidden_key) { if (has_hidden_key) {
...@@ -797,7 +735,7 @@ int ha_clustrixdb::rnd_pos(uchar * buf, uchar *pos) ...@@ -797,7 +735,7 @@ int ha_clustrixdb::rnd_pos(uchar * buf, uchar *pos)
int error_code = 0; int error_code = 0;
THD *thd = ha_thd(); THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
DBUG_RETURN(error_code); DBUG_RETURN(error_code);
...@@ -818,10 +756,9 @@ int ha_clustrixdb::rnd_pos(uchar * buf, uchar *pos) ...@@ -818,10 +756,9 @@ int ha_clustrixdb::rnd_pos(uchar * buf, uchar *pos)
uchar *rowdata; uchar *rowdata;
ulong rowdata_length; ulong rowdata_length;
if ((error_code = trx->clustrix_net->key_read(clustrix_table_oid, 0, if ((error_code = trx->key_read(clustrix_table_oid, 0, table->read_set,
table->read_set, packed_key, packed_key_len,
packed_key, packed_key_len, &rowdata, &rowdata_length)))
&rowdata, &rowdata_length)))
goto err; goto err;
uchar const *current_row_end; uchar const *current_row_end;
...@@ -843,12 +780,12 @@ int ha_clustrixdb::rnd_end() ...@@ -843,12 +780,12 @@ int ha_clustrixdb::rnd_end()
{ {
int error_code = 0; int error_code = 0;
THD *thd = ha_thd(); THD *thd = ha_thd();
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
my_bitmap_free(&scan_fields); my_bitmap_free(&scan_fields);
if (scan_refid && (error_code = trx->clustrix_net->scan_end(scan_refid))) if (scan_refid && (error_code = trx->scan_end(scan_refid)))
return error_code; return error_code;
scan_refid = 0; scan_refid = 0;
...@@ -885,9 +822,14 @@ THR_LOCK_DATA **ha_clustrixdb::store_lock(THD *thd, ...@@ -885,9 +822,14 @@ THR_LOCK_DATA **ha_clustrixdb::store_lock(THD *thd,
int ha_clustrixdb::external_lock(THD *thd, int lock_type) int ha_clustrixdb::external_lock(THD *thd, int lock_type)
{ {
int error_code; int error_code;
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (lock_type != F_UNLCK) if (lock_type != F_UNLCK) {
trx->begin_trans(); trx->begin_trans();
trx->begin_stmt_trans();
trans_register_ha(thd, FALSE, clustrixdb_hton);
if (thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trans_register_ha(thd, TRUE, clustrixdb_hton);
}
//if (lock_type != F_UNLCK) //if (lock_type != F_UNLCK)
//DBUG_ASSERT(trx && trx == get_trx(thd, &error_code)); //DBUG_ASSERT(trx && trx == get_trx(thd, &error_code));
...@@ -985,17 +927,14 @@ void ha_clustrixdb::build_key_packed_row(uint index, uchar *packed_key, ...@@ -985,17 +927,14 @@ void ha_clustrixdb::build_key_packed_row(uint index, uchar *packed_key,
static int clustrixdb_commit(handlerton *hton, THD *thd, bool all) static int clustrixdb_commit(handlerton *hton, THD *thd, bool all)
{ {
int error_code = 0; int error_code = 0;
st_clustrixdb_trx* trx = (st_clustrixdb_trx *) thd_get_ha_data(thd, hton); clustrix_connection* trx = (clustrix_connection *) thd_get_ha_data(thd, hton);
assert(trx); assert(trx);
if (trx->has_transaction) if (trx->has_stmt_trans() && ((error_code = trx->commit_stmt_trans())))
{ return error_code;
if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
{ if (all && trx->has_trans())
error_code = trx->clustrix_net->commit_trans(); error_code = trx->commit_trans();
trx->has_transaction = FALSE;
}
}
return error_code; return error_code;
} }
...@@ -1003,17 +942,14 @@ static int clustrixdb_commit(handlerton *hton, THD *thd, bool all) ...@@ -1003,17 +942,14 @@ static int clustrixdb_commit(handlerton *hton, THD *thd, bool all)
static int clustrixdb_rollback(handlerton *hton, THD *thd, bool all) static int clustrixdb_rollback(handlerton *hton, THD *thd, bool all)
{ {
int error_code = 0; int error_code = 0;
st_clustrixdb_trx* trx = (st_clustrixdb_trx *) thd_get_ha_data(thd, hton); clustrix_connection* trx = (clustrix_connection *) thd_get_ha_data(thd, hton);
assert(trx); assert(trx);
if (trx->has_transaction) if (trx->has_stmt_trans() && ((error_code = trx->rollback_stmt_trans())))
{ return error_code;
if (all || !thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
{ if (all || trx->has_trans())
error_code = trx->clustrix_net->rollback_trans(); error_code = trx->rollback_trans();
trx->has_transaction = FALSE;
}
}
return error_code; return error_code;
} }
...@@ -1026,11 +962,11 @@ static handler* clustrixdb_create_handler(handlerton *hton, TABLE_SHARE *table, ...@@ -1026,11 +962,11 @@ static handler* clustrixdb_create_handler(handlerton *hton, TABLE_SHARE *table,
static int clustrixdb_close_connection(handlerton* hton, THD* thd) static int clustrixdb_close_connection(handlerton* hton, THD* thd)
{ {
st_clustrixdb_trx* trx = (st_clustrixdb_trx *) thd_get_ha_data(thd, hton); clustrix_connection* trx = (clustrix_connection *) thd_get_ha_data(thd, hton);
if (!trx) if (!trx)
return 0; /* Transaction is not started */ return 0; /* Transaction is not started */
if (trx->has_transaction) if (trx->has_stmt_trans())
clustrixdb_rollback(clustrixdb_hton, thd, TRUE); clustrixdb_rollback(clustrixdb_hton, thd, TRUE);
delete trx; delete trx;
......
...@@ -21,27 +21,9 @@ Copyright (c) 2019, MariaDB Corporation. ...@@ -21,27 +21,9 @@ Copyright (c) 2019, MariaDB Corporation.
#include "../../sql/rpl_record.h" #include "../../sql/rpl_record.h"
size_t estimate_row_size(TABLE *table); size_t estimate_row_size(TABLE *table);
class st_clustrixdb_trx; clustrix_connection *get_trx(THD *thd, int *error_code);
st_clustrixdb_trx *get_trx(THD *thd, int *error_code);
bool get_enable_sh(THD* thd); bool get_enable_sh(THD* thd);
class ha_clustrixdb;
class st_clustrixdb_trx
{
public:
THD *thd;
clustrix_connection *clustrix_net;
//query_id_t query_id;
//MEM_ROOT mem_root; /* Memory allocated for the executing transaction */
bool has_transaction;
st_clustrixdb_trx(THD* trx_thd);
~st_clustrixdb_trx();
int net_init();
int begin_trans();
};
class ha_clustrixdb : public handler class ha_clustrixdb : public handler
{ {
private: private:
......
...@@ -99,7 +99,7 @@ create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex) ...@@ -99,7 +99,7 @@ create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex)
int error_code = 0; int error_code = 0;
int field_metadata_size = 0; int field_metadata_size = 0;
ulonglong scan_refid = 0; ulonglong scan_refid = 0;
st_clustrixdb_trx *trx = 0; clustrix_connection *trx = NULL;
// We presume this number is equal to types.elements in get_field_types // We presume this number is equal to types.elements in get_field_types
uint items_number = select_lex->get_item_list()->elements; uint items_number = select_lex->get_item_list()->elements;
...@@ -125,7 +125,7 @@ create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex) ...@@ -125,7 +125,7 @@ create_clustrixdb_select_handler(THD* thd, SELECT_LEX* select_lex)
if (!trx) if (!trx)
goto err; goto err;
if ((error_code = trx->clustrix_net->scan_query_init(query, fieldtype, items_number, if ((error_code = trx->scan_query_init(query, fieldtype, items_number,
null_bits, num_null_bytes, field_metadata, field_metadata_size, &scan_refid))) { null_bits, num_null_bytes, field_metadata, field_metadata_size, &scan_refid))) {
goto err; goto err;
} }
...@@ -169,12 +169,12 @@ ha_clustrixdb_select_handler::ha_clustrixdb_select_handler( ...@@ -169,12 +169,12 @@ ha_clustrixdb_select_handler::ha_clustrixdb_select_handler(
ha_clustrixdb_select_handler::~ha_clustrixdb_select_handler() ha_clustrixdb_select_handler::~ha_clustrixdb_select_handler()
{ {
int error_code; int error_code;
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) { if (!trx) {
// TBD Log this // TBD Log this
} }
if (trx && scan_refid) if (trx && scan_refid)
trx->clustrix_net->scan_end(scan_refid); trx->scan_end(scan_refid);
// If the ::init_scan has been executed // If the ::init_scan has been executed
if (table__) if (table__)
...@@ -217,7 +217,7 @@ int ha_clustrixdb_select_handler::init_scan() ...@@ -217,7 +217,7 @@ int ha_clustrixdb_select_handler::init_scan()
int ha_clustrixdb_select_handler::next_row() int ha_clustrixdb_select_handler::next_row()
{ {
int error_code = 0; int error_code = 0;
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
...@@ -225,8 +225,7 @@ int ha_clustrixdb_select_handler::next_row() ...@@ -225,8 +225,7 @@ int ha_clustrixdb_select_handler::next_row()
uchar *rowdata; uchar *rowdata;
ulong rowdata_length; ulong rowdata_length;
if ((error_code = trx->clustrix_net->scan_next(scan_refid, &rowdata, if ((error_code = trx->scan_next(scan_refid, &rowdata, &rowdata_length)))
&rowdata_length)))
return error_code; return error_code;
uchar const *current_row_end; uchar const *current_row_end;
...@@ -284,7 +283,7 @@ create_clustrixdb_derived_handler(THD* thd, TABLE_LIST *derived) ...@@ -284,7 +283,7 @@ create_clustrixdb_derived_handler(THD* thd, TABLE_LIST *derived)
int error_code = 0; int error_code = 0;
int field_metadata_size = 0; int field_metadata_size = 0;
ulonglong scan_refid = 0; ulonglong scan_refid = 0;
st_clustrixdb_trx *trx = 0; clustrix_connection *trx = NULL;
// We presume this number is equal to types.elements in get_field_types // We presume this number is equal to types.elements in get_field_types
uint items_number = select_lex->get_item_list()->elements; uint items_number = select_lex->get_item_list()->elements;
...@@ -310,7 +309,7 @@ create_clustrixdb_derived_handler(THD* thd, TABLE_LIST *derived) ...@@ -310,7 +309,7 @@ create_clustrixdb_derived_handler(THD* thd, TABLE_LIST *derived)
if (!trx) if (!trx)
goto err; goto err;
if ((error_code = trx->clustrix_net->scan_query_init(query, fieldtype, items_number, if ((error_code = trx->scan_query_init(query, fieldtype, items_number,
null_bits, num_null_bytes, field_metadata, field_metadata_size, &scan_refid))) { null_bits, num_null_bytes, field_metadata, field_metadata_size, &scan_refid))) {
goto err; goto err;
} }
...@@ -354,12 +353,12 @@ ha_clustrixdb_derived_handler::ha_clustrixdb_derived_handler( ...@@ -354,12 +353,12 @@ ha_clustrixdb_derived_handler::ha_clustrixdb_derived_handler(
ha_clustrixdb_derived_handler::~ha_clustrixdb_derived_handler() ha_clustrixdb_derived_handler::~ha_clustrixdb_derived_handler()
{ {
int error_code; int error_code;
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) { if (!trx) {
// TBD Log this. // TBD Log this.
} }
if (trx && scan_refid) if (trx && scan_refid)
trx->clustrix_net->scan_end(scan_refid); trx->scan_end(scan_refid);
// If the ::init_scan has been executed // If the ::init_scan has been executed
if (table__) if (table__)
...@@ -402,7 +401,7 @@ int ha_clustrixdb_derived_handler::init_scan() ...@@ -402,7 +401,7 @@ int ha_clustrixdb_derived_handler::init_scan()
int ha_clustrixdb_derived_handler::next_row() int ha_clustrixdb_derived_handler::next_row()
{ {
int error_code = 0; int error_code = 0;
st_clustrixdb_trx *trx = get_trx(thd, &error_code); clustrix_connection *trx = get_trx(thd, &error_code);
if (!trx) if (!trx)
return error_code; return error_code;
...@@ -410,8 +409,7 @@ int ha_clustrixdb_derived_handler::next_row() ...@@ -410,8 +409,7 @@ int ha_clustrixdb_derived_handler::next_row()
uchar *rowdata; uchar *rowdata;
ulong rowdata_length; ulong rowdata_length;
if ((error_code = trx->clustrix_net->scan_next(scan_refid, &rowdata, if ((error_code = trx->scan_next(scan_refid, &rowdata, &rowdata_length)))
&rowdata_length)))
return error_code; return error_code;
uchar const *current_row_end; uchar const *current_row_end;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment