Commit db16cae5 authored by unknown's avatar unknown

BUG#13987 Cluster: Loss of data nodes can cause high CPU usage from ndb_mgmd

smaller patch for 5.0.

complete patch going to 5.1 due to more intrusiveness for 'list sessions' etc


ndb/include/mgmapi/mgmapi.h:
  add internal get_fd to use in test
ndb/include/util/InputStream.hpp:
  - add this weird startover member to SocketInputStream
      - this helps work out if we've read a newline yet and should start inserting
           into buffer from the start
ndb/src/common/util/InputStream.cpp:
  remove evil, add more.
  
  keep track internally we've retrieved a newline yet (m_startover)
ndb/src/common/util/Parser.cpp:
  change way detecting of NoLine
  
  remove some trailing whitespace that was uglying the place up a bit
ndb/src/common/util/socket_io.cpp:
  Always retrieve data from the OS so that we instantly get EOF on disconnect
  and don't end up spinning looking for a newline.
ndb/src/mgmapi/mgmapi.cpp:
  add internal ndb_mgm_get_fd() for internal testing
  
  handle 'node status' a bit better
ndb/test/ndbapi/testMgm.cpp:
  Add test for MgmApiSession disconnection (mgmd at 100%)
  
  not fully automated due to smaller patch for 5.0
  
  will be complete in 5.1
parent 81526834
...@@ -1071,6 +1071,19 @@ extern "C" { ...@@ -1071,6 +1071,19 @@ extern "C" {
*/ */
int ndb_mgm_end_session(NdbMgmHandle handle); int ndb_mgm_end_session(NdbMgmHandle handle);
/**
* ndb_mgm_get_fd
*
* get the file descriptor of the handle.
* INTERNAL ONLY.
* USE FOR TESTING. OTHER USES ARE NOT A GOOD IDEA.
*
* @param handle NDB management handle
* @return handle->socket
*
*/
int ndb_mgm_get_fd(NdbMgmHandle handle);
/** /**
* Get the node id of the mgm server we're connected to * Get the node id of the mgm server we're connected to
*/ */
......
...@@ -40,6 +40,7 @@ extern FileInputStream Stdin; ...@@ -40,6 +40,7 @@ extern FileInputStream Stdin;
class SocketInputStream : public InputStream { class SocketInputStream : public InputStream {
NDB_SOCKET_TYPE m_socket; NDB_SOCKET_TYPE m_socket;
unsigned m_timeout; unsigned m_timeout;
bool m_startover;
public: public:
SocketInputStream(NDB_SOCKET_TYPE socket, unsigned readTimeout = 1000); SocketInputStream(NDB_SOCKET_TYPE socket, unsigned readTimeout = 1000);
char* gets(char * buf, int bufLen); char* gets(char * buf, int bufLen);
......
...@@ -36,26 +36,35 @@ FileInputStream::gets(char * buf, int bufLen){ ...@@ -36,26 +36,35 @@ FileInputStream::gets(char * buf, int bufLen){
SocketInputStream::SocketInputStream(NDB_SOCKET_TYPE socket, SocketInputStream::SocketInputStream(NDB_SOCKET_TYPE socket,
unsigned readTimeout) unsigned readTimeout)
: m_socket(socket) { : m_socket(socket) {
m_timeout = readTimeout; m_startover= true;
m_timeout = readTimeout;
} }
char* char*
SocketInputStream::gets(char * buf, int bufLen) { SocketInputStream::gets(char * buf, int bufLen) {
buf[0] = 77;
assert(bufLen >= 2); assert(bufLen >= 2);
int res = readln_socket(m_socket, m_timeout, buf, bufLen - 1); int offset= 0;
if(m_startover)
{
buf[0]= '\0';
m_startover= false;
}
else
offset= strlen(buf);
int res = readln_socket(m_socket, m_timeout, buf+offset, bufLen-offset);
if(res == 0)
{
buf[0]=0;
return buf;
}
m_startover= true;
if(res == -1) if(res == -1)
return 0; return 0;
if(res == 0 && buf[0] == 77){ // select return 0
buf[0] = 0;
} else if(res == 0 && buf[0] == 0){ // only newline
buf[0] = '\n';
buf[1] = 0;
} else {
int len = strlen(buf);
buf[len + 1] = '\0';
buf[len] = '\n';
}
return buf; return buf;
} }
...@@ -148,21 +148,26 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst, ...@@ -148,21 +148,26 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
bool ownStop = false; bool ownStop = false;
if(stop == 0) if(stop == 0)
stop = &ownStop; stop = &ownStop;
ctx->m_aliasUsed.clear(); ctx->m_aliasUsed.clear();
const unsigned sz = sizeof(ctx->m_tokenBuffer); const unsigned sz = sizeof(ctx->m_tokenBuffer);
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz); ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
if(Eof(ctx->m_currentToken)){ if(Eof(ctx->m_currentToken)){
ctx->m_status = Parser<Dummy>::Eof; ctx->m_status = Parser<Dummy>::Eof;
DBUG_RETURN(false); DBUG_RETURN(false);
} }
if(ctx->m_currentToken[0] == 0){ int last= strlen(ctx->m_currentToken);
if(last>0)
last--;
if(ctx->m_currentToken[last] !='\n'){
ctx->m_status = Parser<Dummy>::NoLine; ctx->m_status = Parser<Dummy>::NoLine;
ctx->m_tokenBuffer[0]= '\0';
DBUG_RETURN(false); DBUG_RETURN(false);
} }
if(Empty(ctx->m_currentToken)){ if(Empty(ctx->m_currentToken)){
ctx->m_status = Parser<Dummy>::EmptyLine; ctx->m_status = Parser<Dummy>::EmptyLine;
DBUG_RETURN(false); DBUG_RETURN(false);
...@@ -174,14 +179,14 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst, ...@@ -174,14 +179,14 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
ctx->m_status = Parser<Dummy>::UnknownCommand; ctx->m_status = Parser<Dummy>::UnknownCommand;
DBUG_RETURN(false); DBUG_RETURN(false);
} }
Properties * p = new Properties(); Properties * p = new Properties();
bool invalidArgument = false; bool invalidArgument = false;
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz); ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
while((! * stop) && while((! * stop) &&
!Eof(ctx->m_currentToken) && !Eof(ctx->m_currentToken) &&
!Empty(ctx->m_currentToken)){ !Empty(ctx->m_currentToken)){
if(ctx->m_currentToken[0] != 0){ if(ctx->m_currentToken[0] != 0){
trim(ctx->m_currentToken); trim(ctx->m_currentToken);
...@@ -193,7 +198,7 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst, ...@@ -193,7 +198,7 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
} }
ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz); ctx->m_currentToken = input.gets(ctx->m_tokenBuffer, sz);
} }
if(invalidArgument){ if(invalidArgument){
char buf[sz]; char buf[sz];
char * tmp; char * tmp;
...@@ -204,13 +209,13 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst, ...@@ -204,13 +209,13 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
} }
DBUG_RETURN(false); DBUG_RETURN(false);
} }
if(* stop){ if(* stop){
delete p; delete p;
ctx->m_status = Parser<Dummy>::ExternalStop; ctx->m_status = Parser<Dummy>::ExternalStop;
DBUG_RETURN(false); DBUG_RETURN(false);
} }
if(!checkMandatory(ctx, p)){ if(!checkMandatory(ctx, p)){
ctx->m_status = Parser<Dummy>::MissingMandatoryArgument; ctx->m_status = Parser<Dummy>::MissingMandatoryArgument;
delete p; delete p;
...@@ -226,9 +231,9 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst, ...@@ -226,9 +231,9 @@ ParserImpl::run(Context * ctx, const class Properties ** pDst,
tmp.put("name", alias->name); tmp.put("name", alias->name);
tmp.put("realName", alias->realName); tmp.put("realName", alias->realName);
p->put("$ALIAS", i, &tmp); p->put("$ALIAS", i, &tmp);
} }
p->put("$ALIAS", ctx->m_aliasUsed.size()); p->put("$ALIAS", ctx->m_aliasUsed.size());
ctx->m_status = Parser<Dummy>::Ok; ctx->m_status = Parser<Dummy>::Ok;
* pDst = p; * pDst = p;
DBUG_RETURN(true); DBUG_RETURN(true);
......
...@@ -75,7 +75,6 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -75,7 +75,6 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
return -1; return -1;
} }
buf[0] = 0;
const int t = recv(socket, buf, buflen, MSG_PEEK); const int t = recv(socket, buf, buflen, MSG_PEEK);
if(t < 1) if(t < 1)
...@@ -87,27 +86,28 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis, ...@@ -87,27 +86,28 @@ readln_socket(NDB_SOCKET_TYPE socket, int timeout_millis,
for(int i=0; i< t;i++) for(int i=0; i< t;i++)
{ {
if(buf[i] == '\n'){ if(buf[i] == '\n'){
recv(socket, buf, i+1, 0); int r= recv(socket, buf, i+1, 0);
buf[i] = 0; buf[i+1]= 0;
if(r < 1) {
fcntl(socket, F_SETFL, sock_flags);
return -1;
}
if(i > 0 && buf[i-1] == '\r'){ if(i > 0 && buf[i-1] == '\r'){
i--; buf[i-1] = '\n';
buf[i] = 0; buf[i]= '\0';
} }
fcntl(socket, F_SETFL, sock_flags); fcntl(socket, F_SETFL, sock_flags);
return t; return r;
} }
} }
if(t == (buflen - 1)){ int r= recv(socket, buf, t, 0);
recv(socket, buf, t, 0); if(r>=0)
buf[t] = 0; buf[r] = 0;
fcntl(socket, F_SETFL, sock_flags); fcntl(socket, F_SETFL, sock_flags);
return buflen; return r;
}
return 0;
} }
extern "C" extern "C"
......
...@@ -502,6 +502,18 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries, ...@@ -502,6 +502,18 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/**
* Only used for low level testing
* Never to be used by end user.
* Or anybody who doesn't know exactly what they're doing.
*/
extern "C"
int
ndb_mgm_get_fd(NdbMgmHandle handle)
{
return handle->socket;
}
/** /**
* Disconnect from a mgm server * Disconnect from a mgm server
*/ */
...@@ -692,22 +704,16 @@ ndb_mgm_get_status(NdbMgmHandle handle) ...@@ -692,22 +704,16 @@ ndb_mgm_get_status(NdbMgmHandle handle)
SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected"); SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected");
return NULL; return NULL;
} }
if(buf[strlen(buf)-1] == '\n') if(strcmp("node status\n", buf) != 0) {
buf[strlen(buf)-1] = '\0';
if(strcmp("node status", buf) != 0) {
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf); SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL; return NULL;
} }
if(!in.gets(buf, sizeof(buf))) if(!in.gets(buf, sizeof(buf)))
{ {
SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected"); SET_ERROR(handle, NDB_MGM_ILLEGAL_SERVER_REPLY, "Probably disconnected");
return NULL; return NULL;
} }
if(buf[strlen(buf)-1] == '\n')
buf[strlen(buf)-1] = '\0';
BaseString tmp(buf); BaseString tmp(buf);
Vector<BaseString> split; Vector<BaseString> split;
tmp.split(split, ":"); tmp.split(split, ":");
...@@ -715,7 +721,7 @@ ndb_mgm_get_status(NdbMgmHandle handle) ...@@ -715,7 +721,7 @@ ndb_mgm_get_status(NdbMgmHandle handle)
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf); SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL; return NULL;
} }
if(!(split[0].trim() == "nodes")){ if(!(split[0].trim() == "nodes")){
SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf); SET_ERROR(handle, NDB_MGM_ILLEGAL_NODE_STATUS, buf);
return NULL; return NULL;
...@@ -2280,7 +2286,6 @@ ndb_mgm_check_connection(NdbMgmHandle handle){ ...@@ -2280,7 +2286,6 @@ ndb_mgm_check_connection(NdbMgmHandle handle){
SocketOutputStream out(handle->socket); SocketOutputStream out(handle->socket);
SocketInputStream in(handle->socket, handle->read_timeout); SocketInputStream in(handle->socket, handle->read_timeout);
char buf[32]; char buf[32];
if (out.println("check connection")) if (out.println("check connection"))
goto ndb_mgm_check_connection_error; goto ndb_mgm_check_connection_error;
...@@ -2490,7 +2495,6 @@ int ndb_mgm_end_session(NdbMgmHandle handle) ...@@ -2490,7 +2495,6 @@ int ndb_mgm_end_session(NdbMgmHandle handle)
SocketInputStream in(handle->socket, handle->read_timeout); SocketInputStream in(handle->socket, handle->read_timeout);
char buf[32]; char buf[32];
in.gets(buf, sizeof(buf)); in.gets(buf, sizeof(buf));
DBUG_RETURN(0); DBUG_RETURN(0);
......
...@@ -21,6 +21,8 @@ ...@@ -21,6 +21,8 @@
#include <NdbRestarter.hpp> #include <NdbRestarter.hpp>
#include <Vector.hpp> #include <Vector.hpp>
#include <random.h> #include <random.h>
#include <mgmapi.h>
#include <mgmapi_debug.h>
int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){ int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){
...@@ -167,6 +169,26 @@ int runTestSingleUserMode(NDBT_Context* ctx, NDBT_Step* step){ ...@@ -167,6 +169,26 @@ int runTestSingleUserMode(NDBT_Context* ctx, NDBT_Step* step){
return result; return result;
} }
int runTestApiSession(NDBT_Context* ctx, NDBT_Step* step)
{
char *mgm= ctx->getRemoteMgm();
NdbMgmHandle h;
h= ndb_mgm_create_handle();
ndb_mgm_set_connectstring(h, mgm);
ndb_mgm_connect(h,0,0,0);
int s= ndb_mgm_get_fd(h);
write(s,"get",3);
ndb_mgm_disconnect(h);
ndb_mgm_destroy_handle(&h);
/** NOTE: WE CANNOT REALLY TEST ANYTHING in 5.0
*
* a more conservative patch for 5.0, full get and list
* sessions in 5.1.
*
* This is kept so that we can at least manually test easily
*/
}
NDBT_TESTSUITE(testMgm); NDBT_TESTSUITE(testMgm);
...@@ -175,6 +197,11 @@ TESTCASE("SingleUserMode", ...@@ -175,6 +197,11 @@ TESTCASE("SingleUserMode",
INITIALIZER(runTestSingleUserMode); INITIALIZER(runTestSingleUserMode);
FINALIZER(runClearTable); FINALIZER(runClearTable);
} }
TESTCASE("ApiSessionFailure",
"Test failures in MGMAPI session"){
INITIALIZER(runTestApiSession);
}
NDBT_TESTSUITE_END(testMgm); NDBT_TESTSUITE_END(testMgm);
int main(int argc, const char** argv){ int main(int argc, const char** argv){
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment