Commit a2b2bbd9 authored by tomas@whalegate.ndb.mysql.com's avatar tomas@whalegate.ndb.mysql.com

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.1-new-ndb

into  whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-single-user
parents 277a5a50 be3342cf
......@@ -20,7 +20,12 @@
#include <TransporterCallback.hpp>
#include <RefConvert.hpp>
#ifdef ERROR_INSERT
Uint32 MAX_RECEIVED_SIGNALS = 1024;
#else
#define MAX_RECEIVED_SIGNALS 1024
#endif
Uint32
TransporterRegistry::unpack(Uint32 * readPtr,
Uint32 sizeOfData,
......
......@@ -102,6 +102,10 @@ private:
virtual void updateReceiveDataPtr(Uint32 bytesRead);
virtual Uint32 get_free_buffer() const;
inline bool hasReceiveData () const {
return receiveBuffer.sizeOfData > 0;
}
protected:
/**
* Setup client/server and perform connect/accept
......
......@@ -739,16 +739,13 @@ TransporterRegistry::poll_SHM(Uint32 timeOutMillis)
Uint32
TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
{
bool hasdata = false;
if (false && nTCPTransporters == 0)
{
tcpReadSelectReply = 0;
return 0;
}
struct timeval timeout;
timeout.tv_sec = timeOutMillis / 1000;
timeout.tv_usec = (timeOutMillis % 1000) * 1000;
NDB_SOCKET_TYPE maxSocketValue = -1;
// Needed for TCP/IP connections
......@@ -771,8 +768,15 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
// Put the connected transporters in the socket read-set
FD_SET(socket, &tcpReadset);
}
hasdata |= t->hasReceiveData();
}
timeOutMillis = hasdata ? 0 : timeOutMillis;
struct timeval timeout;
timeout.tv_sec = timeOutMillis / 1000;
timeout.tv_usec = (timeOutMillis % 1000) * 1000;
// The highest socket value plus one
maxSocketValue++;
......@@ -787,7 +791,7 @@ TransporterRegistry::poll_TCP(Uint32 timeOutMillis)
}
#endif
return tcpReadSelectReply;
return tcpReadSelectReply || hasdata;
}
#endif
......@@ -796,26 +800,26 @@ void
TransporterRegistry::performReceive()
{
#ifdef NDB_TCP_TRANSPORTER
if(tcpReadSelectReply > 0)
for (int i=0; i<nTCPTransporters; i++)
{
for (int i=0; i<nTCPTransporters; i++)
{
checkJobBuffer();
TCP_Transporter *t = theTCPTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
const NDB_SOCKET_TYPE socket = t->getSocket();
if(is_connected(nodeId)){
if(t->isConnected() && FD_ISSET(socket, &tcpReadset))
checkJobBuffer();
TCP_Transporter *t = theTCPTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
const NDB_SOCKET_TYPE socket = t->getSocket();
if(is_connected(nodeId)){
if(t->isConnected())
{
if (FD_ISSET(socket, &tcpReadset))
{
const int receiveSize = t->doReceive();
if(receiveSize > 0)
{
Uint32 * ptr;
Uint32 sz = t->getReceiveData(&ptr);
transporter_recv_from(callbackObj, nodeId);
Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]);
t->updateReceiveDataPtr(szUsed);
}
t->doReceive();
}
if (t->hasReceiveData())
{
Uint32 * ptr;
Uint32 sz = t->getReceiveData(&ptr);
Uint32 szUsed = unpack(ptr, sz, nodeId, ioStates[nodeId]);
t->updateReceiveDataPtr(szUsed);
}
}
}
......
......@@ -133,6 +133,7 @@ Cmvmi::~Cmvmi()
#ifdef ERROR_INSERT
NodeBitmask c_error_9000_nodes_mask;
extern Uint32 MAX_RECEIVED_SIGNALS;
#endif
void Cmvmi::execNDB_TAMPER(Signal* signal)
......@@ -162,6 +163,22 @@ void Cmvmi::execNDB_TAMPER(Signal* signal)
kill(getpid(), SIGABRT);
}
#endif
#ifdef ERROR_INSERT
if (signal->theData[0] == 9003)
{
if (MAX_RECEIVED_SIGNALS < 1024)
{
MAX_RECEIVED_SIGNALS = 1024;
}
else
{
MAX_RECEIVED_SIGNALS = 1 + (rand() % 128);
}
ndbout_c("MAX_RECEIVED_SIGNALS: %d", MAX_RECEIVED_SIGNALS);
CLEAR_ERROR_INSERT_VALUE;
}
#endif
}//execNDB_TAMPER()
void Cmvmi::execSET_LOGLEVELORD(Signal* signal)
......
......@@ -5200,9 +5200,9 @@ void Dbacc::execEXPANDCHECK2(Signal* signal)
{
jamEntry();
if(refToBlock(signal->getSendersBlockRef()) == DBLQH){
if(refToBlock(signal->getSendersBlockRef()) == DBLQH)
{
jam();
reenable_expand_after_redo_log_exection_complete(signal);
return;
}
......
......@@ -1588,6 +1588,36 @@ int runTestExecuteAsynch(NDBT_Context* ctx, NDBT_Step* step){
template class Vector<NdbScanOperation*>;
int
runBug28443(NDBT_Context* ctx, NDBT_Step* step)
{
int result = NDBT_OK;
int records = ctx->getNumRecords();
NdbRestarter restarter;
restarter.insertErrorInAllNodes(9003);
for (Uint32 i = 0; i<ctx->getNumLoops(); i++)
{
HugoTransactions hugoTrans(*ctx->getTab());
if (hugoTrans.loadTable(GETNDB(step), records, 2048) != 0)
{
result = NDBT_FAILED;
goto done;
}
if (runClearTable(ctx, step) != 0)
{
result = NDBT_FAILED;
goto done;
}
}
done:
restarter.insertErrorInAllNodes(9003);
return result;
}
NDBT_TESTSUITE(testNdbApi);
TESTCASE("MaxNdb",
......@@ -1689,6 +1719,10 @@ TESTCASE("ExecuteAsynch",
"Check that executeAsync() works (BUG#27495)\n"){
INITIALIZER(runTestExecuteAsynch);
}
TESTCASE("Bug28443",
""){
INITIALIZER(runBug28443);
}
NDBT_TESTSUITE_END(testNdbApi);
int main(int argc, const char** argv){
......
......@@ -704,6 +704,10 @@ max-time: 500
cmd: testNdbApi
args: -n ExecuteAsynch T1
max-time: 1000
cmd: testNdbApi
args: -n BugBug28443
#max-time: 500
#cmd: testInterpreter
#args: T1
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment