Commit e98f34ad authored by unknown's avatar unknown

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-4.1

into poseidon.ndb.mysql.com:/home/tomas/mysql-4.1


sql/ha_ndbcluster.cc:
  Auto merged
sql/ha_ndbcluster.h:
  Auto merged
sql/set_var.cc:
  Auto merged
parents 977d17e0 b815d7e5
...@@ -113,7 +113,7 @@ public: ...@@ -113,7 +113,7 @@ public:
* Reset bounds and put operation in list that will be * Reset bounds and put operation in list that will be
* sent on next execute * sent on next execute
*/ */
int reset_bounds(); int reset_bounds(bool forceSend = false);
bool getSorted() const { return m_ordered; } bool getSorted() const { return m_ordered; }
private: private:
...@@ -127,8 +127,8 @@ private: ...@@ -127,8 +127,8 @@ private:
virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*); virtual NdbRecAttr* getValue_impl(const NdbColumnImpl*, char*);
void fix_get_values(); void fix_get_values();
int next_result_ordered(bool fetchAllowed); int next_result_ordered(bool fetchAllowed, bool forceSend = false);
int send_next_scan_ordered(Uint32 idx); int send_next_scan_ordered(Uint32 idx, bool forceSend = false);
int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*); int compare(Uint32 key, Uint32 cols, const NdbReceiver*, const NdbReceiver*);
Uint32 m_sort_columns; Uint32 m_sort_columns;
......
...@@ -89,17 +89,17 @@ public: ...@@ -89,17 +89,17 @@ public:
* - 1: if there are no more tuples to scan. * - 1: if there are no more tuples to scan.
* - 2: if there are no more cached records in NdbApi * - 2: if there are no more cached records in NdbApi
*/ */
int nextResult(bool fetchAllowed = true); int nextResult(bool fetchAllowed = true, bool forceSend = false);
/** /**
* Close result set (scan) * Close result set (scan)
*/ */
void close(); void close(bool forceSend = false);
/** /**
* Restart * Restart
*/ */
int restart(); int restart(bool forceSend = false);
/** /**
* Transfer scan operation to an updating transaction. Use this function * Transfer scan operation to an updating transaction. Use this function
......
...@@ -90,11 +90,11 @@ protected: ...@@ -90,11 +90,11 @@ protected:
NdbScanOperation(Ndb* aNdb); NdbScanOperation(Ndb* aNdb);
virtual ~NdbScanOperation(); virtual ~NdbScanOperation();
int nextResult(bool fetchAllowed = true); int nextResult(bool fetchAllowed = true, bool forceSend = false);
virtual void release(); virtual void release();
void closeScan(); void closeScan(bool forceSend = false);
int close_impl(class TransporterFacade*); int close_impl(class TransporterFacade*, bool forceSend = false);
// Overloaded methods from NdbCursorOperation // Overloaded methods from NdbCursorOperation
int executeCursor(int ProcessorId); int executeCursor(int ProcessorId);
...@@ -103,6 +103,7 @@ protected: ...@@ -103,6 +103,7 @@ protected:
int init(const NdbTableImpl* tab, NdbConnection* myConnection); int init(const NdbTableImpl* tab, NdbConnection* myConnection);
int prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId); int prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId);
int doSend(int ProcessorId); int doSend(int ProcessorId);
void checkForceSend(bool forceSend);
virtual void setErrorCode(int aErrorCode); virtual void setErrorCode(int aErrorCode);
virtual void setErrorCodeAbort(int aErrorCode); virtual void setErrorCodeAbort(int aErrorCode);
...@@ -138,7 +139,7 @@ protected: ...@@ -138,7 +139,7 @@ protected:
Uint32 m_sent_receivers_count; // NOTE needs mutex to access Uint32 m_sent_receivers_count; // NOTE needs mutex to access
NdbReceiver** m_sent_receivers; // receive thread puts them here NdbReceiver** m_sent_receivers; // receive thread puts them here
int send_next_scan(Uint32 cnt, bool close); int send_next_scan(Uint32 cnt, bool close, bool forceSend = false);
void receiver_delivered(NdbReceiver*); void receiver_delivered(NdbReceiver*);
void receiver_completed(NdbReceiver*); void receiver_completed(NdbReceiver*);
void execCLOSE_SCAN_REP(); void execCLOSE_SCAN_REP();
...@@ -148,7 +149,7 @@ protected: ...@@ -148,7 +149,7 @@ protected:
Uint32 m_ordered; Uint32 m_ordered;
int restart(); int restart(bool forceSend = false);
}; };
inline inline
......
...@@ -32,10 +32,13 @@ ...@@ -32,10 +32,13 @@
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
{ "version", 'V', "Output version information and exit.", 0, 0, 0, \ { "version", 'V', "Output version information and exit.", 0, 0, 0, \
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
{ "connect-string", 'c', \ { "ndb-connectstring", 'c', \
"Set connect string for connecting to ndb_mgmd. " \ "Set connect string for connecting to ndb_mgmd. " \
"<constr>=\"host=<hostname:port>[;nodeid=<id>]\". " \ "Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". " \
"Overides specifying entries in NDB_CONNECTSTRING and config file", \ "Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg", \
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },\
{ "connect-string", 'c', "same as --ndb-connectstring",\
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \ (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 } GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }
#else #else
...@@ -46,11 +49,14 @@ ...@@ -46,11 +49,14 @@
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
{ "version", 'V', "Output version information and exit.", 0, 0, 0, \ { "version", 'V', "Output version information and exit.", 0, 0, 0, \
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \ GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, \
{ "connect-string", 'c', \ { "ndb-connectstring", 'c', \
"Set connect string for connecting to ndb_mgmd. " \ "Set connect string for connecting to ndb_mgmd. " \
"<constr>=\"host=<hostname:port>[;nodeid=<id>]\". " \ "Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". " \
"Overides specifying entries in NDB_CONNECTSTRING and config file", \ "Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg", \
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \ (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, \
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },\
{ "connect-string", 'c', "same as --ndb-connectstring",\
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,\
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 } GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }
#endif #endif
......
...@@ -108,7 +108,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -108,7 +108,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
bool bool
Configuration::init(int argc, char** argv) Configuration::init(int argc, char** argv)
{ {
const char *load_default_groups[]= { "ndbd",0 }; const char *load_default_groups[]= { "mysql_cluster","ndbd",0 };
load_defaults("my",load_default_groups,&argc,&argv); load_defaults("my",load_default_groups,&argc,&argv);
int ho_error; int ho_error;
......
...@@ -138,7 +138,7 @@ int main(int argc, char** argv){ ...@@ -138,7 +138,7 @@ int main(int argc, char** argv){
NDB_INIT(argv[0]); NDB_INIT(argv[0]);
const char *_host = 0; const char *_host = 0;
int _port = 0; int _port = 0;
const char *load_default_groups[]= { "ndb_mgm",0 }; const char *load_default_groups[]= { "mysql_cluster","ndb_mgm",0 };
load_defaults("my",load_default_groups,&argc,&argv); load_defaults("my",load_default_groups,&argc,&argv);
int ho_error; int ho_error;
......
...@@ -110,10 +110,14 @@ static struct my_option my_long_options[] = ...@@ -110,10 +110,14 @@ static struct my_option my_long_options[] =
0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "version", 'V', "Output version information and exit.", 0, 0, 0, { "version", 'V', "Output version information and exit.", 0, 0, 0,
GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 }, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0 },
{ "connect-string", 1023, { "ndb-connectstring", 1023,
"Set connect string for connecting to ndb_mgmd. " "Set connect string for connecting to ndb_mgmd. "
"<constr>=\"host=<hostname:port>[;nodeid=<id>]\". " "Syntax: \"[nodeid=<id>;][host=]<hostname>[:<port>]\". "
"Overides specifying entries in NDB_CONNECTSTRING and config file", "Overides specifying entries in NDB_CONNECTSTRING and Ndb.cfg",
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "connect-string", 1023,
"same as --ndb-connectstring.",
(gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0,
GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ "config-file", 'f', "Specify cluster configuration file", { "config-file", 'f', "Specify cluster configuration file",
...@@ -196,7 +200,7 @@ int main(int argc, char** argv) ...@@ -196,7 +200,7 @@ int main(int argc, char** argv)
global_mgmt_server_check = 1; global_mgmt_server_check = 1;
glob.config_filename= "config.ini"; glob.config_filename= "config.ini";
const char *load_default_groups[]= { "ndb_mgmd",0 }; const char *load_default_groups[]= { "mysql_cluster","ndb_mgmd",0 };
load_defaults("my",load_default_groups,&argc,&argv); load_defaults("my",load_default_groups,&argc,&argv);
int ho_error; int ho_error;
......
...@@ -44,10 +44,10 @@ void NdbResultSet::init() ...@@ -44,10 +44,10 @@ void NdbResultSet::init()
{ {
} }
int NdbResultSet::nextResult(bool fetchAllowed) int NdbResultSet::nextResult(bool fetchAllowed, bool forceSend)
{ {
int res; int res;
if ((res = m_operation->nextResult(fetchAllowed)) == 0) { if ((res = m_operation->nextResult(fetchAllowed, forceSend)) == 0) {
// handle blobs // handle blobs
NdbBlob* tBlob = m_operation->theBlobList; NdbBlob* tBlob = m_operation->theBlobList;
while (tBlob != 0) { while (tBlob != 0) {
...@@ -67,9 +67,9 @@ int NdbResultSet::nextResult(bool fetchAllowed) ...@@ -67,9 +67,9 @@ int NdbResultSet::nextResult(bool fetchAllowed)
return res; return res;
} }
void NdbResultSet::close() void NdbResultSet::close(bool forceSend)
{ {
m_operation->closeScan(); m_operation->closeScan(forceSend);
} }
NdbOperation* NdbOperation*
...@@ -98,6 +98,6 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){ ...@@ -98,6 +98,6 @@ NdbResultSet::deleteTuple(NdbConnection * takeOverTrans){
} }
int int
NdbResultSet::restart(){ NdbResultSet::restart(bool forceSend){
return m_operation->restart(); return m_operation->restart(forceSend);
} }
...@@ -447,10 +447,11 @@ NdbScanOperation::executeCursor(int nodeId){ ...@@ -447,10 +447,11 @@ NdbScanOperation::executeCursor(int nodeId){
#define DEBUG_NEXT_RESULT 0 #define DEBUG_NEXT_RESULT 0
int NdbScanOperation::nextResult(bool fetchAllowed) int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend)
{ {
if(m_ordered) if(m_ordered)
return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed); return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed,
forceSend);
/** /**
* Check current receiver * Check current receiver
...@@ -487,7 +488,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed) ...@@ -487,7 +488,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
TransporterFacade* tp = TransporterFacade::instance(); TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr); Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence; Uint32 seq = theNdbCon->theNodeSequence;
if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0){ if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false,
forceSend) == 0){
idx = m_current_api_receiver; idx = m_current_api_receiver;
last = m_api_receivers_count; last = m_api_receivers_count;
...@@ -578,7 +580,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed) ...@@ -578,7 +580,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed)
} }
int int
NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag,
bool forceSend){
if(cnt > 0 || stopScanFlag){ if(cnt > 0 || stopScanFlag){
NdbApiSignal tSignal(theNdb->theMyRef); NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ); tSignal.setSignal(GSN_SCAN_NEXTREQ);
...@@ -618,6 +621,8 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ ...@@ -618,6 +621,8 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
ret = tp->sendSignal(&tSignal, nodeId); ret = tp->sendSignal(&tSignal, nodeId);
} }
if (!ret) checkForceSend(forceSend);
m_sent_receivers_count = last + cnt + stopScanFlag; m_sent_receivers_count = last + cnt + stopScanFlag;
m_api_receivers_count -= cnt; m_api_receivers_count -= cnt;
m_current_api_receiver = 0; m_current_api_receiver = 0;
...@@ -627,6 +632,15 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ ...@@ -627,6 +632,15 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){
return 0; return 0;
} }
void NdbScanOperation::checkForceSend(bool forceSend)
{
if (forceSend) {
TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber);
} else {
TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber);
}//if
}
int int
NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId) NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId)
{ {
...@@ -642,7 +656,7 @@ NdbScanOperation::doSend(int ProcessorId) ...@@ -642,7 +656,7 @@ NdbScanOperation::doSend(int ProcessorId)
return 0; return 0;
} }
void NdbScanOperation::closeScan() void NdbScanOperation::closeScan(bool forceSend)
{ {
if(m_transConnection){ if(m_transConnection){
if(DEBUG_NEXT_RESULT) if(DEBUG_NEXT_RESULT)
...@@ -657,7 +671,7 @@ void NdbScanOperation::closeScan() ...@@ -657,7 +671,7 @@ void NdbScanOperation::closeScan()
TransporterFacade* tp = TransporterFacade::instance(); TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr); Guard guard(tp->theMutexPtr);
close_impl(tp); close_impl(tp, forceSend);
} while(0); } while(0);
...@@ -1293,7 +1307,8 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, ...@@ -1293,7 +1307,8 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols,
} }
int int
NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
bool forceSend){
Uint32 u_idx = 0, u_last = 0; Uint32 u_idx = 0, u_last = 0;
Uint32 s_idx = m_current_api_receiver; // first sorted Uint32 s_idx = m_current_api_receiver; // first sorted
...@@ -1319,7 +1334,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ ...@@ -1319,7 +1334,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
Guard guard(tp->theMutexPtr); Guard guard(tp->theMutexPtr);
Uint32 seq = theNdbCon->theNodeSequence; Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode; Uint32 nodeId = theNdbCon->theDBnode;
if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){ if(seq == tp->getNodeSequence(nodeId) &&
!send_next_scan_ordered(s_idx, forceSend)){
Uint32 tmp = m_sent_receivers_count; Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver; s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){ while(m_sent_receivers_count > 0 && !theError.code){
...@@ -1408,7 +1424,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ ...@@ -1408,7 +1424,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){
} }
int int
NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){
if(idx == theParallelism) if(idx == theParallelism)
return 0; return 0;
...@@ -1440,11 +1456,13 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ ...@@ -1440,11 +1456,13 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){
Uint32 nodeId = theNdbCon->theDBnode; Uint32 nodeId = theNdbCon->theDBnode;
TransporterFacade * tp = TransporterFacade::instance(); TransporterFacade * tp = TransporterFacade::instance();
tSignal.setLength(4+1); tSignal.setLength(4+1);
return tp->sendSignal(&tSignal, nodeId); int ret= tp->sendSignal(&tSignal, nodeId);
if (!ret) checkForceSend(forceSend);
return ret;
} }
int int
NdbScanOperation::close_impl(TransporterFacade* tp){ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
Uint32 seq = theNdbCon->theNodeSequence; Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode; Uint32 nodeId = theNdbCon->theDBnode;
...@@ -1473,7 +1491,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){ ...@@ -1473,7 +1491,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){
if(m_api_receivers_count+m_conf_receivers_count){ if(m_api_receivers_count+m_conf_receivers_count){
// Send close scan // Send close scan
if(send_next_scan(0, true) == -1){ // Close scan if(send_next_scan(0, true, forceSend) == -1){ // Close scan
theNdbCon->theReleaseOnClose = true; theNdbCon->theReleaseOnClose = true;
return -1; return -1;
} }
...@@ -1520,7 +1538,7 @@ NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){ ...@@ -1520,7 +1538,7 @@ NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){
} }
int int
NdbScanOperation::restart() NdbScanOperation::restart(bool forceSend)
{ {
TransporterFacade* tp = TransporterFacade::instance(); TransporterFacade* tp = TransporterFacade::instance();
...@@ -1529,7 +1547,7 @@ NdbScanOperation::restart() ...@@ -1529,7 +1547,7 @@ NdbScanOperation::restart()
{ {
int res; int res;
if((res= close_impl(tp))) if((res= close_impl(tp, forceSend)))
{ {
return res; return res;
} }
...@@ -1548,13 +1566,13 @@ NdbScanOperation::restart() ...@@ -1548,13 +1566,13 @@ NdbScanOperation::restart()
} }
int int
NdbIndexScanOperation::reset_bounds(){ NdbIndexScanOperation::reset_bounds(bool forceSend){
int res; int res;
{ {
TransporterFacade* tp = TransporterFacade::instance(); TransporterFacade* tp = TransporterFacade::instance();
Guard guard(tp->theMutexPtr); Guard guard(tp->theMutexPtr);
res= close_impl(tp); res= close_impl(tp, forceSend);
} }
if(!res) if(!res)
......
...@@ -67,7 +67,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -67,7 +67,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){ int main(int argc, char** argv){
NDB_INIT(argv[0]); NDB_INIT(argv[0]);
const char *load_default_groups[]= { "ndb_tools",0 }; const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv); load_defaults("my",load_default_groups,&argc,&argv);
int ho_error; int ho_error;
if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option))) if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option)))
......
...@@ -67,7 +67,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -67,7 +67,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){ int main(int argc, char** argv){
NDB_INIT(argv[0]); NDB_INIT(argv[0]);
const char *load_default_groups[]= { "ndb_tools",0 }; const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv); load_defaults("my",load_default_groups,&argc,&argv);
int ho_error; int ho_error;
if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option))) if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option)))
......
...@@ -64,7 +64,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -64,7 +64,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){ int main(int argc, char** argv){
NDB_INIT(argv[0]); NDB_INIT(argv[0]);
const char *load_default_groups[]= { "ndb_tools",0 }; const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv); load_defaults("my",load_default_groups,&argc,&argv);
int ho_error; int ho_error;
if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option))) if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option)))
......
...@@ -64,7 +64,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -64,7 +64,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){ int main(int argc, char** argv){
NDB_INIT(argv[0]); NDB_INIT(argv[0]);
const char *load_default_groups[]= { "ndb_tools",0 }; const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv); load_defaults("my",load_default_groups,&argc,&argv);
int ho_error; int ho_error;
if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option))) if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option)))
......
...@@ -220,7 +220,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -220,7 +220,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){ int main(int argc, char** argv){
NDB_INIT(argv[0]); NDB_INIT(argv[0]);
const char* _tabname; const char* _tabname;
const char *load_default_groups[]= { "ndb_tools",0 }; const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv); load_defaults("my",load_default_groups,&argc,&argv);
int ho_error; int ho_error;
if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option))) if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option)))
......
...@@ -143,7 +143,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -143,7 +143,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
bool bool
readArguments(int *pargc, char*** pargv) readArguments(int *pargc, char*** pargv)
{ {
const char *load_default_groups[]= { "ndb_tools","ndb_restore",0 }; const char *load_default_groups[]= { "mysql_cluster","ndb_restore",0 };
load_defaults("my",load_default_groups,pargc,pargv); load_defaults("my",load_default_groups,pargc,pargv);
if (handle_options(pargc, pargv, my_long_options, get_one_option)) if (handle_options(pargc, pargv, my_long_options, get_one_option))
{ {
......
...@@ -105,7 +105,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -105,7 +105,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){ int main(int argc, char** argv){
NDB_INIT(argv[0]); NDB_INIT(argv[0]);
const char *load_default_groups[]= { "ndb_tools",0 }; const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv); load_defaults("my",load_default_groups,&argc,&argv);
const char* _tabname; const char* _tabname;
int ho_error; int ho_error;
......
...@@ -83,7 +83,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -83,7 +83,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){ int main(int argc, char** argv){
NDB_INIT(argv[0]); NDB_INIT(argv[0]);
const char *load_default_groups[]= { "ndb_tools",0 }; const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv); load_defaults("my",load_default_groups,&argc,&argv);
int ho_error; int ho_error;
if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option))) if ((ho_error=handle_options(&argc, &argv, my_long_options, get_one_option)))
......
...@@ -75,7 +75,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)), ...@@ -75,7 +75,7 @@ get_one_option(int optid, const struct my_option *opt __attribute__((unused)),
int main(int argc, char** argv){ int main(int argc, char** argv){
NDB_INIT(argv[0]); NDB_INIT(argv[0]);
const char *load_default_groups[]= { "ndb_tools",0 }; const char *load_default_groups[]= { "mysql_cluster",0 };
load_defaults("my",load_default_groups,&argc,&argv); load_defaults("my",load_default_groups,&argc,&argv);
const char* _hostName = NULL; const char* _hostName = NULL;
int ho_error; int ho_error;
......
...@@ -1268,7 +1268,7 @@ inline int ha_ndbcluster::next_result(byte *buf) ...@@ -1268,7 +1268,7 @@ inline int ha_ndbcluster::next_result(byte *buf)
m_ops_pending= 0; m_ops_pending= 0;
m_blobs_pending= FALSE; m_blobs_pending= FALSE;
} }
check= cursor->nextResult(contact_ndb); check= cursor->nextResult(contact_ndb, m_force_send);
if (check == 0) if (check == 0)
{ {
// One more record found // One more record found
...@@ -1561,7 +1561,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key, ...@@ -1561,7 +1561,7 @@ int ha_ndbcluster::ordered_index_scan(const key_range *start_key,
DBUG_ASSERT(op->getSorted() == sorted); DBUG_ASSERT(op->getSorted() == sorted);
DBUG_ASSERT(op->getLockMode() == DBUG_ASSERT(op->getLockMode() ==
(NdbOperation::LockMode)get_ndb_lock_type(m_lock.type)); (NdbOperation::LockMode)get_ndb_lock_type(m_lock.type));
if(op->reset_bounds()) if(op->reset_bounds(m_force_send))
DBUG_RETURN(ndb_err(m_active_trans)); DBUG_RETURN(ndb_err(m_active_trans));
} }
...@@ -2388,7 +2388,7 @@ int ha_ndbcluster::index_last(byte *buf) ...@@ -2388,7 +2388,7 @@ int ha_ndbcluster::index_last(byte *buf)
int res; int res;
if((res= ordered_index_scan(0, 0, TRUE, buf)) == 0){ if((res= ordered_index_scan(0, 0, TRUE, buf)) == 0){
NdbResultSet *cursor= m_active_cursor; NdbResultSet *cursor= m_active_cursor;
while((res= cursor->nextResult(TRUE)) == 0); while((res= cursor->nextResult(TRUE, m_force_send)) == 0);
if(res == 1){ if(res == 1){
unpack_record(buf); unpack_record(buf);
table->status= 0; table->status= 0;
...@@ -2474,7 +2474,7 @@ int ha_ndbcluster::rnd_init(bool scan) ...@@ -2474,7 +2474,7 @@ int ha_ndbcluster::rnd_init(bool scan)
{ {
if (!scan) if (!scan)
DBUG_RETURN(1); DBUG_RETURN(1);
int res= cursor->restart(); int res= cursor->restart(m_force_send);
DBUG_ASSERT(res == 0); DBUG_ASSERT(res == 0);
} }
index_init(table->primary_key); index_init(table->primary_key);
...@@ -2505,7 +2505,7 @@ int ha_ndbcluster::close_scan() ...@@ -2505,7 +2505,7 @@ int ha_ndbcluster::close_scan()
m_ops_pending= 0; m_ops_pending= 0;
} }
cursor->close(); cursor->close(m_force_send);
m_active_cursor= NULL; m_active_cursor= NULL;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -3025,6 +3025,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -3025,6 +3025,7 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
m_transaction_on= FALSE; m_transaction_on= FALSE;
else else
m_transaction_on= thd->variables.ndb_use_transactions; m_transaction_on= thd->variables.ndb_use_transactions;
m_use_local_query_cache= thd->variables.ndb_use_local_query_cache;
m_active_trans= thd->transaction.all.ndb_tid ? m_active_trans= thd->transaction.all.ndb_tid ?
(NdbConnection*)thd->transaction.all.ndb_tid: (NdbConnection*)thd->transaction.all.ndb_tid:
...@@ -3749,7 +3750,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg): ...@@ -3749,7 +3750,8 @@ ha_ndbcluster::ha_ndbcluster(TABLE *table_arg):
m_ha_not_exact_count(FALSE), m_ha_not_exact_count(FALSE),
m_force_send(TRUE), m_force_send(TRUE),
m_autoincrement_prefetch(32), m_autoincrement_prefetch(32),
m_transaction_on(TRUE) m_transaction_on(TRUE),
m_use_local_query_cache(FALSE)
{ {
int i; int i;
...@@ -4436,7 +4438,7 @@ bool ha_ndbcluster::low_byte_first() const ...@@ -4436,7 +4438,7 @@ bool ha_ndbcluster::low_byte_first() const
} }
bool ha_ndbcluster::has_transactions() bool ha_ndbcluster::has_transactions()
{ {
return TRUE; return m_transaction_on;
} }
const char* ha_ndbcluster::index_type(uint key_number) const char* ha_ndbcluster::index_type(uint key_number)
{ {
...@@ -4453,6 +4455,9 @@ const char* ha_ndbcluster::index_type(uint key_number) ...@@ -4453,6 +4455,9 @@ const char* ha_ndbcluster::index_type(uint key_number)
} }
uint8 ha_ndbcluster::table_cache_type() uint8 ha_ndbcluster::table_cache_type()
{ {
if (m_use_local_query_cache)
return HA_CACHE_TBL_TRANSACT;
else
return HA_CACHE_TBL_NOCACHE; return HA_CACHE_TBL_NOCACHE;
} }
...@@ -4621,10 +4626,9 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, ...@@ -4621,10 +4626,9 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
{ {
DBUG_ENTER("ndb_get_table_statistics"); DBUG_ENTER("ndb_get_table_statistics");
DBUG_PRINT("enter", ("table: %s", table)); DBUG_PRINT("enter", ("table: %s", table));
NdbConnection* pTrans= ndb->startTransaction();
do do
{ {
NdbConnection* pTrans= ndb->startTransaction();
if (pTrans == NULL) if (pTrans == NULL)
break; break;
...@@ -4644,13 +4648,13 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, ...@@ -4644,13 +4648,13 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&rows); pOp->getValue(NdbDictionary::Column::ROW_COUNT, (char*)&rows);
pOp->getValue(NdbDictionary::Column::COMMIT_COUNT, (char*)&commits); pOp->getValue(NdbDictionary::Column::COMMIT_COUNT, (char*)&commits);
check= pTrans->execute(NoCommit); check= pTrans->execute(NoCommit, AbortOnError, TRUE);
if (check == -1) if (check == -1)
break; break;
Uint64 sum_rows= 0; Uint64 sum_rows= 0;
Uint64 sum_commits= 0; Uint64 sum_commits= 0;
while((check= rs->nextResult(TRUE)) == 0) while((check= rs->nextResult(TRUE, TRUE)) == 0)
{ {
sum_rows+= rows; sum_rows+= rows;
sum_commits+= commits; sum_commits+= commits;
...@@ -4659,6 +4663,8 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, ...@@ -4659,6 +4663,8 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
if (check == -1) if (check == -1)
break; break;
rs->close(TRUE);
ndb->closeTransaction(pTrans); ndb->closeTransaction(pTrans);
if(row_count) if(row_count)
* row_count= sum_rows; * row_count= sum_rows;
...@@ -4668,6 +4674,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, ...@@ -4668,6 +4674,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
DBUG_RETURN(0); DBUG_RETURN(0);
} while(0); } while(0);
ndb->closeTransaction(pTrans);
DBUG_PRINT("exit", ("failed")); DBUG_PRINT("exit", ("failed"));
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
......
...@@ -239,10 +239,12 @@ class ha_ndbcluster: public handler ...@@ -239,10 +239,12 @@ class ha_ndbcluster: public handler
char *m_blobs_buffer; char *m_blobs_buffer;
uint32 m_blobs_buffer_size; uint32 m_blobs_buffer_size;
uint m_dupkey; uint m_dupkey;
// set from thread variables at external lock
bool m_ha_not_exact_count; bool m_ha_not_exact_count;
bool m_force_send; bool m_force_send;
ha_rows m_autoincrement_prefetch; ha_rows m_autoincrement_prefetch;
bool m_transaction_on; bool m_transaction_on;
bool m_use_local_query_cache;
void set_rec_per_key(); void set_rec_per_key();
void records_update(); void records_update();
......
...@@ -2227,7 +2227,11 @@ extern "C" pthread_handler_decl(handle_shutdown,arg) ...@@ -2227,7 +2227,11 @@ extern "C" pthread_handler_decl(handle_shutdown,arg)
#endif #endif
const char *load_default_groups[]= { "mysqld","server",MYSQL_BASE_VERSION,0,0}; const char *load_default_groups[]= {
#ifdef HAVE_NDBCLUSTER_DB
"mysql_cluster",
#endif
"mysqld","server",MYSQL_BASE_VERSION,0,0};
bool open_log(MYSQL_LOG *log, const char *hostname, bool open_log(MYSQL_LOG *log, const char *hostname,
const char *opt_name, const char *extension, const char *opt_name, const char *extension,
...@@ -3950,6 +3954,7 @@ enum options_mysqld ...@@ -3950,6 +3954,7 @@ enum options_mysqld
OPT_INNODB, OPT_ISAM, OPT_INNODB, OPT_ISAM,
OPT_NDBCLUSTER, OPT_NDB_CONNECTSTRING, OPT_NDB_USE_EXACT_COUNT, OPT_NDBCLUSTER, OPT_NDB_CONNECTSTRING, OPT_NDB_USE_EXACT_COUNT,
OPT_NDB_FORCE_SEND, OPT_NDB_AUTOINCREMENT_PREFETCH_SZ, OPT_NDB_FORCE_SEND, OPT_NDB_AUTOINCREMENT_PREFETCH_SZ,
OPT_NDB_USE_LOCAL_QUERY_CACHE,
OPT_SKIP_SAFEMALLOC, OPT_SKIP_SAFEMALLOC,
OPT_TEMP_POOL, OPT_TX_ISOLATION, OPT_TEMP_POOL, OPT_TX_ISOLATION,
OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS, OPT_SKIP_STACK_TRACE, OPT_SKIP_SYMLINKS,
...@@ -4409,6 +4414,12 @@ Disable with --skip-ndbcluster (will save memory).", ...@@ -4409,6 +4414,12 @@ Disable with --skip-ndbcluster (will save memory).",
(gptr*) &global_system_variables.ndb_use_exact_count, (gptr*) &global_system_variables.ndb_use_exact_count,
(gptr*) &global_system_variables.ndb_use_exact_count, (gptr*) &global_system_variables.ndb_use_exact_count,
0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0}, 0, GET_BOOL, OPT_ARG, 1, 0, 0, 0, 0, 0},
{"ndb_use_local_query_cache", OPT_NDB_USE_LOCAL_QUERY_CACHE,
"Use local query cache, note that this cache will _not_ "
"be invalidated if data is updated through other mysql servers",
(gptr*) &global_system_variables.ndb_use_local_query_cache,
(gptr*) &global_system_variables.ndb_use_local_query_cache,
0, GET_BOOL, OPT_ARG, 0, 0, 0, 0, 0, 0},
#endif #endif
{"new", 'n', "Use very new possible 'unsafe' functions.", {"new", 'n', "Use very new possible 'unsafe' functions.",
(gptr*) &global_system_variables.new_mode, (gptr*) &global_system_variables.new_mode,
......
...@@ -371,6 +371,9 @@ sys_var_thd_bool ...@@ -371,6 +371,9 @@ sys_var_thd_bool
sys_ndb_use_exact_count("ndb_use_exact_count", sys_ndb_use_exact_count("ndb_use_exact_count",
&SV::ndb_use_exact_count); &SV::ndb_use_exact_count);
sys_var_thd_bool sys_var_thd_bool
sys_ndb_use_local_query_cache("ndb_use_local_query_cache",
&SV::ndb_use_local_query_cache);
sys_var_thd_bool
sys_ndb_use_transactions("ndb_use_transactions", sys_ndb_use_transactions("ndb_use_transactions",
&SV::ndb_use_transactions); &SV::ndb_use_transactions);
// ndb server global variable settings // ndb server global variable settings
...@@ -634,6 +637,7 @@ sys_var *sys_variables[]= ...@@ -634,6 +637,7 @@ sys_var *sys_variables[]=
&sys_ndb_autoincrement_prefetch_sz, &sys_ndb_autoincrement_prefetch_sz,
&sys_ndb_force_send, &sys_ndb_force_send,
&sys_ndb_use_exact_count, &sys_ndb_use_exact_count,
&sys_ndb_use_local_query_cache,
&sys_ndb_use_transactions, &sys_ndb_use_transactions,
#endif #endif
&sys_unique_checks, &sys_unique_checks,
...@@ -801,6 +805,8 @@ struct show_var_st init_vars[]= { ...@@ -801,6 +805,8 @@ struct show_var_st init_vars[]= {
(char*) &sys_ndb_autoincrement_prefetch_sz, SHOW_SYS}, (char*) &sys_ndb_autoincrement_prefetch_sz, SHOW_SYS},
{sys_ndb_force_send.name, (char*) &sys_ndb_force_send, SHOW_SYS}, {sys_ndb_force_send.name, (char*) &sys_ndb_force_send, SHOW_SYS},
{sys_ndb_use_exact_count.name,(char*) &sys_ndb_use_exact_count, SHOW_SYS}, {sys_ndb_use_exact_count.name,(char*) &sys_ndb_use_exact_count, SHOW_SYS},
{sys_ndb_use_local_query_cache.name,
(char*) &sys_ndb_use_local_query_cache, SHOW_SYS},
{sys_ndb_use_transactions.name,(char*) &sys_ndb_use_transactions, SHOW_SYS}, {sys_ndb_use_transactions.name,(char*) &sys_ndb_use_transactions, SHOW_SYS},
#endif #endif
{sys_net_buffer_length.name,(char*) &sys_net_buffer_length, SHOW_SYS}, {sys_net_buffer_length.name,(char*) &sys_net_buffer_length, SHOW_SYS},
......
...@@ -403,6 +403,7 @@ struct system_variables ...@@ -403,6 +403,7 @@ struct system_variables
ulong ndb_autoincrement_prefetch_sz; ulong ndb_autoincrement_prefetch_sz;
my_bool ndb_force_send; my_bool ndb_force_send;
my_bool ndb_use_exact_count; my_bool ndb_use_exact_count;
my_bool ndb_use_local_query_cache;
my_bool ndb_use_transactions; my_bool ndb_use_transactions;
#endif /* HAVE_NDBCLUSTER_DB */ #endif /* HAVE_NDBCLUSTER_DB */
my_bool old_passwords; my_bool old_passwords;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment