Commit 158e6745 authored by unknown's avatar unknown

renamed ha_recovery_logging to ha_enable_transaction

added tests to alter table for "large" alter tables and truncates in ndbcluster
added debug printout in restart() in ndbcluster
added flag THD::transaction.on to enable/disable transaction


mysql-test/r/ndb_alter_table.result:
  added tests to alter table for "large" alter tables and truncates
mysql-test/t/ndb_alter_table.test:
  added tests to alter table for "large" alter tables and truncates
ndb/src/ndbapi/NdbConnection.cpp:
  added debug printout in restart()
sql/ha_ndbcluster.cc:
  added support for large alter table and truncate
sql/handler.cc:
  renamed ha_recovery_logging to ha_enable_transaction
sql/handler.h:
  renamed ha_recovery_logging to ha_enable_transaction
sql/sql_class.cc:
  added flag THD::transaction.on to enable/disable transaction
sql/sql_class.h:
  added flag THD::transaction.on to enable/disable transaction
sql/sql_delete.cc:
  added disable transaction for mysql_truncate
sql/sql_table.cc:
  renamed ha_recovery_logging to ha_enable_transaction
parent 9b01ebdf
...@@ -72,3 +72,14 @@ col6 col1 col3 fourth col4 col4_5 col5 col7 col8 ...@@ -72,3 +72,14 @@ col6 col1 col3 fourth col4 col4_5 col5 col7 col8
1 101 3 4 5 PENDING 0000-00-00 00:00:00 1 101 3 4 5 PENDING 0000-00-00 00:00:00
2 102 4 3 5 99 PENDING EXTRA 2004-01-01 00:00:00 2 102 4 3 5 99 PENDING EXTRA 2004-01-01 00:00:00
drop table t1; drop table t1;
DROP TABLE IF EXISTS t2;
create table t2 (a int NOT NULL PRIMARY KEY) engine=myisam;
alter table t2 engine=ndbcluster;
select count(*) from t2;
count(*)
15001
truncate table t2;
select count(*) from t2;
count(*)
0
drop table t2;
...@@ -48,4 +48,21 @@ show table status; ...@@ -48,4 +48,21 @@ show table status;
select * from t1 order by col1; select * from t1 order by col1;
drop table t1; drop table t1;
--disable_warnings
DROP TABLE IF EXISTS t2;
--enable_warnings
create table t2 (a int NOT NULL PRIMARY KEY) engine=myisam;
let $1=15001;
disable_query_log;
while ($1)
{
eval insert into t2 values($1);
dec $1;
}
enable_query_log;
alter table t2 engine=ndbcluster;
select count(*) from t2;
truncate table t2;
select count(*) from t2;
drop table t2;
...@@ -191,6 +191,7 @@ NdbConnection::setErrorCode(int anErrorCode) ...@@ -191,6 +191,7 @@ NdbConnection::setErrorCode(int anErrorCode)
int int
NdbConnection::restart(){ NdbConnection::restart(){
DBUG_ENTER("NdbConnection::restart");
if(theCompletionStatus == CompletedSuccess){ if(theCompletionStatus == CompletedSuccess){
releaseCompletedOperations(); releaseCompletedOperations();
Uint64 tTransid = theNdb->theFirstTransId; Uint64 tTransid = theNdb->theFirstTransId;
...@@ -201,9 +202,10 @@ NdbConnection::restart(){ ...@@ -201,9 +202,10 @@ NdbConnection::restart(){
theNdb->theFirstTransId = tTransid + 1; theNdb->theFirstTransId = tTransid + 1;
} }
theCompletionStatus = NotCompleted; theCompletionStatus = NotCompleted;
return 0; DBUG_RETURN(0);
} }
return -1; DBUG_PRINT("error",("theCompletionStatus != CompletedSuccess"));
DBUG_RETURN(-1);
} }
/***************************************************************************** /*****************************************************************************
......
...@@ -20,7 +20,6 @@ ...@@ -20,7 +20,6 @@
NDB Cluster NDB Cluster
*/ */
#ifdef __GNUC__ #ifdef __GNUC__
#pragma implementation // gcc: Class implementation #pragma implementation // gcc: Class implementation
#endif #endif
...@@ -1033,7 +1032,7 @@ inline int ha_ndbcluster::next_result(byte *buf) ...@@ -1033,7 +1032,7 @@ inline int ha_ndbcluster::next_result(byte *buf)
if (ops_pending && blobs_pending) if (ops_pending && blobs_pending)
{ {
if (trans->execute(NoCommit) != 0) if (trans->execute(NoCommit) != 0)
DBUG_RETURN(ndb_err(trans)); DBUG_RETURN(ndb_err(trans));
ops_pending= 0; ops_pending= 0;
blobs_pending= false; blobs_pending= false;
} }
...@@ -1058,8 +1057,17 @@ inline int ha_ndbcluster::next_result(byte *buf) ...@@ -1058,8 +1057,17 @@ inline int ha_ndbcluster::next_result(byte *buf)
be sent to NDB be sent to NDB
*/ */
DBUG_PRINT("info", ("ops_pending: %d", ops_pending)); DBUG_PRINT("info", ("ops_pending: %d", ops_pending));
if (ops_pending && (trans->execute(NoCommit) != 0)) if (current_thd->transaction.on)
DBUG_RETURN(ndb_err(trans)); {
if (ops_pending && (trans->execute(NoCommit) != 0))
DBUG_RETURN(ndb_err(trans));
}
else
{
if (ops_pending && (trans->execute(Commit) != 0))
DBUG_RETURN(ndb_err(trans));
trans->restart();
}
ops_pending= 0; ops_pending= 0;
contact_ndb= (check == 2); contact_ndb= (check == 2);
...@@ -1310,7 +1318,6 @@ int ha_ndbcluster::full_table_scan(byte *buf) ...@@ -1310,7 +1318,6 @@ int ha_ndbcluster::full_table_scan(byte *buf)
DBUG_RETURN(define_read_attrs(buf, op)); DBUG_RETURN(define_read_attrs(buf, op));
} }
inline inline
int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
{ {
...@@ -1361,7 +1368,6 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op) ...@@ -1361,7 +1368,6 @@ int ha_ndbcluster::define_read_attrs(byte* buf, NdbOperation* op)
/* /*
Insert one record into NDB Insert one record into NDB
*/ */
int ha_ndbcluster::write_row(byte *record) int ha_ndbcluster::write_row(byte *record)
{ {
bool has_auto_increment; bool has_auto_increment;
...@@ -1427,15 +1433,43 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -1427,15 +1433,43 @@ int ha_ndbcluster::write_row(byte *record)
((rows_inserted % bulk_insert_rows) == 0) || ((rows_inserted % bulk_insert_rows) == 0) ||
uses_blob_value(false) != 0) uses_blob_value(false) != 0)
{ {
THD *thd= current_thd;
// Send rows to NDB // Send rows to NDB
DBUG_PRINT("info", ("Sending inserts to NDB, "\ DBUG_PRINT("info", ("Sending inserts to NDB, "\
"rows_inserted:%d, bulk_insert_rows: %d", "rows_inserted:%d, bulk_insert_rows: %d",
(int)rows_inserted, (int)bulk_insert_rows)); (int)rows_inserted, (int)bulk_insert_rows));
bulk_insert_not_flushed= false; bulk_insert_not_flushed= false;
if (trans->execute(NoCommit) != 0) if (thd->transaction.on) {
if (trans->execute(NoCommit) != 0)
{
skip_auto_increment= true;
DBUG_RETURN(ndb_err(trans));
}
}
else
{ {
skip_auto_increment= true; if (trans->execute(Commit) != 0)
DBUG_RETURN(ndb_err(trans)); {
skip_auto_increment= true;
DBUG_RETURN(ndb_err(trans));
}
#if 0 // this is what we want to use but it is not functional
trans->restart();
#else
m_ndb->closeTransaction(m_active_trans);
m_active_trans= m_ndb->startTransaction();
if (thd->transaction.all.ndb_tid)
thd->transaction.all.ndb_tid= m_active_trans;
else
thd->transaction.stmt.ndb_tid= m_active_trans;
if (m_active_trans == NULL)
{
skip_auto_increment= true;
ERR_RETURN(m_ndb->getNdbError());
}
trans= m_active_trans;
#endif
} }
} }
if ((has_auto_increment) && (skip_auto_increment)) if ((has_auto_increment) && (skip_auto_increment))
...@@ -2494,10 +2528,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -2494,10 +2528,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
trans= m_ndb->startTransaction(); trans= m_ndb->startTransaction();
if (trans == NULL) if (trans == NULL)
{
thd->transaction.ndb_lock_count--; // We didn't get the lock
ERR_RETURN(m_ndb->getNdbError()); ERR_RETURN(m_ndb->getNdbError());
}
thd->transaction.stmt.ndb_tid= trans; thd->transaction.stmt.ndb_tid= trans;
} }
else else
...@@ -2510,10 +2541,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -2510,10 +2541,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
trans= m_ndb->startTransaction(); trans= m_ndb->startTransaction();
if (trans == NULL) if (trans == NULL)
{
thd->transaction.ndb_lock_count--; // We didn't get the lock
ERR_RETURN(m_ndb->getNdbError()); ERR_RETURN(m_ndb->getNdbError());
}
/* /*
If this is the start of a LOCK TABLE, a table look If this is the start of a LOCK TABLE, a table look
...@@ -3128,6 +3156,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): ...@@ -3128,6 +3156,7 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_active_trans(NULL), m_active_trans(NULL),
m_active_cursor(NULL), m_active_cursor(NULL),
m_ndb(NULL), m_ndb(NULL),
m_share(0),
m_table(NULL), m_table(NULL),
m_table_flags(HA_REC_NOT_IN_SEQ | m_table_flags(HA_REC_NOT_IN_SEQ |
HA_NULL_IN_KEY | HA_NULL_IN_KEY |
...@@ -3177,6 +3206,8 @@ ha_ndbcluster::~ha_ndbcluster() ...@@ -3177,6 +3206,8 @@ ha_ndbcluster::~ha_ndbcluster()
{ {
DBUG_ENTER("~ha_ndbcluster"); DBUG_ENTER("~ha_ndbcluster");
if (m_share)
free_share(m_share);
release_metadata(); release_metadata();
my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR)); my_free(blobs_buffer, MYF(MY_ALLOW_ZERO_PTR));
blobs_buffer= 0; blobs_buffer= 0;
...@@ -3219,8 +3250,10 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) ...@@ -3219,8 +3250,10 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
set_dbname(name); set_dbname(name);
set_tabname(name); set_tabname(name);
if (check_ndb_connection()) if (check_ndb_connection()) {
free_share(m_share); m_share= 0;
DBUG_RETURN(HA_ERR_NO_CONNECTION); DBUG_RETURN(HA_ERR_NO_CONNECTION);
}
DBUG_RETURN(get_metadata(name)); DBUG_RETURN(get_metadata(name));
} }
...@@ -3234,7 +3267,7 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked) ...@@ -3234,7 +3267,7 @@ int ha_ndbcluster::open(const char *name, int mode, uint test_if_locked)
int ha_ndbcluster::close(void) int ha_ndbcluster::close(void)
{ {
DBUG_ENTER("close"); DBUG_ENTER("close");
free_share(m_share); free_share(m_share); m_share= 0;
release_metadata(); release_metadata();
m_ndb= NULL; m_ndb= NULL;
DBUG_RETURN(0); DBUG_RETURN(0);
......
...@@ -1199,14 +1199,15 @@ int handler::rename_table(const char * from, const char * to) ...@@ -1199,14 +1199,15 @@ int handler::rename_table(const char * from, const char * to)
} }
/* /*
Tell the handler to turn on or off logging to the handler's recovery log Tell the handler to turn on or off transaction in the handler
*/ */
int ha_recovery_logging(THD *thd, bool on) int ha_enable_transaction(THD *thd, bool on)
{ {
int error=0; int error=0;
DBUG_ENTER("ha_recovery_logging"); DBUG_ENTER("ha_enable_transaction");
thd->transaction.on= on;
DBUG_RETURN(error); DBUG_RETURN(error);
} }
......
...@@ -550,7 +550,7 @@ int ha_savepoint(THD *thd, char *savepoint_name); ...@@ -550,7 +550,7 @@ int ha_savepoint(THD *thd, char *savepoint_name);
int ha_autocommit_or_rollback(THD *thd, int error); int ha_autocommit_or_rollback(THD *thd, int error);
void ha_set_spin_retries(uint retries); void ha_set_spin_retries(uint retries);
bool ha_flush_logs(void); bool ha_flush_logs(void);
int ha_recovery_logging(THD *thd, bool on); int ha_enable_transaction(THD *thd, bool on);
int ha_change_key_cache(KEY_CACHE *old_key_cache, int ha_change_key_cache(KEY_CACHE *old_key_cache,
KEY_CACHE *new_key_cache); KEY_CACHE *new_key_cache);
int ha_discover(const char* dbname, const char* name, int ha_discover(const char* dbname, const char* name,
......
...@@ -302,6 +302,7 @@ void THD::init(void) ...@@ -302,6 +302,7 @@ void THD::init(void)
void THD::init_for_queries() void THD::init_for_queries()
{ {
ha_enable_transaction(this,TRUE);
init_sql_alloc(&mem_root, init_sql_alloc(&mem_root,
variables.query_alloc_block_size, variables.query_alloc_block_size,
variables.query_prealloc_size); variables.query_prealloc_size);
......
...@@ -768,6 +768,7 @@ class THD :public ilink, ...@@ -768,6 +768,7 @@ class THD :public ilink,
#ifdef HAVE_NDBCLUSTER_DB #ifdef HAVE_NDBCLUSTER_DB
void* ndb; void* ndb;
#endif #endif
bool on;
/* /*
Tables changed in transaction (that must be invalidated in query cache). Tables changed in transaction (that must be invalidated in query cache).
List contain only transactional tables, that not invalidated in query List contain only transactional tables, that not invalidated in query
......
...@@ -648,8 +648,11 @@ int mysql_truncate(THD *thd, TABLE_LIST *table_list, bool dont_send_ok) ...@@ -648,8 +648,11 @@ int mysql_truncate(THD *thd, TABLE_LIST *table_list, bool dont_send_ok)
{ {
/* Probably InnoDB table */ /* Probably InnoDB table */
table_list->lock_type= TL_WRITE; table_list->lock_type= TL_WRITE;
DBUG_RETURN(mysql_delete(thd, table_list, (COND*) 0, (SQL_LIST*) 0, ha_enable_transaction(thd, FALSE);
HA_POS_ERROR, 0)); error= mysql_delete(thd, table_list, (COND*) 0, (SQL_LIST*) 0,
HA_POS_ERROR, 0);
ha_enable_transaction(thd, TRUE);
DBUG_RETURN(error);
} }
if (lock_and_wait_for_table_name(thd, table_list)) if (lock_and_wait_for_table_name(thd, table_list))
DBUG_RETURN(-1); DBUG_RETURN(-1);
......
...@@ -3343,7 +3343,7 @@ copy_data_between_tables(TABLE *from,TABLE *to, ...@@ -3343,7 +3343,7 @@ copy_data_between_tables(TABLE *from,TABLE *to,
Turn off recovery logging since rollback of an alter table is to Turn off recovery logging since rollback of an alter table is to
delete the new table so there is no need to log the changes to it. delete the new table so there is no need to log the changes to it.
*/ */
error= ha_recovery_logging(thd,FALSE); error= ha_enable_transaction(thd,FALSE);
if (error) if (error)
{ {
error= 1; error= 1;
...@@ -3405,7 +3405,7 @@ copy_data_between_tables(TABLE *from,TABLE *to, ...@@ -3405,7 +3405,7 @@ copy_data_between_tables(TABLE *from,TABLE *to,
} }
to->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); to->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
ha_recovery_logging(thd,TRUE); ha_enable_transaction(thd,TRUE);
/* /*
Ensure that the new table is saved properly to disk so that we Ensure that the new table is saved properly to disk so that we
can do a rename can do a rename
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment