Commit 72017f8d authored by mikael@mc04.(none)'s avatar mikael@mc04.(none)

logging_ok:

  Logging to logging@openlogging.org accepted
SCI_Transporter.hpp, SCI_Transporter.cpp:
  Major fix-up of SCI Transporter, fixed so that it works with single card, fixed wrap around, added lots of DBUG statements, merged with new transporter handling
TransporterRegistry.cpp:
  Some fixes for wrap around needed plus DBUG handling
TCP_Transporter.hpp, TCP_Transporter.cpp:
  Added DBUG statements
SHM_Transporter.hpp, SHM_Transporter.cpp:
  Fixed SHM Transporter
SHM_Buffer.hpp:
  Fixed SHM Buffer to handle wrap around properly
IPCConfig.cpp:
  Fixed up config of SCI
SocketServer.cpp:
  Added DBUG support for SocketServer threads
ConfigInfo.cpp:
  Config changes for SCI
TransporterDefinitions.hpp, mgmapi_config_parameters.h:
  SCI fixes
Makefile.am, type_ndbapitools.mk.am, type_ndbapitest.mk.am:
  Added SCI library path to Makefiles
configure.in:
  Fixed small bug with shared mem and sci together in configure
acinclude.m4:
  Added possibility of providing SCI library path in confgure
parent b7bbb217
......@@ -100,6 +100,7 @@ miguel@hegel.txg.br
miguel@light.
miguel@light.local
miguel@sartre.local
mikael@mc04.(none)
mikron@c-fb0ae253.1238-1-64736c10.cust.bredbandsbolaget.se
mikron@mikael-ronstr-ms-dator.local
mmatthew@markslaptop.
......@@ -158,6 +159,7 @@ ram@ram.(none)
ranger@regul.home.lan
rburnett@build.mysql.com
root@home.(none)
root@mc04.(none)
root@x3.internalnet
salle@banica.(none)
salle@geopard.(none)
......
......@@ -1551,16 +1551,43 @@ dnl Sets HAVE_NDBCLUSTER_DB if --with-ndbcluster is used
dnl ---------------------------------------------------------------------------
AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [
AC_ARG_WITH([ndb-sci],
AC_HELP_STRING([--with-ndb-sci=DIR],
[Provide MySQL with a custom location of
sci library. Given DIR, sci library is
assumed to be in $DIR/lib and header files
in $DIR/include.]),
[mysql_sci_dir=${withval}],
[mysql_sci_dir=""])
case "$mysql_sci_dir" in
"no" )
have_ndb_sci=no
AC_MSG_RESULT([-- not including sci transporter])
;;
* )
if test -f "$mysql_sci_dir/lib/libsisci.a" -a \
-f "$mysql_sci_dir/include/sisci_api.h"; then
NDB_SCI_INCLUDES="-I$mysql_sci_dir/include"
NDB_SCI_LIBS="-L$mysql_sci_dir/lib -lsisci"
AC_MSG_RESULT([-- including sci transporter])
AC_DEFINE([NDB_SCI_TRANSPORTER], [1],
[Including Ndb Cluster DB sci transporter])
AC_SUBST(NDB_SCI_INCLUDES)
AC_SUBST(NDB_SCI_LIBS)
have_ndb_sci="yes"
AC_MSG_RESULT([found sci transporter in $mysql_sci_dir/{include, lib}])
else
AC_MSG_RESULT([could not find sci transporter in $mysql_sci_dir/{include, lib}])
fi
;;
esac
AC_ARG_WITH([ndb-shm],
[
--with-ndb-shm Include the NDB Cluster shared memory transporter],
[ndb_shm="$withval"],
[ndb_shm=no])
AC_ARG_WITH([ndb-sci],
[
--with-ndb-sci Include the NDB Cluster sci transporter],
[ndb_sci="$withval"],
[ndb_sci=no])
AC_ARG_WITH([ndb-test],
[
--with-ndb-test Include the NDB Cluster ndbapi test programs],
......@@ -1593,19 +1620,6 @@ AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [
;;
esac
have_ndb_sci=no
case "$ndb_sci" in
yes )
AC_MSG_RESULT([-- including sci transporter])
AC_DEFINE([NDB_SCI_TRANSPORTER], [1],
[Including Ndb Cluster DB sci transporter])
have_ndb_sci="yes"
;;
* )
AC_MSG_RESULT([-- not including sci transporter])
;;
esac
have_ndb_test=no
case "$ndb_test" in
yes )
......
......@@ -3024,11 +3024,11 @@ AC_SUBST([ndb_port_base])
ndb_transporter_opt_objs=""
if test X"$have_ndb_shm" = Xyes
then
ndb_transporter_opt_objs="$(ndb_transporter_opt_objs) SHM_Transporter.lo SHM_Transporter.unix.lo"
ndb_transporter_opt_objs="$ndb_transporter_opt_objs SHM_Transporter.lo SHM_Transporter.unix.lo"
fi
if test X"$have_ndb_sci" = Xyes
then
ndb_transporter_opt_objs="$(ndb_transporter_opt_objs) SCI_Transporter.lo"
ndb_transporter_opt_objs="$ndb_transporter_opt_objs SCI_Transporter.lo"
fi
AC_SUBST([ndb_transporter_opt_objs])
......
......@@ -3,7 +3,7 @@ LDADD += $(top_builddir)/ndb/test/src/libNDBT.a \
$(top_builddir)/ndb/src/libndbclient.la \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a
$(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
INCLUDES += -I$(srcdir) -I$(top_srcdir)/include \
-I$(top_srcdir)/ndb/include \
......
......@@ -3,7 +3,7 @@ LDADD += \
$(top_builddir)/ndb/src/libndbclient.la \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a
$(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
INCLUDES += -I$(srcdir) -I$(top_srcdir)/include \
-I$(top_srcdir)/ndb/include \
......
......@@ -117,16 +117,14 @@
#define CFG_SHM_KEY 502
#define CFG_SHM_BUFFER_MEM 503
#define CFG_SCI_ID_0 550
#define CFG_SCI_ID_1 551
#define CFG_SCI_SEND_LIMIT 552
#define CFG_SCI_BUFFER_MEM 553
#define CFG_SCI_NODE1_ADAPTERS 554
#define CFG_SCI_NODE1_ADAPTER0 555
#define CFG_SCI_NODE1_ADAPTER1 556
#define CFG_SCI_NODE2_ADAPTERS 554
#define CFG_SCI_NODE2_ADAPTER0 555
#define CFG_SCI_NODE2_ADAPTER1 556
#define CFG_SCI_HOST1_ID_0 550
#define CFG_SCI_HOST1_ID_1 551
#define CFG_SCI_HOST2_ID_0 552
#define CFG_SCI_HOST2_ID_1 553
#define CFG_SCI_HOSTNAME_1 554
#define CFG_SCI_HOSTNAME_2 555
#define CFG_SCI_SEND_LIMIT 556
#define CFG_SCI_BUFFER_MEM 557
#define CFG_OSE_HOSTNAME_1 600
#define CFG_OSE_HOSTNAME_2 601
......
......@@ -59,8 +59,6 @@ struct TCP_TransporterConfiguration {
NodeId localNodeId;
Uint32 sendBufferSize; // Size of SendBuffer of priority B
Uint32 maxReceiveSize; // Maximum no of bytes to receive
Uint32 byteOrder;
bool compression;
bool checksum;
bool signalId;
};
......@@ -72,10 +70,8 @@ struct SHM_TransporterConfiguration {
Uint32 port;
NodeId remoteNodeId;
NodeId localNodeId;
bool compression;
bool checksum;
bool signalId;
int byteOrder;
Uint32 shmKey;
Uint32 shmSize;
......@@ -89,10 +85,8 @@ struct OSE_TransporterConfiguration {
const char *localHostName;
NodeId remoteNodeId;
NodeId localNodeId;
bool compression;
bool checksum;
bool signalId;
int byteOrder;
Uint32 prioASignalSize;
Uint32 prioBSignalSize;
......@@ -103,20 +97,20 @@ struct OSE_TransporterConfiguration {
* SCI Transporter Configuration
*/
struct SCI_TransporterConfiguration {
const char *remoteHostName;
const char *localHostName;
Uint32 port;
Uint32 sendLimit; // Packet size
Uint32 bufferSize; // Buffer size
Uint32 nLocalAdapters; // 1 or 2, the number of adapters on local host
Uint32 nRemoteAdapters;
Uint32 remoteSciNodeId0; // SCInodeId for adapter 1
Uint32 remoteSciNodeId1; // SCInodeId for adapter 2
NodeId localNodeId; // Local node Id
NodeId remoteNodeId; // Remote node Id
Uint32 byteOrder;
bool compression;
bool checksum;
bool signalId;
......
......@@ -133,7 +133,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
Uint32 compression;
Uint32 checksum;
if(!tmp->get("SendSignalId", &sendSignalId)) continue;
if(!tmp->get("Compression", &compression)) continue;
if(!tmp->get("Checksum", &checksum)) continue;
const char * type;
......@@ -143,8 +142,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
SHM_TransporterConfiguration conf;
conf.localNodeId = the_ownId;
conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2);
conf.byteOrder = 0;
conf.compression = compression;
conf.checksum = checksum;
conf.signalId = sendSignalId;
......@@ -164,8 +161,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
SCI_TransporterConfiguration conf;
conf.localNodeId = the_ownId;
conf.remoteNodeId = (nodeId1 != the_ownId ? nodeId1 : nodeId2);
conf.byteOrder = 0;
conf.compression = compression;
conf.checksum = checksum;
conf.signalId = sendSignalId;
......@@ -174,18 +169,16 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
if(the_ownId == nodeId1){
if(!tmp->get("Node1_NoOfAdapters", &conf.nLocalAdapters)) continue;
if(!tmp->get("Node2_NoOfAdapters", &conf.nRemoteAdapters)) continue;
if(!tmp->get("Node2_Adapter", 0, &conf.remoteSciNodeId0)) continue;
if(conf.nRemoteAdapters > 1){
if(conf.nLocalAdapters > 1){
if(!tmp->get("Node2_Adapter", 1, &conf.remoteSciNodeId1)) continue;
}
} else {
if(!tmp->get("Node2_NoOfAdapters", &conf.nLocalAdapters)) continue;
if(!tmp->get("Node1_NoOfAdapters", &conf.nRemoteAdapters)) continue;
if(!tmp->get("Node1_Adapter", 0, &conf.remoteSciNodeId0)) continue;
if(conf.nRemoteAdapters > 1){
if(conf.nLocalAdapters > 1){
if(!tmp->get("Node1_Adapter", 1, &conf.remoteSciNodeId1)) continue;
}
}
......@@ -243,8 +236,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
conf.localHostName = ownHostName;
conf.remoteNodeId = remoteNodeId;
conf.localNodeId = ownNodeId;
conf.byteOrder = 0;
conf.compression = compression;
conf.checksum = checksum;
conf.signalId = sendSignalId;
......@@ -270,8 +261,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
conf.localHostName = ownHostName;
conf.remoteNodeId = remoteNodeId;
conf.localNodeId = ownNodeId;
conf.byteOrder = 0;
conf.compression = compression;
conf.checksum = checksum;
conf.signalId = sendSignalId;
......@@ -344,6 +333,7 @@ Uint32
IPCConfig::configureTransporters(Uint32 nodeId,
const class ndb_mgm_configuration & config,
class TransporterRegistry & tr){
DBUG_ENTER("IPCConfig::configureTransporters");
Uint32 noOfTransportersCreated= 0, server_port= 0;
ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
......@@ -374,14 +364,13 @@ IPCConfig::configureTransporters(Uint32 nodeId,
}
server_port= tmp_server_port;
}
DBUG_PRINT("info", ("Transporter between this node %d and node %d using port %d, signalId %d, checksum %d",
nodeId, remoteNodeId, tmp_server_port, sendSignalId, checksum));
switch(type){
case CONNECTION_TYPE_SHM:{
SHM_TransporterConfiguration conf;
conf.localNodeId = nodeId;
conf.remoteNodeId = remoteNodeId;
conf.byteOrder = 0;
conf.compression = 0;
conf.checksum = checksum;
conf.signalId = sendSignalId;
......@@ -391,45 +380,60 @@ IPCConfig::configureTransporters(Uint32 nodeId,
conf.port= tmp_server_port;
if(!tr.createTransporter(&conf)){
DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d",
conf.localNodeId, conf.remoteNodeId));
ndbout << "Failed to create SHM Transporter from: "
<< conf.localNodeId << " to: " << conf.remoteNodeId << endl;
} else {
noOfTransportersCreated++;
}
DBUG_PRINT("info", ("Created SHM Transporter using shmkey %d, buf size = %d",
conf.shmKey, conf.shmSize));
break;
}
case CONNECTION_TYPE_SCI:{
SCI_TransporterConfiguration conf;
const char * host1, * host2;
conf.localNodeId = nodeId;
conf.remoteNodeId = remoteNodeId;
conf.byteOrder = 0;
conf.compression = 0;
conf.checksum = checksum;
conf.signalId = sendSignalId;
conf.port= tmp_server_port;
if(iter.get(CFG_SCI_HOSTNAME_1, &host1)) break;
if(iter.get(CFG_SCI_HOSTNAME_2, &host2)) break;
conf.localHostName = (nodeId == nodeId1 ? host1 : host2);
conf.remoteHostName = (nodeId == nodeId1 ? host2 : host1);
if(iter.get(CFG_SCI_SEND_LIMIT, &conf.sendLimit)) break;
if(iter.get(CFG_SCI_BUFFER_MEM, &conf.bufferSize)) break;
if(nodeId == nodeId1){
if(iter.get(CFG_SCI_NODE1_ADAPTERS, &conf.nLocalAdapters)) break;
if(iter.get(CFG_SCI_NODE2_ADAPTERS, &conf.nRemoteAdapters)) break;
if(iter.get(CFG_SCI_NODE2_ADAPTER0, &conf.remoteSciNodeId0)) break;
if(conf.nRemoteAdapters > 1){
if(iter.get(CFG_SCI_NODE2_ADAPTER1, &conf.remoteSciNodeId1)) break;
}
if (nodeId == nodeId1) {
if(iter.get(CFG_SCI_HOST2_ID_0, &conf.remoteSciNodeId0)) break;
if(iter.get(CFG_SCI_HOST2_ID_1, &conf.remoteSciNodeId1)) break;
} else {
if(iter.get(CFG_SCI_NODE2_ADAPTERS, &conf.nLocalAdapters)) break;
if(iter.get(CFG_SCI_NODE1_ADAPTERS, &conf.nRemoteAdapters)) break;
if(iter.get(CFG_SCI_NODE1_ADAPTER0, &conf.remoteSciNodeId0)) break;
if(conf.nRemoteAdapters > 1){
if(iter.get(CFG_SCI_NODE1_ADAPTER1, &conf.remoteSciNodeId1)) break;
}
if(iter.get(CFG_SCI_HOST1_ID_0, &conf.remoteSciNodeId0)) break;
if(iter.get(CFG_SCI_HOST1_ID_1, &conf.remoteSciNodeId1)) break;
}
if(!tr.createTransporter(&conf)){
if (conf.remoteSciNodeId1 == 0) {
conf.nLocalAdapters = 1;
} else {
conf.nLocalAdapters = 2;
}
if(!tr.createTransporter(&conf)){
DBUG_PRINT("error", ("Failed to create SCI Transporter from %d to %d",
conf.localNodeId, conf.remoteNodeId));
ndbout << "Failed to create SCI Transporter from: "
<< conf.localNodeId << " to: " << conf.remoteNodeId << endl;
} else {
DBUG_PRINT("info", ("Created SCI Transporter: Adapters = %d, remote SCI node id %d",
conf.nLocalAdapters, conf.remoteSciNodeId0));
DBUG_PRINT("info", ("Host 1 = %s, Host 2 = %s, sendLimit = %d, buf size = %d",
conf.localHostName, conf.remoteHostName, conf.sendLimit, conf.bufferSize));
if (conf.nLocalAdapters > 1) {
DBUG_PRINT("info", ("Fault-tolerant with 2 Remote Adapters, second remote SCI node id = %d",
conf.remoteSciNodeId1));
}
noOfTransportersCreated++;
continue;
}
......@@ -457,8 +461,6 @@ IPCConfig::configureTransporters(Uint32 nodeId,
conf.remoteNodeId = remoteNodeId;
conf.localHostName = (nodeId == nodeId1 ? host1 : host2);
conf.remoteHostName = (nodeId == nodeId1 ? host2 : host1);
conf.byteOrder = 0;
conf.compression = 0;
conf.checksum = checksum;
conf.signalId = sendSignalId;
......@@ -468,6 +470,9 @@ IPCConfig::configureTransporters(Uint32 nodeId,
} else {
noOfTransportersCreated++;
}
DBUG_PRINT("info", ("Created TCP Transporter: sendBufferSize = %d, maxReceiveSize = %d",
conf.sendBufferSize, conf.maxReceiveSize));
break;
case CONNECTION_TYPE_OSE:{
OSE_TransporterConfiguration conf;
......@@ -483,8 +488,6 @@ IPCConfig::configureTransporters(Uint32 nodeId,
conf.remoteNodeId = remoteNodeId;
conf.localHostName = (nodeId == nodeId1 ? host1 : host2);
conf.remoteHostName = (nodeId == nodeId1 ? host2 : host1);
conf.byteOrder = 0;
conf.compression = 0;
conf.checksum = checksum;
conf.signalId = sendSignalId;
......@@ -505,6 +508,6 @@ IPCConfig::configureTransporters(Uint32 nodeId,
tr.m_service_port= server_port;
return noOfTransportersCreated;
DBUG_RETURN(noOfTransportersCreated);
}
......@@ -13,7 +13,7 @@ EXTRA_libtransporter_la_SOURCES = SHM_Transporter.cpp SHM_Transporter.unix.cpp S
libtransporter_la_LIBADD = @ndb_transporter_opt_objs@
libtransporter_la_DEPENDENCIES = @ndb_transporter_opt_objs@
INCLUDES_LOC = -I$(top_srcdir)/ndb/include/kernel -I$(top_srcdir)/ndb/include/transporter
INCLUDES_LOC = -I$(top_srcdir)/ndb/include/kernel -I$(top_srcdir)/ndb/include/transporter @NDB_SCI_INCLUDES@
include $(top_srcdir)/ndb/config/common.mk.am
include $(top_srcdir)/ndb/config/type_util.mk.am
......
......@@ -24,23 +24,30 @@
#include "TransporterInternalDefinitions.hpp"
#include <TransporterCallback.hpp>
#include <InputStream.hpp>
#include <OutputStream.hpp>
#define FLAGS 0
SCI_Transporter::SCI_Transporter(Uint32 packetSize,
#define DEBUG_TRANSPORTER
SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
int r_port,
Uint32 packetSize,
Uint32 bufferSize,
Uint32 nAdapters,
Uint16 remoteSciNodeId0,
Uint16 remoteSciNodeId1,
NodeId _localNodeId,
NodeId _remoteNodeId,
int byte_order,
bool compr,
bool chksm,
bool signalId,
Uint32 reportFreq) :
Transporter(_localNodeId, _remoteNodeId, byte_order, compr, chksm, signalId)
{
Transporter(t_reg, lHostName, rHostName, r_port, _localNodeId,
_remoteNodeId, 0, false, chksm, signalId)
{
DBUG_ENTER("SCI_Transporter::SCI_Transporter");
m_PacketSize = (packetSize + 3)/4 ;
m_BufferSize = bufferSize;
m_sendBuffer.m_buffer = NULL;
......@@ -56,10 +63,6 @@ SCI_Transporter::SCI_Transporter(Uint32 packetSize,
m_initLocal=false;
m_remoteNodes= new Uint16[m_numberOfRemoteNodes];
if(m_remoteNodes == NULL) {
//DO WHAT??
}
m_swapCounter=0;
m_failCounter=0;
m_remoteNodes[0]=remoteSciNodeId0;
......@@ -94,20 +97,19 @@ SCI_Transporter::SCI_Transporter(Uint32 packetSize,
i4096=0;
i4097=0;
#endif
DBUG_VOID_RETURN;
}
void SCI_Transporter::disconnectImpl()
{
DBUG_ENTER("SCI_Transporter::disconnectImpl");
sci_error_t err;
if(m_mapped){
setDisconnect();
#ifdef DEBUG_TRANSPORTER
ndbout << "DisconnectImpl " << getConnectionStatus() << endl;
ndbout << "remote node " << remoteNodeId << endl;
#endif
DBUG_PRINT("info", ("connect status = %d, remote node = %d",
(int)getConnectionStatus(), remoteNodeId));
disconnectRemote();
disconnectLocal();
}
......@@ -124,65 +126,56 @@ void SCI_Transporter::disconnectImpl()
SCIClose(sciAdapters[i].scidesc, FLAGS, &err);
if(err != SCI_ERR_OK) {
reportError(callbackObj, localNodeId, TE_SCI_UNABLE_TO_CLOSE_CHANNEL);
#ifdef DEBUG_TRANSPORTER
fprintf(stderr,
"\nCannot close channel to the driver. Error code 0x%x",
err);
#endif
}
report_error(TE_SCI_UNABLE_TO_CLOSE_CHANNEL);
DBUG_PRINT("error", ("Cannot close channel to the driver. Error code 0x%x",
err));
}
}
}
m_sciinit=false;
#ifdef DEBUG_TRANSPORTER
ndbout << "total: " << i1024+ i10242048 + i2048+i2049 << endl;
ndbout << "total: " << i1024+ i10242048 + i2048+i2049 << endl;
ndbout << "<1024: " << i1024 << endl;
ndbout << "1024-2047: " << i10242048 << endl;
ndbout << "==2048: " << i2048 << endl;
ndbout << "2049-4096: " << i20484096 << endl;
ndbout << "==4096: " << i4096 << endl;
ndbout << ">4096: " << i4097 << endl;
#endif
DBUG_VOID_RETURN;
}
bool SCI_Transporter::initTransporter() {
if(m_BufferSize < (2*MAX_MESSAGE_SIZE)){
m_BufferSize = 2 * MAX_MESSAGE_SIZE;
DBUG_ENTER("SCI_Transporter::initTransporter");
if(m_BufferSize < (2*MAX_MESSAGE_SIZE + 4096)){
m_BufferSize = 2 * MAX_MESSAGE_SIZE + 4096;
}
// Allocate buffers for sending
Uint32 sz = 0;
if(m_BufferSize < (m_PacketSize * 4)){
sz = m_BufferSize + MAX_MESSAGE_SIZE;
} else {
/**
* 3 packages
*/
sz = (m_PacketSize * 4) * 3 + MAX_MESSAGE_SIZE;
}
// Allocate buffers for sending, send buffer size plus 2048 bytes for avoiding
// the need to send twice when a large message comes around. Send buffer size is
// measured in words.
Uint32 sz = 4 * m_PacketSize + MAX_MESSAGE_SIZE;;
m_sendBuffer.m_bufferSize = 4 * ((sz + 3) / 4);
m_sendBuffer.m_buffer = new Uint32[m_sendBuffer.m_bufferSize / 4];
m_sendBuffer.m_sendBufferSize = 4 * ((sz + 3) / 4);
m_sendBuffer.m_buffer = new Uint32[m_sendBuffer.m_sendBufferSize / 4];
m_sendBuffer.m_dataSize = 0;
DBUG_PRINT("info", ("Created SCI Send Buffer with buffer size %d and packet size %d",
m_sendBuffer.m_sendBufferSize, m_PacketSize * 4));
if(!getLinkStatus(m_ActiveAdapterId) ||
!getLinkStatus(m_StandbyAdapterId)) {
#ifdef DEBUG_TRANSPORTER
ndbout << "The link is not fully operational. " << endl;
ndbout << "Check the cables and the switches" << endl;
#endif
(m_adapters > 1 &&
!getLinkStatus(m_StandbyAdapterId))) {
DBUG_PRINT("error", ("The link is not fully operational. Check the cables and the switches"));
//reportDisconnect(remoteNodeId, 0);
//doDisconnect();
//NDB should terminate
reportError(callbackObj, localNodeId, TE_SCI_LINK_ERROR);
return false;
report_error(TE_SCI_LINK_ERROR);
DBUG_RETURN(false);
}
return true;
DBUG_RETURN(true);
} // initTransporter()
......@@ -218,10 +211,8 @@ bool SCI_Transporter::getLinkStatus(Uint32 adapterNo)
SCIQuery(SCI_Q_ADAPTER,(void*)(&queryAdapter),(Uint32)NULL,&error);
if(error != SCI_ERR_OK) {
#ifdef DEBUG_TRANSPORTER
ndbout << "error querying adapter " << endl;
#endif
return false;
DBUG_PRINT("error", ("error %d querying adapter", error));
return false;
}
if(linkstatus<=0)
return false;
......@@ -231,6 +222,7 @@ bool SCI_Transporter::getLinkStatus(Uint32 adapterNo)
sci_error_t SCI_Transporter::initLocalSegment() {
DBUG_ENTER("SCI_Transporter::initLocalSegment");
Uint32 segmentSize = m_BufferSize;
Uint32 offset = 0;
sci_error_t err;
......@@ -238,16 +230,12 @@ sci_error_t SCI_Transporter::initLocalSegment() {
for(Uint32 i=0; i<m_adapters ; i++) {
SCIOpen(&(sciAdapters[i].scidesc), FLAGS, &err);
sciAdapters[i].localSciNodeId=getLocalNodeId(i);
#ifdef DEBUG_TRANSPORTER
ndbout_c("SCInode iD %d adapter %d\n",
sciAdapters[i].localSciNodeId, i);
#endif
DBUG_PRINT("info", ("SCInode iD %d adapter %d\n",
sciAdapters[i].localSciNodeId, i));
if(err != SCI_ERR_OK) {
#ifdef DEBUG_TRANSPORTER
ndbout_c("\nCannot open an SCI virtual device. Error code 0x%x",
err);
#endif
return err;
DBUG_PRINT("error", ("Cannot open an SCI virtual device. Error code 0x%x",
err));
DBUG_RETURN(err);
}
}
}
......@@ -264,12 +252,11 @@ sci_error_t SCI_Transporter::initLocalSegment() {
&err);
if(err != SCI_ERR_OK) {
return err;
DBUG_PRINT("error", ("Error creating segment, err = 0x%x", err));
DBUG_RETURN(err);
} else {
#ifdef DEBUG_TRANSPORTER
ndbout << "created segment id : "
<< hostSegmentId(localNodeId, remoteNodeId) << endl;
#endif
DBUG_PRINT("info", ("created segment id : %d",
hostSegmentId(localNodeId, remoteNodeId)));
}
/** Prepare the segment*/
......@@ -280,11 +267,9 @@ sci_error_t SCI_Transporter::initLocalSegment() {
&err);
if(err != SCI_ERR_OK) {
#ifdef DEBUG_TRANSPORTER
ndbout_c("Local Segment is not accessible by an SCI adapter.");
ndbout_c("Error code 0x%x\n", err);
#endif
return err;
DBUG_PRINT("error", ("Local Segment is not accessible by an SCI adapter. Error code 0x%x\n",
err));
DBUG_RETURN(err);
}
}
......@@ -301,14 +286,10 @@ sci_error_t SCI_Transporter::initLocalSegment() {
if(err != SCI_ERR_OK) {
#ifdef DEBUG_TRANSPORTER
fprintf(stderr, "\nCannot map area of size %d. Error code 0x%x",
segmentSize,err);
ndbout << "initLocalSegment does a disConnect" << endl;
#endif
DBUG_PRINT("error", ("Cannot map area of size %d. Error code 0x%x",
segmentSize,err));
doDisconnect();
return err;
DBUG_RETURN(err);
}
......@@ -320,18 +301,16 @@ sci_error_t SCI_Transporter::initLocalSegment() {
&err);
if(err != SCI_ERR_OK) {
#ifdef DEBUG_TRANSPORTER
ndbout_c("\nLocal Segment is not available for remote connections.");
ndbout_c("Error code 0x%x\n", err);
#endif
return err;
DBUG_PRINT("error", ("Local Segment is not available for remote connections. Error code 0x%x\n",
err));
DBUG_RETURN(err);
}
}
setupLocalSegment();
return err;
DBUG_RETURN(err);
} // initLocalSegment()
......@@ -345,7 +324,7 @@ bool SCI_Transporter::doSend() {
Uint32 retry=0;
const char * const sendPtr = (char*)m_sendBuffer.m_buffer;
const Uint32 sizeToSend = m_sendBuffer.m_dataSize;
const Uint32 sizeToSend = 4 * m_sendBuffer.m_dataSize; //Convert to number of bytes
if (sizeToSend > 0){
#ifdef DEBUG_TRANSPORTER
......@@ -363,15 +342,19 @@ bool SCI_Transporter::doSend() {
i4097++;
#endif
if(startSequence(m_ActiveAdapterId)!=SCI_ERR_OK) {
#ifdef DEBUG_TRANSPORTER
ndbout << "Start sequence failed" << endl;
#endif
reportError(callbackObj, remoteNodeId, TE_SCI_UNABLE_TO_START_SEQUENCE);
DBUG_PRINT("error", ("Start sequence failed"));
report_error(TE_SCI_UNABLE_TO_START_SEQUENCE);
return false;
}
tryagain:
tryagain:
retry++;
if (retry > 3) {
DBUG_PRINT("error", ("SCI Transfer failed"));
report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
return false;
}
Uint32 * insertPtr = (Uint32 *)
(m_TargetSegm[m_ActiveAdapterId].writer)->getWritePtr(sizeToSend);
......@@ -390,44 +373,37 @@ bool SCI_Transporter::doSend() {
&err);
if (err != SCI_ERR_OK) {
if(err == SCI_ERR_OUT_OF_RANGE) {
#ifdef DEBUG_TRANSPORTER
ndbout << "Data transfer : out of range error \n" << endl;
#endif
DBUG_PRINT("error", ("Data transfer : out of range error"));
goto tryagain;
}
if(err == SCI_ERR_SIZE_ALIGNMENT) {
#ifdef DEBUG_TRANSPORTER
ndbout << "Data transfer : aligne\n" << endl;
#endif
DBUG_PRINT("error", ("Data transfer : alignment error"));
DBUG_PRINT("info", ("sendPtr 0x%x, sizeToSend = %d", sendPtr, sizeToSend));
goto tryagain;
}
if(err == SCI_ERR_OFFSET_ALIGNMENT) {
#ifdef DEBUG_TRANSPORTER
ndbout << "Data transfer : offset alignment\n" << endl;
#endif
DBUG_PRINT("error", ("Data transfer : offset alignment"));
goto tryagain;
}
}
if(err == SCI_ERR_TRANSFER_FAILED) {
//(m_TargetSegm[m_StandbyAdapterId].writer)->heavyLock();
if(getLinkStatus(m_ActiveAdapterId)) {
retry++;
if(retry>3) {
reportError(callbackObj,
remoteNodeId, TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
return false;
}
goto tryagain;
}
if (m_adapters == 1) {
DBUG_PRINT("error", ("SCI Transfer failed"));
report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
return false;
}
m_failCounter++;
Uint32 temp=m_ActiveAdapterId;
switch(m_swapCounter) {
case 0:
/**swap from active (0) to standby (1)*/
if(getLinkStatus(m_StandbyAdapterId)) {
#ifdef DEBUG_TRANSPORTER
ndbout << "Swapping from 0 to 1 " << endl;
#endif
DBUG_PRINT("error", ("Swapping from adapter 0 to 1"));
failoverShmWriter();
SCIStoreBarrier(m_TargetSegm[m_StandbyAdapterId].sequence,0);
m_ActiveAdapterId=m_StandbyAdapterId;
......@@ -436,26 +412,21 @@ bool SCI_Transporter::doSend() {
FLAGS,
&err);
if(err!=SCI_ERR_OK) {
reportError(callbackObj,
remoteNodeId, TE_SCI_UNABLE_TO_REMOVE_SEQUENCE);
report_error(TE_SCI_UNABLE_TO_REMOVE_SEQUENCE);
DBUG_PRINT("error", ("Unable to remove sequence"));
return false;
}
if(startSequence(m_ActiveAdapterId)!=SCI_ERR_OK) {
#ifdef DEBUG_TRANSPORTER
ndbout << "Start sequence failed" << endl;
#endif
reportError(callbackObj,
remoteNodeId, TE_SCI_UNABLE_TO_START_SEQUENCE);
DBUG_PRINT("error", ("Start sequence failed"));
report_error(TE_SCI_UNABLE_TO_START_SEQUENCE);
return false;
}
m_swapCounter++;
#ifdef DEBUG_TRANSPORTER
ndbout << "failover complete.." << endl;
#endif
DBUG_PRINT("info", ("failover complete"));
goto tryagain;
} else {
reportError(callbackObj,
remoteNodeId, TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
DBUG_PRINT("error", ("SCI Transfer failed"));
return false;
}
return false;
......@@ -468,20 +439,15 @@ bool SCI_Transporter::doSend() {
failoverShmWriter();
m_ActiveAdapterId=m_StandbyAdapterId;
m_StandbyAdapterId=temp;
#ifdef DEBUG_TRANSPORTER
ndbout << "Swapping from 1 to 0 " << endl;
#endif
DBUG_PRINT("info", ("Swapping from 1 to 0"));
if(createSequence(m_ActiveAdapterId)!=SCI_ERR_OK) {
reportError(callbackObj,
remoteNodeId, TE_SCI_UNABLE_TO_CREATE_SEQUENCE);
DBUG_PRINT("error", ("Unable to create sequence"));
report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE);
return false;
}
if(startSequence(m_ActiveAdapterId)!=SCI_ERR_OK) {
#ifdef DEBUG_TRANSPORTER
ndbout << "startSequence failed... disconnecting" << endl;
#endif
reportError(callbackObj,
remoteNodeId, TE_SCI_UNABLE_TO_START_SEQUENCE);
DBUG_PRINT("error", ("startSequence failed... disconnecting"));
report_error(TE_SCI_UNABLE_TO_START_SEQUENCE);
return false;
}
......@@ -489,37 +455,36 @@ bool SCI_Transporter::doSend() {
, FLAGS,
&err);
if(err!=SCI_ERR_OK) {
reportError(callbackObj,
remoteNodeId, TE_SCI_UNABLE_TO_REMOVE_SEQUENCE);
DBUG_PRINT("error", ("Unable to remove sequence"));
report_error(TE_SCI_UNABLE_TO_REMOVE_SEQUENCE);
return false;
}
if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) {
reportError(callbackObj,
remoteNodeId, TE_SCI_UNABLE_TO_CREATE_SEQUENCE);
DBUG_PRINT("error", ("Unable to create sequence on standby"));
report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE);
return false;
}
m_swapCounter=0;
#ifdef DEBUG_TRANSPORTER
ndbout << "failover complete.." << endl;
#endif
DBUG_PRINT("info", ("failover complete.."));
goto tryagain;
} else {
reportError(callbackObj,
remoteNodeId, TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
DBUG_PRINT("error", ("Unrecoverable data transfer error"));
report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
return false;
}
break;
default:
reportError(callbackObj,
remoteNodeId, TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
DBUG_PRINT("error", ("Unrecoverable data transfer error"));
report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
return false;
break;
}
}
} else {
SHM_Writer * writer = (m_TargetSegm[m_ActiveAdapterId].writer);
writer->updateWritePtr(sizeToSend);
......@@ -535,13 +500,10 @@ bool SCI_Transporter::doSend() {
/**
* If we end up here, the SCI segment is full.
*/
#ifdef DEBUG_TRANSPORTER
ndbout << "the segment is full for some reason" << endl;
#endif
DBUG_PRINT("error", ("the segment is full for some reason"));
return false;
} //if
}
return true;
} // doSend()
......@@ -557,11 +519,8 @@ void SCI_Transporter::failoverShmWriter() {
void SCI_Transporter::setupLocalSegment()
{
DBUG_ENTER("SCI_Transporter::setupLocalSegment");
Uint32 sharedSize = 0;
sharedSize += 16; //SHM_Reader::getSharedSize();
sharedSize += 16; //SHM_Writer::getSharedSize();
sharedSize += 32; //SHM_Writer::getSharedSize();
sharedSize =4096; //start of the buffer is page aligend
Uint32 sizeOfBuffer = m_BufferSize;
......@@ -570,207 +529,265 @@ void SCI_Transporter::setupLocalSegment()
Uint32 * localReadIndex =
(Uint32*)m_SourceSegm[m_ActiveAdapterId].mappedMemory;
Uint32 * localWriteIndex =
(Uint32*)(localReadIndex+ 1);
Uint32 * localEndOfDataIndex = (Uint32*)
(localReadIndex + 2);
Uint32 * localWriteIndex = (Uint32*)(localReadIndex+ 1);
Uint32 * localEndWriteIndex = (Uint32*)(localReadIndex + 2);
m_localStatusFlag = (Uint32*)(localReadIndex + 3);
Uint32 * sharedLockIndex = (Uint32*)
(localReadIndex + 4);
Uint32 * sharedHeavyLock = (Uint32*)
(localReadIndex + 5);
char * localStartOfBuf = (char*)
((char*)m_SourceSegm[m_ActiveAdapterId].mappedMemory+sharedSize);
* localReadIndex = * localWriteIndex = 0;
* localEndOfDataIndex = sizeOfBuffer -1;
* localReadIndex = 0;
* localWriteIndex = 0;
* localEndWriteIndex = 0;
const Uint32 slack = MAX_MESSAGE_SIZE;
reader = new SHM_Reader(localStartOfBuf,
sizeOfBuffer,
slack,
localReadIndex,
localEndWriteIndex,
localWriteIndex);
* localReadIndex = 0;
* localWriteIndex = 0;
reader->clear();
DBUG_VOID_RETURN;
} //setupLocalSegment
void SCI_Transporter::setupRemoteSegment()
{
DBUG_ENTER("SCI_Transporter::setupRemoteSegment");
Uint32 sharedSize = 0;
sharedSize += 16; //SHM_Reader::getSharedSize();
sharedSize += 16; //SHM_Writer::getSharedSize();
sharedSize += 32;
sharedSize =4096; //start of the buffer is page aligend
sharedSize =4096; //start of the buffer is page aligned
Uint32 sizeOfBuffer = m_BufferSize;
const Uint32 slack = MAX_MESSAGE_SIZE;
sizeOfBuffer -= sharedSize;
Uint32 * segPtr = (Uint32*) m_TargetSegm[m_StandbyAdapterId].mappedMemory ;
Uint32 * remoteReadIndex2 = (Uint32*)segPtr;
Uint32 * remoteWriteIndex2 = (Uint32*) (segPtr + 1);
Uint32 * remoteEndOfDataIndex2 = (Uint32*) (segPtr + 2);
Uint32 * sharedLockIndex2 = (Uint32*) (segPtr + 3);
m_remoteStatusFlag2 = (Uint32*)(segPtr + 4);
Uint32 * sharedHeavyLock2 = (Uint32*) (segPtr + 5);
char * remoteStartOfBuf2 = ( char*)((char *)segPtr+sharedSize);
segPtr = (Uint32*) m_TargetSegm[m_ActiveAdapterId].mappedMemory ;
Uint32 *segPtr = (Uint32*) m_TargetSegm[m_ActiveAdapterId].mappedMemory ;
Uint32 * remoteReadIndex = (Uint32*)segPtr;
Uint32 * remoteWriteIndex = (Uint32*) (segPtr + 1);
Uint32 * remoteEndOfDataIndex = (Uint32*) (segPtr + 2);
Uint32 * sharedLockIndex = (Uint32*) (segPtr + 3);
m_remoteStatusFlag = (Uint32*)(segPtr + 4);
Uint32 * sharedHeavyLock = (Uint32*) (segPtr + 5);
Uint32 * remoteWriteIndex = (Uint32*)(segPtr + 1);
Uint32 * remoteEndWriteIndex = (Uint32*) (segPtr + 2);
m_remoteStatusFlag = (Uint32*)(segPtr + 3);
char * remoteStartOfBuf = ( char*)((char*)segPtr+(sharedSize));
* remoteReadIndex = * remoteWriteIndex = 0;
* remoteReadIndex2 = * remoteWriteIndex2 = 0;
* remoteEndOfDataIndex = sizeOfBuffer - 1;
* remoteEndOfDataIndex2 = sizeOfBuffer - 1;
/**
* setup two writers. writer2 is used to mirror the changes of
* writer on the standby
* segment, so that in the case of a failover, we can switch
* to the stdby seg. quickly.*
*/
const Uint32 slack = MAX_MESSAGE_SIZE;
writer = new SHM_Writer(remoteStartOfBuf,
sizeOfBuffer,
slack,
remoteReadIndex,
remoteEndWriteIndex,
remoteWriteIndex);
writer2 = new SHM_Writer(remoteStartOfBuf2,
sizeOfBuffer,
slack,
remoteReadIndex2,
remoteWriteIndex2);
* remoteReadIndex = 0;
* remoteWriteIndex = 0;
writer->clear();
writer2->clear();
m_TargetSegm[0].writer=writer;
m_TargetSegm[1].writer=writer2;
m_sendBuffer.m_forceSendLimit = writer->getBufferSize();
if(createSequence(m_ActiveAdapterId)!=SCI_ERR_OK) {
reportThreadError(remoteNodeId, TE_SCI_UNABLE_TO_CREATE_SEQUENCE);
report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE);
DBUG_PRINT("error", ("Unable to create sequence on active"));
doDisconnect();
}
if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) {
reportThreadError(remoteNodeId, TE_SCI_UNABLE_TO_CREATE_SEQUENCE);
doDisconnect();
}
if (m_adapters > 1) {
segPtr = (Uint32*) m_TargetSegm[m_StandbyAdapterId].mappedMemory ;
Uint32 * remoteReadIndex2 = (Uint32*)segPtr;
Uint32 * remoteWriteIndex2 = (Uint32*) (segPtr + 1);
Uint32 * remoteEndWriteIndex2 = (Uint32*) (segPtr + 2);
m_remoteStatusFlag2 = (Uint32*)(segPtr + 3);
char * remoteStartOfBuf2 = ( char*)((char *)segPtr+sharedSize);
/**
* setup a writer. writer2 is used to mirror the changes of
* writer on the standby
* segment, so that in the case of a failover, we can switch
* to the stdby seg. quickly.*
*/
writer2 = new SHM_Writer(remoteStartOfBuf2,
sizeOfBuffer,
slack,
remoteReadIndex2,
remoteEndWriteIndex2,
remoteWriteIndex2);
* remoteReadIndex = 0;
* remoteWriteIndex = 0;
* remoteEndWriteIndex = 0;
writer2->clear();
m_TargetSegm[1].writer=writer2;
if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) {
report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE);
DBUG_PRINT("error", ("Unable to create sequence on standby"));
doDisconnect();
}
}
DBUG_VOID_RETURN;
} //setupRemoteSegment
bool SCI_Transporter::connectImpl(Uint32 timeout) {
sci_error_t err;
Uint32 offset = 0;
bool
SCI_Transporter::init_local()
{
DBUG_ENTER("SCI_Transporter::init_local");
if(!m_initLocal) {
if(initLocalSegment()!=SCI_ERR_OK){
NdbSleep_MilliSleep(timeout);
NdbSleep_MilliSleep(10);
//NDB SHOULD TERMINATE AND COMPUTER REBOOTED!
reportThreadError(localNodeId, TE_SCI_CANNOT_INIT_LOCALSEGMENT);
return false;
report_error(TE_SCI_CANNOT_INIT_LOCALSEGMENT);
DBUG_RETURN(false);
}
m_initLocal=true;
m_initLocal=true;
}
if(!m_mapped ) {
for(Uint32 i=0; i < m_adapters ; i++) {
m_TargetSegm[i].rhm[i].remoteHandle=0;
SCIConnectSegment(sciAdapters[i].scidesc,
&(m_TargetSegm[i].rhm[i].remoteHandle),
m_remoteNodes[i],
remoteSegmentId(localNodeId, remoteNodeId),
i,
0,
0,
0,
0,
&err);
if(err != SCI_ERR_OK) {
NdbSleep_MilliSleep(timeout);
return false;
}
}
DBUG_RETURN(true);
}
bool
SCI_Transporter::init_remote()
{
DBUG_ENTER("SCI_Transporter::init_remote");
sci_error_t err;
Uint32 offset = 0;
if(!m_mapped ) {
DBUG_PRINT("info", ("Map remote segments"));
for(Uint32 i=0; i < m_adapters ; i++) {
m_TargetSegm[i].rhm[i].remoteHandle=0;
SCIConnectSegment(sciAdapters[i].scidesc,
&(m_TargetSegm[i].rhm[i].remoteHandle),
m_remoteNodes[i],
remoteSegmentId(localNodeId, remoteNodeId),
i,
0,
0,
0,
0,
&err);
if(err != SCI_ERR_OK) {
NdbSleep_MilliSleep(10);
DBUG_PRINT("error", ("Error connecting segment, err 0x%x", err));
DBUG_RETURN(false);
}
}
// Map the remote memory segment into program space
for(Uint32 i=0; i < m_adapters ; i++) {
m_TargetSegm[i].mappedMemory =
SCIMapRemoteSegment((m_TargetSegm[i].rhm[i].remoteHandle),
&(m_TargetSegm[i].rhm[i].map),
offset,
m_BufferSize,
NULL,
FLAGS,
&err);
if(err!= SCI_ERR_OK) {
#ifdef DEBUG_TRANSPORTER
ndbout_c("\nCannot map a segment to the remote node %d.");
ndbout_c("Error code 0x%x",m_RemoteSciNodeId, err);
#endif
//NDB SHOULD TERMINATE AND COMPUTER REBOOTED!
reportThreadError(remoteNodeId, TE_SCI_CANNOT_MAP_REMOTESEGMENT);
return false;
}
}
m_mapped=true;
setupRemoteSegment();
setConnected();
#ifdef DEBUG_TRANSPORTER
ndbout << "connected and mapped to segment : " << endl;
ndbout << "remoteNode: " << m_remoteNodes[0] << endl;
ndbout << "remoteNode: " << m_remotenodes[1] << endl;
ndbout << "remoteSegId: "
<< remoteSegmentId(localNodeId, remoteNodeId)
<< endl;
#endif
return true;
}
else {
return getConnectionStatus();
}
} // connectImpl()
for(Uint32 i=0; i < m_adapters ; i++) {
m_TargetSegm[i].mappedMemory =
SCIMapRemoteSegment((m_TargetSegm[i].rhm[i].remoteHandle),
&(m_TargetSegm[i].rhm[i].map),
offset,
m_BufferSize,
NULL,
FLAGS,
&err);
if(err!= SCI_ERR_OK) {
DBUG_PRINT("error", ("Cannot map a segment to the remote node %d. Error code 0x%x",m_RemoteSciNodeId, err));
//NDB SHOULD TERMINATE AND COMPUTER REBOOTED!
report_error(TE_SCI_CANNOT_MAP_REMOTESEGMENT);
DBUG_RETURN(false);
}
}
m_mapped=true;
setupRemoteSegment();
setConnected();
DBUG_PRINT("info", ("connected and mapped to segment, remoteNode: %d",
remoteNodeId));
DBUG_PRINT("info", ("remoteSegId: %d",
remoteSegmentId(localNodeId, remoteNodeId)));
DBUG_RETURN(true);
} else {
DBUG_RETURN(getConnectionStatus());
}
}
bool
SCI_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
{
SocketInputStream s_input(sockfd);
SocketOutputStream s_output(sockfd);
char buf[256];
DBUG_ENTER("SCI_Transporter::connect_client_impl");
// Wait for server to create and attach
if (s_input.gets(buf, 256) == 0) {
DBUG_PRINT("error", ("No initial response from server in SCI"));
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
if (!init_local()) {
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
// Send ok to server
s_output.println("sci client 1 ok");
if (!init_remote()) {
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
// Wait for ok from server
if (s_input.gets(buf, 256) == 0) {
DBUG_PRINT("error", ("No second response from server in SCI"));
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
// Send ok to server
s_output.println("sci client 2 ok");
NDB_CLOSE_SOCKET(sockfd);
DBUG_PRINT("info", ("Successfully connected client to node %d",
remoteNodeId));
DBUG_RETURN(true);
}
bool
SCI_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
{
SocketOutputStream s_output(sockfd);
SocketInputStream s_input(sockfd);
char buf[256];
DBUG_ENTER("SCI_Transporter::connect_server_impl");
if (!init_local()) {
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
// Send ok to client
s_output.println("sci server 1 ok");
// Wait for ok from client
if (s_input.gets(buf, 256) == 0) {
DBUG_PRINT("error", ("No response from client in SCI"));
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
if (!init_remote()) {
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
// Send ok to client
s_output.println("sci server 2 ok");
// Wait for ok from client
if (s_input.gets(buf, 256) == 0) {
DBUG_PRINT("error", ("No second response from client in SCI"));
NDB_CLOSE_SOCKET(sockfd);
DBUG_RETURN(false);
}
NDB_CLOSE_SOCKET(sockfd);
DBUG_PRINT("info", ("Successfully connected server to node %d",
remoteNodeId));
DBUG_RETURN(true);
}
sci_error_t SCI_Transporter::createSequence(Uint32 adapterid) {
sci_error_t err;
SCICreateMapSequence((m_TargetSegm[adapterid].rhm[adapterid].map),
......@@ -795,13 +812,14 @@ sci_error_t SCI_Transporter::startSequence(Uint32 adapterid) {
// If there still is an error then data cannot be safely send
return err;
return err;
} // startSequence()
bool SCI_Transporter::disconnectLocal()
{
{
DBUG_ENTER("SCI_Transporter::disconnectLocal");
sci_error_t err;
m_ActiveAdapterId=0;
......@@ -809,31 +827,28 @@ bool SCI_Transporter::disconnectLocal()
*/
SCIUnmapSegment(m_SourceSegm[0].lhm[0].map,0,&err);
if(err!=SCI_ERR_OK) {
reportError(callbackObj,
remoteNodeId, TE_SCI_UNABLE_TO_UNMAP_SEGMENT);
return false;
}
if(err!=SCI_ERR_OK) {
report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT);
DBUG_PRINT("error", ("Unable to unmap segment"));
DBUG_RETURN(false);
}
SCIRemoveSegment((m_SourceSegm[m_ActiveAdapterId].localHandle),
FLAGS,
&err);
if(err!=SCI_ERR_OK) {
reportError(callbackObj, remoteNodeId, TE_SCI_UNABLE_TO_REMOVE_SEGMENT);
return false;
report_error(TE_SCI_UNABLE_TO_REMOVE_SEGMENT);
DBUG_PRINT("error", ("Unable to remove segment"));
DBUG_RETURN(false);
}
if(err == SCI_ERR_OK) {
#ifdef DEBUG_TRANSPORTER
printf("Local memory segment is unmapped and removed\n" );
#endif
}
return true;
DBUG_PRINT("info", ("Local memory segment is unmapped and removed"));
DBUG_RETURN(true);
} // disconnectLocal()
bool SCI_Transporter::disconnectRemote() {
DBUG_ENTER("SCI_Transporter::disconnectRemote");
sci_error_t err;
for(Uint32 i=0; i<m_adapters; i++) {
/**
......@@ -841,35 +856,32 @@ bool SCI_Transporter::disconnectRemote() {
*/
SCIUnmapSegment(m_TargetSegm[i].rhm[i].map,0,&err);
if(err!=SCI_ERR_OK) {
reportError(callbackObj,
remoteNodeId, TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT);
return false;
}
report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT);
DBUG_PRINT("error", ("Unable to unmap segment"));
DBUG_RETURN(false);
}
SCIDisconnectSegment(m_TargetSegm[i].rhm[i].remoteHandle,
FLAGS,
&err);
if(err!=SCI_ERR_OK) {
reportError(callbackObj,
remoteNodeId, TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT);
return false;
report_error(TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT);
DBUG_PRINT("error", ("Unable to disconnect segment"));
DBUG_RETURN(false);
}
#ifdef DEBUG_TRANSPORTER
ndbout_c("Remote memory segment is unmapped and disconnected\n" );
#endif
DBUG_PRINT("info", ("Remote memory segment is unmapped and disconnected"));
}
return true;
DBUG_RETURN(true);
} // disconnectRemote()
SCI_Transporter::~SCI_Transporter() {
DBUG_ENTER("SCI_Transporter::~SCI_Transporter");
// Close channel to the driver
#ifdef DEBUG_TRANSPORTER
ndbout << "~SCITransporter does a disConnect" << endl;
#endif
doDisconnect();
if(m_sendBuffer.m_buffer != NULL)
delete[] m_sendBuffer.m_buffer;
DBUG_VOID_RETURN;
} // ~SCI_Transporter()
......@@ -878,7 +890,7 @@ SCI_Transporter::~SCI_Transporter() {
void SCI_Transporter::closeSCI() {
// Termination of SCI
sci_error_t err;
printf("\nClosing SCI Transporter...\n");
DBUG_ENTER("SCI_Transporter::closeSCI");
// Disconnect and remove remote segment
disconnectRemote();
......@@ -890,26 +902,42 @@ void SCI_Transporter::closeSCI() {
// Closes an SCI virtual device
SCIClose(activeSCIDescriptor, FLAGS, &err);
if(err != SCI_ERR_OK)
fprintf(stderr,
"\nCannot close SCI channel to the driver. Error code 0x%x",
err);
if(err != SCI_ERR_OK) {
DBUG_PRINT("error", ("Cannot close SCI channel to the driver. Error code 0x%x",
err));
}
SCITerminate();
DBUG_VOID_RETURN;
} // closeSCI()
Uint32 *
SCI_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio){
SCI_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio)
{
if(m_sendBuffer.full()){
/**-------------------------------------------------
* Buffer was completely full. We have severe problems.
* -------------------------------------------------
*/
if(!doSend()){
Uint32 sci_buffer_remaining = m_sendBuffer.m_forceSendLimit;
Uint32 send_buf_size = m_sendBuffer.m_sendBufferSize;
Uint32 curr_data_size = m_sendBuffer.m_dataSize << 2;
Uint32 new_curr_data_size = curr_data_size + lenBytes;
if ((new_curr_data_size >= send_buf_size) ||
(curr_data_size >= sci_buffer_remaining)) {
/**
* The new message will not fit in the send buffer. We need to
* send the send buffer before filling it up with the new
* signal data. If current data size will spill over buffer edge
* we will also send to avoid writing larger than possible in
* buffer.
*/
if (!doSend()) {
/**
* We were not successfull sending, report 0 as meaning buffer full and
* upper levels handle retries and other recovery matters.
*/
return 0;
}
}
/**
* New signal fits, simply fill it up with more data.
*/
Uint32 sz = m_sendBuffer.m_dataSize;
return &m_sendBuffer.m_buffer[sz];
}
......@@ -918,10 +946,11 @@ void
SCI_Transporter::updateWritePtr(Uint32 lenBytes, Uint32 prio){
Uint32 sz = m_sendBuffer.m_dataSize;
sz += (lenBytes / 4);
Uint32 packet_size = m_PacketSize;
sz += ((lenBytes + 3) >> 2);
m_sendBuffer.m_dataSize = sz;
if(sz > m_PacketSize) {
if(sz > packet_size) {
/**-------------------------------------------------
* Buffer is full and we are ready to send. We will
* not wait since the signal is already in the buffer.
......@@ -944,7 +973,8 @@ bool
SCI_Transporter::getConnectionStatus() {
if(*m_localStatusFlag == SCICONNECTED &&
(*m_remoteStatusFlag == SCICONNECTED ||
*m_remoteStatusFlag2 == SCICONNECTED))
((m_adapters > 1) &&
*m_remoteStatusFlag2 == SCICONNECTED)))
return true;
else
return false;
......@@ -954,7 +984,9 @@ SCI_Transporter::getConnectionStatus() {
void
SCI_Transporter::setConnected() {
*m_remoteStatusFlag = SCICONNECTED;
*m_remoteStatusFlag2 = SCICONNECTED;
if (m_adapters > 1) {
*m_remoteStatusFlag2 = SCICONNECTED;
}
*m_localStatusFlag = SCICONNECTED;
}
......@@ -963,8 +995,10 @@ void
SCI_Transporter::setDisconnect() {
if(getLinkStatus(m_ActiveAdapterId))
*m_remoteStatusFlag = SCIDISCONNECT;
if(getLinkStatus(m_StandbyAdapterId))
*m_remoteStatusFlag2 = SCIDISCONNECT;
if (m_adapters > 1) {
if(getLinkStatus(m_StandbyAdapterId))
*m_remoteStatusFlag2 = SCIDISCONNECT;
}
}
......@@ -981,20 +1015,20 @@ static bool init = false;
bool
SCI_Transporter::initSCI() {
DBUG_ENTER("SCI_Transporter::initSCI");
if(!init){
sci_error_t error;
// Initialize SISCI library
SCIInitialize(0, &error);
if(error != SCI_ERR_OK) {
#ifdef DEBUG_TRANSPORTER
ndbout_c("\nCannot initialize SISCI library.");
ndbout_c("\nInconsistency between SISCI library and SISCI driver.Error code 0x%x", error);
#endif
return false;
DBUG_PRINT("error", ("Cannot initialize SISCI library."));
DBUG_PRINT("error", ("Inconsistency between SISCI library and SISCI driver. Error code 0x%x",
error));
DBUG_RETURN(false);
}
init = true;
}
return true;
DBUG_RETURN(true);
}
......
......@@ -26,7 +26,7 @@
#include <ndb_types.h>
/**
/**
* The SCI Transporter
*
* The design goal of the SCI transporter is to deliver high performance
......@@ -135,15 +135,17 @@ public:
bool getConnectionStatus();
private:
SCI_Transporter(Uint32 packetSize,
SCI_Transporter(TransporterRegistry &t_reg,
const char *local_host,
const char *remote_host,
int port,
Uint32 packetSize,
Uint32 bufferSize,
Uint32 nAdapters,
Uint16 remoteSciNodeId0,
Uint16 remoteSciNodeId1,
NodeId localNodeID,
NodeId remoteNodeID,
int byteorder,
bool compression,
bool checksum,
bool signalId,
Uint32 reportFreq = 4096);
......@@ -160,7 +162,8 @@ private:
/**
* For statistics on transfered packets
*/
#ifdef DEBUG_TRANSPORTER
//#ifdef DEBUG_TRANSPORTER
#if 1
Uint32 i1024;
Uint32 i2048;
Uint32 i2049;
......@@ -177,10 +180,8 @@ private:
struct {
Uint32 * m_buffer; // The buffer
Uint32 m_dataSize; // No of words in buffer
Uint32 m_bufferSize; // Buffer size
Uint32 m_sendBufferSize; // Buffer size
Uint32 m_forceSendLimit; // Send when buffer is this full
bool full() const { return (m_dataSize * 4) > m_forceSendLimit ;}
} m_sendBuffer;
SHM_Reader * reader;
......@@ -196,7 +197,7 @@ private:
Uint32 m_adapters;
Uint32 m_numberOfRemoteNodes;
Uint16* m_remoteNodes;
Uint16 m_remoteNodes[2];
typedef struct SciAdapter {
sci_desc_t scidesc;
......@@ -297,12 +298,12 @@ private:
bool sendIsPossible(struct timeval * timeout);
void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){
reader->getReadPtr(* ptr, * eod);
void getReceivePtr(Uint32 ** ptr, Uint32 &size){
size = reader->getReadPtr(* ptr);
}
void updateReceivePtr(Uint32 * ptr){
reader->updateReadPtr(ptr);
void updateReceivePtr(Uint32 size){
reader->updateReadPtr(size);
}
/**
......@@ -341,7 +342,9 @@ private:
*/
void failoverShmWriter();
bool init_local();
bool init_remote();
protected:
/** Perform a connection between segment
......@@ -350,7 +353,8 @@ protected:
* retrying.
* @return Returns true on success, otherwize falser
*/
bool connectImpl(Uint32 timeOutMillis);
bool connect_server_impl(NDB_SOCKET_TYPE sockfd);
bool connect_client_impl(NDB_SOCKET_TYPE sockfd);
/**
* We will disconnect if:
......
......@@ -42,17 +42,19 @@ public:
Uint32 _sizeOfBuffer,
Uint32 _slack,
Uint32 * _readIndex,
Uint32 * _endWriteIndex,
Uint32 * _writeIndex) :
m_startOfBuffer(_startOfBuffer),
m_totalBufferSize(_sizeOfBuffer),
m_bufferSize(_sizeOfBuffer - _slack),
m_sharedReadIndex(_readIndex),
m_sharedEndWriteIndex(_endWriteIndex),
m_sharedWriteIndex(_writeIndex)
{
}
void clear() {
m_readIndex = * m_sharedReadIndex;
m_readIndex = 0;
}
/**
......@@ -66,12 +68,12 @@ public:
* returns ptr - where to start reading
* sz - how much can I read
*/
inline void getReadPtr(Uint32 * & ptr, Uint32 * & eod);
inline Uint32 getReadPtr(Uint32 * & ptr);
/**
* Update read ptr
*/
inline void updateReadPtr(Uint32 * readPtr);
inline void updateReadPtr(Uint32 size);
private:
char * const m_startOfBuffer;
......@@ -80,6 +82,7 @@ private:
Uint32 m_readIndex;
Uint32 * m_sharedReadIndex;
Uint32 * m_sharedEndWriteIndex;
Uint32 * m_sharedWriteIndex;
};
......@@ -97,19 +100,22 @@ SHM_Reader::empty() const{
* sz - how much can I read
*/
inline
void
SHM_Reader::getReadPtr(Uint32 * & ptr, Uint32 * & eod){
Uint32
SHM_Reader::getReadPtr(Uint32 * & ptr)
{
Uint32 *eod;
Uint32 tReadIndex = m_readIndex;
Uint32 tWriteIndex = * m_sharedWriteIndex;
Uint32 tEndWriteIndex = * m_sharedEndWriteIndex;
ptr = (Uint32*)&m_startOfBuffer[tReadIndex];
if(tReadIndex <= tWriteIndex){
eod = (Uint32*)&m_startOfBuffer[tWriteIndex];
} else {
eod = (Uint32*)&m_startOfBuffer[m_bufferSize];
eod = (Uint32*)&m_startOfBuffer[tEndWriteIndex];
}
return (Uint32)((char*)eod - (char*)ptr);
}
/**
......@@ -117,14 +123,14 @@ SHM_Reader::getReadPtr(Uint32 * & ptr, Uint32 * & eod){
*/
inline
void
SHM_Reader::updateReadPtr(Uint32 * ptr){
Uint32 tReadIndex = ((char *)ptr) - m_startOfBuffer;
SHM_Reader::updateReadPtr(Uint32 size)
{
Uint32 tReadIndex = m_readIndex;
tReadIndex += size;
assert(tReadIndex < m_totalBufferSize);
if(tReadIndex >= m_bufferSize){
tReadIndex = 0; //-= m_bufferSize;
tReadIndex = 0;
}
m_readIndex = tReadIndex;
......@@ -139,17 +145,19 @@ public:
Uint32 _sizeOfBuffer,
Uint32 _slack,
Uint32 * _readIndex,
Uint32 * _endWriteIndex,
Uint32 * _writeIndex) :
m_startOfBuffer(_startOfBuffer),
m_totalBufferSize(_sizeOfBuffer),
m_bufferSize(_sizeOfBuffer - _slack),
m_sharedReadIndex(_readIndex),
m_sharedEndWriteIndex(_endWriteIndex),
m_sharedWriteIndex(_writeIndex)
{
}
void clear() {
m_writeIndex = * m_sharedWriteIndex;
m_writeIndex = 0;
}
inline char * getWritePtr(Uint32 sz);
......@@ -168,6 +176,7 @@ private:
Uint32 m_writeIndex;
Uint32 * m_sharedReadIndex;
Uint32 * m_sharedEndWriteIndex;
Uint32 * m_sharedWriteIndex;
};
......@@ -206,7 +215,8 @@ SHM_Writer::updateWritePtr(Uint32 sz){
assert(tWriteIndex < m_totalBufferSize);
if(tWriteIndex >= m_bufferSize){
tWriteIndex = 0; //-= m_bufferSize;
* m_sharedEndWriteIndex = tWriteIndex;
tWriteIndex = 0;
}
m_writeIndex = tWriteIndex;
......
......@@ -32,13 +32,12 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
int r_port,
NodeId lNodeId,
NodeId rNodeId,
bool compression,
bool checksum,
bool signalId,
key_t _shmKey,
Uint32 _shmSize) :
Transporter(t_reg, lHostName, rHostName, r_port, lNodeId, rNodeId,
0, compression, checksum, signalId),
0, false, checksum, signalId),
shmKey(_shmKey),
shmSize(_shmSize)
{
......@@ -48,7 +47,7 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
shmBuf = 0;
reader = 0;
writer = 0;
setupBuffersDone=false;
#ifdef DEBUG_TRANSPORTER
printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey);
......@@ -83,36 +82,40 @@ SHM_Transporter::setupBuffers(){
Uint32 * sharedReadIndex1 = base1;
Uint32 * sharedWriteIndex1 = base1 + 1;
Uint32 * sharedEndWriteIndex1 = base1 + 2;
serverStatusFlag = base1 + 4;
char * startOfBuf1 = shmBuf+sharedSize;
Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize);
Uint32 * sharedReadIndex2 = base2;
Uint32 * sharedWriteIndex2 = base2 + 1;
Uint32 * sharedEndWriteIndex2 = base2 + 2;
clientStatusFlag = base2 + 4;
char * startOfBuf2 = ((char *)base2)+sharedSize;
* sharedReadIndex2 = * sharedWriteIndex2 = 0;
if(isServer){
* serverStatusFlag = 0;
reader = new SHM_Reader(startOfBuf1,
sizeOfBuffer,
slack,
sharedReadIndex1,
sharedEndWriteIndex1,
sharedWriteIndex1);
writer = new SHM_Writer(startOfBuf2,
sizeOfBuffer,
slack,
sharedReadIndex2,
sharedEndWriteIndex2,
sharedWriteIndex2);
* sharedReadIndex1 = 0;
* sharedWriteIndex2 = 0;
* sharedWriteIndex1 = 0;
* sharedEndWriteIndex1 = 0;
* sharedReadIndex2 = 0;
* sharedWriteIndex1 = 0;
* sharedWriteIndex2 = 0;
* sharedEndWriteIndex2 = 0;
reader->clear();
writer->clear();
......@@ -145,16 +148,19 @@ SHM_Transporter::setupBuffers(){
sizeOfBuffer,
slack,
sharedReadIndex2,
sharedEndWriteIndex2,
sharedWriteIndex2);
writer = new SHM_Writer(startOfBuf1,
sizeOfBuffer,
slack,
sharedReadIndex1,
sharedEndWriteIndex1,
sharedWriteIndex1);
* sharedReadIndex2 = 0;
* sharedWriteIndex1 = 0;
* sharedEndWriteIndex1 = 0;
reader->clear();
writer->clear();
......@@ -224,6 +230,7 @@ SHM_Transporter::prepareSend(const SignalHeader * const signalHeader,
bool
SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
{
DBUG_ENTER("SHM_Transporter::connect_server_impl");
SocketOutputStream s_output(sockfd);
SocketInputStream s_input(sockfd);
char buf[256];
......@@ -233,7 +240,7 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
if (!ndb_shm_create()) {
report_error(TE_SHM_UNABLE_TO_CREATE_SEGMENT);
NDB_CLOSE_SOCKET(sockfd);
return false;
DBUG_RETURN(false);
}
_shmSegCreated = true;
}
......@@ -243,7 +250,7 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
if (!ndb_shm_attach()) {
report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT);
NDB_CLOSE_SOCKET(sockfd);
return false;
DBUG_RETURN(false);
}
_attached = true;
}
......@@ -254,7 +261,7 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
// Wait for ok from client
if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd);
return false;
DBUG_RETURN(false);
}
int r= connect_common(sockfd);
......@@ -265,17 +272,20 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
// Wait for ok from client
if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd);
return false;
DBUG_RETURN(false);
}
DBUG_PRINT("info", ("Successfully connected server to node %d",
remoteNodeId));
}
NDB_CLOSE_SOCKET(sockfd);
return r;
DBUG_RETURN(r);
}
bool
SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
{
DBUG_ENTER("SHM_Transporter::connect_client_impl");
SocketInputStream s_input(sockfd);
SocketOutputStream s_output(sockfd);
char buf[256];
......@@ -283,14 +293,18 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
// Wait for server to create and attach
if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd);
return false;
DBUG_PRINT("error", ("Server id %d did not attach",
remoteNodeId));
DBUG_RETURN(false);
}
// Create
if(!_shmSegCreated){
if (!ndb_shm_get()) {
NDB_CLOSE_SOCKET(sockfd);
return false;
DBUG_PRINT("error", ("Failed create of shm seg to node %d",
remoteNodeId));
DBUG_RETURN(false);
}
_shmSegCreated = true;
}
......@@ -300,7 +314,9 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
if (!ndb_shm_attach()) {
report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT);
NDB_CLOSE_SOCKET(sockfd);
return false;
DBUG_PRINT("error", ("Failed attach of shm seg to node %d",
remoteNodeId));
DBUG_RETURN(false);
}
_attached = true;
}
......@@ -314,21 +330,28 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
// Wait for ok from server
if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd);
return false;
DBUG_PRINT("error", ("No ok from server node %d",
remoteNodeId));
DBUG_RETURN(false);
}
// Send ok to server
s_output.println("shm client 2 ok");
DBUG_PRINT("info", ("Successfully connected client to node %d",
remoteNodeId));
}
NDB_CLOSE_SOCKET(sockfd);
return r;
DBUG_RETURN(r);
}
bool
SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
{
if (!checkConnected())
if (!checkConnected()) {
DBUG_PRINT("error", ("Already connected to node %d",
remoteNodeId));
return false;
}
if(!setupBuffersDone) {
setupBuffers();
......@@ -341,5 +364,7 @@ SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
return true;
}
DBUG_PRINT("error", ("Failed to set up buffers to node %d",
remoteNodeId));
return false;
}
......@@ -38,7 +38,6 @@ public:
int r_port,
NodeId lNodeId,
NodeId rNodeId,
bool compression,
bool checksum,
bool signalId,
key_t shmKey,
......@@ -62,12 +61,12 @@ public:
writer->updateWritePtr(lenBytes);
}
void getReceivePtr(Uint32 ** ptr, Uint32 ** eod){
reader->getReadPtr(* ptr, * eod);
void getReceivePtr(Uint32 ** ptr, Uint32 sz){
sz = reader->getReadPtr(* ptr);
}
void updateReceivePtr(Uint32 * ptr){
reader->updateReadPtr(ptr);
void updateReceivePtr(Uint32 sz){
reader->updateReadPtr(sz);
}
protected:
......@@ -127,6 +126,7 @@ protected:
private:
bool _shmSegCreated;
bool _attached;
bool m_connected;
key_t shmKey;
volatile Uint32 * serverStatusFlag;
......
......@@ -70,11 +70,10 @@ TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
int r_port,
NodeId lNodeId,
NodeId rNodeId,
int byte_order,
bool compr, bool chksm, bool signalId,
bool chksm, bool signalId,
Uint32 _reportFreq) :
Transporter(t_reg, lHostName, rHostName, r_port, lNodeId, rNodeId,
byte_order, compr, chksm, signalId),
0, false, chksm, signalId),
m_sendBuffer(sendBufSize)
{
maxReceiveSize = maxRecvSize;
......@@ -106,12 +105,14 @@ TCP_Transporter::~TCP_Transporter() {
bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
{
return connect_common(sockfd);
DBUG_ENTER("TCP_Transpporter::connect_server_impl");
DBUG_RETURN(connect_common(sockfd));
}
bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
{
return connect_common(sockfd);
DBUG_ENTER("TCP_Transpporter::connect_client_impl");
DBUG_RETURN(connect_common(sockfd));
}
bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
......@@ -119,6 +120,8 @@ bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
theSocket = sockfd;
setSocketOptions();
setSocketNonBlocking(theSocket);
DBUG_PRINT("info", ("Successfully set-up TCP transporter to node %d",
remoteNodeId));
return true;
}
......
......@@ -52,8 +52,7 @@ private:
int r_port,
NodeId lHostId,
NodeId rHostId,
int byteorder,
bool compression, bool checksum, bool signalId,
bool checksum, bool signalId,
Uint32 reportFreq = 4096);
// Disconnect, delete send buffers and receive buffer
......
......@@ -15,6 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <my_pthread.h>
#include <TransporterRegistry.hpp>
#include "TransporterInternalDefinitions.hpp"
......@@ -48,9 +49,10 @@
SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
{
DBUG_ENTER("SocketServer::Session * TransporterService::newSession");
if (m_auth && !m_auth->server_authenticate(sockfd)){
NDB_CLOSE_SOCKET(sockfd);
return 0;
DBUG_RETURN(0);
}
{
......@@ -60,27 +62,32 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
char buf[256];
if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd);
return 0;
DBUG_PRINT("error", ("Could not get node id from client"));
DBUG_RETURN(0);
}
if (sscanf(buf, "%d", &nodeId) != 1) {
NDB_CLOSE_SOCKET(sockfd);
return 0;
DBUG_PRINT("error", ("Error in node id from client"));
DBUG_RETURN(0);
}
//check that nodeid is valid and that there is an allocated transporter
if ( nodeId < 0 || nodeId >= m_transporter_registry->maxTransporters) {
NDB_CLOSE_SOCKET(sockfd);
return 0;
if ( nodeId < 0 || nodeId >= (int)m_transporter_registry->maxTransporters) {
NDB_CLOSE_SOCKET(sockfd);
DBUG_PRINT("error", ("Node id out of range from client"));
DBUG_RETURN(0);
}
if (m_transporter_registry->theTransporters[nodeId] == 0) {
NDB_CLOSE_SOCKET(sockfd);
return 0;
DBUG_PRINT("error", ("No transporter for this node id from client"));
DBUG_RETURN(0);
}
//check that the transporter should be connected
if (m_transporter_registry->performStates[nodeId] != TransporterRegistry::CONNECTING) {
NDB_CLOSE_SOCKET(sockfd);
return 0;
DBUG_PRINT("error", ("Transporter in wrong state for this node id from client"));
DBUG_RETURN(0);
}
Transporter *t= m_transporter_registry->theTransporters[nodeId];
......@@ -93,7 +100,7 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
t->connect_server(sockfd);
}
return 0;
DBUG_RETURN(0);
}
TransporterRegistry::TransporterRegistry(void * callback,
......@@ -209,8 +216,6 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
config->port,
localNodeId,
config->remoteNodeId,
config->byteOrder,
config->compression,
config->checksum,
config->signalId);
if (t == NULL)
......@@ -264,8 +269,6 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) {
conf->localHostName,
conf->remoteNodeId,
conf->remoteHostName,
conf->byteOrder,
conf->compression,
conf->checksum,
conf->signalId);
if (t == NULL)
......@@ -306,15 +309,17 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) {
if(theTransporters[config->remoteNodeId] != NULL)
return false;
SCI_Transporter * t = new SCI_Transporter(config->sendLimit,
SCI_Transporter * t = new SCI_Transporter(*this,
config->localHostName,
config->remoteHostName,
config->port,
config->sendLimit,
config->bufferSize,
config->nLocalAdapters,
config->remoteSciNodeId0,
config->remoteSciNodeId1,
localNodeId,
config->remoteNodeId,
config->byteOrder,
config->compression,
config->checksum,
config->signalId);
......@@ -357,7 +362,6 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
config->port,
localNodeId,
config->remoteNodeId,
config->compression,
config->checksum,
config->signalId,
config->shmKey,
......@@ -853,10 +857,11 @@ TransporterRegistry::performReceive(){
const NodeId nodeId = t->getRemoteNodeId();
if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected()){
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
readPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
t->updateReceivePtr(readPtr);
Uint32 * readPtr;
Uint32 sz = 0;
t->getReceivePtr(&readPtr, sz);
Uint32 szUsed = unpack(readPtr, sz, nodeId, ioStates[nodeId]);
t->updateReceivePtr(szUsed);
}
}
}
......@@ -868,10 +873,11 @@ TransporterRegistry::performReceive(){
const NodeId nodeId = t->getRemoteNodeId();
if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected()){
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
readPtr = unpack(readPtr, eodPtr, nodeId, ioStates[nodeId]);
t->updateReceivePtr(readPtr);
Uint32 * readPtr;
Uint32 sz = 0;
t->getReceivePtr(&readPtr, sz);
Uint32 szUsed = unpack(readPtr, sz, nodeId, ioStates[nodeId]);
t->updateReceivePtr(szUsed);
}
}
}
......@@ -1023,7 +1029,9 @@ TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
static void *
run_start_clients_C(void * me)
{
my_thread_init();
((TransporterRegistry*) me)->start_clients_thread();
my_thread_end();
NdbThread_Exit(0);
return me;
}
......@@ -1106,6 +1114,7 @@ TransporterRegistry::update_connections()
void
TransporterRegistry::start_clients_thread()
{
DBUG_ENTER("TransporterRegistry::start_clients_thread");
while (m_run_start_clients_thread) {
NdbSleep_MilliSleep(100);
for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
......@@ -1129,6 +1138,7 @@ TransporterRegistry::start_clients_thread()
}
}
}
DBUG_VOID_RETURN;
}
bool
......
......@@ -16,6 +16,7 @@
#include <ndb_global.h>
#include <my_pthread.h>
#include <SocketServer.hpp>
......@@ -176,9 +177,9 @@ extern "C"
void*
socketServerThread_C(void* _ss){
SocketServer * ss = (SocketServer *)_ss;
my_thread_init();
ss->doRun();
my_thread_end();
NdbThread_Exit(0);
return 0;
}
......@@ -287,8 +288,10 @@ void*
sessionThread_C(void* _sc){
SocketServer::Session * si = (SocketServer::Session *)_sc;
my_thread_init();
if(!transfer(si->m_socket)){
si->m_stopped = true;
my_thread_end();
NdbThread_Exit(0);
return 0;
}
......@@ -301,6 +304,7 @@ sessionThread_C(void* _sc){
}
si->m_stopped = true;
my_thread_end();
NdbThread_Exit(0);
return 0;
}
......
......@@ -7,7 +7,7 @@ LDADD_LOC = \
$(top_builddir)/ndb/src/libndbclient.la \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a
$(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
include $(top_srcdir)/ndb/config/common.mk.am
include $(top_srcdir)/ndb/config/type_util.mk.am
......
......@@ -55,7 +55,7 @@ LDADD += \
$(top_builddir)/ndb/src/common/util/libgeneral.la \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a
$(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
# Don't update the files from bitkeeper
%::SCCS/s.%
......@@ -7,7 +7,7 @@ LDADD_LOC = \
$(top_builddir)/ndb/src/libndbclient.la \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a
$(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
include $(top_srcdir)/ndb/config/common.mk.am
......
......@@ -125,11 +125,14 @@ ConfigInfo::m_SectionRules[] = {
{ "TCP", fixHostname, "HostName1" },
{ "TCP", fixHostname, "HostName2" },
{ "SCI", fixHostname, "HostName1" },
{ "SCI", fixHostname, "HostName2" },
{ "OSE", fixHostname, "HostName1" },
{ "OSE", fixHostname, "HostName2" },
{ "TCP", fixPortNumber, 0 }, // has to come after fixHostName
{ "SHM", fixPortNumber, 0 }, // has to come after fixHostName
{ "SCI", fixPortNumber, 0 }, // has to come after fixHostName
//{ "SHM", fixShmKey, 0 },
/**
......@@ -159,6 +162,8 @@ ConfigInfo::m_SectionRules[] = {
{ "TCP", checkTCPConstraints, "HostName1" },
{ "TCP", checkTCPConstraints, "HostName2" },
{ "SCI", checkTCPConstraints, "HostName1" },
{ "SCI", checkTCPConstraints, "HostName2" },
{ "*", checkMandatory, 0 },
......@@ -1788,7 +1793,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"Id of node ("DB_TOKEN", "API_TOKEN" or "MGM_TOKEN") on one side of the connection",
ConfigInfo::USED,
false,
ConfigInfo::INT,
ConfigInfo::STRING,
MANDATORY,
"0",
STR_VALUE(MAX_INT_RNIL) },
......@@ -1800,28 +1805,74 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"Id of node ("DB_TOKEN", "API_TOKEN" or "MGM_TOKEN") on one side of the connection",
ConfigInfo::USED,
false,
ConfigInfo::INT,
ConfigInfo::STRING,
MANDATORY,
"0",
STR_VALUE(MAX_INT_RNIL) },
{
CFG_SCI_ID_0,
"SciId0",
CFG_SCI_HOSTNAME_1,
"HostName1",
"SCI",
"Name/IP of computer on one side of the connection",
ConfigInfo::INTERNAL,
false,
ConfigInfo::STRING,
UNDEFINED,
0, 0 },
{
CFG_SCI_HOSTNAME_2,
"HostName2",
"SCI",
"Name/IP of computer on one side of the connection",
ConfigInfo::INTERNAL,
false,
ConfigInfo::STRING,
UNDEFINED,
0, 0 },
{
CFG_CONNECTION_SERVER_PORT,
"PortNumber",
"SCI",
"Local SCI-node id for adapter 0 (a computer can have two adapters)",
"Port used for this transporter",
ConfigInfo::USED,
false,
ConfigInfo::INT,
MANDATORY,
"0",
STR_VALUE(MAX_INT_RNIL) },
{
CFG_SCI_HOST1_ID_0,
"Host1SciId0",
"SCI",
"SCI-node id for adapter 0 on Host1 (a computer can have two adapters)",
ConfigInfo::USED,
false,
ConfigInfo::INT,
MANDATORY,
"0",
STR_VALUE(MAX_INT_RNIL) },
{
CFG_SCI_HOST1_ID_1,
"Host1SciId1",
"SCI",
"SCI-node id for adapter 1 on Host1 (a computer can have two adapters)",
ConfigInfo::USED,
false,
ConfigInfo::INT,
"0",
"0",
STR_VALUE(MAX_INT_RNIL) },
{
CFG_SCI_ID_1,
"SciId1",
CFG_SCI_HOST2_ID_0,
"Host2SciId0",
"SCI",
"Local SCI-node id for adapter 1 (a computer can have two adapters)",
"SCI-node id for adapter 0 on Host2 (a computer can have two adapters)",
ConfigInfo::USED,
false,
ConfigInfo::INT,
......@@ -1829,6 +1880,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"0",
STR_VALUE(MAX_INT_RNIL) },
{
CFG_SCI_HOST2_ID_1,
"Host2SciId1",
"SCI",
"SCI-node id for adapter 1 on Host2 (a computer can have two adapters)",
ConfigInfo::USED,
false,
ConfigInfo::INT,
"0",
"0",
STR_VALUE(MAX_INT_RNIL) },
{
CFG_CONNECTION_SEND_SIGNAL_ID,
"SendSignalId",
......@@ -1862,8 +1925,8 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
false,
ConfigInfo::INT,
"2K",
"512",
STR_VALUE(MAX_INT_RNIL) },
"128",
"32K" },
{
CFG_SCI_BUFFER_MEM,
......@@ -1873,8 +1936,8 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED,
false,
ConfigInfo::INT,
"1M",
"256K",
"192K",
"64K",
STR_VALUE(MAX_INT_RNIL) },
{
......
......@@ -29,7 +29,7 @@ LDADD_LOC = $(top_builddir)/ndb/src/libndbclient.la \
$(top_builddir)/ndb/src/common/editline/libeditline.a \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a
$(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@
@TERMCAP_LIB@
DEFS_LOC = -DDEFAULT_MYSQL_HOME="\"$(MYSQLBASEdir)\"" \
......
......@@ -37,7 +37,7 @@ LDADD = @isam_libs@ \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/regex/libregex.a \
$(top_builddir)/strings/libmystrings.a @ZLIB_LIBS@
$(top_builddir)/strings/libmystrings.a @ZLIB_LIBS@ @NDB_SCI_LIBS@
mysqld_LDADD = @MYSQLD_EXTRA_LDFLAGS@ \
@bdb_libs@ @innodb_libs@ @pstack_libs@ \
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment