Commit 18a9ac2e authored by unknown's avatar unknown

Merge mysql.com:/users/lthalmann/bkroot/mysql-4.1-rpl

into  mysql.com:/users/lthalmann/bk/MERGE/mysql-4.1-merge
parents e135f26e 1020f902
drop table if exists t1, t2;
create table t1 (a int key) engine=ndbcluster;
begin;
insert into t1 values (1);
insert into t1 values (2);
ERROR HY000: Got temporary error 4025 'Node failure caused abort of transaction' from ndbcluster
commit;
ERROR HY000: Got error 4350 'Transaction already aborted' from ndbcluster
drop table t1;
create table t2 (a int, b int, primary key(a,b)) engine=ndbcluster;
insert into t2 values (1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1),(10,1);
select * from t2 order by a limit 3;
a b
1 1
2 1
3 1
create table t2 (a int key) engine=ndbcluster;
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
select * from t2 order by a limit 3;
a
1
2
3
select * from t2 order by a limit 3;
ERROR HY000: Can't lock file (errno: 241)
select * from t2 order by a limit 3;
a
1
2
3
show tables;
Tables_in_test
create table t2 (a int key) engine=ndbcluster;
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
select * from t2 order by a limit 3;
a
1
2
3
select * from t2 order by a limit 3;
a
1
2
3
drop table t2;
DROP TABLE IF EXISTS t1;
create table t1(a int) engine=myisam;
select * into outfile 'MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' from t1;
drop table t1;
create table t1(a int) engine=ndb;
load data local infile 'MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' into table t1;
select count(*) from t1;
count(*)
10000
drop table t1;
create table t1(a int) engine=myisam;
insert into t1 values (1), (2), (2), (3);
select * into outfile 'MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' from t1;
drop table t1;
create table t1(a int primary key) engine=ndb;
load data local infile 'MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' into table t1;
select * from t1 order by a;
a
1
2
3
drop table t1;
create table t1(a int) engine=myisam;
insert into t1 values (1), (1), (2), (3);
select * into outfile 'MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' from t1;
drop table t1;
create table t1(a int primary key) engine=ndb;
load data local infile 'MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' into table t1;
select * from t1 order by a;
a
1
2
3
drop table t1;
create table t1(a int) engine=myisam;
insert into t1 values (1), (2), (3), (3);
select * into outfile 'MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' from t1;
drop table t1;
create table t1(a int primary key) engine=ndb;
load data local infile 'MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' into table t1;
select * from t1 order by a;
a
1
2
3
drop table t1;
-- source include/have_ndb.inc
-- source include/have_multi_ndb.inc
-- source include/not_embedded.inc
--disable_warnings
drop table if exists t1, t2;
--enable_warnings
#
# Transaction ongoing while cluster is restarted
#
--connection server1
create table t1 (a int key) engine=ndbcluster;
begin;
insert into t1 values (1);
--exec $NDB_MGM --no-defaults -e "all restart" >> $NDB_TOOLS_OUTPUT
--exec $NDB_TOOLS_DIR/ndb_waiter --no-defaults >> $NDB_TOOLS_OUTPUT
--error 1297
insert into t1 values (2);
--error 1296
commit;
drop table t1;
#
# Stale cache after restart -i
#
--connection server1
create table t2 (a int, b int, primary key(a,b)) engine=ndbcluster;
insert into t2 values (1,1),(2,1),(3,1),(4,1),(5,1),(6,1),(7,1),(8,1),(9,1),(10,1);
select * from t2 order by a limit 3;
--exec $NDB_MGM --no-defaults -e "all restart -i" >> $NDB_TOOLS_OUTPUT
--exec $NDB_TOOLS_DIR/ndb_waiter --no-defaults >> $NDB_TOOLS_OUTPUT
--connection server2
create table t2 (a int key) engine=ndbcluster;
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
select * from t2 order by a limit 3;
# server 1 should have a stale cache, and in this case wrong frm, transaction must be retried
--connection server1
--error 1015
select * from t2 order by a limit 3;
select * from t2 order by a limit 3;
--exec $NDB_MGM --no-defaults -e "all restart -i" >> $NDB_TOOLS_OUTPUT
--exec $NDB_TOOLS_DIR/ndb_waiter --no-defaults >> $NDB_TOOLS_OUTPUT
--connection server1
show tables;
create table t2 (a int key) engine=ndbcluster;
insert into t2 values (1),(2),(3),(4),(5),(6),(7),(8),(9),(10);
select * from t2 order by a limit 3;
# server 2 should have a stale cache, but with right frm, transaction need not be retried
--connection server2
select * from t2 order by a limit 3;
drop table t2;
# End of 4.1 tests
-- source include/have_ndb.inc
-- source include/not_embedded.inc
--disable_warnings
DROP TABLE IF EXISTS t1;
--enable_warnings
create table t1(a int) engine=myisam;
let $1=10000;
disable_query_log;
set SQL_LOG_BIN=0;
while ($1)
{
insert into t1 values(1);
dec $1;
}
set SQL_LOG_BIN=1;
enable_query_log;
--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
eval select * into outfile '$MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' from t1;
#This will generate a 20KB file, now test LOAD DATA LOCAL
drop table t1;
create table t1(a int) engine=ndb;
--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
eval load data local infile '$MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' into table t1;
select count(*) from t1;
system rm $MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile ;
drop table t1;
create table t1(a int) engine=myisam;
insert into t1 values (1), (2), (2), (3);
--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
eval select * into outfile '$MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' from t1;
drop table t1;
create table t1(a int primary key) engine=ndb;
--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
eval load data local infile '$MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' into table t1;
system rm $MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile;
select * from t1 order by a;
drop table t1;
create table t1(a int) engine=myisam;
insert into t1 values (1), (1), (2), (3);
--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
eval select * into outfile '$MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' from t1;
drop table t1;
create table t1(a int primary key) engine=ndb;
--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
eval load data local infile '$MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' into table t1;
system rm $MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile;
select * from t1 order by a;
drop table t1;
create table t1(a int) engine=myisam;
insert into t1 values (1), (2), (3), (3);
--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
eval select * into outfile '$MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' from t1;
drop table t1;
create table t1(a int primary key) engine=ndb;
--replace_result $MYSQLTEST_VARDIR MYSQLTEST_VARDIR
eval load data local infile '$MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile' into table t1;
system rm $MYSQLTEST_VARDIR/master-data/ndb_loaddatalocal.select_outfile;
select * from t1 order by a;
drop table t1;
# End of 4.1 tests
...@@ -65,6 +65,7 @@ ...@@ -65,6 +65,7 @@
#define CFG_DB_BACKUP_DATA_BUFFER_MEM 134 #define CFG_DB_BACKUP_DATA_BUFFER_MEM 134
#define CFG_DB_BACKUP_LOG_BUFFER_MEM 135 #define CFG_DB_BACKUP_LOG_BUFFER_MEM 135
#define CFG_DB_BACKUP_WRITE_SIZE 136 #define CFG_DB_BACKUP_WRITE_SIZE 136
#define CFG_DB_BACKUP_MAX_WRITE_SIZE 139
#define CFG_LOG_DESTINATION 147 #define CFG_LOG_DESTINATION 147
......
...@@ -83,6 +83,7 @@ public: ...@@ -83,6 +83,7 @@ public:
void set_optimized_node_selection(int val); void set_optimized_node_selection(int val);
unsigned no_db_nodes(); unsigned no_db_nodes();
unsigned get_connect_count() const;
#endif #endif
private: private:
......
...@@ -74,7 +74,7 @@ public: ...@@ -74,7 +74,7 @@ public:
/** /**
* Constructor / Destructor * Constructor / Destructor
*/ */
SocketServer(int maxSessions = 32); SocketServer(unsigned maxSessions = ~(unsigned)0);
~SocketServer(); ~SocketServer();
/** /**
......
...@@ -100,10 +100,10 @@ Transporter::connect_server(NDB_SOCKET_TYPE sockfd) { ...@@ -100,10 +100,10 @@ Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
} }
{ {
struct sockaddr addr; struct sockaddr_in addr;
SOCKET_SIZE_TYPE addrlen= sizeof(addr); SOCKET_SIZE_TYPE addrlen= sizeof(addr);
int r= getpeername(sockfd, &addr, &addrlen); int r= getpeername(sockfd, (struct sockaddr*)&addr, &addrlen);
m_connect_address= ((struct sockaddr_in *)&addr)->sin_addr; m_connect_address= (&addr)->sin_addr;
} }
bool res = connect_server_impl(sockfd); bool res = connect_server_impl(sockfd);
...@@ -173,10 +173,10 @@ Transporter::connect_client() { ...@@ -173,10 +173,10 @@ Transporter::connect_client() {
} }
{ {
struct sockaddr addr; struct sockaddr_in addr;
SOCKET_SIZE_TYPE addrlen= sizeof(addr); SOCKET_SIZE_TYPE addrlen= sizeof(addr);
int r= getpeername(sockfd, &addr, &addrlen); int r= getpeername(sockfd, (struct sockaddr*)&addr, &addrlen);
m_connect_address= ((struct sockaddr_in *)&addr)->sin_addr; m_connect_address= (&addr)->sin_addr;
} }
bool res = connect_client_impl(sockfd); bool res = connect_client_impl(sockfd);
......
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
#define DEBUG(x) ndbout << x << endl; #define DEBUG(x) ndbout << x << endl;
SocketServer::SocketServer(int maxSessions) : SocketServer::SocketServer(unsigned maxSessions) :
m_sessions(10), m_sessions(10),
m_services(5) m_services(5)
{ {
...@@ -124,7 +124,7 @@ SocketServer::setup(SocketServer::Service * service, ...@@ -124,7 +124,7 @@ SocketServer::setup(SocketServer::Service * service,
DBUG_RETURN(false); DBUG_RETURN(false);
} }
if (listen(sock, m_maxSessions) == -1){ if (listen(sock, m_maxSessions > 32 ? 32 : m_maxSessions) == -1){
DBUG_PRINT("error",("listen() - %d - %s", DBUG_PRINT("error",("listen() - %d - %s",
errno, strerror(errno))); errno, strerror(errno)));
NDB_CLOSE_SOCKET(sock); NDB_CLOSE_SOCKET(sock);
......
...@@ -66,15 +66,16 @@ Backup::Backup(const Configuration & conf) : ...@@ -66,15 +66,16 @@ Backup::Backup(const Configuration & conf) :
Uint32 szDataBuf = (2 * 1024 * 1024); Uint32 szDataBuf = (2 * 1024 * 1024);
Uint32 szLogBuf = (2 * 1024 * 1024); Uint32 szLogBuf = (2 * 1024 * 1024);
Uint32 szWrite = 32768; Uint32 szWrite = 32768, maxWriteSize = (256 * 1024);
ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_DATA_BUFFER_MEM, &szDataBuf); ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_DATA_BUFFER_MEM, &szDataBuf);
ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_LOG_BUFFER_MEM, &szLogBuf); ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_LOG_BUFFER_MEM, &szLogBuf);
ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_WRITE_SIZE, &szWrite); ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_WRITE_SIZE, &szWrite);
ndb_mgm_get_int_parameter(p, CFG_DB_BACKUP_MAX_WRITE_SIZE, &maxWriteSize);
c_defaults.m_logBufferSize = szLogBuf; c_defaults.m_logBufferSize = szLogBuf;
c_defaults.m_dataBufferSize = szDataBuf; c_defaults.m_dataBufferSize = szDataBuf;
c_defaults.m_minWriteSize = szWrite; c_defaults.m_minWriteSize = szWrite;
c_defaults.m_maxWriteSize = szWrite; c_defaults.m_maxWriteSize = maxWriteSize;
{ // Init all tables { // Init all tables
ArrayList<Table> tables(c_tablePool); ArrayList<Table> tables(c_tablePool);
......
...@@ -226,6 +226,7 @@ void Cmvmi::execEVENT_REP(Signal* signal) ...@@ -226,6 +226,7 @@ void Cmvmi::execEVENT_REP(Signal* signal)
void void
Cmvmi::execEVENT_SUBSCRIBE_REQ(Signal * signal){ Cmvmi::execEVENT_SUBSCRIBE_REQ(Signal * signal){
EventSubscribeReq * subReq = (EventSubscribeReq *)&signal->theData[0]; EventSubscribeReq * subReq = (EventSubscribeReq *)&signal->theData[0];
Uint32 senderRef = signal->getSendersBlockRef();
SubscriberPtr ptr; SubscriberPtr ptr;
jamEntry(); jamEntry();
DBUG_ENTER("Cmvmi::execEVENT_SUBSCRIBE_REQ"); DBUG_ENTER("Cmvmi::execEVENT_SUBSCRIBE_REQ");
...@@ -243,7 +244,7 @@ Cmvmi::execEVENT_SUBSCRIBE_REQ(Signal * signal){ ...@@ -243,7 +244,7 @@ Cmvmi::execEVENT_SUBSCRIBE_REQ(Signal * signal){
* Create a new one * Create a new one
*/ */
if(subscribers.seize(ptr) == false){ if(subscribers.seize(ptr) == false){
sendSignal(subReq->blockRef, GSN_EVENT_SUBSCRIBE_REF, signal, 1, JBB); sendSignal(senderRef, GSN_EVENT_SUBSCRIBE_REF, signal, 1, JBB);
return; return;
} }
ptr.p->logLevel.clear(); ptr.p->logLevel.clear();
...@@ -270,7 +271,7 @@ Cmvmi::execEVENT_SUBSCRIBE_REQ(Signal * signal){ ...@@ -270,7 +271,7 @@ Cmvmi::execEVENT_SUBSCRIBE_REQ(Signal * signal){
} }
signal->theData[0] = ptr.i; signal->theData[0] = ptr.i;
sendSignal(ptr.p->blockRef, GSN_EVENT_SUBSCRIBE_CONF, signal, 1, JBB); sendSignal(senderRef, GSN_EVENT_SUBSCRIBE_CONF, signal, 1, JBB);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
......
...@@ -638,10 +638,12 @@ ndb_mgm_get_status(NdbMgmHandle handle) ...@@ -638,10 +638,12 @@ ndb_mgm_get_status(NdbMgmHandle handle)
Vector<BaseString> split; Vector<BaseString> split;
tmp.split(split, ":"); tmp.split(split, ":");
if(split.size() != 2){ if(split.size() != 2){
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL; return NULL;
} }
if(!(split[0].trim() == "nodes")){ if(!(split[0].trim() == "nodes")){
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL; return NULL;
} }
...@@ -690,6 +692,7 @@ ndb_mgm_get_status(NdbMgmHandle handle) ...@@ -690,6 +692,7 @@ ndb_mgm_get_status(NdbMgmHandle handle)
if(i+1 != noOfNodes){ if(i+1 != noOfNodes){
free(state); free(state);
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, "Node count mismatch");
return NULL; return NULL;
} }
......
...@@ -1191,7 +1191,19 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1191,7 +1191,19 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
false, false,
ConfigInfo::CI_INT, ConfigInfo::CI_INT,
"32K", "32K",
"0", "2K",
STR_VALUE(MAX_INT_RNIL) },
{
CFG_DB_BACKUP_MAX_WRITE_SIZE,
"BackupMaxWriteSize",
DB_TOKEN,
"Max size of filesystem writes made by backup (in bytes)",
ConfigInfo::CI_USED,
false,
ConfigInfo::CI_INT,
"256K",
"2K",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_INT_RNIL) },
/*************************************************************************** /***************************************************************************
......
...@@ -119,40 +119,49 @@ MgmtSrvr::logLevelThreadRun() ...@@ -119,40 +119,49 @@ MgmtSrvr::logLevelThreadRun()
/** /**
* Handle started nodes * Handle started nodes
*/ */
m_started_nodes.lock();
if (m_started_nodes.size() > 0)
{
// calculate max log level
EventSubscribeReq req; EventSubscribeReq req;
req = m_event_listner[0].m_logLevel; {
LogLevel tmp;
m_event_listner.lock();
for(int i = m_event_listner.m_clients.size() - 1; i >= 0; i--)
tmp.set_max(m_event_listner[i].m_logLevel);
m_event_listner.unlock();
req = tmp;
}
req.blockRef = _ownReference; req.blockRef = _ownReference;
while (m_started_nodes.size() > 0)
SetLogLevelOrd ord; {
m_started_nodes.lock();
while(m_started_nodes.size() > 0){
Uint32 node = m_started_nodes[0]; Uint32 node = m_started_nodes[0];
m_started_nodes.erase(0, false); m_started_nodes.erase(0, false);
m_started_nodes.unlock(); m_started_nodes.unlock();
setEventReportingLevelImpl(node, req); setEventReportingLevelImpl(node, req);
SetLogLevelOrd ord;
ord = m_nodeLogLevel[node]; ord = m_nodeLogLevel[node];
setNodeLogLevelImpl(node, ord); setNodeLogLevelImpl(node, ord);
m_started_nodes.lock(); m_started_nodes.lock();
} }
}
m_started_nodes.unlock(); m_started_nodes.unlock();
m_log_level_requests.lock(); m_log_level_requests.lock();
while(m_log_level_requests.size() > 0){ while (m_log_level_requests.size() > 0)
req = m_log_level_requests[0]; {
EventSubscribeReq req = m_log_level_requests[0];
m_log_level_requests.erase(0, false); m_log_level_requests.erase(0, false);
m_log_level_requests.unlock(); m_log_level_requests.unlock();
LogLevel tmp;
tmp = req;
if(req.blockRef == 0){ if(req.blockRef == 0){
req.blockRef = _ownReference; req.blockRef = _ownReference;
setEventReportingLevelImpl(0, req); setEventReportingLevelImpl(0, req);
} else { } else {
SetLogLevelOrd ord;
ord = req; ord = req;
setNodeLogLevelImpl(req.blockRef, ord); setNodeLogLevelImpl(req.blockRef, ord);
} }
...@@ -1274,7 +1283,8 @@ int ...@@ -1274,7 +1283,8 @@ int
MgmtSrvr::setEventReportingLevelImpl(int nodeId, MgmtSrvr::setEventReportingLevelImpl(int nodeId,
const EventSubscribeReq& ll) const EventSubscribeReq& ll)
{ {
INIT_SIGNAL_SENDER(ss,nodeId); SignalSender ss(theFacade);
ss.lock();
SimpleSignal ssig; SimpleSignal ssig;
EventSubscribeReq * dst = EventSubscribeReq * dst =
...@@ -1283,41 +1293,54 @@ MgmtSrvr::setEventReportingLevelImpl(int nodeId, ...@@ -1283,41 +1293,54 @@ MgmtSrvr::setEventReportingLevelImpl(int nodeId,
EventSubscribeReq::SignalLength); EventSubscribeReq::SignalLength);
*dst = ll; *dst = ll;
send(ss,ssig,nodeId,NODE_TYPE_DB); NodeBitmask nodes;
nodes.clear();
Uint32 max = (nodeId == 0) ? (nodeId = 1, MAX_NDB_NODES) : nodeId;
for(; nodeId <= max; nodeId++)
{
if (nodeTypes[nodeId] != NODE_TYPE_DB)
continue;
if (okToSendTo(nodeId, true))
continue;
if (ss.sendSignal(nodeId, &ssig) == SEND_OK)
{
nodes.set(nodeId);
}
}
#if 0 int error = 0;
while (1) while (!nodes.isclear())
{ {
SimpleSignal *signal = ss.waitFor(); SimpleSignal *signal = ss.waitFor();
int gsn = signal->readSignalNumber(); int gsn = signal->readSignalNumber();
nodeId = refToNode(signal->header.theSendersBlockRef);
switch (gsn) { switch (gsn) {
case GSN_EVENT_SUBSCRIBE_CONF:{ case GSN_EVENT_SUBSCRIBE_CONF:{
nodes.clear(nodeId);
break; break;
} }
case GSN_EVENT_SUBSCRIBE_REF:{ case GSN_EVENT_SUBSCRIBE_REF:{
return SEND_OR_RECEIVE_FAILED; nodes.clear(nodeId);
error = 1;
break;
} }
case GSN_NF_COMPLETEREP:{ case GSN_NF_COMPLETEREP:{
const NFCompleteRep * const rep = const NFCompleteRep * const rep =
CAST_CONSTPTR(NFCompleteRep, signal->getDataPtr()); CAST_CONSTPTR(NFCompleteRep, signal->getDataPtr());
if (rep->failedNodeId == nodeId) nodes.clear(rep->failedNodeId);
return SEND_OR_RECEIVE_FAILED;
break; break;
} }
case GSN_NODE_FAILREP:{ case GSN_NODE_FAILREP:{
const NodeFailRep * const rep = // ignore, NF_COMPLETEREP will arrive later
CAST_CONSTPTR(NodeFailRep, signal->getDataPtr());
if (NodeBitmask::get(rep->theNodes,nodeId))
return SEND_OR_RECEIVE_FAILED;
break; break;
} }
default: default:
report_unknown_signal(signal); report_unknown_signal(signal);
return SEND_OR_RECEIVE_FAILED; return SEND_OR_RECEIVE_FAILED;
} }
} }
#endif if (error)
return SEND_OR_RECEIVE_FAILED;
return 0; return 0;
} }
...@@ -1337,19 +1360,6 @@ MgmtSrvr::setNodeLogLevelImpl(int nodeId, const SetLogLevelOrd & ll) ...@@ -1337,19 +1360,6 @@ MgmtSrvr::setNodeLogLevelImpl(int nodeId, const SetLogLevelOrd & ll)
return ss.sendSignal(nodeId, &ssig) == SEND_OK ? 0 : SEND_OR_RECEIVE_FAILED; return ss.sendSignal(nodeId, &ssig) == SEND_OK ? 0 : SEND_OR_RECEIVE_FAILED;
} }
int
MgmtSrvr::send(SignalSender &ss, SimpleSignal &ssig, Uint32 node, Uint32 node_type){
Uint32 max = (node == 0) ? MAX_NODES : node + 1;
for(; node < max; node++){
while(nodeTypes[node] != (int)node_type && node < max) node++;
if(nodeTypes[node] != (int)node_type)
break;
ss.sendSignal(node, &ssig);
}
return 0;
}
//**************************************************************************** //****************************************************************************
//**************************************************************************** //****************************************************************************
...@@ -2107,6 +2117,7 @@ int ...@@ -2107,6 +2117,7 @@ int
MgmtSrvr::abortBackup(Uint32 backupId) MgmtSrvr::abortBackup(Uint32 backupId)
{ {
SignalSender ss(theFacade); SignalSender ss(theFacade);
ss.lock(); // lock will be released on exit
bool next; bool next;
NodeId nodeId = 0; NodeId nodeId = 0;
......
...@@ -472,8 +472,6 @@ public: ...@@ -472,8 +472,6 @@ public:
private: private:
//************************************************************************** //**************************************************************************
int send(SignalSender &ss, SimpleSignal &ssig, Uint32 node, Uint32 node_type);
int sendSTOP_REQ(NodeId nodeId, int sendSTOP_REQ(NodeId nodeId,
NodeBitmask &stoppedNodes, NodeBitmask &stoppedNodes,
Uint32 singleUserNodeId, Uint32 singleUserNodeId,
......
...@@ -427,9 +427,9 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ...@@ -427,9 +427,9 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
return; return;
} }
struct sockaddr addr; struct sockaddr_in addr;
SOCKET_SIZE_TYPE addrlen= sizeof(addr); SOCKET_SIZE_TYPE addrlen= sizeof(addr);
int r = getpeername(m_socket, &addr, &addrlen); int r = getpeername(m_socket, (struct sockaddr*)&addr, &addrlen);
if (r != 0 ) { if (r != 0 ) {
m_output->println(cmd); m_output->println(cmd);
m_output->println("result: getpeername(%d) failed, err= %d", m_socket, r); m_output->println("result: getpeername(%d) failed, err= %d", m_socket, r);
...@@ -441,7 +441,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ...@@ -441,7 +441,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
if(tmp == 0 || !m_allocated_resources->is_reserved(tmp)){ if(tmp == 0 || !m_allocated_resources->is_reserved(tmp)){
BaseString error_string; BaseString error_string;
if (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype, if (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype,
&addr, &addrlen, error_string)){ (struct sockaddr*)&addr, &addrlen, error_string)){
const char *alias; const char *alias;
const char *str; const char *str;
alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type) alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type)
...@@ -763,9 +763,8 @@ MgmApiSession::setClusterLogLevel(Parser<MgmApiSession>::Context &, ...@@ -763,9 +763,8 @@ MgmApiSession::setClusterLogLevel(Parser<MgmApiSession>::Context &,
m_mgmsrv.m_event_listner.unlock(); m_mgmsrv.m_event_listner.unlock();
{ {
LogLevel ll; LogLevel tmp;
ll.setLogLevel(category,level); m_mgmsrv.m_event_listner.update_max_log_level(tmp);
m_mgmsrv.m_event_listner.update_max_log_level(ll);
} }
m_output->println(reply); m_output->println(reply);
...@@ -1248,21 +1247,23 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId) ...@@ -1248,21 +1247,23 @@ Ndb_mgmd_event_service::log(int eventType, const Uint32* theData, NodeId nodeId)
void void
Ndb_mgmd_event_service::update_max_log_level(const LogLevel &log_level) Ndb_mgmd_event_service::update_max_log_level(const LogLevel &log_level)
{ {
LogLevel tmp= m_logLevel; LogLevel tmp = log_level;
tmp.set_max(log_level); m_clients.lock();
for(int i = m_clients.size() - 1; i >= 0; i--)
tmp.set_max(m_clients[i].m_logLevel);
m_clients.unlock();
update_log_level(tmp); update_log_level(tmp);
} }
void void
Ndb_mgmd_event_service::update_log_level(const LogLevel &tmp) Ndb_mgmd_event_service::update_log_level(const LogLevel &tmp)
{ {
if(!(tmp == m_logLevel)){
m_logLevel = tmp; m_logLevel = tmp;
EventSubscribeReq req; EventSubscribeReq req;
req = tmp; req = tmp;
// send update to all nodes
req.blockRef = 0; req.blockRef = 0;
m_mgmsrv->m_log_level_requests.push_back(req); m_mgmsrv->m_log_level_requests.push_back(req);
}
} }
void void
......
...@@ -70,6 +70,7 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade): ...@@ -70,6 +70,7 @@ ClusterMgr::ClusterMgr(TransporterFacade & _facade):
noOfAliveNodes= 0; noOfAliveNodes= 0;
noOfConnectedNodes= 0; noOfConnectedNodes= 0;
theClusterMgrThread= 0; theClusterMgrThread= 0;
m_connect_count = 0;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -456,6 +457,10 @@ ClusterMgr::reportNodeFailed(NodeId nodeId){ ...@@ -456,6 +457,10 @@ ClusterMgr::reportNodeFailed(NodeId nodeId){
theNode.nfCompleteRep = false; theNode.nfCompleteRep = false;
if(noOfAliveNodes == 0){ if(noOfAliveNodes == 0){
theFacade.m_globalDictCache.lock();
theFacade.m_globalDictCache.invalidate_all();
theFacade.m_globalDictCache.unlock();
m_connect_count ++;
NFCompleteRep rep; NFCompleteRep rep;
for(Uint32 i = 1; i<MAX_NODES; i++){ for(Uint32 i = 1; i<MAX_NODES; i++){
if(theNodes[i].defined && theNodes[i].nfCompleteRep == false){ if(theNodes[i].defined && theNodes[i].nfCompleteRep == false){
......
...@@ -78,6 +78,7 @@ public: ...@@ -78,6 +78,7 @@ public:
const Node & getNodeInfo(NodeId) const; const Node & getNodeInfo(NodeId) const;
Uint32 getNoOfConnectedNodes() const; Uint32 getNoOfConnectedNodes() const;
Uint32 m_connect_count;
private: private:
Uint32 noOfAliveNodes; Uint32 noOfAliveNodes;
......
...@@ -255,6 +255,42 @@ GlobalDictCache::drop(NdbTableImpl * tab) ...@@ -255,6 +255,42 @@ GlobalDictCache::drop(NdbTableImpl * tab)
abort(); abort();
} }
unsigned
GlobalDictCache::get_size()
{
NdbElement_t<Vector<TableVersion> > * curr = m_tableHash.getNext(0);
int sz = 0;
while(curr != 0){
sz += curr->theData->size();
curr = m_tableHash.getNext(curr);
}
return sz;
}
void
GlobalDictCache::invalidate_all()
{
DBUG_ENTER("GlobalDictCache::invalidate_all");
NdbElement_t<Vector<TableVersion> > * curr = m_tableHash.getNext(0);
while(curr != 0){
Vector<TableVersion> * vers = curr->theData;
if (vers->size())
{
TableVersion * ver = & vers->back();
ver->m_impl->m_status = NdbDictionary::Object::Invalid;
ver->m_status = DROPPED;
if (ver->m_refCount == 0)
{
delete ver->m_impl;
vers->erase(vers->size() - 1);
}
}
curr = m_tableHash.getNext(curr);
}
DBUG_VOID_RETURN;
}
void void
GlobalDictCache::release(NdbTableImpl * tab){ GlobalDictCache::release(NdbTableImpl * tab){
unsigned i; unsigned i;
......
...@@ -71,6 +71,9 @@ public: ...@@ -71,6 +71,9 @@ public:
void alter_table_rep(const char * name, void alter_table_rep(const char * name,
Uint32 tableId, Uint32 tableVersion, bool altered); Uint32 tableId, Uint32 tableVersion, bool altered);
unsigned get_size();
void invalidate_all();
public: public:
enum Status { enum Status {
OK = 0, OK = 0,
......
...@@ -75,7 +75,9 @@ SignalSender::SignalSender(TransporterFacade *facade) ...@@ -75,7 +75,9 @@ SignalSender::SignalSender(TransporterFacade *facade)
{ {
m_cond = NdbCondition_Create(); m_cond = NdbCondition_Create();
theFacade = facade; theFacade = facade;
lock();
m_blockNo = theFacade->open(this, execSignal, execNodeStatus); m_blockNo = theFacade->open(this, execSignal, execNodeStatus);
unlock();
assert(m_blockNo > 0); assert(m_blockNo > 0);
} }
......
...@@ -264,6 +264,12 @@ TransporterFacade::unlock_mutex() ...@@ -264,6 +264,12 @@ TransporterFacade::unlock_mutex()
#include "ClusterMgr.hpp" #include "ClusterMgr.hpp"
inline
unsigned Ndb_cluster_connection_impl::get_connect_count() const
{
return TransporterFacade::instance()->theClusterMgr->m_connect_count;
}
inline inline
bool bool
TransporterFacade::check_send_size(Uint32 node_id, Uint32 send_size) TransporterFacade::check_send_size(Uint32 node_id, Uint32 send_size)
......
...@@ -236,6 +236,12 @@ Ndb_cluster_connection::wait_until_ready(int timeout, ...@@ -236,6 +236,12 @@ Ndb_cluster_connection::wait_until_ready(int timeout,
} while (1); } while (1);
} }
unsigned Ndb_cluster_connection::get_connect_count() const
{
return m_impl.get_connect_count();
}
/* /*
......
...@@ -49,6 +49,7 @@ class Ndb_cluster_connection_impl : public Ndb_cluster_connection ...@@ -49,6 +49,7 @@ class Ndb_cluster_connection_impl : public Ndb_cluster_connection
void init_get_next_node(Ndb_cluster_connection_node_iter &iter); void init_get_next_node(Ndb_cluster_connection_node_iter &iter);
Uint32 get_next_node(Ndb_cluster_connection_node_iter &iter); Uint32 get_next_node(Ndb_cluster_connection_node_iter &iter);
inline unsigned get_connect_count() const;
private: private:
friend class Ndb; friend class Ndb;
friend class NdbImpl; friend class NdbImpl;
......
...@@ -129,6 +129,12 @@ getStatus(){ ...@@ -129,6 +129,12 @@ getStatus(){
ndbout << "status==NULL, retries="<<retries<<endl; ndbout << "status==NULL, retries="<<retries<<endl;
MGMERR(handle); MGMERR(handle);
retries++; retries++;
ndb_mgm_disconnect(handle);
if (ndb_mgm_connect(handle,0,0,1)) {
MGMERR(handle);
g_err << "Reconnect failed" << endl;
break;
}
continue; continue;
} }
int count = status->no_of_nodes; int count = status->no_of_nodes;
......
...@@ -1836,6 +1836,11 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -1836,6 +1836,11 @@ int ha_ndbcluster::write_row(byte *record)
if(m_ignore_dup_key && table->primary_key != MAX_KEY) if(m_ignore_dup_key && table->primary_key != MAX_KEY)
{ {
/*
compare if expression with that in start_bulk_insert()
start_bulk_insert will set parameters to ensure that each
write_row is committed individually
*/
int peek_res= peek_row(record); int peek_res= peek_row(record);
if (!peek_res) if (!peek_res)
...@@ -2996,6 +3001,19 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows) ...@@ -2996,6 +3001,19 @@ void ha_ndbcluster::start_bulk_insert(ha_rows rows)
DBUG_PRINT("enter", ("rows: %d", (int)rows)); DBUG_PRINT("enter", ("rows: %d", (int)rows));
m_rows_inserted= (ha_rows) 0; m_rows_inserted= (ha_rows) 0;
if (m_ignore_dup_key && table->primary_key != MAX_KEY)
{
/*
compare if expression with that in write_row
we have a situation where peek_row() will be called
so we cannot batch
*/
DBUG_PRINT("info", ("Batching turned off as duplicate key is "
"ignored by using peek_row"));
m_rows_to_insert= 1;
m_bulk_insert_rows= 1;
DBUG_VOID_RETURN;
}
if (rows == (ha_rows) 0) if (rows == (ha_rows) 0)
{ {
/* We don't know how many will be inserted, guess */ /* We don't know how many will be inserted, guess */
...@@ -3306,8 +3324,23 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type) ...@@ -3306,8 +3324,23 @@ int ha_ndbcluster::external_lock(THD *thd, int lock_type)
{ {
m_table= (void *)tab; m_table= (void *)tab;
m_table_version = tab->getObjectVersion(); m_table_version = tab->getObjectVersion();
if (!(my_errno= build_index_list(ndb, table, ILBP_OPEN))) if ((my_errno= build_index_list(ndb, table, ILBP_OPEN)))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
const void *data, *pack_data;
uint length, pack_length;
if (readfrm(table->path, &data, &length) ||
packfrm(data, length, &pack_data, &pack_length) ||
pack_length != tab->getFrmLength() ||
memcmp(pack_data, tab->getFrmData(), pack_length))
{
my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
NdbError err= ndb->getNdbError(NDB_INVALID_SCHEMA_OBJECT);
DBUG_RETURN(ndb_to_mysql_error(&err));
}
my_free((char*)data, MYF(MY_ALLOW_ZERO_PTR));
my_free((char*)pack_data, MYF(MY_ALLOW_ZERO_PTR));
} }
m_table_info= tab_info; m_table_info= tab_info;
} }
...@@ -4021,20 +4054,30 @@ int ha_ndbcluster::delete_table(const char *name) ...@@ -4021,20 +4054,30 @@ int ha_ndbcluster::delete_table(const char *name)
int ha_ndbcluster::drop_table() int ha_ndbcluster::drop_table()
{ {
THD *thd= current_thd;
Ndb *ndb= get_ndb(); Ndb *ndb= get_ndb();
NdbDictionary::Dictionary *dict= ndb->getDictionary(); NdbDictionary::Dictionary *dict= ndb->getDictionary();
DBUG_ENTER("drop_table"); DBUG_ENTER("drop_table");
DBUG_PRINT("enter", ("Deleting %s", m_tabname)); DBUG_PRINT("enter", ("Deleting %s", m_tabname));
if (dict->dropTable(m_tabname)) while (dict->dropTable(m_tabname))
{ {
const NdbError err= dict->getNdbError(); const NdbError err= dict->getNdbError();
if (err.code == 709) switch (err.status)
; // 709: No such table existed {
else case NdbError::TemporaryError:
if (!thd->killed)
continue; // retry indefinitly
break;
default:
break;
}
if (err.code != 709) // 709: No such table existed
ERR_RETURN(dict->getNdbError()); ERR_RETURN(dict->getNdbError());
break;
} }
release_metadata(); release_metadata();
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -4438,14 +4481,24 @@ int ndbcluster_drop_database(const char *path) ...@@ -4438,14 +4481,24 @@ int ndbcluster_drop_database(const char *path)
List_iterator_fast<char> it(drop_list); List_iterator_fast<char> it(drop_list);
while ((tabname=it++)) while ((tabname=it++))
{ {
if (dict->dropTable(tabname)) while (dict->dropTable(tabname))
{ {
const NdbError err= dict->getNdbError(); const NdbError err= dict->getNdbError();
if (err.code != 709) switch (err.status)
{
case NdbError::TemporaryError:
if (!thd->killed)
continue; // retry indefinitly
break;
default:
break;
}
if (err.code != 709) // 709: No such table existed
{ {
ERR_PRINT(err); ERR_PRINT(err);
ret= ndb_to_mysql_error(&err); ret= ndb_to_mysql_error(&err);
} }
break;
} }
} }
DBUG_RETURN(ret); DBUG_RETURN(ret);
......
...@@ -644,6 +644,7 @@ fi ...@@ -644,6 +644,7 @@ fi
%attr(755, root, root) %{_bindir}/ndb_desc %attr(755, root, root) %{_bindir}/ndb_desc
%attr(755, root, root) %{_bindir}/ndb_show_tables %attr(755, root, root) %{_bindir}/ndb_show_tables
%attr(755, root, root) %{_bindir}/ndb_test_platform %attr(755, root, root) %{_bindir}/ndb_test_platform
%attr(755, root, root) %{_bindir}/ndb_config
%files ndb-extra %files ndb-extra
%defattr(-,root,root,0755) %defattr(-,root,root,0755)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment