Commit 4575534d authored by unknown's avatar unknown

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.0

into  poseidon.ndb.mysql.com:/home/tomas/mysql-5.0
parents 9bc05c49 6108803a
...@@ -232,6 +232,12 @@ extern "C" { ...@@ -232,6 +232,12 @@ extern "C" {
/** Could not connect to socker */ /** Could not connect to socker */
NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET = 1011, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET = 1011,
/* Alloc node id failures */
/** Generic error, retry may succeed */
NDB_MGM_ALLOCID_ERROR = 1101,
/** Non retriable error */
NDB_MGM_ALLOCID_CONFIG_MISMATCH = 1102,
/* Service errors - Start/Stop Node or System */ /* Service errors - Start/Stop Node or System */
/** Start failed */ /** Start failed */
NDB_MGM_START_FAILED = 2001, NDB_MGM_START_FAILED = 2001,
...@@ -999,7 +1005,7 @@ extern "C" { ...@@ -999,7 +1005,7 @@ extern "C" {
void ndb_mgm_destroy_configuration(struct ndb_mgm_configuration *); void ndb_mgm_destroy_configuration(struct ndb_mgm_configuration *);
int ndb_mgm_alloc_nodeid(NdbMgmHandle handle, int ndb_mgm_alloc_nodeid(NdbMgmHandle handle,
unsigned version, int nodetype); unsigned version, int nodetype, int log_event);
/** /**
* End Session * End Session
......
...@@ -349,12 +349,14 @@ ConfigRetriever::allocNodeId(int no_retries, int retry_delay_in_seconds) ...@@ -349,12 +349,14 @@ ConfigRetriever::allocNodeId(int no_retries, int retry_delay_in_seconds)
if(!ndb_mgm_connect(m_handle, 0, 0, 0)) if(!ndb_mgm_connect(m_handle, 0, 0, 0))
goto next; goto next;
res= ndb_mgm_alloc_nodeid(m_handle, m_version, m_node_type); res= ndb_mgm_alloc_nodeid(m_handle, m_version, m_node_type,
no_retries == 0 /* only log last retry */);
if(res >= 0) if(res >= 0)
return _ownNodeId= (Uint32)res; return _ownNodeId= (Uint32)res;
next: next:
if (no_retries == 0) int error = ndb_mgm_get_latest_error(m_handle);
if (no_retries == 0 || error == NDB_MGM_ALLOCID_CONFIG_MISMATCH)
break; break;
no_retries--; no_retries--;
NdbSleep_SecSleep(retry_delay_in_seconds); NdbSleep_SecSleep(retry_delay_in_seconds);
......
...@@ -286,7 +286,8 @@ Configuration::fetch_configuration(){ ...@@ -286,7 +286,8 @@ Configuration::fetch_configuration(){
if (globalData.ownId) if (globalData.ownId)
cr.setNodeId(globalData.ownId); cr.setNodeId(globalData.ownId);
globalData.ownId = cr.allocNodeId(2 /*retry*/,3 /*delay*/); globalData.ownId = cr.allocNodeId(globalData.ownId ? 10 : 2 /*retry*/,
3 /*delay*/);
if(globalData.ownId == 0){ if(globalData.ownId == 0){
ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG, ERROR_SET(fatal, NDBD_EXIT_INVALID_CONFIG,
......
...@@ -1868,7 +1868,8 @@ const char *ndb_mgm_get_connectstring(NdbMgmHandle handle, char *buf, int buf_sz ...@@ -1868,7 +1868,8 @@ const char *ndb_mgm_get_connectstring(NdbMgmHandle handle, char *buf, int buf_sz
extern "C" extern "C"
int int
ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype) ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype,
int log_event)
{ {
CHECK_HANDLE(handle, 0); CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0); CHECK_CONNECTED(handle, 0);
...@@ -1888,9 +1889,11 @@ ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype) ...@@ -1888,9 +1889,11 @@ ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype)
args.put("endian", (endian_check.c[sizeof(long)-1])?"big":"little"); args.put("endian", (endian_check.c[sizeof(long)-1])?"big":"little");
if (handle->m_name) if (handle->m_name)
args.put("name", handle->m_name); args.put("name", handle->m_name);
args.put("log_event", log_event);
const ParserRow<ParserDummy> reply[]= { const ParserRow<ParserDummy> reply[]= {
MGM_CMD("get nodeid reply", NULL, ""), MGM_CMD("get nodeid reply", NULL, ""),
MGM_ARG("error_code", Int, Optional, "Error code"),
MGM_ARG("nodeid", Int, Optional, "Error message"), MGM_ARG("nodeid", Int, Optional, "Error message"),
MGM_ARG("result", String, Mandatory, "Error message"), MGM_ARG("result", String, Mandatory, "Error message"),
MGM_END() MGM_END()
...@@ -1903,14 +1906,16 @@ ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype) ...@@ -1903,14 +1906,16 @@ ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, int nodetype)
nodeid= -1; nodeid= -1;
do { do {
const char * buf; const char * buf;
if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){ if (!prop->get("result", &buf) || strcmp(buf, "Ok") != 0)
{
const char *hostname= ndb_mgm_get_connected_host(handle); const char *hostname= ndb_mgm_get_connected_host(handle);
unsigned port= ndb_mgm_get_connected_port(handle); unsigned port= ndb_mgm_get_connected_port(handle);
BaseString err; BaseString err;
Uint32 error_code= NDB_MGM_ALLOCID_ERROR;
err.assfmt("Could not alloc node id at %s port %d: %s", err.assfmt("Could not alloc node id at %s port %d: %s",
hostname, port, buf); hostname, port, buf);
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, prop->get("error_code", &error_code);
err.c_str()); setError(handle, error_code, __LINE__, err.c_str());
break; break;
} }
Uint32 _nodeid; Uint32 _nodeid;
......
...@@ -507,9 +507,10 @@ MgmtSrvr::MgmtSrvr(SocketServer *socket_server, ...@@ -507,9 +507,10 @@ MgmtSrvr::MgmtSrvr(SocketServer *socket_server,
if (_ownNodeId == 0) // we did not get node id from other server if (_ownNodeId == 0) // we did not get node id from other server
{ {
NodeId tmp= m_config_retriever->get_configuration_nodeid(); NodeId tmp= m_config_retriever->get_configuration_nodeid();
int error_code;
if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM, if (!alloc_node_id(&tmp, NDB_MGM_NODE_TYPE_MGM,
0, 0, error_string)){ 0, 0, error_code, error_string)){
ndbout << "Unable to obtain requested nodeid: " ndbout << "Unable to obtain requested nodeid: "
<< error_string.c_str() << endl; << error_string.c_str() << endl;
require(false); require(false);
...@@ -1118,31 +1119,16 @@ int MgmtSrvr::sendSTOP_REQ(const Vector<NodeId> &node_ids, ...@@ -1118,31 +1119,16 @@ int MgmtSrvr::sendSTOP_REQ(const Vector<NodeId> &node_ids,
const NFCompleteRep * const rep = const NFCompleteRep * const rep =
CAST_CONSTPTR(NFCompleteRep, signal->getDataPtr()); CAST_CONSTPTR(NFCompleteRep, signal->getDataPtr());
#ifdef VM_TRACE #ifdef VM_TRACE
ndbout_c("Node %d fail completed", rep->failedNodeId); ndbout_c("sendSTOP_REQ Node %d fail completed", rep->failedNodeId);
#endif #endif
nodes.clear(rep->failedNodeId); // clear the failed node
if (singleUserNodeId == 0)
stoppedNodes.set(rep->failedNodeId);
break; break;
} }
case GSN_NODE_FAILREP:{ case GSN_NODE_FAILREP:{
const NodeFailRep * const rep = const NodeFailRep * const rep =
CAST_CONSTPTR(NodeFailRep, signal->getDataPtr()); CAST_CONSTPTR(NodeFailRep, signal->getDataPtr());
NodeBitmask failedNodes;
failedNodes.assign(NodeBitmask::Size, rep->theNodes);
#ifdef VM_TRACE
{
ndbout << "Failed nodes:";
for (unsigned i = 0; i < 32*NodeBitmask::Size; i++)
if(failedNodes.get(i))
ndbout << " " << i;
ndbout << endl;
}
#endif
failedNodes.bitAND(nodes);
if (!failedNodes.isclear())
{
nodes.bitANDC(failedNodes); // clear the failed nodes
if (singleUserNodeId == 0)
stoppedNodes.bitOR(failedNodes);
}
break; break;
} }
default: default:
...@@ -1263,11 +1249,47 @@ int MgmtSrvr::restartNodes(const Vector<NodeId> &node_ids, ...@@ -1263,11 +1249,47 @@ int MgmtSrvr::restartNodes(const Vector<NodeId> &node_ids,
abort, abort,
false, false,
true, true,
nostart, true,
initialStart); initialStart);
if (ret)
return ret;
if (stopCount) if (stopCount)
*stopCount = nodes.count(); *stopCount = nodes.count();
return ret;
// start up the nodes again
int waitTime = 12000;
NDB_TICKS maxTime = NdbTick_CurrentMillisecond() + waitTime;
for (unsigned i = 0; i < node_ids.size(); i++)
{
NodeId nodeId= node_ids[i];
enum ndb_mgm_node_status s;
s = NDB_MGM_NODE_STATUS_NO_CONTACT;
#ifdef VM_TRACE
ndbout_c("Waiting for %d not started", nodeId);
#endif
while (s != NDB_MGM_NODE_STATUS_NOT_STARTED && waitTime > 0)
{
Uint32 startPhase = 0, version = 0, dynamicId = 0, nodeGroup = 0;
Uint32 connectCount = 0;
bool system;
const char *address;
status(nodeId, &s, &version, &startPhase,
&system, &dynamicId, &nodeGroup, &connectCount, &address);
NdbSleep_MilliSleep(100);
waitTime = (maxTime - NdbTick_CurrentMillisecond());
}
}
if (nostart)
return 0;
for (unsigned i = 0; i < node_ids.size(); i++)
{
int result = start(node_ids[i]);
}
return 0;
} }
/* /*
...@@ -1918,7 +1940,8 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, ...@@ -1918,7 +1940,8 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
enum ndb_mgm_node_type type, enum ndb_mgm_node_type type,
struct sockaddr *client_addr, struct sockaddr *client_addr,
SOCKET_SIZE_TYPE *client_addr_len, SOCKET_SIZE_TYPE *client_addr_len,
BaseString &error_string) int &error_code, BaseString &error_string,
int log_event)
{ {
DBUG_ENTER("MgmtSrvr::alloc_node_id"); DBUG_ENTER("MgmtSrvr::alloc_node_id");
DBUG_PRINT("enter", ("nodeid=%d, type=%d, client_addr=%d", DBUG_PRINT("enter", ("nodeid=%d, type=%d, client_addr=%d",
...@@ -1927,6 +1950,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, ...@@ -1927,6 +1950,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
if (*nodeId == 0) { if (*nodeId == 0) {
error_string.appfmt("no-nodeid-checks set in management server.\n" error_string.appfmt("no-nodeid-checks set in management server.\n"
"node id must be set explicitly in connectstring"); "node id must be set explicitly in connectstring");
error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
DBUG_RETURN(false); DBUG_RETURN(false);
} }
DBUG_RETURN(true); DBUG_RETURN(true);
...@@ -1951,8 +1975,10 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, ...@@ -1951,8 +1975,10 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
if(NdbMutex_Lock(m_configMutex)) if(NdbMutex_Lock(m_configMutex))
{ {
// should not happen
error_string.appfmt("unable to lock configuration mutex"); error_string.appfmt("unable to lock configuration mutex");
return false; error_code = NDB_MGM_ALLOCID_ERROR;
DBUG_RETURN(false);
} }
ndb_mgm_configuration_iterator ndb_mgm_configuration_iterator
iter(* _config->m_configValues, CFG_SECTION_NODE); iter(* _config->m_configValues, CFG_SECTION_NODE);
...@@ -2023,6 +2049,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, ...@@ -2023,6 +2049,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
"or specifying unique host names in config file.", "or specifying unique host names in config file.",
id_found, tmp); id_found, tmp);
NdbMutex_Unlock(m_configMutex); NdbMutex_Unlock(m_configMutex);
error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
DBUG_RETURN(false); DBUG_RETURN(false);
} }
if (config_hostname == 0) { if (config_hostname == 0) {
...@@ -2031,6 +2058,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, ...@@ -2031,6 +2058,7 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
"or specifying unique host names in config file,\n" "or specifying unique host names in config file,\n"
"or specifying just one mgmt server in config file.", "or specifying just one mgmt server in config file.",
tmp); tmp);
error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
DBUG_RETURN(false); DBUG_RETURN(false);
} }
id_found= tmp; // mgmt server matched, check for more matches id_found= tmp; // mgmt server matched, check for more matches
...@@ -2072,7 +2100,8 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, ...@@ -2072,7 +2100,8 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
char tmp_str[128]; char tmp_str[128];
m_reserved_nodes.getText(tmp_str); m_reserved_nodes.getText(tmp_str);
g_eventLogger.info("Mgmt server state: nodeid %d reserved for ip %s, m_reserved_nodes %s.", g_eventLogger.info("Mgmt server state: nodeid %d reserved for ip %s, "
"m_reserved_nodes %s.",
id_found, get_connect_address(id_found), tmp_str); id_found, get_connect_address(id_found), tmp_str);
DBUG_RETURN(true); DBUG_RETURN(true);
} }
...@@ -2093,26 +2122,48 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, ...@@ -2093,26 +2122,48 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
type_c_string.assfmt("%s(%s)", alias, str); type_c_string.assfmt("%s(%s)", alias, str);
} }
if (*nodeId == 0) { if (*nodeId == 0)
{
if (found_matching_id) if (found_matching_id)
{
if (found_matching_type) if (found_matching_type)
{
if (found_free_node) if (found_free_node)
{
error_string.appfmt("Connection done from wrong host ip %s.", error_string.appfmt("Connection done from wrong host ip %s.",
(client_addr)? (client_addr)?
inet_ntoa(((struct sockaddr_in *) inet_ntoa(((struct sockaddr_in *)
(client_addr))->sin_addr):""); (client_addr))->sin_addr):"");
error_code = NDB_MGM_ALLOCID_ERROR;
}
else else
{
error_string.appfmt("No free node id found for %s.", error_string.appfmt("No free node id found for %s.",
type_string.c_str()); type_string.c_str());
error_code = NDB_MGM_ALLOCID_ERROR;
}
}
else else
{
error_string.appfmt("No %s node defined in config file.", error_string.appfmt("No %s node defined in config file.",
type_string.c_str()); type_string.c_str());
error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
}
}
else else
{
error_string.append("No nodes defined in config file."); error_string.append("No nodes defined in config file.");
} else { error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
}
}
else
{
if (found_matching_id) if (found_matching_id)
{
if (found_matching_type) if (found_matching_type)
if (found_free_node) { {
if (found_free_node)
{
// have to split these into two since inet_ntoa overwrites itself // have to split these into two since inet_ntoa overwrites itself
error_string.appfmt("Connection with id %d done from wrong host ip %s,", error_string.appfmt("Connection with id %d done from wrong host ip %s,",
*nodeId, inet_ntoa(((struct sockaddr_in *) *nodeId, inet_ntoa(((struct sockaddr_in *)
...@@ -2120,27 +2171,44 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId, ...@@ -2120,27 +2171,44 @@ MgmtSrvr::alloc_node_id(NodeId * nodeId,
error_string.appfmt(" expected %s(%s).", config_hostname, error_string.appfmt(" expected %s(%s).", config_hostname,
r_config_addr ? r_config_addr ?
"lookup failed" : inet_ntoa(config_addr)); "lookup failed" : inet_ntoa(config_addr));
} else error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
}
else
{
error_string.appfmt("Id %d already allocated by another node.", error_string.appfmt("Id %d already allocated by another node.",
*nodeId); *nodeId);
error_code = NDB_MGM_ALLOCID_ERROR;
}
}
else else
{
error_string.appfmt("Id %d configured as %s, connect attempted as %s.", error_string.appfmt("Id %d configured as %s, connect attempted as %s.",
*nodeId, type_c_string.c_str(), *nodeId, type_c_string.c_str(),
type_string.c_str()); type_string.c_str());
error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
}
}
else else
{
error_string.appfmt("No node defined with id=%d in config file.", error_string.appfmt("No node defined with id=%d in config file.",
*nodeId); *nodeId);
error_code = NDB_MGM_ALLOCID_CONFIG_MISMATCH;
}
} }
g_eventLogger.warning("Allocate nodeid (%d) failed. Connection from ip %s. " if (log_event || error_code == NDB_MGM_ALLOCID_CONFIG_MISMATCH)
"Returned error string \"%s\"", {
g_eventLogger.warning("Allocate nodeid (%d) failed. Connection from ip %s."
" Returned error string \"%s\"",
*nodeId, *nodeId,
client_addr != 0 ? inet_ntoa(((struct sockaddr_in *)(client_addr))->sin_addr) : "<none>", client_addr != 0
? inet_ntoa(((struct sockaddr_in *)
(client_addr))->sin_addr)
: "<none>",
error_string.c_str()); error_string.c_str());
NodeBitmask connected_nodes2; NodeBitmask connected_nodes2;
get_connected_nodes(connected_nodes2); get_connected_nodes(connected_nodes2);
{
BaseString tmp_connected, tmp_not_connected; BaseString tmp_connected, tmp_not_connected;
for(Uint32 i = 0; i < MAX_NODES; i++) for(Uint32 i = 0; i < MAX_NODES; i++)
{ {
...@@ -2378,6 +2446,8 @@ MgmtSrvr::repCommand(Uint32* repReqId, Uint32 request, bool waitCompleted) ...@@ -2378,6 +2446,8 @@ MgmtSrvr::repCommand(Uint32* repReqId, Uint32 request, bool waitCompleted)
MgmtSrvr::Allocated_resources::Allocated_resources(MgmtSrvr &m) MgmtSrvr::Allocated_resources::Allocated_resources(MgmtSrvr &m)
: m_mgmsrv(m) : m_mgmsrv(m)
{ {
m_reserved_nodes.clear();
m_alloc_timeout= 0;
} }
MgmtSrvr::Allocated_resources::~Allocated_resources() MgmtSrvr::Allocated_resources::~Allocated_resources()
...@@ -2396,9 +2466,22 @@ MgmtSrvr::Allocated_resources::~Allocated_resources() ...@@ -2396,9 +2466,22 @@ MgmtSrvr::Allocated_resources::~Allocated_resources()
} }
void void
MgmtSrvr::Allocated_resources::reserve_node(NodeId id) MgmtSrvr::Allocated_resources::reserve_node(NodeId id, NDB_TICKS timeout)
{ {
m_reserved_nodes.set(id); m_reserved_nodes.set(id);
m_alloc_timeout= NdbTick_CurrentMillisecond() + timeout;
}
bool
MgmtSrvr::Allocated_resources::is_timed_out(NDB_TICKS tick)
{
if (m_alloc_timeout && tick > m_alloc_timeout)
{
g_eventLogger.info("Mgmt server state: nodeid %d timed out.",
get_nodeid());
return true;
}
return false;
} }
NodeId NodeId
......
...@@ -106,7 +106,8 @@ public: ...@@ -106,7 +106,8 @@ public:
~Allocated_resources(); ~Allocated_resources();
// methods to reserve/allocate resources which // methods to reserve/allocate resources which
// will be freed when running destructor // will be freed when running destructor
void reserve_node(NodeId id); void reserve_node(NodeId id, NDB_TICKS timeout);
bool is_timed_out(NDB_TICKS tick);
bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId); } bool is_reserved(NodeId nodeId) { return m_reserved_nodes.get(nodeId); }
bool is_reserved(NodeBitmask mask) { return !mask.bitAND(m_reserved_nodes).isclear(); } bool is_reserved(NodeBitmask mask) { return !mask.bitAND(m_reserved_nodes).isclear(); }
bool isclear() { return m_reserved_nodes.isclear(); } bool isclear() { return m_reserved_nodes.isclear(); }
...@@ -114,6 +115,7 @@ public: ...@@ -114,6 +115,7 @@ public:
private: private:
MgmtSrvr &m_mgmsrv; MgmtSrvr &m_mgmsrv;
NodeBitmask m_reserved_nodes; NodeBitmask m_reserved_nodes;
NDB_TICKS m_alloc_timeout;
}; };
NdbMutex *m_node_id_mutex; NdbMutex *m_node_id_mutex;
...@@ -432,8 +434,10 @@ public: ...@@ -432,8 +434,10 @@ public:
*/ */
bool getNextNodeId(NodeId * _nodeId, enum ndb_mgm_node_type type) const ; bool getNextNodeId(NodeId * _nodeId, enum ndb_mgm_node_type type) const ;
bool alloc_node_id(NodeId * _nodeId, enum ndb_mgm_node_type type, bool alloc_node_id(NodeId * _nodeId, enum ndb_mgm_node_type type,
struct sockaddr *client_addr, SOCKET_SIZE_TYPE *client_addr_len, struct sockaddr *client_addr,
BaseString &error_string); SOCKET_SIZE_TYPE *client_addr_len,
int &error_code, BaseString &error_string,
int log_event = 1);
/** /**
* *
......
...@@ -137,6 +137,8 @@ ParserRow<MgmApiSession> commands[] = { ...@@ -137,6 +137,8 @@ ParserRow<MgmApiSession> commands[] = {
MGM_ARG("public key", String, Mandatory, "Public key"), MGM_ARG("public key", String, Mandatory, "Public key"),
MGM_ARG("endian", String, Optional, "Endianness"), MGM_ARG("endian", String, Optional, "Endianness"),
MGM_ARG("name", String, Optional, "Name of connection"), MGM_ARG("name", String, Optional, "Name of connection"),
MGM_ARG("timeout", Int, Optional, "Timeout in seconds"),
MGM_ARG("log_event", Int, Optional, "Log failure in cluster log"),
MGM_CMD("get version", &MgmApiSession::getVersion, ""), MGM_CMD("get version", &MgmApiSession::getVersion, ""),
...@@ -259,6 +261,15 @@ ParserRow<MgmApiSession> commands[] = { ...@@ -259,6 +261,15 @@ ParserRow<MgmApiSession> commands[] = {
MGM_END() MGM_END()
}; };
struct PurgeStruct
{
NodeBitmask free_nodes;/* free nodes as reported
* by ndbd in apiRegReqConf
*/
BaseString *str;
NDB_TICKS tick;
};
MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock) MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock)
: SocketServer::Session(sock), m_mgmsrv(mgm) : SocketServer::Session(sock), m_mgmsrv(mgm)
{ {
...@@ -408,12 +419,15 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ...@@ -408,12 +419,15 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
{ {
const char *cmd= "get nodeid reply"; const char *cmd= "get nodeid reply";
Uint32 version, nodeid= 0, nodetype= 0xff; Uint32 version, nodeid= 0, nodetype= 0xff;
Uint32 timeout= 20; // default seconds timeout
const char * transporter; const char * transporter;
const char * user; const char * user;
const char * password; const char * password;
const char * public_key; const char * public_key;
const char * endian= NULL; const char * endian= NULL;
const char * name= NULL; const char * name= NULL;
Uint32 log_event= 1;
bool log_event_version;
union { long l; char c[sizeof(long)]; } endian_check; union { long l; char c[sizeof(long)]; } endian_check;
args.get("version", &version); args.get("version", &version);
...@@ -425,6 +439,9 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ...@@ -425,6 +439,9 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
args.get("public key", &public_key); args.get("public key", &public_key);
args.get("endian", &endian); args.get("endian", &endian);
args.get("name", &name); args.get("name", &name);
args.get("timeout", &timeout);
/* for backwards compatability keep track if client uses new protocol */
log_event_version= args.get("log_event", &log_event);
endian_check.l = 1; endian_check.l = 1;
if(endian if(endian
...@@ -464,14 +481,38 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ...@@ -464,14 +481,38 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
NodeId tmp= nodeid; NodeId tmp= nodeid;
if(tmp == 0 || !m_allocated_resources->is_reserved(tmp)){ if(tmp == 0 || !m_allocated_resources->is_reserved(tmp)){
BaseString error_string; BaseString error_string;
if (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype, int error_code;
&addr, &addrlen, error_string)){ NDB_TICKS tick= 0;
/* only report error on second attempt as not to clog the cluster log */
while (!m_mgmsrv.alloc_node_id(&tmp, (enum ndb_mgm_node_type)nodetype,
&addr, &addrlen, error_code, error_string,
tick == 0 ? 0 : log_event))
{
/* NDB_MGM_ALLOCID_CONFIG_MISMATCH is a non retriable error */
if (tick == 0 && error_code != NDB_MGM_ALLOCID_CONFIG_MISMATCH)
{
// attempt to free any timed out reservations
tick= NdbTick_CurrentMillisecond();
struct PurgeStruct ps;
m_mgmsrv.get_connected_nodes(ps.free_nodes);
// invert connected_nodes to get free nodes
ps.free_nodes.bitXORC(NodeBitmask());
ps.str= 0;
ps.tick= tick;
m_mgmsrv.get_socket_server()->
foreachSession(stop_session_if_timed_out,&ps);
error_string = "";
continue;
}
const char *alias; const char *alias;
const char *str; const char *str;
alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type) alias= ndb_mgm_get_node_type_alias_string((enum ndb_mgm_node_type)
nodetype, &str); nodetype, &str);
m_output->println(cmd); m_output->println(cmd);
m_output->println("result: %s", error_string.c_str()); m_output->println("result: %s", error_string.c_str());
/* only use error_code protocol if client knows about it */
if (log_event_version)
m_output->println("error_code: %d", error_code);
m_output->println(""); m_output->println("");
return; return;
} }
...@@ -491,7 +532,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &, ...@@ -491,7 +532,7 @@ MgmApiSession::get_nodeid(Parser_t::Context &,
m_output->println("nodeid: %u", tmp); m_output->println("nodeid: %u", tmp);
m_output->println("result: Ok"); m_output->println("result: Ok");
m_output->println(""); m_output->println("");
m_allocated_resources->reserve_node(tmp); m_allocated_resources->reserve_node(tmp, timeout*1000);
if (name) if (name)
g_eventLogger.info("Node %d: %s", tmp, name); g_eventLogger.info("Node %d: %s", tmp, name);
...@@ -1480,14 +1521,6 @@ MgmApiSession::listen_event(Parser<MgmApiSession>::Context & ctx, ...@@ -1480,14 +1521,6 @@ MgmApiSession::listen_event(Parser<MgmApiSession>::Context & ctx,
m_output->println(""); m_output->println("");
} }
struct PurgeStruct
{
NodeBitmask free_nodes;/* free nodes as reported
* by ndbd in apiRegReqConf
*/
BaseString *str;
};
void void
MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *data) MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *data)
{ {
...@@ -1495,11 +1528,24 @@ MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *da ...@@ -1495,11 +1528,24 @@ MgmApiSession::stop_session_if_not_connected(SocketServer::Session *_s, void *da
struct PurgeStruct &ps= *(struct PurgeStruct *)data; struct PurgeStruct &ps= *(struct PurgeStruct *)data;
if (s->m_allocated_resources->is_reserved(ps.free_nodes)) if (s->m_allocated_resources->is_reserved(ps.free_nodes))
{ {
if (ps.str)
ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid()); ps.str->appfmt(" %d", s->m_allocated_resources->get_nodeid());
s->stopSession(); s->stopSession();
} }
} }
void
MgmApiSession::stop_session_if_timed_out(SocketServer::Session *_s, void *data)
{
MgmApiSession *s= (MgmApiSession *)_s;
struct PurgeStruct &ps= *(struct PurgeStruct *)data;
if (s->m_allocated_resources->is_reserved(ps.free_nodes) &&
s->m_allocated_resources->is_timed_out(ps.tick))
{
s->stopSession();
}
}
void void
MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx, MgmApiSession::purge_stale_sessions(Parser_t::Context &ctx,
const class Properties &args) const class Properties &args)
......
...@@ -30,6 +30,7 @@ ...@@ -30,6 +30,7 @@
class MgmApiSession : public SocketServer::Session class MgmApiSession : public SocketServer::Session
{ {
static void stop_session_if_timed_out(SocketServer::Session *_s, void *data);
static void stop_session_if_not_connected(SocketServer::Session *_s, void *data); static void stop_session_if_not_connected(SocketServer::Session *_s, void *data);
private: private:
typedef Parser<MgmApiSession> Parser_t; typedef Parser<MgmApiSession> Parser_t;
......
...@@ -5650,11 +5650,23 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, ...@@ -5650,11 +5650,23 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
{ {
DBUG_ENTER("ndb_get_table_statistics"); DBUG_ENTER("ndb_get_table_statistics");
DBUG_PRINT("enter", ("table: %s", table)); DBUG_PRINT("enter", ("table: %s", table));
NdbTransaction* pTrans= ndb->startTransaction(); NdbTransaction* pTrans;
int retries= 10;
int retry_sleep= 30 * 1000; /* 30 milliseconds */
do do
{ {
pTrans= ndb->startTransaction();
if (pTrans == NULL) if (pTrans == NULL)
{
if (ndb->getNdbError().status == NdbError::TemporaryError &&
retries--)
{
my_sleep(retry_sleep);
continue;
}
break; break;
}
NdbScanOperation* pOp= pTrans->getNdbScanOperation(table); NdbScanOperation* pOp= pTrans->getNdbScanOperation(table);
if (pOp == NULL) if (pOp == NULL)
...@@ -5678,7 +5690,17 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, ...@@ -5678,7 +5690,17 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
NdbTransaction::AbortOnError, NdbTransaction::AbortOnError,
TRUE); TRUE);
if (check == -1) if (check == -1)
{
if (pTrans->getNdbError().status == NdbError::TemporaryError &&
retries--)
{
ndb->closeTransaction(pTrans);
pTrans= 0;
my_sleep(retry_sleep);
continue;
}
break; break;
}
Uint32 count= 0; Uint32 count= 0;
Uint64 sum_rows= 0; Uint64 sum_rows= 0;
...@@ -5713,7 +5735,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table, ...@@ -5713,7 +5735,7 @@ ndb_get_table_statistics(Ndb* ndb, const char * table,
sum_mem, count)); sum_mem, count));
DBUG_RETURN(0); DBUG_RETURN(0);
} while (0); } while(1);
if (pTrans) if (pTrans)
ndb->closeTransaction(pTrans); ndb->closeTransaction(pTrans);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment