Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
d9051882
Commit
d9051882
authored
Sep 16, 2004
by
mysql@mc04.(none)
Browse files
Options
Browse Files
Download
Plain Diff
Merge mronstrom@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb
into mc04.(none):/space/mysql/mysql-4.1-ndb
parents
083e4f47
72017f8d
Changes
24
Hide whitespace changes
Inline
Side-by-side
Showing
24 changed files
with
702 additions
and
538 deletions
+702
-538
BitKeeper/etc/logging_ok
BitKeeper/etc/logging_ok
+3
-0
acinclude.m4
acinclude.m4
+32
-18
configure.in
configure.in
+2
-2
ndb/config/type_ndbapitest.mk.am
ndb/config/type_ndbapitest.mk.am
+1
-1
ndb/config/type_ndbapitools.mk.am
ndb/config/type_ndbapitools.mk.am
+1
-1
ndb/include/mgmapi/mgmapi_config_parameters.h
ndb/include/mgmapi/mgmapi_config_parameters.h
+8
-10
ndb/include/transporter/TransporterDefinitions.hpp
ndb/include/transporter/TransporterDefinitions.hpp
+3
-9
ndb/src/common/mgmcommon/IPCConfig.cpp
ndb/src/common/mgmcommon/IPCConfig.cpp
+42
-39
ndb/src/common/transporter/Makefile.am
ndb/src/common/transporter/Makefile.am
+1
-1
ndb/src/common/transporter/SCI_Transporter.cpp
ndb/src/common/transporter/SCI_Transporter.cpp
+388
-354
ndb/src/common/transporter/SCI_Transporter.hpp
ndb/src/common/transporter/SCI_Transporter.hpp
+19
-15
ndb/src/common/transporter/SHM_Buffer.hpp
ndb/src/common/transporter/SHM_Buffer.hpp
+24
-14
ndb/src/common/transporter/SHM_Transporter.cpp
ndb/src/common/transporter/SHM_Transporter.cpp
+43
-18
ndb/src/common/transporter/SHM_Transporter.hpp
ndb/src/common/transporter/SHM_Transporter.hpp
+5
-5
ndb/src/common/transporter/TCP_Transporter.cpp
ndb/src/common/transporter/TCP_Transporter.cpp
+8
-5
ndb/src/common/transporter/TCP_Transporter.hpp
ndb/src/common/transporter/TCP_Transporter.hpp
+1
-2
ndb/src/common/transporter/TransporterRegistry.cpp
ndb/src/common/transporter/TransporterRegistry.cpp
+35
-25
ndb/src/common/util/SocketServer.cpp
ndb/src/common/util/SocketServer.cpp
+6
-2
ndb/src/cw/cpcd/Makefile.am
ndb/src/cw/cpcd/Makefile.am
+1
-1
ndb/src/kernel/Makefile.am
ndb/src/kernel/Makefile.am
+1
-1
ndb/src/kernel/blocks/backup/restore/Makefile.am
ndb/src/kernel/blocks/backup/restore/Makefile.am
+1
-1
ndb/src/mgmsrv/ConfigInfo.cpp
ndb/src/mgmsrv/ConfigInfo.cpp
+75
-12
ndb/src/mgmsrv/Makefile.am
ndb/src/mgmsrv/Makefile.am
+1
-1
sql/Makefile.am
sql/Makefile.am
+1
-1
No files found.
BitKeeper/etc/logging_ok
View file @
d9051882
...
...
@@ -100,6 +100,7 @@ miguel@hegel.txg.br
miguel@light.
miguel@light.local
miguel@sartre.local
mikael@mc04.(none)
mikron@c-fb0ae253.1238-1-64736c10.cust.bredbandsbolaget.se
mikron@mikael-ronstr-ms-dator.local
mmatthew@markslaptop.
...
...
@@ -129,6 +130,7 @@ mwagner@here.mwagner.org
mwagner@work.mysql.com
mydev@mysql.com
mysql@home.(none)
mysql@mc04.(none)
mysqldev@build.mysql2.com
mysqldev@melody.local
mysqldev@mysql.com
...
...
@@ -158,6 +160,7 @@ ram@ram.(none)
ranger@regul.home.lan
rburnett@build.mysql.com
root@home.(none)
root@mc04.(none)
root@x3.internalnet
salle@banica.(none)
salle@geopard.(none)
...
...
acinclude.m4
View file @
d9051882
...
...
@@ -1551,16 +1551,43 @@ dnl Sets HAVE_NDBCLUSTER_DB if --with-ndbcluster is used
dnl ---------------------------------------------------------------------------
AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [
AC_ARG_WITH([ndb-sci],
AC_HELP_STRING([--with-ndb-sci=DIR],
[Provide MySQL with a custom location of
sci library. Given DIR, sci library is
assumed to be in $DIR/lib and header files
in $DIR/include.]),
[mysql_sci_dir=${withval}],
[mysql_sci_dir=""])
case "$mysql_sci_dir" in
"no" )
have_ndb_sci=no
AC_MSG_RESULT([-- not including sci transporter])
;;
* )
if test -f "$mysql_sci_dir/lib/libsisci.a" -a \
-f "$mysql_sci_dir/include/sisci_api.h"; then
NDB_SCI_INCLUDES="-I$mysql_sci_dir/include"
NDB_SCI_LIBS="-L$mysql_sci_dir/lib -lsisci"
AC_MSG_RESULT([-- including sci transporter])
AC_DEFINE([NDB_SCI_TRANSPORTER], [1],
[Including Ndb Cluster DB sci transporter])
AC_SUBST(NDB_SCI_INCLUDES)
AC_SUBST(NDB_SCI_LIBS)
have_ndb_sci="yes"
AC_MSG_RESULT([found sci transporter in $mysql_sci_dir/{include, lib}])
else
AC_MSG_RESULT([could not find sci transporter in $mysql_sci_dir/{include, lib}])
fi
;;
esac
AC_ARG_WITH([ndb-shm],
[
--with-ndb-shm Include the NDB Cluster shared memory transporter],
[ndb_shm="$withval"],
[ndb_shm=no])
AC_ARG_WITH([ndb-sci],
[
--with-ndb-sci Include the NDB Cluster sci transporter],
[ndb_sci="$withval"],
[ndb_sci=no])
AC_ARG_WITH([ndb-test],
[
--with-ndb-test Include the NDB Cluster ndbapi test programs],
...
...
@@ -1593,19 +1620,6 @@ AC_DEFUN([MYSQL_CHECK_NDB_OPTIONS], [
;;
esac
have_ndb_sci=no
case "$ndb_sci" in
yes )
AC_MSG_RESULT([-- including sci transporter])
AC_DEFINE([NDB_SCI_TRANSPORTER], [1],
[Including Ndb Cluster DB sci transporter])
have_ndb_sci="yes"
;;
* )
AC_MSG_RESULT([-- not including sci transporter])
;;
esac
have_ndb_test=no
case "$ndb_test" in
yes )
...
...
configure.in
View file @
d9051882
...
...
@@ -3024,11 +3024,11 @@ AC_SUBST([ndb_port_base])
ndb_transporter_opt_objs
=
""
if
test
X
"
$have_ndb_shm
"
=
Xyes
then
ndb_transporter_opt_objs
=
"
$
(
ndb_transporter_opt_objs
)
SHM_Transporter.lo SHM_Transporter.unix.lo"
ndb_transporter_opt_objs
=
"
$
ndb_transporter_opt_objs
SHM_Transporter.lo SHM_Transporter.unix.lo"
fi
if
test
X
"
$have_ndb_sci
"
=
Xyes
then
ndb_transporter_opt_objs
=
"
$
(
ndb_transporter_opt_objs
)
SCI_Transporter.lo"
ndb_transporter_opt_objs
=
"
$
ndb_transporter_opt_objs
SCI_Transporter.lo"
fi
AC_SUBST
([
ndb_transporter_opt_objs]
)
...
...
ndb/config/type_ndbapitest.mk.am
View file @
d9051882
...
...
@@ -3,7 +3,7 @@ LDADD += $(top_builddir)/ndb/test/src/libNDBT.a \
$(top_builddir)/ndb/src/libndbclient.la \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a
$(top_builddir)/strings/libmystrings.a
@NDB_SCI_LIBS@
INCLUDES += -I$(srcdir) -I$(top_srcdir)/include \
-I$(top_srcdir)/ndb/include \
...
...
ndb/config/type_ndbapitools.mk.am
View file @
d9051882
...
...
@@ -3,7 +3,7 @@ LDADD += \
$(top_builddir)/ndb/src/libndbclient.la \
$(top_builddir)/dbug/libdbug.a \
$(top_builddir)/mysys/libmysys.a \
$(top_builddir)/strings/libmystrings.a
$(top_builddir)/strings/libmystrings.a
@NDB_SCI_LIBS@
INCLUDES += -I$(srcdir) -I$(top_srcdir)/include \
-I$(top_srcdir)/ndb/include \
...
...
ndb/include/mgmapi/mgmapi_config_parameters.h
View file @
d9051882
...
...
@@ -117,16 +117,14 @@
#define CFG_SHM_KEY 502
#define CFG_SHM_BUFFER_MEM 503
#define CFG_SCI_ID_0 550
#define CFG_SCI_ID_1 551
#define CFG_SCI_SEND_LIMIT 552
#define CFG_SCI_BUFFER_MEM 553
#define CFG_SCI_NODE1_ADAPTERS 554
#define CFG_SCI_NODE1_ADAPTER0 555
#define CFG_SCI_NODE1_ADAPTER1 556
#define CFG_SCI_NODE2_ADAPTERS 554
#define CFG_SCI_NODE2_ADAPTER0 555
#define CFG_SCI_NODE2_ADAPTER1 556
#define CFG_SCI_HOST1_ID_0 550
#define CFG_SCI_HOST1_ID_1 551
#define CFG_SCI_HOST2_ID_0 552
#define CFG_SCI_HOST2_ID_1 553
#define CFG_SCI_HOSTNAME_1 554
#define CFG_SCI_HOSTNAME_2 555
#define CFG_SCI_SEND_LIMIT 556
#define CFG_SCI_BUFFER_MEM 557
#define CFG_OSE_HOSTNAME_1 600
#define CFG_OSE_HOSTNAME_2 601
...
...
ndb/include/transporter/TransporterDefinitions.hpp
View file @
d9051882
...
...
@@ -59,8 +59,6 @@ struct TCP_TransporterConfiguration {
NodeId
localNodeId
;
Uint32
sendBufferSize
;
// Size of SendBuffer of priority B
Uint32
maxReceiveSize
;
// Maximum no of bytes to receive
Uint32
byteOrder
;
bool
compression
;
bool
checksum
;
bool
signalId
;
};
...
...
@@ -72,10 +70,8 @@ struct SHM_TransporterConfiguration {
Uint32
port
;
NodeId
remoteNodeId
;
NodeId
localNodeId
;
bool
compression
;
bool
checksum
;
bool
signalId
;
int
byteOrder
;
Uint32
shmKey
;
Uint32
shmSize
;
...
...
@@ -89,10 +85,8 @@ struct OSE_TransporterConfiguration {
const
char
*
localHostName
;
NodeId
remoteNodeId
;
NodeId
localNodeId
;
bool
compression
;
bool
checksum
;
bool
signalId
;
int
byteOrder
;
Uint32
prioASignalSize
;
Uint32
prioBSignalSize
;
...
...
@@ -103,20 +97,20 @@ struct OSE_TransporterConfiguration {
* SCI Transporter Configuration
*/
struct
SCI_TransporterConfiguration
{
const
char
*
remoteHostName
;
const
char
*
localHostName
;
Uint32
port
;
Uint32
sendLimit
;
// Packet size
Uint32
bufferSize
;
// Buffer size
Uint32
nLocalAdapters
;
// 1 or 2, the number of adapters on local host
Uint32
nRemoteAdapters
;
Uint32
remoteSciNodeId0
;
// SCInodeId for adapter 1
Uint32
remoteSciNodeId1
;
// SCInodeId for adapter 2
NodeId
localNodeId
;
// Local node Id
NodeId
remoteNodeId
;
// Remote node Id
Uint32
byteOrder
;
bool
compression
;
bool
checksum
;
bool
signalId
;
...
...
ndb/src/common/mgmcommon/IPCConfig.cpp
View file @
d9051882
...
...
@@ -133,7 +133,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
Uint32
compression
;
Uint32
checksum
;
if
(
!
tmp
->
get
(
"SendSignalId"
,
&
sendSignalId
))
continue
;
if
(
!
tmp
->
get
(
"Compression"
,
&
compression
))
continue
;
if
(
!
tmp
->
get
(
"Checksum"
,
&
checksum
))
continue
;
const
char
*
type
;
...
...
@@ -143,8 +142,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
SHM_TransporterConfiguration
conf
;
conf
.
localNodeId
=
the_ownId
;
conf
.
remoteNodeId
=
(
nodeId1
!=
the_ownId
?
nodeId1
:
nodeId2
);
conf
.
byteOrder
=
0
;
conf
.
compression
=
compression
;
conf
.
checksum
=
checksum
;
conf
.
signalId
=
sendSignalId
;
...
...
@@ -164,8 +161,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
SCI_TransporterConfiguration
conf
;
conf
.
localNodeId
=
the_ownId
;
conf
.
remoteNodeId
=
(
nodeId1
!=
the_ownId
?
nodeId1
:
nodeId2
);
conf
.
byteOrder
=
0
;
conf
.
compression
=
compression
;
conf
.
checksum
=
checksum
;
conf
.
signalId
=
sendSignalId
;
...
...
@@ -174,18 +169,16 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
if
(
the_ownId
==
nodeId1
){
if
(
!
tmp
->
get
(
"Node1_NoOfAdapters"
,
&
conf
.
nLocalAdapters
))
continue
;
if
(
!
tmp
->
get
(
"Node2_NoOfAdapters"
,
&
conf
.
nRemoteAdapters
))
continue
;
if
(
!
tmp
->
get
(
"Node2_Adapter"
,
0
,
&
conf
.
remoteSciNodeId0
))
continue
;
if
(
conf
.
n
Remote
Adapters
>
1
){
if
(
conf
.
n
Local
Adapters
>
1
){
if
(
!
tmp
->
get
(
"Node2_Adapter"
,
1
,
&
conf
.
remoteSciNodeId1
))
continue
;
}
}
else
{
if
(
!
tmp
->
get
(
"Node2_NoOfAdapters"
,
&
conf
.
nLocalAdapters
))
continue
;
if
(
!
tmp
->
get
(
"Node1_NoOfAdapters"
,
&
conf
.
nRemoteAdapters
))
continue
;
if
(
!
tmp
->
get
(
"Node1_Adapter"
,
0
,
&
conf
.
remoteSciNodeId0
))
continue
;
if
(
conf
.
n
Remote
Adapters
>
1
){
if
(
conf
.
n
Local
Adapters
>
1
){
if
(
!
tmp
->
get
(
"Node1_Adapter"
,
1
,
&
conf
.
remoteSciNodeId1
))
continue
;
}
}
...
...
@@ -243,8 +236,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
conf
.
localHostName
=
ownHostName
;
conf
.
remoteNodeId
=
remoteNodeId
;
conf
.
localNodeId
=
ownNodeId
;
conf
.
byteOrder
=
0
;
conf
.
compression
=
compression
;
conf
.
checksum
=
checksum
;
conf
.
signalId
=
sendSignalId
;
...
...
@@ -270,8 +261,6 @@ IPCConfig::configureTransporters(TransporterRegistry * theTransporterRegistry){
conf
.
localHostName
=
ownHostName
;
conf
.
remoteNodeId
=
remoteNodeId
;
conf
.
localNodeId
=
ownNodeId
;
conf
.
byteOrder
=
0
;
conf
.
compression
=
compression
;
conf
.
checksum
=
checksum
;
conf
.
signalId
=
sendSignalId
;
...
...
@@ -344,6 +333,7 @@ Uint32
IPCConfig
::
configureTransporters
(
Uint32
nodeId
,
const
class
ndb_mgm_configuration
&
config
,
class
TransporterRegistry
&
tr
){
DBUG_ENTER
(
"IPCConfig::configureTransporters"
);
Uint32
noOfTransportersCreated
=
0
,
server_port
=
0
;
ndb_mgm_configuration_iterator
iter
(
config
,
CFG_SECTION_CONNECTION
);
...
...
@@ -374,14 +364,13 @@ IPCConfig::configureTransporters(Uint32 nodeId,
}
server_port
=
tmp_server_port
;
}
DBUG_PRINT
(
"info"
,
(
"Transporter between this node %d and node %d using port %d, signalId %d, checksum %d"
,
nodeId
,
remoteNodeId
,
tmp_server_port
,
sendSignalId
,
checksum
));
switch
(
type
){
case
CONNECTION_TYPE_SHM
:{
SHM_TransporterConfiguration
conf
;
conf
.
localNodeId
=
nodeId
;
conf
.
remoteNodeId
=
remoteNodeId
;
conf
.
byteOrder
=
0
;
conf
.
compression
=
0
;
conf
.
checksum
=
checksum
;
conf
.
signalId
=
sendSignalId
;
...
...
@@ -391,45 +380,60 @@ IPCConfig::configureTransporters(Uint32 nodeId,
conf
.
port
=
tmp_server_port
;
if
(
!
tr
.
createTransporter
(
&
conf
)){
DBUG_PRINT
(
"error"
,
(
"Failed to create SCI Transporter from %d to %d"
,
conf
.
localNodeId
,
conf
.
remoteNodeId
));
ndbout
<<
"Failed to create SHM Transporter from: "
<<
conf
.
localNodeId
<<
" to: "
<<
conf
.
remoteNodeId
<<
endl
;
}
else
{
noOfTransportersCreated
++
;
}
DBUG_PRINT
(
"info"
,
(
"Created SHM Transporter using shmkey %d, buf size = %d"
,
conf
.
shmKey
,
conf
.
shmSize
));
break
;
}
case
CONNECTION_TYPE_SCI
:{
SCI_TransporterConfiguration
conf
;
const
char
*
host1
,
*
host2
;
conf
.
localNodeId
=
nodeId
;
conf
.
remoteNodeId
=
remoteNodeId
;
conf
.
byteOrder
=
0
;
conf
.
compression
=
0
;
conf
.
checksum
=
checksum
;
conf
.
signalId
=
sendSignalId
;
conf
.
port
=
tmp_server_port
;
if
(
iter
.
get
(
CFG_SCI_HOSTNAME_1
,
&
host1
))
break
;
if
(
iter
.
get
(
CFG_SCI_HOSTNAME_2
,
&
host2
))
break
;
conf
.
localHostName
=
(
nodeId
==
nodeId1
?
host1
:
host2
);
conf
.
remoteHostName
=
(
nodeId
==
nodeId1
?
host2
:
host1
);
if
(
iter
.
get
(
CFG_SCI_SEND_LIMIT
,
&
conf
.
sendLimit
))
break
;
if
(
iter
.
get
(
CFG_SCI_BUFFER_MEM
,
&
conf
.
bufferSize
))
break
;
if
(
nodeId
==
nodeId1
){
if
(
iter
.
get
(
CFG_SCI_NODE1_ADAPTERS
,
&
conf
.
nLocalAdapters
))
break
;
if
(
iter
.
get
(
CFG_SCI_NODE2_ADAPTERS
,
&
conf
.
nRemoteAdapters
))
break
;
if
(
iter
.
get
(
CFG_SCI_NODE2_ADAPTER0
,
&
conf
.
remoteSciNodeId0
))
break
;
if
(
conf
.
nRemoteAdapters
>
1
){
if
(
iter
.
get
(
CFG_SCI_NODE2_ADAPTER1
,
&
conf
.
remoteSciNodeId1
))
break
;
}
if
(
nodeId
==
nodeId1
)
{
if
(
iter
.
get
(
CFG_SCI_HOST2_ID_0
,
&
conf
.
remoteSciNodeId0
))
break
;
if
(
iter
.
get
(
CFG_SCI_HOST2_ID_1
,
&
conf
.
remoteSciNodeId1
))
break
;
}
else
{
if
(
iter
.
get
(
CFG_SCI_NODE2_ADAPTERS
,
&
conf
.
nLocalAdapters
))
break
;
if
(
iter
.
get
(
CFG_SCI_NODE1_ADAPTERS
,
&
conf
.
nRemoteAdapters
))
break
;
if
(
iter
.
get
(
CFG_SCI_NODE1_ADAPTER0
,
&
conf
.
remoteSciNodeId0
))
break
;
if
(
conf
.
nRemoteAdapters
>
1
){
if
(
iter
.
get
(
CFG_SCI_NODE1_ADAPTER1
,
&
conf
.
remoteSciNodeId1
))
break
;
}
if
(
iter
.
get
(
CFG_SCI_HOST1_ID_0
,
&
conf
.
remoteSciNodeId0
))
break
;
if
(
iter
.
get
(
CFG_SCI_HOST1_ID_1
,
&
conf
.
remoteSciNodeId1
))
break
;
}
if
(
!
tr
.
createTransporter
(
&
conf
)){
if
(
conf
.
remoteSciNodeId1
==
0
)
{
conf
.
nLocalAdapters
=
1
;
}
else
{
conf
.
nLocalAdapters
=
2
;
}
if
(
!
tr
.
createTransporter
(
&
conf
)){
DBUG_PRINT
(
"error"
,
(
"Failed to create SCI Transporter from %d to %d"
,
conf
.
localNodeId
,
conf
.
remoteNodeId
));
ndbout
<<
"Failed to create SCI Transporter from: "
<<
conf
.
localNodeId
<<
" to: "
<<
conf
.
remoteNodeId
<<
endl
;
}
else
{
DBUG_PRINT
(
"info"
,
(
"Created SCI Transporter: Adapters = %d, remote SCI node id %d"
,
conf
.
nLocalAdapters
,
conf
.
remoteSciNodeId0
));
DBUG_PRINT
(
"info"
,
(
"Host 1 = %s, Host 2 = %s, sendLimit = %d, buf size = %d"
,
conf
.
localHostName
,
conf
.
remoteHostName
,
conf
.
sendLimit
,
conf
.
bufferSize
));
if
(
conf
.
nLocalAdapters
>
1
)
{
DBUG_PRINT
(
"info"
,
(
"Fault-tolerant with 2 Remote Adapters, second remote SCI node id = %d"
,
conf
.
remoteSciNodeId1
));
}
noOfTransportersCreated
++
;
continue
;
}
...
...
@@ -457,8 +461,6 @@ IPCConfig::configureTransporters(Uint32 nodeId,
conf
.
remoteNodeId
=
remoteNodeId
;
conf
.
localHostName
=
(
nodeId
==
nodeId1
?
host1
:
host2
);
conf
.
remoteHostName
=
(
nodeId
==
nodeId1
?
host2
:
host1
);
conf
.
byteOrder
=
0
;
conf
.
compression
=
0
;
conf
.
checksum
=
checksum
;
conf
.
signalId
=
sendSignalId
;
...
...
@@ -468,6 +470,9 @@ IPCConfig::configureTransporters(Uint32 nodeId,
}
else
{
noOfTransportersCreated
++
;
}
DBUG_PRINT
(
"info"
,
(
"Created TCP Transporter: sendBufferSize = %d, maxReceiveSize = %d"
,
conf
.
sendBufferSize
,
conf
.
maxReceiveSize
));
break
;
case
CONNECTION_TYPE_OSE
:{
OSE_TransporterConfiguration
conf
;
...
...
@@ -483,8 +488,6 @@ IPCConfig::configureTransporters(Uint32 nodeId,
conf
.
remoteNodeId
=
remoteNodeId
;
conf
.
localHostName
=
(
nodeId
==
nodeId1
?
host1
:
host2
);
conf
.
remoteHostName
=
(
nodeId
==
nodeId1
?
host2
:
host1
);
conf
.
byteOrder
=
0
;
conf
.
compression
=
0
;
conf
.
checksum
=
checksum
;
conf
.
signalId
=
sendSignalId
;
...
...
@@ -505,6 +508,6 @@ IPCConfig::configureTransporters(Uint32 nodeId,
tr
.
m_service_port
=
server_port
;
return
noOfTransportersCreated
;
DBUG_RETURN
(
noOfTransportersCreated
)
;
}
ndb/src/common/transporter/Makefile.am
View file @
d9051882
...
...
@@ -13,7 +13,7 @@ EXTRA_libtransporter_la_SOURCES = SHM_Transporter.cpp SHM_Transporter.unix.cpp S
libtransporter_la_LIBADD
=
@ndb_transporter_opt_objs@
libtransporter_la_DEPENDENCIES
=
@ndb_transporter_opt_objs@
INCLUDES_LOC
=
-I
$(top_srcdir)
/ndb/include/kernel
-I
$(top_srcdir)
/ndb/include/transporter
INCLUDES_LOC
=
-I
$(top_srcdir)
/ndb/include/kernel
-I
$(top_srcdir)
/ndb/include/transporter
@NDB_SCI_INCLUDES@
include
$(top_srcdir)/ndb/config/common.mk.am
include
$(top_srcdir)/ndb/config/type_util.mk.am
...
...
ndb/src/common/transporter/SCI_Transporter.cpp
View file @
d9051882
...
...
@@ -24,23 +24,30 @@
#include "TransporterInternalDefinitions.hpp"
#include <TransporterCallback.hpp>
#include <InputStream.hpp>
#include <OutputStream.hpp>
#define FLAGS 0
SCI_Transporter
::
SCI_Transporter
(
Uint32
packetSize
,
#define DEBUG_TRANSPORTER
SCI_Transporter
::
SCI_Transporter
(
TransporterRegistry
&
t_reg
,
const
char
*
lHostName
,
const
char
*
rHostName
,
int
r_port
,
Uint32
packetSize
,
Uint32
bufferSize
,
Uint32
nAdapters
,
Uint16
remoteSciNodeId0
,
Uint16
remoteSciNodeId1
,
NodeId
_localNodeId
,
NodeId
_remoteNodeId
,
int
byte_order
,
bool
compr
,
bool
chksm
,
bool
signalId
,
Uint32
reportFreq
)
:
Transporter
(
_localNodeId
,
_remoteNodeId
,
byte_order
,
compr
,
chksm
,
signalId
)
{
Transporter
(
t_reg
,
lHostName
,
rHostName
,
r_port
,
_localNodeId
,
_remoteNodeId
,
0
,
false
,
chksm
,
signalId
)
{
DBUG_ENTER
(
"SCI_Transporter::SCI_Transporter"
);
m_PacketSize
=
(
packetSize
+
3
)
/
4
;
m_BufferSize
=
bufferSize
;
m_sendBuffer
.
m_buffer
=
NULL
;
...
...
@@ -56,10 +63,6 @@ SCI_Transporter::SCI_Transporter(Uint32 packetSize,
m_initLocal
=
false
;
m_remoteNodes
=
new
Uint16
[
m_numberOfRemoteNodes
];
if
(
m_remoteNodes
==
NULL
)
{
//DO WHAT??
}
m_swapCounter
=
0
;
m_failCounter
=
0
;
m_remoteNodes
[
0
]
=
remoteSciNodeId0
;
...
...
@@ -94,20 +97,19 @@ SCI_Transporter::SCI_Transporter(Uint32 packetSize,
i4096
=
0
;
i4097
=
0
;
#endif
DBUG_VOID_RETURN
;
}
void
SCI_Transporter
::
disconnectImpl
()
{
DBUG_ENTER
(
"SCI_Transporter::disconnectImpl"
);
sci_error_t
err
;
if
(
m_mapped
){
setDisconnect
();
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"DisconnectImpl "
<<
getConnectionStatus
()
<<
endl
;
ndbout
<<
"remote node "
<<
remoteNodeId
<<
endl
;
#endif
DBUG_PRINT
(
"info"
,
(
"connect status = %d, remote node = %d"
,
(
int
)
getConnectionStatus
(),
remoteNodeId
));
disconnectRemote
();
disconnectLocal
();
}
...
...
@@ -124,65 +126,56 @@ void SCI_Transporter::disconnectImpl()
SCIClose
(
sciAdapters
[
i
].
scidesc
,
FLAGS
,
&
err
);
if
(
err
!=
SCI_ERR_OK
)
{
reportError
(
callbackObj
,
localNodeId
,
TE_SCI_UNABLE_TO_CLOSE_CHANNEL
);
#ifdef DEBUG_TRANSPORTER
fprintf
(
stderr
,
"
\n
Cannot close channel to the driver. Error code 0x%x"
,
err
);
#endif
}
report_error
(
TE_SCI_UNABLE_TO_CLOSE_CHANNEL
);
DBUG_PRINT
(
"error"
,
(
"Cannot close channel to the driver. Error code 0x%x"
,
err
));
}
}
}
m_sciinit
=
false
;
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"total: "
<<
i1024
+
i10242048
+
i2048
+
i2049
<<
endl
;
ndbout
<<
"total: "
<<
i1024
+
i10242048
+
i2048
+
i2049
<<
endl
;
ndbout
<<
"<1024: "
<<
i1024
<<
endl
;
ndbout
<<
"1024-2047: "
<<
i10242048
<<
endl
;
ndbout
<<
"==2048: "
<<
i2048
<<
endl
;
ndbout
<<
"2049-4096: "
<<
i20484096
<<
endl
;
ndbout
<<
"==4096: "
<<
i4096
<<
endl
;
ndbout
<<
">4096: "
<<
i4097
<<
endl
;
#endif
DBUG_VOID_RETURN
;
}
bool
SCI_Transporter
::
initTransporter
()
{
if
(
m_BufferSize
<
(
2
*
MAX_MESSAGE_SIZE
)){
m_BufferSize
=
2
*
MAX_MESSAGE_SIZE
;
DBUG_ENTER
(
"SCI_Transporter::initTransporter"
);
if
(
m_BufferSize
<
(
2
*
MAX_MESSAGE_SIZE
+
4096
)){
m_BufferSize
=
2
*
MAX_MESSAGE_SIZE
+
4096
;
}
// Allocate buffers for sending
Uint32
sz
=
0
;
if
(
m_BufferSize
<
(
m_PacketSize
*
4
)){
sz
=
m_BufferSize
+
MAX_MESSAGE_SIZE
;
}
else
{
/**
* 3 packages
*/
sz
=
(
m_PacketSize
*
4
)
*
3
+
MAX_MESSAGE_SIZE
;
}
// Allocate buffers for sending, send buffer size plus 2048 bytes for avoiding
// the need to send twice when a large message comes around. Send buffer size is
// measured in words.
Uint32
sz
=
4
*
m_PacketSize
+
MAX_MESSAGE_SIZE
;;
m_sendBuffer
.
m_
b
ufferSize
=
4
*
((
sz
+
3
)
/
4
);
m_sendBuffer
.
m_buffer
=
new
Uint32
[
m_sendBuffer
.
m_
b
ufferSize
/
4
];
m_sendBuffer
.
m_
sendB
ufferSize
=
4
*
((
sz
+
3
)
/
4
);
m_sendBuffer
.
m_buffer
=
new
Uint32
[
m_sendBuffer
.
m_
sendB
ufferSize
/
4
];
m_sendBuffer
.
m_dataSize
=
0
;
DBUG_PRINT
(
"info"
,
(
"Created SCI Send Buffer with buffer size %d and packet size %d"
,
m_sendBuffer
.
m_sendBufferSize
,
m_PacketSize
*
4
));
if
(
!
getLinkStatus
(
m_ActiveAdapterId
)
||
!
getLinkStatus
(
m_StandbyAdapterId
))
{
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"The link is not fully operational. "
<<
endl
;
ndbout
<<
"Check the cables and the switches"
<<
endl
;
#endif
(
m_adapters
>
1
&&
!
getLinkStatus
(
m_StandbyAdapterId
)))
{
DBUG_PRINT
(
"error"
,
(
"The link is not fully operational. Check the cables and the switches"
));
//reportDisconnect(remoteNodeId, 0);
//doDisconnect();
//NDB should terminate
report
Error
(
callbackObj
,
localNodeId
,
TE_SCI_LINK_ERROR
);
return
false
;
report
_error
(
TE_SCI_LINK_ERROR
);
DBUG_RETURN
(
false
)
;
}
return
true
;
DBUG_RETURN
(
true
)
;
}
// initTransporter()
...
...
@@ -218,10 +211,8 @@ bool SCI_Transporter::getLinkStatus(Uint32 adapterNo)
SCIQuery
(
SCI_Q_ADAPTER
,(
void
*
)(
&
queryAdapter
),(
Uint32
)
NULL
,
&
error
);
if
(
error
!=
SCI_ERR_OK
)
{
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"error querying adapter "
<<
endl
;
#endif
return
false
;
DBUG_PRINT
(
"error"
,
(
"error %d querying adapter"
,
error
));
return
false
;
}
if
(
linkstatus
<=
0
)
return
false
;
...
...
@@ -231,6 +222,7 @@ bool SCI_Transporter::getLinkStatus(Uint32 adapterNo)
sci_error_t
SCI_Transporter
::
initLocalSegment
()
{
DBUG_ENTER
(
"SCI_Transporter::initLocalSegment"
);
Uint32
segmentSize
=
m_BufferSize
;
Uint32
offset
=
0
;
sci_error_t
err
;
...
...
@@ -238,16 +230,12 @@ sci_error_t SCI_Transporter::initLocalSegment() {
for
(
Uint32
i
=
0
;
i
<
m_adapters
;
i
++
)
{
SCIOpen
(
&
(
sciAdapters
[
i
].
scidesc
),
FLAGS
,
&
err
);
sciAdapters
[
i
].
localSciNodeId
=
getLocalNodeId
(
i
);
#ifdef DEBUG_TRANSPORTER
ndbout_c
(
"SCInode iD %d adapter %d
\n
"
,
sciAdapters
[
i
].
localSciNodeId
,
i
);
#endif
DBUG_PRINT
(
"info"
,
(
"SCInode iD %d adapter %d
\n
"
,
sciAdapters
[
i
].
localSciNodeId
,
i
));
if
(
err
!=
SCI_ERR_OK
)
{
#ifdef DEBUG_TRANSPORTER
ndbout_c
(
"
\n
Cannot open an SCI virtual device. Error code 0x%x"
,
err
);
#endif
return
err
;
DBUG_PRINT
(
"error"
,
(
"Cannot open an SCI virtual device. Error code 0x%x"
,
err
));
DBUG_RETURN
(
err
);
}
}
}
...
...
@@ -264,12 +252,11 @@ sci_error_t SCI_Transporter::initLocalSegment() {
&
err
);
if
(
err
!=
SCI_ERR_OK
)
{
return
err
;
DBUG_PRINT
(
"error"
,
(
"Error creating segment, err = 0x%x"
,
err
));
DBUG_RETURN
(
err
);
}
else
{
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"created segment id : "
<<
hostSegmentId
(
localNodeId
,
remoteNodeId
)
<<
endl
;
#endif
DBUG_PRINT
(
"info"
,
(
"created segment id : %d"
,
hostSegmentId
(
localNodeId
,
remoteNodeId
)));
}
/** Prepare the segment*/
...
...
@@ -280,11 +267,9 @@ sci_error_t SCI_Transporter::initLocalSegment() {
&
err
);
if
(
err
!=
SCI_ERR_OK
)
{
#ifdef DEBUG_TRANSPORTER
ndbout_c
(
"Local Segment is not accessible by an SCI adapter."
);
ndbout_c
(
"Error code 0x%x
\n
"
,
err
);
#endif
return
err
;
DBUG_PRINT
(
"error"
,
(
"Local Segment is not accessible by an SCI adapter. Error code 0x%x
\n
"
,
err
));
DBUG_RETURN
(
err
);
}
}
...
...
@@ -301,14 +286,10 @@ sci_error_t SCI_Transporter::initLocalSegment() {
if
(
err
!=
SCI_ERR_OK
)
{
#ifdef DEBUG_TRANSPORTER
fprintf
(
stderr
,
"
\n
Cannot map area of size %d. Error code 0x%x"
,
segmentSize
,
err
);
ndbout
<<
"initLocalSegment does a disConnect"
<<
endl
;
#endif
DBUG_PRINT
(
"error"
,
(
"Cannot map area of size %d. Error code 0x%x"
,
segmentSize
,
err
));
doDisconnect
();
return
err
;
DBUG_RETURN
(
err
)
;
}
...
...
@@ -320,18 +301,16 @@ sci_error_t SCI_Transporter::initLocalSegment() {
&
err
);
if
(
err
!=
SCI_ERR_OK
)
{
#ifdef DEBUG_TRANSPORTER
ndbout_c
(
"
\n
Local Segment is not available for remote connections."
);
ndbout_c
(
"Error code 0x%x
\n
"
,
err
);
#endif
return
err
;
DBUG_PRINT
(
"error"
,
(
"Local Segment is not available for remote connections. Error code 0x%x
\n
"
,
err
));
DBUG_RETURN
(
err
);
}
}
setupLocalSegment
();
return
err
;
DBUG_RETURN
(
err
)
;
}
// initLocalSegment()
...
...
@@ -345,7 +324,7 @@ bool SCI_Transporter::doSend() {
Uint32
retry
=
0
;
const
char
*
const
sendPtr
=
(
char
*
)
m_sendBuffer
.
m_buffer
;
const
Uint32
sizeToSend
=
m_sendBuffer
.
m_dataSize
;
const
Uint32
sizeToSend
=
4
*
m_sendBuffer
.
m_dataSize
;
//Convert to number of bytes
if
(
sizeToSend
>
0
){
#ifdef DEBUG_TRANSPORTER
...
...
@@ -363,15 +342,19 @@ bool SCI_Transporter::doSend() {
i4097
++
;
#endif
if
(
startSequence
(
m_ActiveAdapterId
)
!=
SCI_ERR_OK
)
{
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"Start sequence failed"
<<
endl
;
#endif
reportError
(
callbackObj
,
remoteNodeId
,
TE_SCI_UNABLE_TO_START_SEQUENCE
);
DBUG_PRINT
(
"error"
,
(
"Start sequence failed"
));
report_error
(
TE_SCI_UNABLE_TO_START_SEQUENCE
);
return
false
;
}
tryagain:
tryagain:
retry
++
;
if
(
retry
>
3
)
{
DBUG_PRINT
(
"error"
,
(
"SCI Transfer failed"
));
report_error
(
TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR
);
return
false
;
}
Uint32
*
insertPtr
=
(
Uint32
*
)
(
m_TargetSegm
[
m_ActiveAdapterId
].
writer
)
->
getWritePtr
(
sizeToSend
);
...
...
@@ -390,44 +373,37 @@ bool SCI_Transporter::doSend() {
&
err
);
if
(
err
!=
SCI_ERR_OK
)
{
if
(
err
==
SCI_ERR_OUT_OF_RANGE
)
{
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"Data transfer : out of range error
\n
"
<<
endl
;
#endif
DBUG_PRINT
(
"error"
,
(
"Data transfer : out of range error"
));
goto
tryagain
;
}
if
(
err
==
SCI_ERR_SIZE_ALIGNMENT
)
{
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"Data transfer : aligne
\n
"
<<
endl
;
#endif
DBUG_PRINT
(
"error"
,
(
"Data transfer : alignment error"
));
DBUG_PRINT
(
"info"
,
(
"sendPtr 0x%x, sizeToSend = %d"
,
sendPtr
,
sizeToSend
));
goto
tryagain
;
}
if
(
err
==
SCI_ERR_OFFSET_ALIGNMENT
)
{
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"Data transfer : offset alignment
\n
"
<<
endl
;
#endif
DBUG_PRINT
(
"error"
,
(
"Data transfer : offset alignment"
));
goto
tryagain
;
}
}
if
(
err
==
SCI_ERR_TRANSFER_FAILED
)
{
//(m_TargetSegm[m_StandbyAdapterId].writer)->heavyLock();
if
(
getLinkStatus
(
m_ActiveAdapterId
))
{
retry
++
;
if
(
retry
>
3
)
{
reportError
(
callbackObj
,
remoteNodeId
,
TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR
);
return
false
;
}
goto
tryagain
;
}
if
(
m_adapters
==
1
)
{
DBUG_PRINT
(
"error"
,
(
"SCI Transfer failed"
));
report_error
(
TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR
);
return
false
;
}
m_failCounter
++
;
Uint32
temp
=
m_ActiveAdapterId
;
switch
(
m_swapCounter
)
{
case
0
:
/**swap from active (0) to standby (1)*/
if
(
getLinkStatus
(
m_StandbyAdapterId
))
{
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"Swapping from 0 to 1 "
<<
endl
;
#endif
DBUG_PRINT
(
"error"
,
(
"Swapping from adapter 0 to 1"
));
failoverShmWriter
();
SCIStoreBarrier
(
m_TargetSegm
[
m_StandbyAdapterId
].
sequence
,
0
);
m_ActiveAdapterId
=
m_StandbyAdapterId
;
...
...
@@ -436,26 +412,21 @@ bool SCI_Transporter::doSend() {
FLAGS
,
&
err
);
if
(
err
!=
SCI_ERR_OK
)
{
report
Error
(
callbackObj
,
remoteNodeId
,
TE_SCI_UNABLE_TO_REMOVE_SEQUENCE
);
report
_error
(
TE_SCI_UNABLE_TO_REMOVE_SEQUENCE
);
DBUG_PRINT
(
"error"
,
(
"Unable to remove sequence"
));
return
false
;
}
if
(
startSequence
(
m_ActiveAdapterId
)
!=
SCI_ERR_OK
)
{
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"Start sequence failed"
<<
endl
;
#endif
reportError
(
callbackObj
,
remoteNodeId
,
TE_SCI_UNABLE_TO_START_SEQUENCE
);
DBUG_PRINT
(
"error"
,
(
"Start sequence failed"
));
report_error
(
TE_SCI_UNABLE_TO_START_SEQUENCE
);
return
false
;
}
m_swapCounter
++
;
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"failover complete.."
<<
endl
;
#endif
DBUG_PRINT
(
"info"
,
(
"failover complete"
));
goto
tryagain
;
}
else
{
report
Error
(
callbackObj
,
remoteNodeId
,
TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR
);
report
_error
(
TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR
);
DBUG_PRINT
(
"error"
,
(
"SCI Transfer failed"
));
return
false
;
}
return
false
;
...
...
@@ -468,20 +439,15 @@ bool SCI_Transporter::doSend() {
failoverShmWriter
();
m_ActiveAdapterId
=
m_StandbyAdapterId
;
m_StandbyAdapterId
=
temp
;
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"Swapping from 1 to 0 "
<<
endl
;
#endif
DBUG_PRINT
(
"info"
,
(
"Swapping from 1 to 0"
));
if
(
createSequence
(
m_ActiveAdapterId
)
!=
SCI_ERR_OK
)
{
reportError
(
callbackObj
,
remoteNodeId
,
TE_SCI_UNABLE_TO_CREATE_SEQUENCE
);
DBUG_PRINT
(
"error"
,
(
"Unable to create sequence"
));
report_error
(
TE_SCI_UNABLE_TO_CREATE_SEQUENCE
);
return
false
;
}
if
(
startSequence
(
m_ActiveAdapterId
)
!=
SCI_ERR_OK
)
{
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"startSequence failed... disconnecting"
<<
endl
;
#endif
reportError
(
callbackObj
,
remoteNodeId
,
TE_SCI_UNABLE_TO_START_SEQUENCE
);
DBUG_PRINT
(
"error"
,
(
"startSequence failed... disconnecting"
));
report_error
(
TE_SCI_UNABLE_TO_START_SEQUENCE
);
return
false
;
}
...
...
@@ -489,37 +455,36 @@ bool SCI_Transporter::doSend() {
,
FLAGS
,
&
err
);
if
(
err
!=
SCI_ERR_OK
)
{
reportError
(
callbackObj
,
remoteNodeId
,
TE_SCI_UNABLE_TO_REMOVE_SEQUENCE
);
DBUG_PRINT
(
"error"
,
(
"Unable to remove sequence"
));
report_error
(
TE_SCI_UNABLE_TO_REMOVE_SEQUENCE
);
return
false
;
}
if
(
createSequence
(
m_StandbyAdapterId
)
!=
SCI_ERR_OK
)
{
reportError
(
callbackObj
,
remoteNodeId
,
TE_SCI_UNABLE_TO_CREATE_SEQUENCE
);
DBUG_PRINT
(
"error"
,
(
"Unable to create sequence on standby"
));
report_error
(
TE_SCI_UNABLE_TO_CREATE_SEQUENCE
);
return
false
;
}
m_swapCounter
=
0
;
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"failover complete.."
<<
endl
;
#endif
DBUG_PRINT
(
"info"
,
(
"failover complete.."
));
goto
tryagain
;
}
else
{
reportError
(
callbackObj
,
remoteNodeId
,
TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR
);
DBUG_PRINT
(
"error"
,
(
"Unrecoverable data transfer error"
));
report_error
(
TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR
);
return
false
;
}
break
;
default:
reportError
(
callbackObj
,
remoteNodeId
,
TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR
);
DBUG_PRINT
(
"error"
,
(
"Unrecoverable data transfer error"
));
report_error
(
TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR
);
return
false
;
break
;
}
}
}
else
{
SHM_Writer
*
writer
=
(
m_TargetSegm
[
m_ActiveAdapterId
].
writer
);
writer
->
updateWritePtr
(
sizeToSend
);
...
...
@@ -535,13 +500,10 @@ bool SCI_Transporter::doSend() {
/**
* If we end up here, the SCI segment is full.
*/
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"the segment is full for some reason"
<<
endl
;
#endif
DBUG_PRINT
(
"error"
,
(
"the segment is full for some reason"
));
return
false
;
}
//if
}
return
true
;
}
// doSend()
...
...
@@ -557,11 +519,8 @@ void SCI_Transporter::failoverShmWriter() {
void
SCI_Transporter
::
setupLocalSegment
()
{
DBUG_ENTER
(
"SCI_Transporter::setupLocalSegment"
);
Uint32
sharedSize
=
0
;
sharedSize
+=
16
;
//SHM_Reader::getSharedSize();
sharedSize
+=
16
;
//SHM_Writer::getSharedSize();
sharedSize
+=
32
;
//SHM_Writer::getSharedSize();
sharedSize
=
4096
;
//start of the buffer is page aligend
Uint32
sizeOfBuffer
=
m_BufferSize
;
...
...
@@ -570,207 +529,265 @@ void SCI_Transporter::setupLocalSegment()
Uint32
*
localReadIndex
=
(
Uint32
*
)
m_SourceSegm
[
m_ActiveAdapterId
].
mappedMemory
;
Uint32
*
localWriteIndex
=
(
Uint32
*
)(
localReadIndex
+
1
);
Uint32
*
localEndOfDataIndex
=
(
Uint32
*
)
(
localReadIndex
+
2
);
Uint32
*
localWriteIndex
=
(
Uint32
*
)(
localReadIndex
+
1
);
Uint32
*
localEndWriteIndex
=
(
Uint32
*
)(
localReadIndex
+
2
);
m_localStatusFlag
=
(
Uint32
*
)(
localReadIndex
+
3
);
Uint32
*
sharedLockIndex
=
(
Uint32
*
)
(
localReadIndex
+
4
);
Uint32
*
sharedHeavyLock
=
(
Uint32
*
)
(
localReadIndex
+
5
);
char
*
localStartOfBuf
=
(
char
*
)
((
char
*
)
m_SourceSegm
[
m_ActiveAdapterId
].
mappedMemory
+
sharedSize
);
*
local
ReadIndex
=
*
local
WriteIndex
=
0
;
*
localEnd
OfDataIndex
=
sizeOfBuffer
-
1
;
*
localReadIndex
=
0
;
*
localWriteIndex
=
0
;
*
localEnd
WriteIndex
=
0
;
const
Uint32
slack
=
MAX_MESSAGE_SIZE
;
reader
=
new
SHM_Reader
(
localStartOfBuf
,
sizeOfBuffer
,
slack
,
localReadIndex
,
localEndWriteIndex
,
localWriteIndex
);
*
localReadIndex
=
0
;
*
localWriteIndex
=
0
;
reader
->
clear
();
DBUG_VOID_RETURN
;
}
//setupLocalSegment
void
SCI_Transporter
::
setupRemoteSegment
()
{
DBUG_ENTER
(
"SCI_Transporter::setupRemoteSegment"
);
Uint32
sharedSize
=
0
;
sharedSize
+=
16
;
//SHM_Reader::getSharedSize();
sharedSize
+=
16
;
//SHM_Writer::getSharedSize();
sharedSize
+=
32
;
sharedSize
=
4096
;
//start of the buffer is page aligend
sharedSize
=
4096
;
//start of the buffer is page aligned
Uint32
sizeOfBuffer
=
m_BufferSize
;
const
Uint32
slack
=
MAX_MESSAGE_SIZE
;
sizeOfBuffer
-=
sharedSize
;
Uint32
*
segPtr
=
(
Uint32
*
)
m_TargetSegm
[
m_StandbyAdapterId
].
mappedMemory
;
Uint32
*
remoteReadIndex2
=
(
Uint32
*
)
segPtr
;
Uint32
*
remoteWriteIndex2
=
(
Uint32
*
)
(
segPtr
+
1
);
Uint32
*
remoteEndOfDataIndex2
=
(
Uint32
*
)
(
segPtr
+
2
);
Uint32
*
sharedLockIndex2
=
(
Uint32
*
)
(
segPtr
+
3
);
m_remoteStatusFlag2
=
(
Uint32
*
)(
segPtr
+
4
);
Uint32
*
sharedHeavyLock2
=
(
Uint32
*
)
(
segPtr
+
5
);
char
*
remoteStartOfBuf2
=
(
char
*
)((
char
*
)
segPtr
+
sharedSize
);
segPtr
=
(
Uint32
*
)
m_TargetSegm
[
m_ActiveAdapterId
].
mappedMemory
;
Uint32
*
segPtr
=
(
Uint32
*
)
m_TargetSegm
[
m_ActiveAdapterId
].
mappedMemory
;
Uint32
*
remoteReadIndex
=
(
Uint32
*
)
segPtr
;
Uint32
*
remoteWriteIndex
=
(
Uint32
*
)
(
segPtr
+
1
);
Uint32
*
remoteEndOfDataIndex
=
(
Uint32
*
)
(
segPtr
+
2
);
Uint32
*
sharedLockIndex
=
(
Uint32
*
)
(
segPtr
+
3
);
m_remoteStatusFlag
=
(
Uint32
*
)(
segPtr
+
4
);
Uint32
*
sharedHeavyLock
=
(
Uint32
*
)
(
segPtr
+
5
);
Uint32
*
remoteWriteIndex
=
(
Uint32
*
)(
segPtr
+
1
);
Uint32
*
remoteEndWriteIndex
=
(
Uint32
*
)
(
segPtr
+
2
);
m_remoteStatusFlag
=
(
Uint32
*
)(
segPtr
+
3
);
char
*
remoteStartOfBuf
=
(
char
*
)((
char
*
)
segPtr
+
(
sharedSize
));
*
remoteReadIndex
=
*
remoteWriteIndex
=
0
;
*
remoteReadIndex2
=
*
remoteWriteIndex2
=
0
;
*
remoteEndOfDataIndex
=
sizeOfBuffer
-
1
;
*
remoteEndOfDataIndex2
=
sizeOfBuffer
-
1
;
/**
* setup two writers. writer2 is used to mirror the changes of
* writer on the standby
* segment, so that in the case of a failover, we can switch
* to the stdby seg. quickly.*
*/
const
Uint32
slack
=
MAX_MESSAGE_SIZE
;
writer
=
new
SHM_Writer
(
remoteStartOfBuf
,
sizeOfBuffer
,
slack
,
remoteReadIndex
,
remoteEndWriteIndex
,
remoteWriteIndex
);
writer2
=
new
SHM_Writer
(
remoteStartOfBuf2
,
sizeOfBuffer
,
slack
,
remoteReadIndex2
,
remoteWriteIndex2
);
*
remoteReadIndex
=
0
;
*
remoteWriteIndex
=
0
;
writer
->
clear
();
writer2
->
clear
();
m_TargetSegm
[
0
].
writer
=
writer
;
m_TargetSegm
[
1
].
writer
=
writer2
;
m_sendBuffer
.
m_forceSendLimit
=
writer
->
getBufferSize
();
if
(
createSequence
(
m_ActiveAdapterId
)
!=
SCI_ERR_OK
)
{
reportThreadError
(
remoteNodeId
,
TE_SCI_UNABLE_TO_CREATE_SEQUENCE
);
report_error
(
TE_SCI_UNABLE_TO_CREATE_SEQUENCE
);
DBUG_PRINT
(
"error"
,
(
"Unable to create sequence on active"
));
doDisconnect
();
}
if
(
createSequence
(
m_StandbyAdapterId
)
!=
SCI_ERR_OK
)
{
reportThreadError
(
remoteNodeId
,
TE_SCI_UNABLE_TO_CREATE_SEQUENCE
);
doDisconnect
();
}
if
(
m_adapters
>
1
)
{
segPtr
=
(
Uint32
*
)
m_TargetSegm
[
m_StandbyAdapterId
].
mappedMemory
;
Uint32
*
remoteReadIndex2
=
(
Uint32
*
)
segPtr
;
Uint32
*
remoteWriteIndex2
=
(
Uint32
*
)
(
segPtr
+
1
);
Uint32
*
remoteEndWriteIndex2
=
(
Uint32
*
)
(
segPtr
+
2
);
m_remoteStatusFlag2
=
(
Uint32
*
)(
segPtr
+
3
);
char
*
remoteStartOfBuf2
=
(
char
*
)((
char
*
)
segPtr
+
sharedSize
);
/**
* setup a writer. writer2 is used to mirror the changes of
* writer on the standby
* segment, so that in the case of a failover, we can switch
* to the stdby seg. quickly.*
*/
writer2
=
new
SHM_Writer
(
remoteStartOfBuf2
,
sizeOfBuffer
,
slack
,
remoteReadIndex2
,
remoteEndWriteIndex2
,
remoteWriteIndex2
);
*
remoteReadIndex
=
0
;
*
remoteWriteIndex
=
0
;
*
remoteEndWriteIndex
=
0
;
writer2
->
clear
();
m_TargetSegm
[
1
].
writer
=
writer2
;
if
(
createSequence
(
m_StandbyAdapterId
)
!=
SCI_ERR_OK
)
{
report_error
(
TE_SCI_UNABLE_TO_CREATE_SEQUENCE
);
DBUG_PRINT
(
"error"
,
(
"Unable to create sequence on standby"
));
doDisconnect
();
}
}
DBUG_VOID_RETURN
;
}
//setupRemoteSegment
bool
SCI_Transporter
::
connectImpl
(
Uint32
timeout
)
{
sci_error_t
err
;
Uint32
offset
=
0
;
bool
SCI_Transporter
::
init_local
()
{
DBUG_ENTER
(
"SCI_Transporter::init_local"
);
if
(
!
m_initLocal
)
{
if
(
initLocalSegment
()
!=
SCI_ERR_OK
){
NdbSleep_MilliSleep
(
timeout
);
NdbSleep_MilliSleep
(
10
);
//NDB SHOULD TERMINATE AND COMPUTER REBOOTED!
report
ThreadError
(
localNodeId
,
TE_SCI_CANNOT_INIT_LOCALSEGMENT
);
return
false
;
report
_error
(
TE_SCI_CANNOT_INIT_LOCALSEGMENT
);
DBUG_RETURN
(
false
);
}
m_initLocal
=
true
;
m_initLocal
=
true
;
}
if
(
!
m_mapped
)
{
for
(
Uint32
i
=
0
;
i
<
m_adapters
;
i
++
)
{
m_TargetSegm
[
i
].
rhm
[
i
].
remoteHandle
=
0
;
SCIConnectSegment
(
sciAdapters
[
i
].
scidesc
,
&
(
m_TargetSegm
[
i
].
rhm
[
i
].
remoteHandle
),
m_remoteNodes
[
i
],
remoteSegmentId
(
localNodeId
,
remoteNodeId
),
i
,
0
,
0
,
0
,
0
,
&
err
);
if
(
err
!=
SCI_ERR_OK
)
{
NdbSleep_MilliSleep
(
timeout
);
return
false
;
}
}
DBUG_RETURN
(
true
);
}
bool
SCI_Transporter
::
init_remote
()
{
DBUG_ENTER
(
"SCI_Transporter::init_remote"
);
sci_error_t
err
;
Uint32
offset
=
0
;
if
(
!
m_mapped
)
{
DBUG_PRINT
(
"info"
,
(
"Map remote segments"
));
for
(
Uint32
i
=
0
;
i
<
m_adapters
;
i
++
)
{
m_TargetSegm
[
i
].
rhm
[
i
].
remoteHandle
=
0
;
SCIConnectSegment
(
sciAdapters
[
i
].
scidesc
,
&
(
m_TargetSegm
[
i
].
rhm
[
i
].
remoteHandle
),
m_remoteNodes
[
i
],
remoteSegmentId
(
localNodeId
,
remoteNodeId
),
i
,
0
,
0
,
0
,
0
,
&
err
);
if
(
err
!=
SCI_ERR_OK
)
{
NdbSleep_MilliSleep
(
10
);
DBUG_PRINT
(
"error"
,
(
"Error connecting segment, err 0x%x"
,
err
));
DBUG_RETURN
(
false
);
}
}
// Map the remote memory segment into program space
for
(
Uint32
i
=
0
;
i
<
m_adapters
;
i
++
)
{
m_TargetSegm
[
i
].
mappedMemory
=
SCIMapRemoteSegment
((
m_TargetSegm
[
i
].
rhm
[
i
].
remoteHandle
),
&
(
m_TargetSegm
[
i
].
rhm
[
i
].
map
),
offset
,
m_BufferSize
,
NULL
,
FLAGS
,
&
err
);
if
(
err
!=
SCI_ERR_OK
)
{
#ifdef DEBUG_TRANSPORTER
ndbout_c
(
"
\n
Cannot map a segment to the remote node %d."
);
ndbout_c
(
"Error code 0x%x"
,
m_RemoteSciNodeId
,
err
);
#endif
//NDB SHOULD TERMINATE AND COMPUTER REBOOTED!
reportThreadError
(
remoteNodeId
,
TE_SCI_CANNOT_MAP_REMOTESEGMENT
);
return
false
;
}
}
m_mapped
=
true
;
setupRemoteSegment
();
setConnected
();
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"connected and mapped to segment : "
<<
endl
;
ndbout
<<
"remoteNode: "
<<
m_remoteNodes
[
0
]
<<
endl
;
ndbout
<<
"remoteNode: "
<<
m_remotenodes
[
1
]
<<
endl
;
ndbout
<<
"remoteSegId: "
<<
remoteSegmentId
(
localNodeId
,
remoteNodeId
)
<<
endl
;
#endif
return
true
;
}
else
{
return
getConnectionStatus
();
}
}
// connectImpl()
for
(
Uint32
i
=
0
;
i
<
m_adapters
;
i
++
)
{
m_TargetSegm
[
i
].
mappedMemory
=
SCIMapRemoteSegment
((
m_TargetSegm
[
i
].
rhm
[
i
].
remoteHandle
),
&
(
m_TargetSegm
[
i
].
rhm
[
i
].
map
),
offset
,
m_BufferSize
,
NULL
,
FLAGS
,
&
err
);
if
(
err
!=
SCI_ERR_OK
)
{
DBUG_PRINT
(
"error"
,
(
"Cannot map a segment to the remote node %d. Error code 0x%x"
,
m_RemoteSciNodeId
,
err
));
//NDB SHOULD TERMINATE AND COMPUTER REBOOTED!
report_error
(
TE_SCI_CANNOT_MAP_REMOTESEGMENT
);
DBUG_RETURN
(
false
);
}
}
m_mapped
=
true
;
setupRemoteSegment
();
setConnected
();
DBUG_PRINT
(
"info"
,
(
"connected and mapped to segment, remoteNode: %d"
,
remoteNodeId
));
DBUG_PRINT
(
"info"
,
(
"remoteSegId: %d"
,
remoteSegmentId
(
localNodeId
,
remoteNodeId
)));
DBUG_RETURN
(
true
);
}
else
{
DBUG_RETURN
(
getConnectionStatus
());
}
}
bool
SCI_Transporter
::
connect_client_impl
(
NDB_SOCKET_TYPE
sockfd
)
{
SocketInputStream
s_input
(
sockfd
);
SocketOutputStream
s_output
(
sockfd
);
char
buf
[
256
];
DBUG_ENTER
(
"SCI_Transporter::connect_client_impl"
);
// Wait for server to create and attach
if
(
s_input
.
gets
(
buf
,
256
)
==
0
)
{
DBUG_PRINT
(
"error"
,
(
"No initial response from server in SCI"
));
NDB_CLOSE_SOCKET
(
sockfd
);
DBUG_RETURN
(
false
);
}
if
(
!
init_local
())
{
NDB_CLOSE_SOCKET
(
sockfd
);
DBUG_RETURN
(
false
);
}
// Send ok to server
s_output
.
println
(
"sci client 1 ok"
);
if
(
!
init_remote
())
{
NDB_CLOSE_SOCKET
(
sockfd
);
DBUG_RETURN
(
false
);
}
// Wait for ok from server
if
(
s_input
.
gets
(
buf
,
256
)
==
0
)
{
DBUG_PRINT
(
"error"
,
(
"No second response from server in SCI"
));
NDB_CLOSE_SOCKET
(
sockfd
);
DBUG_RETURN
(
false
);
}
// Send ok to server
s_output
.
println
(
"sci client 2 ok"
);
NDB_CLOSE_SOCKET
(
sockfd
);
DBUG_PRINT
(
"info"
,
(
"Successfully connected client to node %d"
,
remoteNodeId
));
DBUG_RETURN
(
true
);
}
bool
SCI_Transporter
::
connect_server_impl
(
NDB_SOCKET_TYPE
sockfd
)
{
SocketOutputStream
s_output
(
sockfd
);
SocketInputStream
s_input
(
sockfd
);
char
buf
[
256
];
DBUG_ENTER
(
"SCI_Transporter::connect_server_impl"
);
if
(
!
init_local
())
{
NDB_CLOSE_SOCKET
(
sockfd
);
DBUG_RETURN
(
false
);
}
// Send ok to client
s_output
.
println
(
"sci server 1 ok"
);
// Wait for ok from client
if
(
s_input
.
gets
(
buf
,
256
)
==
0
)
{
DBUG_PRINT
(
"error"
,
(
"No response from client in SCI"
));
NDB_CLOSE_SOCKET
(
sockfd
);
DBUG_RETURN
(
false
);
}
if
(
!
init_remote
())
{
NDB_CLOSE_SOCKET
(
sockfd
);
DBUG_RETURN
(
false
);
}
// Send ok to client
s_output
.
println
(
"sci server 2 ok"
);
// Wait for ok from client
if
(
s_input
.
gets
(
buf
,
256
)
==
0
)
{
DBUG_PRINT
(
"error"
,
(
"No second response from client in SCI"
));
NDB_CLOSE_SOCKET
(
sockfd
);
DBUG_RETURN
(
false
);
}
NDB_CLOSE_SOCKET
(
sockfd
);
DBUG_PRINT
(
"info"
,
(
"Successfully connected server to node %d"
,
remoteNodeId
));
DBUG_RETURN
(
true
);
}
sci_error_t
SCI_Transporter
::
createSequence
(
Uint32
adapterid
)
{
sci_error_t
err
;
SCICreateMapSequence
((
m_TargetSegm
[
adapterid
].
rhm
[
adapterid
].
map
),
...
...
@@ -795,13 +812,14 @@ sci_error_t SCI_Transporter::startSequence(Uint32 adapterid) {
// If there still is an error then data cannot be safely send
return
err
;
return
err
;
}
// startSequence()
bool
SCI_Transporter
::
disconnectLocal
()
{
{
DBUG_ENTER
(
"SCI_Transporter::disconnectLocal"
);
sci_error_t
err
;
m_ActiveAdapterId
=
0
;
...
...
@@ -809,31 +827,28 @@ bool SCI_Transporter::disconnectLocal()
*/
SCIUnmapSegment
(
m_SourceSegm
[
0
].
lhm
[
0
].
map
,
0
,
&
err
);
if
(
err
!=
SCI_ERR_OK
)
{
reportError
(
callbackObj
,
remoteNodeId
,
TE_SCI_UNABLE_TO_UNMAP_SEGMENT
);
return
false
;
}
if
(
err
!=
SCI_ERR_OK
)
{
report_error
(
TE_SCI_UNABLE_TO_UNMAP_SEGMENT
);
DBUG_PRINT
(
"error"
,
(
"Unable to unmap segment"
));
DBUG_RETURN
(
false
)
;
}
SCIRemoveSegment
((
m_SourceSegm
[
m_ActiveAdapterId
].
localHandle
),
FLAGS
,
&
err
);
if
(
err
!=
SCI_ERR_OK
)
{
reportError
(
callbackObj
,
remoteNodeId
,
TE_SCI_UNABLE_TO_REMOVE_SEGMENT
);
return
false
;
report_error
(
TE_SCI_UNABLE_TO_REMOVE_SEGMENT
);
DBUG_PRINT
(
"error"
,
(
"Unable to remove segment"
));
DBUG_RETURN
(
false
);
}
if
(
err
==
SCI_ERR_OK
)
{
#ifdef DEBUG_TRANSPORTER
printf
(
"Local memory segment is unmapped and removed
\n
"
);
#endif
}
return
true
;
DBUG_PRINT
(
"info"
,
(
"Local memory segment is unmapped and removed"
));
DBUG_RETURN
(
true
);
}
// disconnectLocal()
bool
SCI_Transporter
::
disconnectRemote
()
{
DBUG_ENTER
(
"SCI_Transporter::disconnectRemote"
);
sci_error_t
err
;
for
(
Uint32
i
=
0
;
i
<
m_adapters
;
i
++
)
{
/**
...
...
@@ -841,35 +856,32 @@ bool SCI_Transporter::disconnectRemote() {
*/
SCIUnmapSegment
(
m_TargetSegm
[
i
].
rhm
[
i
].
map
,
0
,
&
err
);
if
(
err
!=
SCI_ERR_OK
)
{
reportError
(
callbackObj
,
remoteNodeId
,
TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT
);
return
false
;
}
report_error
(
TE_SCI_UNABLE_TO_UNMAP_SEGMENT
);
DBUG_PRINT
(
"error"
,
(
"Unable to unmap segment"
));
DBUG_RETURN
(
false
)
;
}
SCIDisconnectSegment
(
m_TargetSegm
[
i
].
rhm
[
i
].
remoteHandle
,
FLAGS
,
&
err
);
if
(
err
!=
SCI_ERR_OK
)
{
report
Error
(
callbackObj
,
remoteNodeId
,
TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT
);
return
false
;
report
_error
(
TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT
);
DBUG_PRINT
(
"error"
,
(
"Unable to disconnect segment"
));
DBUG_RETURN
(
false
)
;
}
#ifdef DEBUG_TRANSPORTER
ndbout_c
(
"Remote memory segment is unmapped and disconnected
\n
"
);
#endif
DBUG_PRINT
(
"info"
,
(
"Remote memory segment is unmapped and disconnected"
));
}
return
true
;
DBUG_RETURN
(
true
)
;
}
// disconnectRemote()
SCI_Transporter
::~
SCI_Transporter
()
{
DBUG_ENTER
(
"SCI_Transporter::~SCI_Transporter"
);
// Close channel to the driver
#ifdef DEBUG_TRANSPORTER
ndbout
<<
"~SCITransporter does a disConnect"
<<
endl
;
#endif
doDisconnect
();
if
(
m_sendBuffer
.
m_buffer
!=
NULL
)
delete
[]
m_sendBuffer
.
m_buffer
;
DBUG_VOID_RETURN
;
}
// ~SCI_Transporter()
...
...
@@ -878,7 +890,7 @@ SCI_Transporter::~SCI_Transporter() {
void
SCI_Transporter
::
closeSCI
()
{
// Termination of SCI
sci_error_t
err
;
printf
(
"
\n
Closing SCI Transporter...
\n
"
);
DBUG_ENTER
(
"SCI_Transporter::closeSCI"
);
// Disconnect and remove remote segment
disconnectRemote
();
...
...
@@ -890,26 +902,42 @@ void SCI_Transporter::closeSCI() {
// Closes an SCI virtual device
SCIClose
(
activeSCIDescriptor
,
FLAGS
,
&
err
);
if
(
err
!=
SCI_ERR_OK
)
fprintf
(
stderr
,
"
\n
Cannot close SCI channel to the driver. Error code 0x%x"
,
err
);
if
(
err
!=
SCI_ERR_OK
)
{
DBUG_PRINT
(
"error"
,
(
"Cannot close SCI channel to the driver. Error code 0x%x"
,
err
));
}
SCITerminate
();
DBUG_VOID_RETURN
;
}
// closeSCI()
Uint32
*
SCI_Transporter
::
getWritePtr
(
Uint32
lenBytes
,
Uint32
prio
){
SCI_Transporter
::
getWritePtr
(
Uint32
lenBytes
,
Uint32
prio
)
{
if
(
m_sendBuffer
.
full
()){
/**-------------------------------------------------
* Buffer was completely full. We have severe problems.
* -------------------------------------------------
*/
if
(
!
doSend
()){
Uint32
sci_buffer_remaining
=
m_sendBuffer
.
m_forceSendLimit
;
Uint32
send_buf_size
=
m_sendBuffer
.
m_sendBufferSize
;
Uint32
curr_data_size
=
m_sendBuffer
.
m_dataSize
<<
2
;
Uint32
new_curr_data_size
=
curr_data_size
+
lenBytes
;
if
((
new_curr_data_size
>=
send_buf_size
)
||
(
curr_data_size
>=
sci_buffer_remaining
))
{
/**
* The new message will not fit in the send buffer. We need to
* send the send buffer before filling it up with the new
* signal data. If current data size will spill over buffer edge
* we will also send to avoid writing larger than possible in
* buffer.
*/
if
(
!
doSend
())
{
/**
* We were not successfull sending, report 0 as meaning buffer full and
* upper levels handle retries and other recovery matters.
*/
return
0
;
}
}
/**
* New signal fits, simply fill it up with more data.
*/
Uint32
sz
=
m_sendBuffer
.
m_dataSize
;
return
&
m_sendBuffer
.
m_buffer
[
sz
];
}
...
...
@@ -918,10 +946,11 @@ void
SCI_Transporter
::
updateWritePtr
(
Uint32
lenBytes
,
Uint32
prio
){
Uint32
sz
=
m_sendBuffer
.
m_dataSize
;
sz
+=
(
lenBytes
/
4
);
Uint32
packet_size
=
m_PacketSize
;
sz
+=
((
lenBytes
+
3
)
>>
2
);
m_sendBuffer
.
m_dataSize
=
sz
;
if
(
sz
>
m_PacketS
ize
)
{
if
(
sz
>
packet_s
ize
)
{
/**-------------------------------------------------
* Buffer is full and we are ready to send. We will
* not wait since the signal is already in the buffer.
...
...
@@ -944,7 +973,8 @@ bool
SCI_Transporter
::
getConnectionStatus
()
{
if
(
*
m_localStatusFlag
==
SCICONNECTED
&&
(
*
m_remoteStatusFlag
==
SCICONNECTED
||
*
m_remoteStatusFlag2
==
SCICONNECTED
))
((
m_adapters
>
1
)
&&
*
m_remoteStatusFlag2
==
SCICONNECTED
)))
return
true
;
else
return
false
;
...
...
@@ -954,7 +984,9 @@ SCI_Transporter::getConnectionStatus() {
void
SCI_Transporter
::
setConnected
()
{
*
m_remoteStatusFlag
=
SCICONNECTED
;
*
m_remoteStatusFlag2
=
SCICONNECTED
;
if
(
m_adapters
>
1
)
{
*
m_remoteStatusFlag2
=
SCICONNECTED
;
}
*
m_localStatusFlag
=
SCICONNECTED
;
}
...
...
@@ -963,8 +995,10 @@ void
SCI_Transporter
::
setDisconnect
()
{
if
(
getLinkStatus
(
m_ActiveAdapterId
))
*
m_remoteStatusFlag
=
SCIDISCONNECT
;
if
(
getLinkStatus
(
m_StandbyAdapterId
))
*
m_remoteStatusFlag2
=
SCIDISCONNECT
;
if
(
m_adapters
>
1
)
{
if
(
getLinkStatus
(
m_StandbyAdapterId
))
*
m_remoteStatusFlag2
=
SCIDISCONNECT
;
}
}
...
...
@@ -981,20 +1015,20 @@ static bool init = false;
bool
SCI_Transporter
::
initSCI
()
{
DBUG_ENTER
(
"SCI_Transporter::initSCI"
);
if
(
!
init
){
sci_error_t
error
;
// Initialize SISCI library
SCIInitialize
(
0
,
&
error
);
if
(
error
!=
SCI_ERR_OK
)
{
#ifdef DEBUG_TRANSPORTER
ndbout_c
(
"
\n
Cannot initialize SISCI library."
);
ndbout_c
(
"
\n
Inconsistency between SISCI library and SISCI driver.Error code 0x%x"
,
error
);
#endif
return
false
;
DBUG_PRINT
(
"error"
,
(
"Cannot initialize SISCI library."
));
DBUG_PRINT
(
"error"
,
(
"Inconsistency between SISCI library and SISCI driver. Error code 0x%x"
,
error
));
DBUG_RETURN
(
false
);
}
init
=
true
;
}
return
true
;
DBUG_RETURN
(
true
);
}
...
...
ndb/src/common/transporter/SCI_Transporter.hpp
View file @
d9051882
...
...
@@ -26,7 +26,7 @@
#include <ndb_types.h>
/**
/**
* The SCI Transporter
*
* The design goal of the SCI transporter is to deliver high performance
...
...
@@ -135,15 +135,17 @@ public:
bool
getConnectionStatus
();
private:
SCI_Transporter
(
Uint32
packetSize
,
SCI_Transporter
(
TransporterRegistry
&
t_reg
,
const
char
*
local_host
,
const
char
*
remote_host
,
int
port
,
Uint32
packetSize
,
Uint32
bufferSize
,
Uint32
nAdapters
,
Uint16
remoteSciNodeId0
,
Uint16
remoteSciNodeId1
,
NodeId
localNodeID
,
NodeId
remoteNodeID
,
int
byteorder
,
bool
compression
,
bool
checksum
,
bool
signalId
,
Uint32
reportFreq
=
4096
);
...
...
@@ -160,7 +162,8 @@ private:
/**
* For statistics on transfered packets
*/
#ifdef DEBUG_TRANSPORTER
//#ifdef DEBUG_TRANSPORTER
#if 1
Uint32
i1024
;
Uint32
i2048
;
Uint32
i2049
;
...
...
@@ -177,10 +180,8 @@ private:
struct
{
Uint32
*
m_buffer
;
// The buffer
Uint32
m_dataSize
;
// No of words in buffer
Uint32
m_
bufferSize
;
// Buffer size
Uint32
m_
sendBufferSize
;
// Buffer size
Uint32
m_forceSendLimit
;
// Send when buffer is this full
bool
full
()
const
{
return
(
m_dataSize
*
4
)
>
m_forceSendLimit
;}
}
m_sendBuffer
;
SHM_Reader
*
reader
;
...
...
@@ -196,7 +197,7 @@ private:
Uint32
m_adapters
;
Uint32
m_numberOfRemoteNodes
;
Uint16
*
m_remoteNodes
;
Uint16
m_remoteNodes
[
2
]
;
typedef
struct
SciAdapter
{
sci_desc_t
scidesc
;
...
...
@@ -297,12 +298,12 @@ private:
bool
sendIsPossible
(
struct
timeval
*
timeout
);
void
getReceivePtr
(
Uint32
**
ptr
,
Uint32
**
eod
){
reader
->
getReadPtr
(
*
ptr
,
*
eod
);
void
getReceivePtr
(
Uint32
**
ptr
,
Uint32
&
size
){
size
=
reader
->
getReadPtr
(
*
ptr
);
}
void
updateReceivePtr
(
Uint32
*
ptr
){
reader
->
updateReadPtr
(
ptr
);
void
updateReceivePtr
(
Uint32
size
){
reader
->
updateReadPtr
(
size
);
}
/**
...
...
@@ -341,7 +342,9 @@ private:
*/
void
failoverShmWriter
();
bool
init_local
();
bool
init_remote
();
protected
:
/** Perform a connection between segment
...
...
@@ -350,7 +353,8 @@ protected:
* retrying.
* @return Returns true on success, otherwize falser
*/
bool
connectImpl
(
Uint32
timeOutMillis
);
bool
connect_server_impl
(
NDB_SOCKET_TYPE
sockfd
);
bool
connect_client_impl
(
NDB_SOCKET_TYPE
sockfd
);
/**
* We will disconnect if:
...
...
ndb/src/common/transporter/SHM_Buffer.hpp
View file @
d9051882
...
...
@@ -42,17 +42,19 @@ public:
Uint32
_sizeOfBuffer
,
Uint32
_slack
,
Uint32
*
_readIndex
,
Uint32
*
_endWriteIndex
,
Uint32
*
_writeIndex
)
:
m_startOfBuffer
(
_startOfBuffer
),
m_totalBufferSize
(
_sizeOfBuffer
),
m_bufferSize
(
_sizeOfBuffer
-
_slack
),
m_sharedReadIndex
(
_readIndex
),
m_sharedEndWriteIndex
(
_endWriteIndex
),
m_sharedWriteIndex
(
_writeIndex
)
{
}
void
clear
()
{
m_readIndex
=
*
m_sharedReadIndex
;
m_readIndex
=
0
;
}
/**
...
...
@@ -66,12 +68,12 @@ public:
* returns ptr - where to start reading
* sz - how much can I read
*/
inline
void
getReadPtr
(
Uint32
*
&
ptr
,
Uint32
*
&
eod
);
inline
Uint32
getReadPtr
(
Uint32
*
&
ptr
);
/**
* Update read ptr
*/
inline
void
updateReadPtr
(
Uint32
*
readPtr
);
inline
void
updateReadPtr
(
Uint32
size
);
private:
char
*
const
m_startOfBuffer
;
...
...
@@ -80,6 +82,7 @@ private:
Uint32
m_readIndex
;
Uint32
*
m_sharedReadIndex
;
Uint32
*
m_sharedEndWriteIndex
;
Uint32
*
m_sharedWriteIndex
;
};
...
...
@@ -97,19 +100,22 @@ SHM_Reader::empty() const{
* sz - how much can I read
*/
inline
void
SHM_Reader
::
getReadPtr
(
Uint32
*
&
ptr
,
Uint32
*
&
eod
){
Uint32
SHM_Reader
::
getReadPtr
(
Uint32
*
&
ptr
)
{
Uint32
*
eod
;
Uint32
tReadIndex
=
m_readIndex
;
Uint32
tWriteIndex
=
*
m_sharedWriteIndex
;
Uint32
tEndWriteIndex
=
*
m_sharedEndWriteIndex
;
ptr
=
(
Uint32
*
)
&
m_startOfBuffer
[
tReadIndex
];
if
(
tReadIndex
<=
tWriteIndex
){
eod
=
(
Uint32
*
)
&
m_startOfBuffer
[
tWriteIndex
];
}
else
{
eod
=
(
Uint32
*
)
&
m_startOfBuffer
[
m_bufferSize
];
eod
=
(
Uint32
*
)
&
m_startOfBuffer
[
tEndWriteIndex
];
}
return
(
Uint32
)((
char
*
)
eod
-
(
char
*
)
ptr
);
}
/**
...
...
@@ -117,14 +123,14 @@ SHM_Reader::getReadPtr(Uint32 * & ptr, Uint32 * & eod){
*/
inline
void
SHM_Reader
::
updateReadPtr
(
Uint32
*
ptr
){
Uint32
tReadIndex
=
((
char
*
)
ptr
)
-
m_startOfBuffer
;
SHM_Reader
::
updateReadPtr
(
Uint32
size
)
{
Uint32
tReadIndex
=
m_readIndex
;
tReadIndex
+=
size
;
assert
(
tReadIndex
<
m_totalBufferSize
);
if
(
tReadIndex
>=
m_bufferSize
){
tReadIndex
=
0
;
//-= m_bufferSize;
tReadIndex
=
0
;
}
m_readIndex
=
tReadIndex
;
...
...
@@ -139,17 +145,19 @@ public:
Uint32
_sizeOfBuffer
,
Uint32
_slack
,
Uint32
*
_readIndex
,
Uint32
*
_endWriteIndex
,
Uint32
*
_writeIndex
)
:
m_startOfBuffer
(
_startOfBuffer
),
m_totalBufferSize
(
_sizeOfBuffer
),
m_bufferSize
(
_sizeOfBuffer
-
_slack
),
m_sharedReadIndex
(
_readIndex
),
m_sharedEndWriteIndex
(
_endWriteIndex
),
m_sharedWriteIndex
(
_writeIndex
)
{
}
void
clear
()
{
m_writeIndex
=
*
m_sharedWriteIndex
;
m_writeIndex
=
0
;
}
inline
char
*
getWritePtr
(
Uint32
sz
);
...
...
@@ -168,6 +176,7 @@ private:
Uint32
m_writeIndex
;
Uint32
*
m_sharedReadIndex
;
Uint32
*
m_sharedEndWriteIndex
;
Uint32
*
m_sharedWriteIndex
;
};
...
...
@@ -206,7 +215,8 @@ SHM_Writer::updateWritePtr(Uint32 sz){
assert
(
tWriteIndex
<
m_totalBufferSize
);
if
(
tWriteIndex
>=
m_bufferSize
){
tWriteIndex
=
0
;
//-= m_bufferSize;
*
m_sharedEndWriteIndex
=
tWriteIndex
;
tWriteIndex
=
0
;
}
m_writeIndex
=
tWriteIndex
;
...
...
ndb/src/common/transporter/SHM_Transporter.cpp
View file @
d9051882
...
...
@@ -32,13 +32,12 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
int
r_port
,
NodeId
lNodeId
,
NodeId
rNodeId
,
bool
compression
,
bool
checksum
,
bool
signalId
,
key_t
_shmKey
,
Uint32
_shmSize
)
:
Transporter
(
t_reg
,
lHostName
,
rHostName
,
r_port
,
lNodeId
,
rNodeId
,
0
,
compression
,
checksum
,
signalId
),
0
,
false
,
checksum
,
signalId
),
shmKey
(
_shmKey
),
shmSize
(
_shmSize
)
{
...
...
@@ -48,7 +47,7 @@ SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
shmBuf
=
0
;
reader
=
0
;
writer
=
0
;
setupBuffersDone
=
false
;
#ifdef DEBUG_TRANSPORTER
printf
(
"shm key (%d - %d) = %d
\n
"
,
lNodeId
,
rNodeId
,
shmKey
);
...
...
@@ -83,36 +82,40 @@ SHM_Transporter::setupBuffers(){
Uint32
*
sharedReadIndex1
=
base1
;
Uint32
*
sharedWriteIndex1
=
base1
+
1
;
Uint32
*
sharedEndWriteIndex1
=
base1
+
2
;
serverStatusFlag
=
base1
+
4
;
char
*
startOfBuf1
=
shmBuf
+
sharedSize
;
Uint32
*
base2
=
(
Uint32
*
)(
shmBuf
+
sizeOfBuffer
+
sharedSize
);
Uint32
*
sharedReadIndex2
=
base2
;
Uint32
*
sharedWriteIndex2
=
base2
+
1
;
Uint32
*
sharedEndWriteIndex2
=
base2
+
2
;
clientStatusFlag
=
base2
+
4
;
char
*
startOfBuf2
=
((
char
*
)
base2
)
+
sharedSize
;
*
sharedReadIndex2
=
*
sharedWriteIndex2
=
0
;
if
(
isServer
){
*
serverStatusFlag
=
0
;
reader
=
new
SHM_Reader
(
startOfBuf1
,
sizeOfBuffer
,
slack
,
sharedReadIndex1
,
sharedEndWriteIndex1
,
sharedWriteIndex1
);
writer
=
new
SHM_Writer
(
startOfBuf2
,
sizeOfBuffer
,
slack
,
sharedReadIndex2
,
sharedEndWriteIndex2
,
sharedWriteIndex2
);
*
sharedReadIndex1
=
0
;
*
sharedWriteIndex2
=
0
;
*
sharedWriteIndex1
=
0
;
*
sharedEndWriteIndex1
=
0
;
*
sharedReadIndex2
=
0
;
*
sharedWriteIndex1
=
0
;
*
sharedWriteIndex2
=
0
;
*
sharedEndWriteIndex2
=
0
;
reader
->
clear
();
writer
->
clear
();
...
...
@@ -145,16 +148,19 @@ SHM_Transporter::setupBuffers(){
sizeOfBuffer
,
slack
,
sharedReadIndex2
,
sharedEndWriteIndex2
,
sharedWriteIndex2
);
writer
=
new
SHM_Writer
(
startOfBuf1
,
sizeOfBuffer
,
slack
,
sharedReadIndex1
,
sharedEndWriteIndex1
,
sharedWriteIndex1
);
*
sharedReadIndex2
=
0
;
*
sharedWriteIndex1
=
0
;
*
sharedEndWriteIndex1
=
0
;
reader
->
clear
();
writer
->
clear
();
...
...
@@ -224,6 +230,7 @@ SHM_Transporter::prepareSend(const SignalHeader * const signalHeader,
bool
SHM_Transporter
::
connect_server_impl
(
NDB_SOCKET_TYPE
sockfd
)
{
DBUG_ENTER
(
"SHM_Transporter::connect_server_impl"
);
SocketOutputStream
s_output
(
sockfd
);
SocketInputStream
s_input
(
sockfd
);
char
buf
[
256
];
...
...
@@ -233,7 +240,7 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
if
(
!
ndb_shm_create
())
{
report_error
(
TE_SHM_UNABLE_TO_CREATE_SEGMENT
);
NDB_CLOSE_SOCKET
(
sockfd
);
return
false
;
DBUG_RETURN
(
false
)
;
}
_shmSegCreated
=
true
;
}
...
...
@@ -243,7 +250,7 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
if
(
!
ndb_shm_attach
())
{
report_error
(
TE_SHM_UNABLE_TO_ATTACH_SEGMENT
);
NDB_CLOSE_SOCKET
(
sockfd
);
return
false
;
DBUG_RETURN
(
false
)
;
}
_attached
=
true
;
}
...
...
@@ -254,7 +261,7 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
// Wait for ok from client
if
(
s_input
.
gets
(
buf
,
256
)
==
0
)
{
NDB_CLOSE_SOCKET
(
sockfd
);
return
false
;
DBUG_RETURN
(
false
)
;
}
int
r
=
connect_common
(
sockfd
);
...
...
@@ -265,17 +272,20 @@ SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
// Wait for ok from client
if
(
s_input
.
gets
(
buf
,
256
)
==
0
)
{
NDB_CLOSE_SOCKET
(
sockfd
);
return
false
;
DBUG_RETURN
(
false
)
;
}
DBUG_PRINT
(
"info"
,
(
"Successfully connected server to node %d"
,
remoteNodeId
));
}
NDB_CLOSE_SOCKET
(
sockfd
);
return
r
;
DBUG_RETURN
(
r
)
;
}
bool
SHM_Transporter
::
connect_client_impl
(
NDB_SOCKET_TYPE
sockfd
)
{
DBUG_ENTER
(
"SHM_Transporter::connect_client_impl"
);
SocketInputStream
s_input
(
sockfd
);
SocketOutputStream
s_output
(
sockfd
);
char
buf
[
256
];
...
...
@@ -283,14 +293,18 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
// Wait for server to create and attach
if
(
s_input
.
gets
(
buf
,
256
)
==
0
)
{
NDB_CLOSE_SOCKET
(
sockfd
);
return
false
;
DBUG_PRINT
(
"error"
,
(
"Server id %d did not attach"
,
remoteNodeId
));
DBUG_RETURN
(
false
);
}
// Create
if
(
!
_shmSegCreated
){
if
(
!
ndb_shm_get
())
{
NDB_CLOSE_SOCKET
(
sockfd
);
return
false
;
DBUG_PRINT
(
"error"
,
(
"Failed create of shm seg to node %d"
,
remoteNodeId
));
DBUG_RETURN
(
false
);
}
_shmSegCreated
=
true
;
}
...
...
@@ -300,7 +314,9 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
if
(
!
ndb_shm_attach
())
{
report_error
(
TE_SHM_UNABLE_TO_ATTACH_SEGMENT
);
NDB_CLOSE_SOCKET
(
sockfd
);
return
false
;
DBUG_PRINT
(
"error"
,
(
"Failed attach of shm seg to node %d"
,
remoteNodeId
));
DBUG_RETURN
(
false
);
}
_attached
=
true
;
}
...
...
@@ -314,21 +330,28 @@ SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
// Wait for ok from server
if
(
s_input
.
gets
(
buf
,
256
)
==
0
)
{
NDB_CLOSE_SOCKET
(
sockfd
);
return
false
;
DBUG_PRINT
(
"error"
,
(
"No ok from server node %d"
,
remoteNodeId
));
DBUG_RETURN
(
false
);
}
// Send ok to server
s_output
.
println
(
"shm client 2 ok"
);
DBUG_PRINT
(
"info"
,
(
"Successfully connected client to node %d"
,
remoteNodeId
));
}
NDB_CLOSE_SOCKET
(
sockfd
);
return
r
;
DBUG_RETURN
(
r
)
;
}
bool
SHM_Transporter
::
connect_common
(
NDB_SOCKET_TYPE
sockfd
)
{
if
(
!
checkConnected
())
if
(
!
checkConnected
())
{
DBUG_PRINT
(
"error"
,
(
"Already connected to node %d"
,
remoteNodeId
));
return
false
;
}
if
(
!
setupBuffersDone
)
{
setupBuffers
();
...
...
@@ -341,5 +364,7 @@ SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
return
true
;
}
DBUG_PRINT
(
"error"
,
(
"Failed to set up buffers to node %d"
,
remoteNodeId
));
return
false
;
}
ndb/src/common/transporter/SHM_Transporter.hpp
View file @
d9051882
...
...
@@ -38,7 +38,6 @@ public:
int
r_port
,
NodeId
lNodeId
,
NodeId
rNodeId
,
bool
compression
,
bool
checksum
,
bool
signalId
,
key_t
shmKey
,
...
...
@@ -62,12 +61,12 @@ public:
writer
->
updateWritePtr
(
lenBytes
);
}
void
getReceivePtr
(
Uint32
**
ptr
,
Uint32
**
eod
){
reader
->
getReadPtr
(
*
ptr
,
*
eod
);
void
getReceivePtr
(
Uint32
**
ptr
,
Uint32
sz
){
sz
=
reader
->
getReadPtr
(
*
ptr
);
}
void
updateReceivePtr
(
Uint32
*
ptr
){
reader
->
updateReadPtr
(
ptr
);
void
updateReceivePtr
(
Uint32
sz
){
reader
->
updateReadPtr
(
sz
);
}
protected:
...
...
@@ -127,6 +126,7 @@ protected:
private:
bool
_shmSegCreated
;
bool
_attached
;
bool
m_connected
;
key_t
shmKey
;
volatile
Uint32
*
serverStatusFlag
;
...
...
ndb/src/common/transporter/TCP_Transporter.cpp
View file @
d9051882
...
...
@@ -70,11 +70,10 @@ TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
int
r_port
,
NodeId
lNodeId
,
NodeId
rNodeId
,
int
byte_order
,
bool
compr
,
bool
chksm
,
bool
signalId
,
bool
chksm
,
bool
signalId
,
Uint32
_reportFreq
)
:
Transporter
(
t_reg
,
lHostName
,
rHostName
,
r_port
,
lNodeId
,
rNodeId
,
byte_order
,
compr
,
chksm
,
signalId
),
0
,
false
,
chksm
,
signalId
),
m_sendBuffer
(
sendBufSize
)
{
maxReceiveSize
=
maxRecvSize
;
...
...
@@ -106,12 +105,14 @@ TCP_Transporter::~TCP_Transporter() {
bool
TCP_Transporter
::
connect_server_impl
(
NDB_SOCKET_TYPE
sockfd
)
{
return
connect_common
(
sockfd
);
DBUG_ENTER
(
"TCP_Transpporter::connect_server_impl"
);
DBUG_RETURN
(
connect_common
(
sockfd
));
}
bool
TCP_Transporter
::
connect_client_impl
(
NDB_SOCKET_TYPE
sockfd
)
{
return
connect_common
(
sockfd
);
DBUG_ENTER
(
"TCP_Transpporter::connect_client_impl"
);
DBUG_RETURN
(
connect_common
(
sockfd
));
}
bool
TCP_Transporter
::
connect_common
(
NDB_SOCKET_TYPE
sockfd
)
...
...
@@ -119,6 +120,8 @@ bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
theSocket
=
sockfd
;
setSocketOptions
();
setSocketNonBlocking
(
theSocket
);
DBUG_PRINT
(
"info"
,
(
"Successfully set-up TCP transporter to node %d"
,
remoteNodeId
));
return
true
;
}
...
...
ndb/src/common/transporter/TCP_Transporter.hpp
View file @
d9051882
...
...
@@ -52,8 +52,7 @@ private:
int
r_port
,
NodeId
lHostId
,
NodeId
rHostId
,
int
byteorder
,
bool
compression
,
bool
checksum
,
bool
signalId
,
bool
checksum
,
bool
signalId
,
Uint32
reportFreq
=
4096
);
// Disconnect, delete send buffers and receive buffer
...
...
ndb/src/common/transporter/TransporterRegistry.cpp
View file @
d9051882
...
...
@@ -15,6 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <my_pthread.h>
#include <TransporterRegistry.hpp>
#include "TransporterInternalDefinitions.hpp"
...
...
@@ -48,9 +49,10 @@
SocketServer
::
Session
*
TransporterService
::
newSession
(
NDB_SOCKET_TYPE
sockfd
)
{
DBUG_ENTER
(
"SocketServer::Session * TransporterService::newSession"
);
if
(
m_auth
&&
!
m_auth
->
server_authenticate
(
sockfd
)){
NDB_CLOSE_SOCKET
(
sockfd
);
return
0
;
DBUG_RETURN
(
0
)
;
}
{
...
...
@@ -60,27 +62,32 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
char
buf
[
256
];
if
(
s_input
.
gets
(
buf
,
256
)
==
0
)
{
NDB_CLOSE_SOCKET
(
sockfd
);
return
0
;
DBUG_PRINT
(
"error"
,
(
"Could not get node id from client"
));
DBUG_RETURN
(
0
);
}
if
(
sscanf
(
buf
,
"%d"
,
&
nodeId
)
!=
1
)
{
NDB_CLOSE_SOCKET
(
sockfd
);
return
0
;
DBUG_PRINT
(
"error"
,
(
"Error in node id from client"
));
DBUG_RETURN
(
0
);
}
//check that nodeid is valid and that there is an allocated transporter
if
(
nodeId
<
0
||
nodeId
>=
m_transporter_registry
->
maxTransporters
)
{
NDB_CLOSE_SOCKET
(
sockfd
);
return
0
;
if
(
nodeId
<
0
||
nodeId
>=
(
int
)
m_transporter_registry
->
maxTransporters
)
{
NDB_CLOSE_SOCKET
(
sockfd
);
DBUG_PRINT
(
"error"
,
(
"Node id out of range from client"
));
DBUG_RETURN
(
0
);
}
if
(
m_transporter_registry
->
theTransporters
[
nodeId
]
==
0
)
{
NDB_CLOSE_SOCKET
(
sockfd
);
return
0
;
DBUG_PRINT
(
"error"
,
(
"No transporter for this node id from client"
));
DBUG_RETURN
(
0
);
}
//check that the transporter should be connected
if
(
m_transporter_registry
->
performStates
[
nodeId
]
!=
TransporterRegistry
::
CONNECTING
)
{
NDB_CLOSE_SOCKET
(
sockfd
);
return
0
;
DBUG_PRINT
(
"error"
,
(
"Transporter in wrong state for this node id from client"
));
DBUG_RETURN
(
0
);
}
Transporter
*
t
=
m_transporter_registry
->
theTransporters
[
nodeId
];
...
...
@@ -93,7 +100,7 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
t
->
connect_server
(
sockfd
);
}
return
0
;
DBUG_RETURN
(
0
)
;
}
TransporterRegistry
::
TransporterRegistry
(
void
*
callback
,
...
...
@@ -209,8 +216,6 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
config
->
port
,
localNodeId
,
config
->
remoteNodeId
,
config
->
byteOrder
,
config
->
compression
,
config
->
checksum
,
config
->
signalId
);
if
(
t
==
NULL
)
...
...
@@ -264,8 +269,6 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) {
conf
->
localHostName
,
conf
->
remoteNodeId
,
conf
->
remoteHostName
,
conf
->
byteOrder
,
conf
->
compression
,
conf
->
checksum
,
conf
->
signalId
);
if
(
t
==
NULL
)
...
...
@@ -306,15 +309,17 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) {
if
(
theTransporters
[
config
->
remoteNodeId
]
!=
NULL
)
return
false
;
SCI_Transporter
*
t
=
new
SCI_Transporter
(
config
->
sendLimit
,
SCI_Transporter
*
t
=
new
SCI_Transporter
(
*
this
,
config
->
localHostName
,
config
->
remoteHostName
,
config
->
port
,
config
->
sendLimit
,
config
->
bufferSize
,
config
->
nLocalAdapters
,
config
->
remoteSciNodeId0
,
config
->
remoteSciNodeId1
,
localNodeId
,
config
->
remoteNodeId
,
config
->
byteOrder
,
config
->
compression
,
config
->
checksum
,
config
->
signalId
);
...
...
@@ -357,7 +362,6 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
config
->
port
,
localNodeId
,
config
->
remoteNodeId
,
config
->
compression
,
config
->
checksum
,
config
->
signalId
,
config
->
shmKey
,
...
...
@@ -853,10 +857,11 @@ TransporterRegistry::performReceive(){
const
NodeId
nodeId
=
t
->
getRemoteNodeId
();
if
(
is_connected
(
nodeId
)){
if
(
t
->
isConnected
()
&&
t
->
checkConnected
()){
Uint32
*
readPtr
,
*
eodPtr
;
t
->
getReceivePtr
(
&
readPtr
,
&
eodPtr
);
readPtr
=
unpack
(
readPtr
,
eodPtr
,
nodeId
,
ioStates
[
nodeId
]);
t
->
updateReceivePtr
(
readPtr
);
Uint32
*
readPtr
;
Uint32
sz
=
0
;
t
->
getReceivePtr
(
&
readPtr
,
sz
);
Uint32
szUsed
=
unpack
(
readPtr
,
sz
,
nodeId
,
ioStates
[
nodeId
]);
t
->
updateReceivePtr
(
szUsed
);
}
}
}
...
...
@@ -868,10 +873,11 @@ TransporterRegistry::performReceive(){
const
NodeId
nodeId
=
t
->
getRemoteNodeId
();
if
(
is_connected
(
nodeId
)){
if
(
t
->
isConnected
()
&&
t
->
checkConnected
()){
Uint32
*
readPtr
,
*
eodPtr
;
t
->
getReceivePtr
(
&
readPtr
,
&
eodPtr
);
readPtr
=
unpack
(
readPtr
,
eodPtr
,
nodeId
,
ioStates
[
nodeId
]);
t
->
updateReceivePtr
(
readPtr
);
Uint32
*
readPtr
;
Uint32
sz
=
0
;
t
->
getReceivePtr
(
&
readPtr
,
sz
);
Uint32
szUsed
=
unpack
(
readPtr
,
sz
,
nodeId
,
ioStates
[
nodeId
]);
t
->
updateReceivePtr
(
szUsed
);
}
}
}
...
...
@@ -1023,7 +1029,9 @@ TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
static
void
*
run_start_clients_C
(
void
*
me
)
{
my_thread_init
();
((
TransporterRegistry
*
)
me
)
->
start_clients_thread
();
my_thread_end
();
NdbThread_Exit
(
0
);
return
me
;
}
...
...
@@ -1106,6 +1114,7 @@ TransporterRegistry::update_connections()
void
TransporterRegistry
::
start_clients_thread
()
{
DBUG_ENTER
(
"TransporterRegistry::start_clients_thread"
);
while
(
m_run_start_clients_thread
)
{
NdbSleep_MilliSleep
(
100
);
for
(
int
i
=
0
,
n
=
0
;
n
<
nTransporters
&&
m_run_start_clients_thread
;
i
++
){
...
...
@@ -1129,6 +1138,7 @@ TransporterRegistry::start_clients_thread()
}
}
}
DBUG_VOID_RETURN
;
}
bool
...
...
ndb/src/common/util/SocketServer.cpp
View file @
d9051882
...
...
@@ -16,6 +16,7 @@
#include <ndb_global.h>
#include <my_pthread.h>
#include <SocketServer.hpp>
...
...
@@ -176,9 +177,9 @@ extern "C"
void
*
socketServerThread_C
(
void
*
_ss
){
SocketServer
*
ss
=
(
SocketServer
*
)
_ss
;
my_thread_init
();
ss
->
doRun
();
my_thread_end
();
NdbThread_Exit
(
0
);
return
0
;
}
...
...
@@ -287,8 +288,10 @@ void*
sessionThread_C
(
void
*
_sc
){
SocketServer
::
Session
*
si
=
(
SocketServer
::
Session
*
)
_sc
;
my_thread_init
();
if
(
!
transfer
(
si
->
m_socket
)){
si
->
m_stopped
=
true
;
my_thread_end
();
NdbThread_Exit
(
0
);
return
0
;
}
...
...
@@ -301,6 +304,7 @@ sessionThread_C(void* _sc){
}
si
->
m_stopped
=
true
;
my_thread_end
();
NdbThread_Exit
(
0
);
return
0
;
}
...
...
ndb/src/cw/cpcd/Makefile.am
View file @
d9051882
...
...
@@ -7,7 +7,7 @@ LDADD_LOC = \
$(top_builddir)
/ndb/src/libndbclient.la
\
$(top_builddir)
/dbug/libdbug.a
\
$(top_builddir)
/mysys/libmysys.a
\
$(top_builddir)
/strings/libmystrings.a
$(top_builddir)
/strings/libmystrings.a
@NDB_SCI_LIBS@
include
$(top_srcdir)/ndb/config/common.mk.am
include
$(top_srcdir)/ndb/config/type_util.mk.am
...
...
ndb/src/kernel/Makefile.am
View file @
d9051882
...
...
@@ -55,7 +55,7 @@ LDADD += \
$(top_builddir)
/ndb/src/common/util/libgeneral.la
\
$(top_builddir)
/dbug/libdbug.a
\
$(top_builddir)
/mysys/libmysys.a
\
$(top_builddir)
/strings/libmystrings.a
$(top_builddir)
/strings/libmystrings.a
@NDB_SCI_LIBS@
# Don't update the files from bitkeeper
%
::
SCCS/s.%
ndb/src/kernel/blocks/backup/restore/Makefile.am
View file @
d9051882
...
...
@@ -7,7 +7,7 @@ LDADD_LOC = \
$(top_builddir)
/ndb/src/libndbclient.la
\
$(top_builddir)
/dbug/libdbug.a
\
$(top_builddir)
/mysys/libmysys.a
\
$(top_builddir)
/strings/libmystrings.a
$(top_builddir)
/strings/libmystrings.a
@NDB_SCI_LIBS@
include
$(top_srcdir)/ndb/config/common.mk.am
...
...
ndb/src/mgmsrv/ConfigInfo.cpp
View file @
d9051882
...
...
@@ -129,11 +129,14 @@ ConfigInfo::m_SectionRules[] = {
{
"TCP"
,
fixHostname
,
"HostName1"
},
{
"TCP"
,
fixHostname
,
"HostName2"
},
{
"SCI"
,
fixHostname
,
"HostName1"
},
{
"SCI"
,
fixHostname
,
"HostName2"
},
{
"OSE"
,
fixHostname
,
"HostName1"
},
{
"OSE"
,
fixHostname
,
"HostName2"
},
{
"TCP"
,
fixPortNumber
,
0
},
// has to come after fixHostName
{
"SHM"
,
fixPortNumber
,
0
},
// has to come after fixHostName
{
"SCI"
,
fixPortNumber
,
0
},
// has to come after fixHostName
//{ "SHM", fixShmKey, 0 },
/**
...
...
@@ -163,6 +166,8 @@ ConfigInfo::m_SectionRules[] = {
{
"TCP"
,
checkTCPConstraints
,
"HostName1"
},
{
"TCP"
,
checkTCPConstraints
,
"HostName2"
},
{
"SCI"
,
checkTCPConstraints
,
"HostName1"
},
{
"SCI"
,
checkTCPConstraints
,
"HostName2"
},
{
"*"
,
checkMandatory
,
0
},
...
...
@@ -1808,7 +1813,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"Id of node ("
DB_TOKEN_PRINT
", "
API_TOKEN_PRINT
" or "
MGM_TOKEN_PRINT
") on one side of the connection"
,
ConfigInfo
::
USED
,
false
,
ConfigInfo
::
INT
,
ConfigInfo
::
STRING
,
MANDATORY
,
"0"
,
STR_VALUE
(
MAX_INT_RNIL
)
},
...
...
@@ -1820,28 +1825,74 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"Id of node ("
DB_TOKEN_PRINT
", "
API_TOKEN_PRINT
" or "
MGM_TOKEN_PRINT
") on one side of the connection"
,
ConfigInfo
::
USED
,
false
,
ConfigInfo
::
INT
,
ConfigInfo
::
STRING
,
MANDATORY
,
"0"
,
STR_VALUE
(
MAX_INT_RNIL
)
},
{
CFG_SCI_ID_0
,
"SciId0"
,
CFG_SCI_HOSTNAME_1
,
"HostName1"
,
"SCI"
,
"Name/IP of computer on one side of the connection"
,
ConfigInfo
::
INTERNAL
,
false
,
ConfigInfo
::
STRING
,
UNDEFINED
,
0
,
0
},
{
CFG_SCI_HOSTNAME_2
,
"HostName2"
,
"SCI"
,
"Name/IP of computer on one side of the connection"
,
ConfigInfo
::
INTERNAL
,
false
,
ConfigInfo
::
STRING
,
UNDEFINED
,
0
,
0
},
{
CFG_CONNECTION_SERVER_PORT
,
"PortNumber"
,
"SCI"
,
"
Local SCI-node id for adapter 0 (a computer can have two adapters)
"
,
"
Port used for this transporter
"
,
ConfigInfo
::
USED
,
false
,
ConfigInfo
::
INT
,
MANDATORY
,
"0"
,
STR_VALUE
(
MAX_INT_RNIL
)
},
{
CFG_SCI_HOST1_ID_0
,
"Host1SciId0"
,
"SCI"
,
"SCI-node id for adapter 0 on Host1 (a computer can have two adapters)"
,
ConfigInfo
::
USED
,
false
,
ConfigInfo
::
INT
,
MANDATORY
,
"0"
,
STR_VALUE
(
MAX_INT_RNIL
)
},
{
CFG_SCI_HOST1_ID_1
,
"Host1SciId1"
,
"SCI"
,
"SCI-node id for adapter 1 on Host1 (a computer can have two adapters)"
,
ConfigInfo
::
USED
,
false
,
ConfigInfo
::
INT
,
"0"
,
"0"
,
STR_VALUE
(
MAX_INT_RNIL
)
},
{
CFG_SCI_
ID_1
,
"
SciId1
"
,
CFG_SCI_
HOST2_ID_0
,
"
Host2SciId0
"
,
"SCI"
,
"
Local SCI-node id for adapter 1
(a computer can have two adapters)"
,
"
SCI-node id for adapter 0 on Host2
(a computer can have two adapters)"
,
ConfigInfo
::
USED
,
false
,
ConfigInfo
::
INT
,
...
...
@@ -1849,6 +1900,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
"0"
,
STR_VALUE
(
MAX_INT_RNIL
)
},
{
CFG_SCI_HOST2_ID_1
,
"Host2SciId1"
,
"SCI"
,
"SCI-node id for adapter 1 on Host2 (a computer can have two adapters)"
,
ConfigInfo
::
USED
,
false
,
ConfigInfo
::
INT
,
"0"
,
"0"
,
STR_VALUE
(
MAX_INT_RNIL
)
},
{
CFG_CONNECTION_SEND_SIGNAL_ID
,
"SendSignalId"
,
...
...
@@ -1882,8 +1945,8 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
false
,
ConfigInfo
::
INT
,
"2K"
,
"
512
"
,
STR_VALUE
(
MAX_INT_RNIL
)
},
"
128
"
,
"32K"
},
{
CFG_SCI_BUFFER_MEM
,
...
...
@@ -1893,8 +1956,8 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo
::
USED
,
false
,
ConfigInfo
::
INT
,
"1
M
"
,
"
256
K"
,
"1
92K
"
,
"
64
K"
,
STR_VALUE
(
MAX_INT_RNIL
)
},
{
...
...
ndb/src/mgmsrv/Makefile.am
View file @
d9051882
...
...
@@ -29,7 +29,7 @@ LDADD_LOC = $(top_builddir)/ndb/src/libndbclient.la \
$(top_builddir)
/ndb/src/common/editline/libeditline.a
\
$(top_builddir)
/dbug/libdbug.a
\
$(top_builddir)
/mysys/libmysys.a
\
$(top_builddir)
/strings/libmystrings.a
$(top_builddir)
/strings/libmystrings.a
@NDB_SCI_LIBS@
@TERMCAP_LIB@
DEFS_LOC
=
-DDEFAULT_MYSQL_HOME
=
"
\"
$(MYSQLBASEdir)
\"
"
\
...
...
sql/Makefile.am
View file @
d9051882
...
...
@@ -37,7 +37,7 @@ LDADD = @isam_libs@ \
$(top_builddir)
/mysys/libmysys.a
\
$(top_builddir)
/dbug/libdbug.a
\
$(top_builddir)
/regex/libregex.a
\
$(top_builddir)
/strings/libmystrings.a @ZLIB_LIBS@
$(top_builddir)
/strings/libmystrings.a @ZLIB_LIBS@
@NDB_SCI_LIBS@
mysqld_LDADD
=
@MYSQLD_EXTRA_LDFLAGS@
\
@bdb_libs@ @innodb_libs@ @pstack_libs@
\
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment