Commit 5d19f4eb authored by mysql@mc04.(none)'s avatar mysql@mc04.(none)

Restored old shared memory buffer implementation (used by SCI and SHM).

Improved Default SCI config params
Added missing SCI libraries in ndb_mgm and atrt
Added max of 1024 signals per receive on transporter (to improve
real-time bahaviour and to ensure no job buffer explosion, still
some more work left on avoiding job buffer explosion in the general
case)
parent 5e8d686b
......@@ -21,6 +21,7 @@
#include <TransporterCallback.hpp>
#include <RefConvert.hpp>
#define MAX_RECEIVED_SIGNALS 1024
Uint32
TransporterRegistry::unpack(Uint32 * readPtr,
Uint32 sizeOfData,
......@@ -30,12 +31,15 @@ TransporterRegistry::unpack(Uint32 * readPtr,
LinearSectionPtr ptr[3];
Uint32 usedData = 0;
Uint32 loop_count = 0;
if(state == NoHalt || state == HaltOutput){
while(sizeOfData >= 4 + sizeof(Protocol6)){
while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
(loop_count < MAX_RECEIVED_SIGNALS)) {
Uint32 word1 = readPtr[0];
Uint32 word2 = readPtr[1];
Uint32 word3 = readPtr[2];
loop_count++;
#if 0
if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
......@@ -112,10 +116,12 @@ TransporterRegistry::unpack(Uint32 * readPtr,
} else {
/** state = HaltIO || state == HaltInput */
while(sizeOfData >= 4 + sizeof(Protocol6)){
while ((sizeOfData >= 4 + sizeof(Protocol6)) &&
(loop_count < MAX_RECEIVED_SIGNALS)) {
Uint32 word1 = readPtr[0];
Uint32 word2 = readPtr[1];
Uint32 word3 = readPtr[2];
loop_count++;
#if 0
if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
......@@ -208,12 +214,13 @@ TransporterRegistry::unpack(Uint32 * readPtr,
IOState state) {
static SignalHeader signalHeader;
static LinearSectionPtr ptr[3];
Uint32 loop_count = 0;
if(state == NoHalt || state == HaltOutput){
while(readPtr < eodPtr){
while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
Uint32 word1 = readPtr[0];
Uint32 word2 = readPtr[1];
Uint32 word3 = readPtr[2];
loop_count++;
#if 0
if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
//Do funky stuff
......@@ -280,11 +287,11 @@ TransporterRegistry::unpack(Uint32 * readPtr,
} else {
/** state = HaltIO || state == HaltInput */
while(readPtr < eodPtr){
while ((readPtr < eodPtr) && (loop_count < MAX_RECEIVED_SIGNALS)) {
Uint32 word1 = readPtr[0];
Uint32 word2 = readPtr[1];
Uint32 word3 = readPtr[2];
loop_count++;
#if 0
if(Protocol6::getByteOrder(word1) != MY_OWN_BYTE_ORDER){
//Do funky stuff
......
......@@ -530,7 +530,6 @@ void SCI_Transporter::setupLocalSegment()
Uint32 * localReadIndex =
(Uint32*)m_SourceSegm[m_ActiveAdapterId].mappedMemory;
Uint32 * localWriteIndex = (Uint32*)(localReadIndex+ 1);
Uint32 * localEndWriteIndex = (Uint32*)(localReadIndex + 2);
m_localStatusFlag = (Uint32*)(localReadIndex + 3);
char * localStartOfBuf = (char*)
......@@ -538,7 +537,6 @@ void SCI_Transporter::setupLocalSegment()
* localReadIndex = 0;
* localWriteIndex = 0;
* localEndWriteIndex = 0;
const Uint32 slack = MAX_MESSAGE_SIZE;
......@@ -546,7 +544,6 @@ void SCI_Transporter::setupLocalSegment()
sizeOfBuffer,
slack,
localReadIndex,
localEndWriteIndex,
localWriteIndex);
reader->clear();
......@@ -570,7 +567,6 @@ void SCI_Transporter::setupRemoteSegment()
Uint32 * remoteReadIndex = (Uint32*)segPtr;
Uint32 * remoteWriteIndex = (Uint32*)(segPtr + 1);
Uint32 * remoteEndWriteIndex = (Uint32*) (segPtr + 2);
m_remoteStatusFlag = (Uint32*)(segPtr + 3);
char * remoteStartOfBuf = ( char*)((char*)segPtr+(sharedSize));
......@@ -579,7 +575,6 @@ void SCI_Transporter::setupRemoteSegment()
sizeOfBuffer,
slack,
remoteReadIndex,
remoteEndWriteIndex,
remoteWriteIndex);
writer->clear();
......@@ -598,7 +593,6 @@ void SCI_Transporter::setupRemoteSegment()
Uint32 * remoteReadIndex2 = (Uint32*)segPtr;
Uint32 * remoteWriteIndex2 = (Uint32*) (segPtr + 1);
Uint32 * remoteEndWriteIndex2 = (Uint32*) (segPtr + 2);
m_remoteStatusFlag2 = (Uint32*)(segPtr + 3);
char * remoteStartOfBuf2 = ( char*)((char *)segPtr+sharedSize);
......@@ -613,12 +607,10 @@ void SCI_Transporter::setupRemoteSegment()
sizeOfBuffer,
slack,
remoteReadIndex2,
remoteEndWriteIndex2,
remoteWriteIndex2);
* remoteReadIndex = 0;
* remoteWriteIndex = 0;
* remoteEndWriteIndex = 0;
writer2->clear();
m_TargetSegm[1].writer=writer2;
if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) {
......@@ -918,14 +910,13 @@ SCI_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio)
Uint32 send_buf_size = m_sendBuffer.m_sendBufferSize;
Uint32 curr_data_size = m_sendBuffer.m_dataSize << 2;
Uint32 new_curr_data_size = curr_data_size + lenBytes;
if ((new_curr_data_size >= send_buf_size) ||
if ((curr_data_size >= send_buf_size) ||
(curr_data_size >= sci_buffer_remaining)) {
/**
* The new message will not fit in the send buffer. We need to
* send the send buffer before filling it up with the new
* signal data. If current data size will spill over buffer edge
* we will also send to avoid writing larger than possible in
* buffer.
* we will also send to ensure correct operation.
*/
if (!doSend()) {
/**
......
......@@ -297,13 +297,12 @@ private:
*/
bool sendIsPossible(struct timeval * timeout);
void getReceivePtr(Uint32 ** ptr, Uint32 &size){
size = reader->getReadPtr(* ptr);
void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){
reader->getReadPtr(* ptr, * eod);
}
void updateReceivePtr(Uint32 size){
reader->updateReadPtr(size);
void updateReceivePtr(Uint32 *ptr){
reader->updateReadPtr(ptr);
}
/**
......
......@@ -42,13 +42,11 @@ public:
Uint32 _sizeOfBuffer,
Uint32 _slack,
Uint32 * _readIndex,
Uint32 * _endWriteIndex,
Uint32 * _writeIndex) :
m_startOfBuffer(_startOfBuffer),
m_totalBufferSize(_sizeOfBuffer),
m_bufferSize(_sizeOfBuffer - _slack),
m_sharedReadIndex(_readIndex),
m_sharedEndWriteIndex(_endWriteIndex),
m_sharedWriteIndex(_writeIndex)
{
}
......@@ -68,12 +66,12 @@ public:
* returns ptr - where to start reading
* sz - how much can I read
*/
inline Uint32 getReadPtr(Uint32 * & ptr);
inline void getReadPtr(Uint32 * & ptr, Uint32 * & eod);
/**
* Update read ptr
*/
inline void updateReadPtr(Uint32 size);
inline void updateReadPtr(Uint32 *ptr);
private:
char * const m_startOfBuffer;
......@@ -82,7 +80,6 @@ private:
Uint32 m_readIndex;
Uint32 * m_sharedReadIndex;
Uint32 * m_sharedEndWriteIndex;
Uint32 * m_sharedWriteIndex;
};
......@@ -100,22 +97,19 @@ SHM_Reader::empty() const{
* sz - how much can I read
*/
inline
Uint32
SHM_Reader::getReadPtr(Uint32 * & ptr)
void
SHM_Reader::getReadPtr(Uint32 * & ptr, Uint32 * & eod)
{
Uint32 *eod;
Uint32 tReadIndex = m_readIndex;
Uint32 tWriteIndex = * m_sharedWriteIndex;
Uint32 tEndWriteIndex = * m_sharedEndWriteIndex;
ptr = (Uint32*)&m_startOfBuffer[tReadIndex];
if(tReadIndex <= tWriteIndex){
eod = (Uint32*)&m_startOfBuffer[tWriteIndex];
} else {
eod = (Uint32*)&m_startOfBuffer[tEndWriteIndex];
eod = (Uint32*)&m_startOfBuffer[m_bufferSize];
}
return (Uint32)((char*)eod - (char*)ptr);
}
/**
......@@ -123,10 +117,10 @@ SHM_Reader::getReadPtr(Uint32 * & ptr)
*/
inline
void
SHM_Reader::updateReadPtr(Uint32 size)
SHM_Reader::updateReadPtr(Uint32 *ptr)
{
Uint32 tReadIndex = m_readIndex;
tReadIndex += size;
Uint32 tReadIndex = ((char*)ptr) - m_startOfBuffer;
assert(tReadIndex < m_totalBufferSize);
if(tReadIndex >= m_bufferSize){
......@@ -145,13 +139,11 @@ public:
Uint32 _sizeOfBuffer,
Uint32 _slack,
Uint32 * _readIndex,
Uint32 * _endWriteIndex,
Uint32 * _writeIndex) :
m_startOfBuffer(_startOfBuffer),
m_totalBufferSize(_sizeOfBuffer),
m_bufferSize(_sizeOfBuffer - _slack),
m_sharedReadIndex(_readIndex),
m_sharedEndWriteIndex(_endWriteIndex),
m_sharedWriteIndex(_writeIndex)
{
}
......@@ -176,7 +168,6 @@ private:
Uint32 m_writeIndex;
Uint32 * m_sharedReadIndex;
Uint32 * m_sharedEndWriteIndex;
Uint32 * m_sharedWriteIndex;
};
......@@ -215,7 +206,6 @@ SHM_Writer::updateWritePtr(Uint32 sz){
assert(tWriteIndex < m_totalBufferSize);
if(tWriteIndex >= m_bufferSize){
* m_sharedEndWriteIndex = tWriteIndex;
tWriteIndex = 0;
}
......
......@@ -82,14 +82,12 @@ SHM_Transporter::setupBuffers(){
Uint32 * sharedReadIndex1 = base1;
Uint32 * sharedWriteIndex1 = base1 + 1;
Uint32 * sharedEndWriteIndex1 = base1 + 2;
serverStatusFlag = base1 + 4;
char * startOfBuf1 = shmBuf+sharedSize;
Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize);
Uint32 * sharedReadIndex2 = base2;
Uint32 * sharedWriteIndex2 = base2 + 1;
Uint32 * sharedEndWriteIndex2 = base2 + 2;
clientStatusFlag = base2 + 4;
char * startOfBuf2 = ((char *)base2)+sharedSize;
......@@ -99,23 +97,19 @@ SHM_Transporter::setupBuffers(){
sizeOfBuffer,
slack,
sharedReadIndex1,
sharedEndWriteIndex1,
sharedWriteIndex1);
writer = new SHM_Writer(startOfBuf2,
sizeOfBuffer,
slack,
sharedReadIndex2,
sharedEndWriteIndex2,
sharedWriteIndex2);
* sharedReadIndex1 = 0;
* sharedWriteIndex1 = 0;
* sharedEndWriteIndex1 = 0;
* sharedReadIndex2 = 0;
* sharedWriteIndex2 = 0;
* sharedEndWriteIndex2 = 0;
reader->clear();
writer->clear();
......@@ -148,19 +142,16 @@ SHM_Transporter::setupBuffers(){
sizeOfBuffer,
slack,
sharedReadIndex2,
sharedEndWriteIndex2,
sharedWriteIndex2);
writer = new SHM_Writer(startOfBuf1,
sizeOfBuffer,
slack,
sharedReadIndex1,
sharedEndWriteIndex1,
sharedWriteIndex1);
* sharedReadIndex2 = 0;
* sharedWriteIndex1 = 0;
* sharedEndWriteIndex1 = 0;
reader->clear();
writer->clear();
......
......@@ -61,12 +61,12 @@ public:
writer->updateWritePtr(lenBytes);
}
void getReceivePtr(Uint32 ** ptr, Uint32 sz){
sz = reader->getReadPtr(* ptr);
void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){
reader->getReadPtr(* ptr, * eod);
}
void updateReceivePtr(Uint32 sz){
reader->updateReadPtr(sz);
void updateReceivePtr(Uint32 * ptr){
reader->updateReadPtr(ptr);
}
protected:
......
......@@ -857,11 +857,11 @@ TransporterRegistry::performReceive(){
const NodeId nodeId = t->getRemoteNodeId();
if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected()){
Uint32 * readPtr;
Uint32 * readPtr, * eodPtr;
Uint32 sz = 0;
t->getReceivePtr(&readPtr, sz);
Uint32 szUsed = unpack(readPtr, sz, nodeId, ioStates[nodeId]);
t->updateReceivePtr(szUsed);
t->getReceivePtr(&readPtr, &eodPtr);
Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
t->updateReceivePtr(newPtr);
}
}
}
......@@ -873,11 +873,11 @@ TransporterRegistry::performReceive(){
const NodeId nodeId = t->getRemoteNodeId();
if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected()){
Uint32 * readPtr;
Uint32 * readPtr, * eodPtr;
Uint32 sz = 0;
t->getReceivePtr(&readPtr, sz);
Uint32 szUsed = unpack(readPtr, sz, nodeId, ioStates[nodeId]);
t->updateReceivePtr(szUsed);
t->getReceivePtr(&readPtr, &eodPtr);
Uint32 *newPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
t->updateReceivePtr(newPtr);
}
}
}
......
......@@ -141,7 +141,7 @@ int
FastScheduler::checkDoJob()
{
/*
* Joob buffer overload protetction
* Job buffer overload protetction
* If the job buffer B is filled over a certain limit start
* to execute the signals in the job buffer's
*/
......
......@@ -16,7 +16,7 @@ LDADD_LOC = $(top_builddir)/ndb/src/libndbclient.la \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a \
@TERMCAP_LIB@
@TERMCAP_LIB@ @NDB_SCI_LIBS@
ndb_mgm_LDFLAGS = @ndb_bin_am_ldflags@
......
......@@ -1944,7 +1944,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED,
false,
ConfigInfo::INT,
"2K",
"8K",
"128",
"32K" },
......@@ -1956,7 +1956,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED,
false,
ConfigInfo::INT,
"192K",
"1M",
"64K",
STR_VALUE(MAX_INT_RNIL) },
......
......@@ -16,7 +16,7 @@ LDADD_LOC = $(top_builddir)/ndb/src/mgmclient/CpcClient.o \
$(top_builddir)/ndb/src/libndbclient.la \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a
$(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
wrappersdir=$(prefix)/bin
wrappers_SCRIPTS=atrt-testBackup atrt-mysql-test-run
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment