Commit 1860613c authored by tomas@poseidon.ndb.mysql.com's avatar tomas@poseidon.ndb.mysql.com

Merge

parents 146d6887 11bd3eed
......@@ -7,69 +7,30 @@ Discless: CHOOSE_Discless
[COMPUTER]
Id: 1
ByteOrder: Little
HostName: CHOOSE_HOSTNAME_1
[COMPUTER]
Id: 2
ByteOrder: Little
HostName: CHOOSE_HOSTNAME_2
[COMPUTER]
Id: 3
ByteOrder: Little
HostName: CHOOSE_HOSTNAME_3
[COMPUTER]
Id: 4
ByteOrder: Little
HostName: CHOOSE_HOSTNAME_4
[COMPUTER]
Id: 5
ByteOrder: Little
HostName: CHOOSE_HOSTNAME_5
[COMPUTER]
Id: 6
ByteOrder: Little
HostName: CHOOSE_HOSTNAME_6
[COMPUTER]
Id: 7
ByteOrder: Little
HostName: CHOOSE_HOSTNAME_7
[MGM]
Id: 1
[DB]
ExecuteOnComputer: 1
PortNumber: CHOOSE_PORT_MGM
FileSystemPath: CHOOSE_FILESYSTEM_NODE_1
[DB]
Id: 2
ExecuteOnComputer: 2
FileSystemPath: CHOOSE_FILESYSTEM_NODE_2
[DB]
Id: 3
ExecuteOnComputer: 3
FileSystemPath: CHOOSE_FILESYSTEM_NODE_3
[MGM]
PortNumber: CHOOSE_PORT_MGM
[API]
Id: 11
ExecuteOnComputer: 4
[API]
Id: 12
ExecuteOnComputer: 5
[API]
Id: 13
ExecuteOnComputer: 6
[API]
Id: 14
ExecuteOnComputer: 7
[TCP DEFAULT]
PortNumber: CHOOSE_PORT_TRANSPORTER
......@@ -82,11 +82,8 @@ while test $# -gt 0; do
done
fs_ndb=$fsdir/ndbcluster
fs_mgm_1=$fs_ndb/1.ndb_mgm
fs_ndb_2=$fs_ndb/2.ndb_db
fs_ndb_3=$fs_ndb/3.ndb_db
fs_name_1=$fs_ndb/node-1-fs-$port_base
fs_name_2=$fs_ndb/node-2-fs-$port_base
fs_name_3=$fs_ndb/node-3-fs-$port_base
NDB_HOME=
export NDB_CONNECTSTRING
......@@ -111,13 +108,10 @@ NDB_CONNECTSTRING=
if [ $initial_ndb ] ; then
[ -d $fs_ndb ] || mkdir $fs_ndb
[ -d $fs_mgm_1 ] || mkdir $fs_mgm_1
[ -d $fs_ndb_2 ] || mkdir $fs_ndb_2
[ -d $fs_ndb_3 ] || mkdir $fs_ndb_3
[ -d $fs_name_1 ] || mkdir $fs_name_1
[ -d $fs_name_2 ] || mkdir $fs_name_2
[ -d $fs_name_3 ] || mkdir $fs_name_3
fi
if [ -d "$fs_ndb" -a -d "$fs_mgm_1" -a -d "$fs_ndb_2" -a -d "$fs_ndb_3" -a -d "$fs_name_2" -a -d "$fs_name_3" ]; then :; else
if [ -d "$fs_ndb" -a -d "$fs_name_1" -a -d "$fs_name_2" ]; then :; else
echo "$fs_ndb filesystem directory does not exist"
exit 1
fi
......@@ -127,14 +121,11 @@ fi
ndb_host="localhost"
ndb_mgmd_port=$port_base
port_transporter=`expr $ndb_mgmd_port + 2`
NDB_CONNECTSTRING_BASE="host=$ndb_host:$ndb_mgmd_port;nodeid="
export NDB_CONNECTSTRING="host=$ndb_host:$ndb_mgmd_port"
# Start management server as deamon
NDB_ID="1"
NDB_CONNECTSTRING=$NDB_CONNECTSTRING_BASE$NDB_ID
# Edit file system path and ports in config file
if [ $initial_ndb ] ; then
......@@ -144,60 +135,54 @@ sed \
-e s,"CHOOSE_IndexMemory",$ndb_imem,g \
-e s,"CHOOSE_Discless",$ndb_discless,g \
-e s,"CHOOSE_HOSTNAME_".*,"$ndb_host",g \
-e s,"CHOOSE_FILESYSTEM_NODE_1","$fs_name_1",g \
-e s,"CHOOSE_FILESYSTEM_NODE_2","$fs_name_2",g \
-e s,"CHOOSE_FILESYSTEM_NODE_3","$fs_name_3",g \
-e s,"CHOOSE_PORT_MGM",$ndb_mgmd_port,g \
-e s,"CHOOSE_PORT_TRANSPORTER",$port_transporter,g \
< ndb/ndb_config_2_node.ini \
> "$fs_mgm_1/config.ini"
> "$fs_ndb/config.ini"
fi
if ( cd $fs_mgm_1 ; echo $NDB_CONNECTSTRING > $cfgfile ; $exec_mgmtsrvr -d -c config.ini ) ; then :; else
rm -f Ndb.cfg
rm -f $fs_ndb/Ndb.cfg
if ( cd $fs_ndb ; $exec_mgmtsrvr -d -c config.ini ) ; then :; else
echo "Unable to start $exec_mgmtsrvr from `pwd`"
exit 1
fi
cat `find $fs_ndb -name 'node*.pid'` > $pidfile
cat `find $fs_ndb -name 'ndb_*.pid'` > $pidfile
# Start database node
NDB_ID="2"
NDB_CONNECTSTRING=$NDB_CONNECTSTRING_BASE$NDB_ID
echo "Starting ndbd connectstring=\""$NDB_CONNECTSTRING\"
( cd $fs_ndb_2 ; echo $NDB_CONNECTSTRING > $cfgfile ; $exec_ndb -d $flags_ndb & )
echo "Starting ndbd"
( cd $fs_ndb ; $exec_ndb -d $flags_ndb & )
cat `find $fs_ndb -name 'node*.pid'` > $pidfile
cat `find $fs_ndb -name 'ndb_*.pid'` > $pidfile
# Start database node
NDB_ID="3"
NDB_CONNECTSTRING=$NDB_CONNECTSTRING_BASE$NDB_ID
echo "Starting ndbd connectstring=\""$NDB_CONNECTSTRING\"
( cd $fs_ndb_3 ; echo $NDB_CONNECTSTRING > $cfgfile ; $exec_ndb -d $flags_ndb & )
echo "Starting ndbd"
( cd $fs_ndb ; $exec_ndb -d $flags_ndb & )
cat `find $fs_ndb -name 'node*.pid'` > $pidfile
cat `find $fs_ndb -name 'ndb_*.pid'` > $pidfile
# test if Ndb Cluster starts properly
echo "Waiting for started..."
NDB_ID="11"
NDB_CONNECTSTRING=$NDB_CONNECTSTRING_BASE$NDB_ID
if ( $exec_waiter ) | grep "NDBT_ProgramExit: 0 - OK"; then :; else
echo "Ndbcluster startup failed"
exit 1
fi
echo $NDB_CONNECTSTRING > $cfgfile
cat `find $fs_ndb -name 'node*.pid'` > $pidfile
cat `find $fs_ndb -name 'ndb_*.pid'` > $pidfile
status_ndbcluster
}
status_ndbcluster() {
# Start management client
echo "show" | $exec_mgmtclient $ndb_host $ndb_mgmd_port
# Start management client
echo "show" | $exec_mgmtclient $ndb_host $ndb_mgmd_port
}
stop_default_ndbcluster() {
......
......@@ -72,13 +72,6 @@ public:
*/
~EventLogger();
/**
* Open/create the eventlog, the default name is 'cluster.log'.
*
* @return true if successful.
*/
bool open();
/**
* Opens/creates the eventlog with the specified filename.
*
......
......@@ -18,6 +18,7 @@
#define NODE_INFO_HPP
#include <NdbOut.hpp>
#include <mgmapi_config_parameters.h>
class NodeInfo {
public:
......@@ -27,10 +28,10 @@ public:
* NodeType
*/
enum NodeType {
DB = 0, ///< Database node
API = 1, ///< NDB API node
MGM = 2, ///< Management node (incl. NDB API)
REP = 3, ///< Replication node (incl. NDB API)
DB = NODE_TYPE_DB, ///< Database node
API = NODE_TYPE_API, ///< NDB API node
MGM = NODE_TYPE_MGM, ///< Management node (incl. NDB API)
REP = NODE_TYPE_REP, ///< Replication node (incl. NDB API)
INVALID = 255 ///< Invalid type
};
NodeType getType() const;
......
......@@ -18,6 +18,7 @@
#define NODE_STATE_HPP
#include <NdbOut.hpp>
#include <NodeBitmask.hpp>
class NodeState {
public:
......@@ -99,7 +100,7 @@ public:
/**
* Length in 32-bit words
*/
static const Uint32 DataLength = 8;
static const Uint32 DataLength = 8 + NdbNodeBitmask::Size;
/**
* Constructor(s)
......@@ -146,6 +147,8 @@ public:
Uint32 singleUserMode;
Uint32 singleUserApi; //the single user node
BitmaskPOD<NdbNodeBitmask::Size> m_connected_nodes;
void setDynamicId(Uint32 dynamic);
void setNodeGroup(Uint32 group);
void setSingleUser(Uint32 s);
......@@ -182,6 +185,7 @@ NodeState::NodeState(){
dynamicId = 0xFFFFFFFF;
singleUserMode = 0;
singleUserApi = 0xFFFFFFFF;
m_connected_nodes.clear();
}
inline
......
......@@ -49,6 +49,8 @@
* @{
*/
#include "mgmapi_config_parameters.h"
#ifdef __cplusplus
extern "C" {
#endif
......@@ -81,10 +83,10 @@ extern "C" {
*/
enum ndb_mgm_node_type {
NDB_MGM_NODE_TYPE_UNKNOWN = -1, /*/< Node type not known*/
NDB_MGM_NODE_TYPE_API = 0, /*/< An application node (API)*/
NDB_MGM_NODE_TYPE_NDB = 1, /*/< A database node (DB)*/
NDB_MGM_NODE_TYPE_MGM = 2, /*/< A management server node (MGM)*/
NDB_MGM_NODE_TYPE_REP = 3, ///< A replication node
NDB_MGM_NODE_TYPE_API = NODE_TYPE_API, /*/< An application node (API)*/
NDB_MGM_NODE_TYPE_NDB = NODE_TYPE_DB, /*/< A database node (DB)*/
NDB_MGM_NODE_TYPE_MGM = NODE_TYPE_MGM, /*/< A management server node (MGM)*/
NDB_MGM_NODE_TYPE_REP = NODE_TYPE_REP, ///< A replication node
NDB_MGM_NODE_TYPE_MIN = 0, /*/< Min valid value*/
NDB_MGM_NODE_TYPE_MAX = 3 /*/< Max valid value*/
......@@ -666,6 +668,11 @@ extern "C" {
*/
struct ndb_mgm_configuration * ndb_mgm_get_configuration(NdbMgmHandle handle,
unsigned version);
int ndb_mgm_alloc_nodeid(NdbMgmHandle handle,
unsigned version,
unsigned *pnodeid,
int nodetype);
/**
* Config iterator
*/
......
......@@ -6,6 +6,7 @@
#define CFG_SYS_PRIMARY_MGM_NODE 1
#define CFG_SYS_CONFIG_GENERATION 2
#define CFG_SYS_REPLICATION_ROLE 7
#define CFG_SYS_PORT_BASE 8
#define CFG_NODE_ID 3
#define CFG_NODE_BYTE_ORDER 4
......
......@@ -77,7 +77,7 @@ public:
* Get config using socket
*/
struct ndb_mgm_configuration * getConfig(const char * mgmhost, short port,
int versionId);
int versionId, int nodetype);
/**
* Get config from file
*/
......@@ -98,7 +98,9 @@ private:
char * m_connectString;
char * m_defaultConnectString;
NdbMgmHandle m_handle;
/**
* Verify config
*/
......
......@@ -21,11 +21,14 @@
extern "C" {
#endif
const char* NdbConfig_HomePath(char* buf, int buflen);
const char* NdbConfig_NdbCfgName(char* buf, int buflen, int with_ndb_home);
const char* NdbConfig_ErrorFileName(char* buf, int buflen);
const char* NdbConfig_ClusterLogFileName(char* buf, int buflen);
char* NdbConfig_NdbCfgName(int with_ndb_home);
char* NdbConfig_ErrorFileName(int node_id);
char* NdbConfig_ClusterLogFileName(int node_id);
char* NdbConfig_SignalLogFileName(int node_id);
char* NdbConfig_TraceFileName(int node_id, int file_no);
char* NdbConfig_NextTraceFileName(int node_id);
char* NdbConfig_PidFileName(int node_id);
char* NdbConfig_StdoutFileName(int node_id);
#ifdef __cplusplus
}
......
......@@ -4,6 +4,8 @@
#include <my_global.h>
#define NDB_BASE_PORT 2200
#if defined(_WIN32) || defined(_WIN64) || defined(__WIN32__) || defined(WIN32)
#define NDB_WIN32
#else
......
......@@ -64,7 +64,7 @@ typedef int socklen_t;
#define NDB_NONBLOCK O_NONBLOCK
#define NDB_SOCKET_TYPE int
#define NDB_INVALID_SOCKET -1
#define NDB_CLOSE_SOCKET(x) close(x)
#define NDB_CLOSE_SOCKET(x) ::close(x)
#define InetErrno errno
......
......@@ -29,20 +29,10 @@
#define TransporterRegistry_H
#include "TransporterDefinitions.hpp"
#include <SocketServer.hpp>
#include <NdbTCP.h>
// A transporter is always in a PerformState.
// PerformIO is used initially and as long as any of the events
// PerformConnect, ...
enum PerformState {
PerformNothing = 4, // Does nothing
PerformIO = 0, // Is connected
PerformConnect = 1, // Is trying to connect
PerformDisconnect = 2, // Trying to disconnect
RemoveTransporter = 3 // Will be removed
};
// A transporter is always in an IOState.
// NoHalt is used initially and as long as it is no restrictions on
// sending or receiving.
......@@ -60,18 +50,45 @@ enum TransporterType {
tt_OSE_TRANSPORTER = 4
};
static const char *performStateString[] =
{ "is connected",
"is trying to connect",
"does nothing",
"is trying to disconnect" };
class Transporter;
class TCP_Transporter;
class SCI_Transporter;
class SHM_Transporter;
class OSE_Transporter;
class TransporterRegistry;
class SocketAuthenticator;
class TransporterService : public SocketServer::Service {
SocketAuthenticator * m_auth;
TransporterRegistry * m_transporter_registry;
public:
TransporterService(SocketAuthenticator *auth= 0)
{
m_auth= auth;
m_transporter_registry= 0;
}
void setTransporterRegistry(TransporterRegistry *t)
{
m_transporter_registry= t;
}
SocketServer::Session * newSession(NDB_SOCKET_TYPE socket);
};
/**
* @class TransporterRegistry
* @brief ...
*/
class TransporterRegistry {
friend class OSE_Receiver;
friend class Transporter;
friend class TransporterService;
public:
/**
* Constructor
......@@ -98,6 +115,12 @@ public:
*/
~TransporterRegistry();
bool start_service(SocketServer& server);
bool start_clients();
bool stop_clients();
void start_clients_thread();
void update_connections();
/**
* Start/Stop receiving
*/
......@@ -110,16 +133,26 @@ public:
void startSending();
void stopSending();
// A transporter is always in a PerformState.
// PerformIO is used initially and as long as any of the events
// PerformConnect, ...
enum PerformState {
CONNECTED = 0,
CONNECTING = 1,
DISCONNECTED = 2,
DISCONNECTING = 3
};
const char *getPerformStateString(NodeId nodeId) const
{ return performStateString[(unsigned)performStates[nodeId]]; };
/**
* Get and set methods for PerformState
*/
PerformState performState(NodeId nodeId);
void setPerformState(NodeId nodeId, PerformState state);
/**
* Set perform state for all transporters
*/
void setPerformState(PerformState state);
void do_connect(NodeId node_id);
void do_disconnect(NodeId node_id);
bool is_connected(NodeId node_id) { return performStates[node_id] == CONNECTED; };
void report_connect(NodeId node_id);
void report_disconnect(NodeId node_id, int errnum);
/**
* Get and set methods for IOState
......@@ -174,8 +207,6 @@ public:
void performReceive();
void performSend();
void checkConnections();
/**
* Force sending if more than or equal to sendLimit
* number have asked for send. Returns 0 if not sending
......@@ -187,11 +218,18 @@ public:
void printState();
#endif
unsigned short m_service_port;
protected:
private:
void * callbackObj;
TransporterService *m_transporter_service;
char *m_interface_name;
struct NdbThread *m_start_clients_thread;
bool m_run_start_clients_thread;
int sendCounter;
NodeId localNodeId;
bool nodeIdSpecified;
......@@ -202,11 +240,6 @@ private:
int nSHMTransporters;
int nOSETransporters;
int m_ccCount;
int m_ccIndex;
int m_ccStep;
int m_nTransportersPerformConnect;
bool m_ccReady;
/**
* Arrays holding all transporters in the order they are created
*/
......
......@@ -326,7 +326,7 @@ BitmaskImpl::getText(unsigned size, const Uint32 data[], char* buf)
* XXX replace size by length in bits
*/
template <unsigned size>
class Bitmask {
struct BitmaskPOD {
public:
/**
* POD data representation
......@@ -334,7 +334,7 @@ public:
struct Data {
Uint32 data[size];
#if 0
Data & operator=(const Bitmask<size> & src) {
Data & operator=(const BitmaskPOD<size> & src) {
src.copyto(size, data);
return *this;
}
......@@ -348,19 +348,17 @@ public:
STATIC_CONST( NotFound = BitmaskImpl::NotFound );
STATIC_CONST( TextLength = size * 8 );
Bitmask() { clear();}
/**
* assign - Set all bits in <em>dst</em> to corresponding in <em>src/<em>
*/
void assign(const typename Bitmask<size>::Data & src);
void assign(const typename BitmaskPOD<size>::Data & src);
/**
* assign - Set all bits in <em>dst</em> to corresponding in <em>src/<em>
*/
static void assign(Uint32 dst[], const Uint32 src[]);
static void assign(Uint32 dst[], const Bitmask<size> & src);
void assign(const Bitmask<size> & src);
static void assign(Uint32 dst[], const BitmaskPOD<size> & src);
void assign(const BitmaskPOD<size> & src);
/**
* copy this to <em>dst</em>
......@@ -432,43 +430,43 @@ public:
* equal - Bitwise equal.
*/
static bool equal(const Uint32 data[], const Uint32 data2[]);
bool equal(const Bitmask<size>& mask2) const;
bool equal(const BitmaskPOD<size>& mask2) const;
/**
* bitOR - Bitwise (x | y) into first operand.
*/
static void bitOR(Uint32 data[], const Uint32 data2[]);
Bitmask<size>& bitOR(const Bitmask<size>& mask2);
BitmaskPOD<size>& bitOR(const BitmaskPOD<size>& mask2);
/**
* bitAND - Bitwise (x & y) into first operand.
*/
static void bitAND(Uint32 data[], const Uint32 data2[]);
Bitmask<size>& bitAND(const Bitmask<size>& mask2);
BitmaskPOD<size>& bitAND(const BitmaskPOD<size>& mask2);
/**
* bitANDC - Bitwise (x & ~y) into first operand.
*/
static void bitANDC(Uint32 data[], const Uint32 data2[]);
Bitmask<size>& bitANDC(const Bitmask<size>& mask2);
BitmaskPOD<size>& bitANDC(const BitmaskPOD<size>& mask2);
/**
* bitXOR - Bitwise (x ^ y) into first operand.
*/
static void bitXOR(Uint32 data[], const Uint32 data2[]);
Bitmask<size>& bitXOR(const Bitmask<size>& mask2);
BitmaskPOD<size>& bitXOR(const BitmaskPOD<size>& mask2);
/**
* contains - Check if all bits set in data2 (that) are also set in data (this)
*/
static bool contains(Uint32 data[], const Uint32 data2[]);
bool contains(Bitmask<size> that);
bool contains(BitmaskPOD<size> that);
/**
* overlaps - Check if any bit set in this Bitmask (data) is also set in that (data2)
* overlaps - Check if any bit set in this BitmaskPOD (data) is also set in that (data2)
*/
static bool overlaps(Uint32 data[], const Uint32 data2[]);
bool overlaps(Bitmask<size> that);
bool overlaps(BitmaskPOD<size> that);
/**
* getText - Return as hex-digits (only for debug routines).
......@@ -479,196 +477,196 @@ public:
template <unsigned size>
inline void
Bitmask<size>::assign(Uint32 dst[], const Uint32 src[])
BitmaskPOD<size>::assign(Uint32 dst[], const Uint32 src[])
{
BitmaskImpl::assign(size, dst, src);
}
template <unsigned size>
inline void
Bitmask<size>::assign(Uint32 dst[], const Bitmask<size> & src)
BitmaskPOD<size>::assign(Uint32 dst[], const BitmaskPOD<size> & src)
{
BitmaskImpl::assign(size, dst, src.rep.data);
}
template <unsigned size>
inline void
Bitmask<size>::assign(const typename Bitmask<size>::Data & src)
BitmaskPOD<size>::assign(const typename BitmaskPOD<size>::Data & src)
{
assign(rep.data, src.data);
}
template <unsigned size>
inline void
Bitmask<size>::assign(const Bitmask<size> & src)
BitmaskPOD<size>::assign(const BitmaskPOD<size> & src)
{
assign(rep.data, src.rep.data);
}
template <unsigned size>
inline void
Bitmask<size>::copyto(unsigned sz, Uint32 dst[]) const
BitmaskPOD<size>::copyto(unsigned sz, Uint32 dst[]) const
{
BitmaskImpl::assign(sz, dst, rep.data);
}
template <unsigned size>
inline void
Bitmask<size>::assign(unsigned sz, const Uint32 src[])
BitmaskPOD<size>::assign(unsigned sz, const Uint32 src[])
{
BitmaskImpl::assign(sz, rep.data, src);
}
template <unsigned size>
inline bool
Bitmask<size>::get(const Uint32 data[], unsigned n)
BitmaskPOD<size>::get(const Uint32 data[], unsigned n)
{
return BitmaskImpl::get(size, data, n);
}
template <unsigned size>
inline bool
Bitmask<size>::get(unsigned n) const
BitmaskPOD<size>::get(unsigned n) const
{
return get(rep.data, n);
}
template <unsigned size>
inline void
Bitmask<size>::set(Uint32 data[], unsigned n, bool value)
BitmaskPOD<size>::set(Uint32 data[], unsigned n, bool value)
{
BitmaskImpl::set(size, data, n, value);
}
template <unsigned size>
inline void
Bitmask<size>::set(unsigned n, bool value)
BitmaskPOD<size>::set(unsigned n, bool value)
{
set(rep.data, n, value);
}
template <unsigned size>
inline void
Bitmask<size>::set(Uint32 data[], unsigned n)
BitmaskPOD<size>::set(Uint32 data[], unsigned n)
{
BitmaskImpl::set(size, data, n);
}
template <unsigned size>
inline void
Bitmask<size>::set(unsigned n)
BitmaskPOD<size>::set(unsigned n)
{
set(rep.data, n);
}
template <unsigned size>
inline void
Bitmask<size>::set(Uint32 data[])
BitmaskPOD<size>::set(Uint32 data[])
{
BitmaskImpl::set(size, data);
}
template <unsigned size>
inline void
Bitmask<size>::set()
BitmaskPOD<size>::set()
{
set(rep.data);
}
template <unsigned size>
inline void
Bitmask<size>::clear(Uint32 data[], unsigned n)
BitmaskPOD<size>::clear(Uint32 data[], unsigned n)
{
BitmaskImpl::clear(size, data, n);
}
template <unsigned size>
inline void
Bitmask<size>::clear(unsigned n)
BitmaskPOD<size>::clear(unsigned n)
{
clear(rep.data, n);
}
template <unsigned size>
inline void
Bitmask<size>::clear(Uint32 data[])
BitmaskPOD<size>::clear(Uint32 data[])
{
BitmaskImpl::clear(size, data);
}
template <unsigned size>
inline void
Bitmask<size>::clear()
BitmaskPOD<size>::clear()
{
clear(rep.data);
}
template <unsigned size>
inline bool
Bitmask<size>::isclear(const Uint32 data[])
BitmaskPOD<size>::isclear(const Uint32 data[])
{
return BitmaskImpl::isclear(size, data);
}
template <unsigned size>
inline bool
Bitmask<size>::isclear() const
BitmaskPOD<size>::isclear() const
{
return isclear(rep.data);
}
template <unsigned size>
unsigned
Bitmask<size>::count(const Uint32 data[])
BitmaskPOD<size>::count(const Uint32 data[])
{
return BitmaskImpl::count(size, data);
}
template <unsigned size>
inline unsigned
Bitmask<size>::count() const
BitmaskPOD<size>::count() const
{
return count(rep.data);
}
template <unsigned size>
unsigned
Bitmask<size>::find(const Uint32 data[], unsigned n)
BitmaskPOD<size>::find(const Uint32 data[], unsigned n)
{
return BitmaskImpl::find(size, data, n);
}
template <unsigned size>
inline unsigned
Bitmask<size>::find(unsigned n) const
BitmaskPOD<size>::find(unsigned n) const
{
return find(rep.data, n);
}
template <unsigned size>
inline bool
Bitmask<size>::equal(const Uint32 data[], const Uint32 data2[])
BitmaskPOD<size>::equal(const Uint32 data[], const Uint32 data2[])
{
return BitmaskImpl::equal(size, data, data2);
}
template <unsigned size>
inline bool
Bitmask<size>::equal(const Bitmask<size>& mask2) const
BitmaskPOD<size>::equal(const BitmaskPOD<size>& mask2) const
{
return equal(rep.data, mask2.rep.data);
}
template <unsigned size>
inline void
Bitmask<size>::bitOR(Uint32 data[], const Uint32 data2[])
BitmaskPOD<size>::bitOR(Uint32 data[], const Uint32 data2[])
{
BitmaskImpl::bitOR(size,data, data2);
}
template <unsigned size>
inline Bitmask<size>&
Bitmask<size>::bitOR(const Bitmask<size>& mask2)
inline BitmaskPOD<size>&
BitmaskPOD<size>::bitOR(const BitmaskPOD<size>& mask2)
{
bitOR(rep.data, mask2.rep.data);
return *this;
......@@ -676,14 +674,14 @@ Bitmask<size>::bitOR(const Bitmask<size>& mask2)
template <unsigned size>
inline void
Bitmask<size>::bitAND(Uint32 data[], const Uint32 data2[])
BitmaskPOD<size>::bitAND(Uint32 data[], const Uint32 data2[])
{
BitmaskImpl::bitAND(size,data, data2);
}
template <unsigned size>
inline Bitmask<size>&
Bitmask<size>::bitAND(const Bitmask<size>& mask2)
inline BitmaskPOD<size>&
BitmaskPOD<size>::bitAND(const BitmaskPOD<size>& mask2)
{
bitAND(rep.data, mask2.rep.data);
return *this;
......@@ -691,14 +689,14 @@ Bitmask<size>::bitAND(const Bitmask<size>& mask2)
template <unsigned size>
inline void
Bitmask<size>::bitANDC(Uint32 data[], const Uint32 data2[])
BitmaskPOD<size>::bitANDC(Uint32 data[], const Uint32 data2[])
{
BitmaskImpl::bitANDC(size,data, data2);
}
template <unsigned size>
inline Bitmask<size>&
Bitmask<size>::bitANDC(const Bitmask<size>& mask2)
inline BitmaskPOD<size>&
BitmaskPOD<size>::bitANDC(const BitmaskPOD<size>& mask2)
{
bitANDC(rep.data, mask2.rep.data);
return *this;
......@@ -706,14 +704,14 @@ Bitmask<size>::bitANDC(const Bitmask<size>& mask2)
template <unsigned size>
inline void
Bitmask<size>::bitXOR(Uint32 data[], const Uint32 data2[])
BitmaskPOD<size>::bitXOR(Uint32 data[], const Uint32 data2[])
{
BitmaskImpl::bitXOR(size,data, data2);
}
template <unsigned size>
inline Bitmask<size>&
Bitmask<size>::bitXOR(const Bitmask<size>& mask2)
inline BitmaskPOD<size>&
BitmaskPOD<size>::bitXOR(const BitmaskPOD<size>& mask2)
{
bitXOR(rep.data, mask2.rep.data);
return *this;
......@@ -721,44 +719,50 @@ Bitmask<size>::bitXOR(const Bitmask<size>& mask2)
template <unsigned size>
char *
Bitmask<size>::getText(const Uint32 data[], char* buf)
BitmaskPOD<size>::getText(const Uint32 data[], char* buf)
{
return BitmaskImpl::getText(size, data, buf);
}
template <unsigned size>
inline char *
Bitmask<size>::getText(char* buf) const
BitmaskPOD<size>::getText(char* buf) const
{
return getText(rep.data, buf);
}
template <unsigned size>
inline bool
Bitmask<size>::contains(Uint32 data[], const Uint32 data2[])
BitmaskPOD<size>::contains(Uint32 data[], const Uint32 data2[])
{
return BitmaskImpl::contains(size, data, data2);
}
template <unsigned size>
inline bool
Bitmask<size>::contains(Bitmask<size> that)
BitmaskPOD<size>::contains(BitmaskPOD<size> that)
{
return contains(this->rep.data, that.rep.data);
}
template <unsigned size>
inline bool
Bitmask<size>::overlaps(Uint32 data[], const Uint32 data2[])
BitmaskPOD<size>::overlaps(Uint32 data[], const Uint32 data2[])
{
return BitmaskImpl::overlaps(size, data, data2);
}
template <unsigned size>
inline bool
Bitmask<size>::overlaps(Bitmask<size> that)
BitmaskPOD<size>::overlaps(BitmaskPOD<size> that)
{
return overlaps(this->rep.data, that.rep.data);
}
template <unsigned size>
class Bitmask : public BitmaskPOD<size> {
public:
Bitmask() { clear();}
};
#endif
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef SOCKET_AUTHENTICATOR_HPP
#define SOCKET_AUTHENTICATOR_HPP
class SocketAuthenticator
{
public:
virtual ~SocketAuthenticator() {};
virtual bool client_authenticate(int sockfd) = 0;
virtual bool server_authenticate(int sockfd) = 0;
};
class SocketAuthSimple : public SocketAuthenticator
{
const char *m_passwd;
const char *m_username;
public:
SocketAuthSimple(const char *username, const char *passwd);
virtual ~SocketAuthSimple();
virtual bool client_authenticate(int sockfd);
virtual bool server_authenticate(int sockfd);
};
#endif // SOCKET_AUTHENTICATOR_HPP
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef SOCKET_CLIENT_HPP
#define SOCKET_CLIENT_HPP
#include <NdbTCP.h>
class SocketAuthenticator;
class SocketClient
{
NDB_SOCKET_TYPE m_sockfd;
struct sockaddr_in m_servaddr;
unsigned short m_port;
char *m_server_name;
SocketAuthenticator *m_auth;
public:
SocketClient(const char *server_name, unsigned short port, SocketAuthenticator *sa = 0);
~SocketClient();
bool init();
NDB_SOCKET_TYPE connect();
bool close();
};
#endif // SOCKET_ClIENT_HPP
......@@ -1350,15 +1350,6 @@ EventLogger::EventLogger() : Logger(), m_logLevel(), m_filterLevel(15)
EventLogger::~EventLogger()
{
}
bool
EventLogger::open()
{
char clusterLog[128];
NdbConfig_ClusterLogFileName(clusterLog, 128);
return open(clusterLog);
}
bool
......
......@@ -83,9 +83,6 @@ ConfigInfo::m_SectionRules[] = {
{ "SCI", transformConnection, 0 },
{ "OSE", transformConnection, 0 },
{ "TCP", fixPortNumber, 0 },
//{ "SHM", fixShmKey, 0 },
{ "DB", fixNodeHostname, 0 },
{ "API", fixNodeHostname, 0 },
{ "MGM", fixNodeHostname, 0 },
......@@ -106,6 +103,9 @@ ConfigInfo::m_SectionRules[] = {
{ "OSE", fixHostname, "HostName1" },
{ "OSE", fixHostname, "HostName2" },
{ "TCP", fixPortNumber, 0 }, // has to come after fixHostName
//{ "SHM", fixShmKey, 0 },
/**
* fixExtConnection must be after fixNodeId
*/
......@@ -146,13 +146,17 @@ const int ConfigInfo::m_NoOfRules = sizeof(m_SectionRules)/sizeof(SectionRule);
/****************************************************************************
* Config Rules declarations
****************************************************************************/
bool addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * ruleData);
bool add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * rule_data);
bool add_server_ports(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * rule_data);
const ConfigInfo::ConfigRule
ConfigInfo::m_ConfigRules[] = {
{ addNodeConnections, 0 },
{ add_node_connections, 0 },
{ add_server_ports, 0 },
{ 0, 0 }
};
......@@ -325,6 +329,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
0,
0x7FFFFFFF },
{
CFG_SYS_PORT_BASE,
"PortBase",
"SYSTEM",
"Base port for system",
ConfigInfo::USED,
false,
ConfigInfo::INT,
NDB_BASE_PORT+2,
0,
0x7FFFFFFF },
/***************************************************************************
* DB
***************************************************************************/
......@@ -376,6 +392,18 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
1,
(MAX_NODES - 1) },
{
KEY_INTERNAL,
"ServerPort",
"DB",
"Port used to setup transporter",
ConfigInfo::USED,
false,
ConfigInfo::INT,
UNDEFINED,
1,
65535 },
{
CFG_DB_NO_REPLICAS,
"NoOfReplicas",
......@@ -1231,7 +1259,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED,
false,
ConfigInfo::STRING,
MANDATORY,
0,
0,
0x7FFFFFFF },
......@@ -1330,7 +1358,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED,
false,
ConfigInfo::STRING,
MANDATORY,
0,
0,
0x7FFFFFFF },
......@@ -1354,7 +1382,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED,
false,
ConfigInfo::INT,
2200,
NDB_BASE_PORT,
0,
0x7FFFFFFF },
......@@ -1538,7 +1566,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::USED,
false,
ConfigInfo::INT,
2202,
NDB_BASE_PORT+2,
0,
0x7FFFFFFF },
......@@ -2489,11 +2517,27 @@ transformNode(InitConfigFileParser::Context & ctx, const char * data){
Uint32 id;
if(!ctx.m_currentSection->get("Id", &id)){
Uint32 nextNodeId= 1;
ctx.m_userProperties.get("NextNodeId", &nextNodeId);
id= nextNodeId;
while (ctx.m_userProperties.get("AllocatedNodeId_", id, &id))
id++;
ctx.m_userProperties.put("NextNodeId", id+1, true);
ctx.m_currentSection->put("Id", id);
#if 0
ctx.reportError("Mandatory parameter Id missing from section "
"[%s] starting at line: %d",
ctx.fname, ctx.m_sectionLineno);
return false;
#endif
} else if(ctx.m_userProperties.get("AllocatedNodeId_", id, &id)) {
ctx.reportError("Duplicate Id in section "
"[%s] starting at line: %d",
ctx.fname, ctx.m_sectionLineno);
return false;
}
ctx.m_userProperties.put("AllocatedNodeId_", id, id);
snprintf(ctx.pname, sizeof(ctx.pname), "Node_%d", id);
ctx.m_currentSection->put("Type", ctx.fname);
......@@ -2510,10 +2554,25 @@ fixNodeHostname(InitConfigFileParser::Context & ctx, const char * data){
const char * compId;
if(!ctx.m_currentSection->get("ExecuteOnComputer", &compId)){
require(ctx.m_currentSection->put("HostName", ""));
const char * type;
if(ctx.m_currentSection->get("Type", &type) &&
strcmp(type,"DB") == 0)
{
ctx.reportError("Parameter \"ExecuteOnComputer\" missing from DB section "
"[%s] starting at line: %d",
ctx.fname, ctx.m_sectionLineno);
return false;
}
return true;
#if 0
ctx.reportError("Parameter \"ExecuteOnComputer\" missing from section "
"[%s] starting at line: %d",
ctx.fname, ctx.m_sectionLineno);
return false;
#endif
}
const Properties * computer;
......@@ -2870,18 +2929,44 @@ fixHostname(InitConfigFileParser::Context & ctx, const char * data){
bool
fixPortNumber(InitConfigFileParser::Context & ctx, const char * data){
if(!ctx.m_currentSection->contains("PortNumber")){
Uint32 adder = 0;
ctx.m_userProperties.get("PortNumberAdder", &adder);
Uint32 id1= 0, id2= 0;
require(ctx.m_currentSection->get("NodeId1", &id1));
require(ctx.m_currentSection->get("NodeId2", &id2));
id1 = id1 < id2 ? id1 : id2;
const Properties * node;
require(ctx.m_config->get("Node", id1, &node));
BaseString hostname;
require(node->get("HostName", hostname));
if (hostname.c_str()[0] == 0) {
ctx.reportError("Hostname required on nodeid %d since it will act as server.", id1);
return false;
}
Uint32 port= 0;
if (!node->get("ServerPort", &port) && !ctx.m_userProperties.get("ServerPort_", id1, &port)) {
hostname.append("_ServerPortAdder");
Uint32 adder= 0;
ctx.m_userProperties.get(hostname.c_str(), &adder);
ctx.m_userProperties.put(hostname.c_str(), adder+1, true);
Uint32 base = 0;
if(!(ctx.m_userDefaults && ctx.m_userDefaults->get("PortNumber", &base)) &&
!ctx.m_systemDefaults->get("PortNumber", &base)){
return false;
}
ctx.m_currentSection->put("PortNumber", base + adder);
adder++;
ctx.m_userProperties.put("PortNumberAdder", adder, true);
port= base + adder;
ctx.m_userProperties.put("ServerPort_", id1, port);
}
if(ctx.m_currentSection->contains("PortNumber")) {
ndbout << "PortNumber should no longer be specificied per connection, please remove from config. Will be changed to " << port << endl;
}
ctx.m_currentSection->put("PortNumber", port);
return true;
}
......@@ -3158,9 +3243,9 @@ saveInConfigValues(InitConfigFileParser::Context & ctx, const char * data){
}
bool
addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>&sections,
add_node_connections(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * ruleData)
const char * rule_data)
{
Properties * props= ctx.m_config;
Properties p_connections;
......@@ -3241,3 +3326,42 @@ addNodeConnections(Vector<ConfigInfo::ConfigRuleSection>&sections,
return true;
}
bool add_server_ports(Vector<ConfigInfo::ConfigRuleSection>&sections,
struct InitConfigFileParser::Context &ctx,
const char * rule_data)
{
#if 0
Properties * props= ctx.m_config;
Properties computers;
Uint32 port_base = NDB_BASE_PORT+2;
Uint32 nNodes;
ctx.m_userProperties.get("NoOfNodes", &nNodes);
for (Uint32 i= 0, n= 0; n < nNodes; i++){
Properties * tmp;
if(!props->get("Node", i, &tmp)) continue;
n++;
const char * type;
if(!tmp->get("Type", &type)) continue;
Uint32 port;
if (tmp->get("ServerPort", &port)) continue;
Uint32 computer;
if (!tmp->get("ExecuteOnComputer", &computer)) continue;
Uint32 adder= 0;
computers.get("",computer, &adder);
if (strcmp(type,"DB") == 0) {
adder++;
tmp->put("ServerPort", port_base+adder);
computers.put("",computer, adder);
}
}
#endif
return true;
}
......@@ -45,13 +45,15 @@
ConfigRetriever::ConfigRetriever() {
_localConfigFileName = NULL;
m_defaultConnectString = NULL;
_localConfigFileName = 0;
m_defaultConnectString = 0;
errorString = 0;
_localConfig = new LocalConfig();
m_connectString = NULL;
m_connectString = 0;
m_handle= 0;
}
ConfigRetriever::~ConfigRetriever(){
......@@ -68,6 +70,11 @@ ConfigRetriever::~ConfigRetriever(){
free(errorString);
delete _localConfig;
if (m_handle) {
ndb_mgm_disconnect(m_handle);
ndb_mgm_destroy_handle(&m_handle);
}
}
......@@ -114,7 +121,8 @@ ConfigRetriever::getConfig(int verId, int nodeType) {
struct ndb_mgm_configuration * p = 0;
switch(m->type){
case MgmId_TCP:
p = getConfig(m->data.tcp.remoteHost, m->data.tcp.port, verId);
p = getConfig(m->data.tcp.remoteHost, m->data.tcp.port,
verId, nodeType);
break;
case MgmId_File:
p = getConfig(m->data.file.filename, verId);
......@@ -155,30 +163,52 @@ ConfigRetriever::getConfig(int verId, int nodeType) {
ndb_mgm_configuration *
ConfigRetriever::getConfig(const char * mgmhost,
short port,
int versionId){
NdbMgmHandle h;
h = ndb_mgm_create_handle();
if (h == NULL) {
int versionId,
int nodetype){
if (m_handle) {
ndb_mgm_disconnect(m_handle);
ndb_mgm_destroy_handle(&m_handle);
}
m_handle = ndb_mgm_create_handle();
if (m_handle == 0) {
setError(CR_ERROR, "Unable to allocate mgm handle");
return 0;
}
BaseString tmp;
tmp.assfmt("%s:%d", mgmhost, port);
if (ndb_mgm_connect(h, tmp.c_str()) != 0) {
setError(CR_RETRY, ndb_mgm_get_latest_error_desc(h));
ndb_mgm_destroy_handle(&h);
if (ndb_mgm_connect(m_handle, tmp.c_str()) != 0) {
setError(CR_RETRY, ndb_mgm_get_latest_error_desc(m_handle));
ndb_mgm_destroy_handle(&m_handle);
m_handle= 0;
return 0;
}
ndb_mgm_configuration * conf = ndb_mgm_get_configuration(h, versionId);
ndb_mgm_configuration * conf = ndb_mgm_get_configuration(m_handle, versionId);
if(conf == 0){
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(h));
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
ndb_mgm_disconnect(m_handle);
ndb_mgm_destroy_handle(&m_handle);
m_handle= 0;
return 0;
}
ndb_mgm_disconnect(h);
ndb_mgm_destroy_handle(&h);
{
unsigned nodeid= getOwnNodeId();
int res= ndb_mgm_alloc_nodeid(m_handle, versionId, &nodeid, nodetype);
if(res != 0) {
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
ndb_mgm_disconnect(m_handle);
ndb_mgm_destroy_handle(&m_handle);
m_handle= 0;
return 0;
}
_ownNodeId= nodeid;
}
return conf;
#if 0
......@@ -329,6 +359,9 @@ ConfigRetriever::verifyConfig(const struct ndb_mgm_configuration * conf,
}
do {
if(strlen(hostname) == 0)
break;
if(strcasecmp(hostname, localhost) == 0)
break;
......
......@@ -339,12 +339,13 @@ IPCConfig::getNodeType(NodeId id) const {
return out;
}
#include <mgmapi.h>
Uint32
IPCConfig::configureTransporters(Uint32 nodeId,
const class ndb_mgm_configuration & config,
class TransporterRegistry & tr){
Uint32 noOfTransportersCreated = 0;
Uint32 noOfTransportersCreated= 0, server_port= 0;
ndb_mgm_configuration_iterator iter(config, CFG_SECTION_CONNECTION);
for(iter.first(); iter.valid(); iter.next()){
......@@ -440,6 +441,14 @@ IPCConfig::configureTransporters(Uint32 nodeId,
}
}
if (nodeId <= nodeId1 && nodeId <= nodeId2) {
if (server_port && server_port != conf.port) {
ndbout << "internal error in config setup of server ports line= " << __LINE__ << endl;
exit(-1);
}
server_port= conf.port;
}
conf.localNodeId = nodeId;
conf.remoteNodeId = remoteNodeId;
conf.localHostName = (nodeId == nodeId1 ? host1 : host2);
......@@ -490,6 +499,8 @@ IPCConfig::configureTransporters(Uint32 nodeId,
}
}
tr.m_service_port= server_port;
return noOfTransportersCreated;
}
......@@ -17,10 +17,12 @@
#include "LocalConfig.hpp"
#include <NdbEnv.h>
#include <NdbConfig.h>
#include <NdbAutoPtr.hpp>
LocalConfig::LocalConfig(){
ids = 0; size = 0; items = 0;
error_line = 0; error_msg[0] = 0;
_ownNodeId= 0;
}
bool
......@@ -68,10 +70,10 @@ LocalConfig::init(bool onlyNodeId,
//4. Check Ndb.cfg in NDB_HOME
{
bool fopenError;
char buf[256];
if(readFile(NdbConfig_NdbCfgName(buf, sizeof(buf), 1 /*true*/), fopenError, onlyNodeId)){
char *buf= NdbConfig_NdbCfgName(1 /*true*/);
NdbAutoPtr<char> tmp_aptr(buf);
if(readFile(buf, fopenError, onlyNodeId))
return true;
}
if (!fopenError)
return false;
}
......@@ -79,22 +81,29 @@ LocalConfig::init(bool onlyNodeId,
//5. Check Ndb.cfg in cwd
{
bool fopenError;
char buf[256];
if(readFile(NdbConfig_NdbCfgName(buf, sizeof(buf), 0 /*false*/), fopenError, onlyNodeId)){
char *buf= NdbConfig_NdbCfgName(0 /*false*/);
NdbAutoPtr<char> tmp_aptr(buf);
if(readFile(buf, fopenError, onlyNodeId))
return true;
}
if (!fopenError)
return false;
}
//6. Check defaultConnectString
if(defaultConnectString != 0) {
if(readConnectString(defaultConnectString, onlyNodeId)){
if(readConnectString(defaultConnectString, onlyNodeId))
return true;
}
return false;
}
//7. Check
{
char buf[256];
snprintf(buf, sizeof(buf), "host=localhost:%u", NDB_BASE_PORT);
if(readConnectString(buf, onlyNodeId))
return true;
}
setError(0, "");
return false;
......@@ -144,12 +153,12 @@ void LocalConfig::printUsage() const {
ndbout << "1. Put a Ndb.cfg file in the directory where you start"<<endl
<< " the node. "<< endl
<< " Ex: Ndb.cfg" << endl
<< " | nodeid=11;host=localhost:2200"<<endl<<endl;
<< " | host=localhost:"<<NDB_BASE_PORT<<endl;
ndbout << "2. Use the environment variable NDB_CONNECTSTRING to "<<endl
<< " provide this information." <<endl
<< " Ex: " << endl
<< " >export NDB_CONNECTSTRING=\"nodeid=11;host=localhost:2200\""
<< " >export NDB_CONNECTSTRING=\"host=localhost:"<<NDB_BASE_PORT<<"\""
<<endl<<endl;
}
......
......@@ -18,43 +18,92 @@
#include <NdbConfig.h>
#include <NdbEnv.h>
const char*
NdbConfig_HomePath(char* buf, int buflen){
const char* p;
p = NdbEnv_GetEnv("NDB_HOME", buf, buflen);
if (p == NULL){
strlcpy(buf, "", buflen);
p = buf;
} else {
const int len = strlen(buf);
if(len != 0 && buf[len-1] != '/'){
buf[len] = '/';
buf[len+1] = 0;
}
}
return p;
}
const char*
NdbConfig_NdbCfgName(char* buf, int buflen, int with_ndb_home){
if (with_ndb_home)
NdbConfig_HomePath(buf, buflen);
static char*
NdbConfig_AllocHomePath(int _len)
{
const char *path= NdbEnv_GetEnv("NDB_HOME", 0, 0);
int len= _len;
int path_len= 0;
if (path)
path_len= strlen(path);
len+= path_len;
char *buf= malloc(len);
if (path_len > 0)
snprintf(buf, len, "%s%c", path, DIR_SEPARATOR);
else
buf[0] = 0;
strlcat(buf, "Ndb.cfg", buflen);
buf[0]= 0;
return buf;
}
char*
NdbConfig_NdbCfgName(int with_ndb_home){
char *buf;
int len= 0;
if (with_ndb_home) {
buf= NdbConfig_AllocHomePath(128);
len= strlen(buf);
} else
buf= malloc(128);
snprintf(buf+len, 128, "Ndb.cfg");
return buf;
}
char*
NdbConfig_ErrorFileName(int node_id){
char *buf= NdbConfig_AllocHomePath(128);
int len= strlen(buf);
snprintf(buf+len, 128, "ndb_%u_error.log", node_id);
return buf;
}
char*
NdbConfig_ClusterLogFileName(int node_id){
char *buf= NdbConfig_AllocHomePath(128);
int len= strlen(buf);
snprintf(buf+len, 128, "ndb_%u_cluster.log", node_id);
return buf;
}
char*
NdbConfig_SignalLogFileName(int node_id){
char *buf= NdbConfig_AllocHomePath(128);
int len= strlen(buf);
snprintf(buf+len, 128, "ndb_%u_signal.log", node_id);
return buf;
}
char*
NdbConfig_TraceFileName(int node_id, int file_no){
char *buf= NdbConfig_AllocHomePath(128);
int len= strlen(buf);
snprintf(buf+len, 128, "ndb_%u_trace.log.%u", node_id, file_no);
return buf;
}
char*
NdbConfig_NextTraceFileName(int node_id){
char *buf= NdbConfig_AllocHomePath(128);
int len= strlen(buf);
snprintf(buf+len, 128, "ndb_%u_trace.log.next", node_id);
return buf;
}
const char*
NdbConfig_ErrorFileName(char* buf, int buflen){
NdbConfig_HomePath(buf, buflen);
strlcat(buf, "error.log", buflen);
char*
NdbConfig_PidFileName(int node_id){
char *buf= NdbConfig_AllocHomePath(128);
int len= strlen(buf);
snprintf(buf+len, 128, "ndb_%u.pid", node_id);
return buf;
}
const char*
NdbConfig_ClusterLogFileName(char* buf, int buflen){
NdbConfig_HomePath(buf, buflen);
strlcat(buf, "cluster.log", buflen);
char*
NdbConfig_StdoutFileName(int node_id){
char *buf= NdbConfig_AllocHomePath(128);
int len= strlen(buf);
snprintf(buf+len, 128, "ndb_%u_out.log", node_id);
return buf;
}
......@@ -63,27 +63,23 @@ ndbstrerror::~ndbstrerror(void)
#define ndbstrerror strerror
#endif
TCP_Transporter::TCP_Transporter(int sendBufSize, int maxRecvSize,
int portNo,
const char *rHostName,
TCP_Transporter::TCP_Transporter(TransporterRegistry &t_reg,
int sendBufSize, int maxRecvSize,
const char *lHostName,
NodeId rNodeId, NodeId lNodeId,
const char *rHostName,
int r_port,
NodeId lNodeId,
NodeId rNodeId,
int byte_order,
bool compr, bool chksm, bool signalId,
Uint32 _reportFreq) :
Transporter(lNodeId, rNodeId, byte_order, compr, chksm, signalId),
m_sendBuffer(sendBufSize),
isServer(lNodeId < rNodeId),
port(portNo)
Transporter(t_reg, lHostName, rHostName, r_port, lNodeId, rNodeId,
byte_order, compr, chksm, signalId),
m_sendBuffer(sendBufSize)
{
maxReceiveSize = maxRecvSize;
strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
// Initialize member variables
Ndb_getInAddr(&remoteHostAddress, rHostName);
Ndb_getInAddr(&localHostAddress, lHostName);
theSocket = NDB_INVALID_SOCKET;
sendCount = receiveCount = 0;
......@@ -108,6 +104,24 @@ TCP_Transporter::~TCP_Transporter() {
receiveBuffer.destroy();
}
bool TCP_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
{
return connect_common(sockfd);
}
bool TCP_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
{
return connect_common(sockfd);
}
bool TCP_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
{
theSocket = sockfd;
setSocketOptions();
setSocketNonBlocking(theSocket);
return true;
}
bool
TCP_Transporter::initTransporter() {
......@@ -316,7 +330,7 @@ TCP_Transporter::doSend() {
sendCount ++;
sendSize += nBytesSent;
if(sendCount == reportFreq){
reportSendLen(callbackObj,remoteNodeId, sendCount, sendSize);
reportSendLen(get_callback_obj(), remoteNodeId, sendCount, sendSize);
sendCount = 0;
sendSize = 0;
}
......@@ -331,7 +345,7 @@ TCP_Transporter::doSend() {
#endif
if(DISCONNECT_ERRNO(InetErrno, nBytesSent)){
doDisconnect();
reportDisconnect(callbackObj, remoteNodeId, InetErrno);
report_disconnect(InetErrno);
}
return false;
......@@ -361,14 +375,15 @@ TCP_Transporter::doReceive() {
#endif
ndbout_c("receiveBuffer.sizeOfData(%d) > receiveBuffer.sizeOfBuffer(%d)",
receiveBuffer.sizeOfData, receiveBuffer.sizeOfBuffer);
reportError(callbackObj, remoteNodeId, TE_INVALID_MESSAGE_LENGTH);
report_error(TE_INVALID_MESSAGE_LENGTH);
return 0;
}
receiveCount ++;
receiveSize += nBytesRead;
if(receiveCount == reportFreq){
reportReceiveLen(callbackObj, remoteNodeId, receiveCount, receiveSize);
reportReceiveLen(get_callback_obj(), remoteNodeId, receiveCount, receiveSize);
receiveCount = 0;
receiveSize = 0;
}
......@@ -384,60 +399,17 @@ TCP_Transporter::doReceive() {
if(DISCONNECT_ERRNO(InetErrno, nBytesRead)){
// The remote node has closed down
doDisconnect();
reportDisconnect(callbackObj, remoteNodeId,InetErrno);
report_disconnect(InetErrno);
}
}
return nBytesRead;
}
bool
TCP_Transporter::connectImpl(Uint32 timeOutMillis){
struct timeval timeout = {0, 0};
timeout.tv_sec = timeOutMillis / 1000;
timeout.tv_usec = (timeOutMillis % 1000)*1000;
bool retVal = false;
if(isServer){
if(theSocket == NDB_INVALID_SOCKET){
startTCPServer();
}
if(theSocket == NDB_INVALID_SOCKET)
{
NdbSleep_MilliSleep(timeOutMillis);
return false;
}
retVal = acceptClient(&timeout);
} else {
// Is client
retVal = connectClient(&timeout);
}
if(!retVal) {
NdbSleep_MilliSleep(timeOutMillis);
return false;
}
#if defined NDB_OSE || defined NDB_SOFTOSE
if(setsockopt(theSocket, SOL_SOCKET, SO_OSEOWNER,
&theReceiverPid, sizeof(PROCESS)) != 0){
ndbout << "Failed to transfer ownership of socket" << endl;
NDB_CLOSE_SOCKET(theSocket);
theSocket = -1;
return false;
}
#endif
return true;
}
void
TCP_Transporter::disconnectImpl() {
TCP_Transporter::disconnectImpl() {
if(theSocket != NDB_INVALID_SOCKET){
if(NDB_CLOSE_SOCKET(theSocket) < 0){
reportError(callbackObj, remoteNodeId, TE_ERROR_CLOSING_SOCKET);
report_error(TE_ERROR_CLOSING_SOCKET);
}
}
......@@ -447,155 +419,3 @@ TCP_Transporter::disconnectImpl() {
theSocket = NDB_INVALID_SOCKET;
}
bool
TCP_Transporter::startTCPServer() {
int bindResult, listenResult;
// The server variable is the remote server when we are a client
// htonl and htons returns the parameter in network byte order
// INADDR_ANY tells the OS kernel to choose the IP address
struct sockaddr_in server;
memset((void*)&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr.s_addr = localHostAddress.s_addr;
server.sin_port = htons(port);
if (theSocket != NDB_INVALID_SOCKET) {
return true; // Server socket is already initialized
}
// Create the socket
theSocket = socket(AF_INET, SOCK_STREAM, 0);
if (theSocket == NDB_INVALID_SOCKET) {
reportThreadError(remoteNodeId, TE_COULD_NOT_CREATE_SOCKET);
return false;
}
// Set the socket reuse addr to true, so we are sure we can bind the
// socket
int reuseAddr = 1;
setsockopt(theSocket, SOL_SOCKET, SO_REUSEADDR,
(char*)&reuseAddr, sizeof(reuseAddr));
// Set the TCP_NODELAY option so also small packets are sent
// as soon as possible
int nodelay = 1;
setsockopt(theSocket, IPPROTO_TCP, TCP_NODELAY,
(char*)&nodelay, sizeof(nodelay));
// Bind the socket
bindResult = bind(theSocket, (struct sockaddr *) &server,
sizeof(server));
if (bindResult < 0) {
reportThreadError(remoteNodeId, TE_COULD_NOT_BIND_SOCKET);
NDB_CLOSE_SOCKET(theSocket);
theSocket = NDB_INVALID_SOCKET;
return false;
}
// Perform listen.
listenResult = listen(theSocket, 1);
if (listenResult == 1) {
reportThreadError(remoteNodeId, TE_LISTEN_FAILED);
NDB_CLOSE_SOCKET(theSocket);
theSocket = NDB_INVALID_SOCKET;
return false;
}
return true;
}
bool
TCP_Transporter::acceptClient (struct timeval * timeout){
struct sockaddr_in clientAddress;
fd_set readset;
FD_ZERO(&readset);
FD_SET(theSocket, &readset);
const int res = select(theSocket + 1, &readset, 0, 0, timeout);
if(res == 0)
return false;
if(res < 0){
reportThreadError(remoteNodeId, TE_ERROR_IN_SELECT_BEFORE_ACCEPT);
return false;
}
NDB_SOCKLEN_T clientAddressLen = sizeof(clientAddress);
const NDB_SOCKET_TYPE clientSocket = accept(theSocket,
(struct sockaddr*)&clientAddress,
&clientAddressLen);
if (clientSocket == NDB_INVALID_SOCKET) {
reportThreadError(remoteNodeId, TE_ACCEPT_RETURN_ERROR);
return false;
}
if (clientAddress.sin_addr.s_addr != remoteHostAddress.s_addr) {
ndbout_c("Wrong client connecting!");
ndbout_c("connecting address: %s", inet_ntoa(clientAddress.sin_addr));
ndbout_c("expecting address: %s", inet_ntoa(remoteHostAddress));
// The newly connected host is not the remote host
// we wanted to connect to. Disconnect it.
// XXX This is not valid. We cannot disconnect it.
NDB_CLOSE_SOCKET(clientSocket);
return false;
} else {
NDB_CLOSE_SOCKET(theSocket);
theSocket = clientSocket;
setSocketOptions();
setSocketNonBlocking(theSocket);
return true;
}
}
bool
TCP_Transporter::connectClient (struct timeval * timeout){
// Create the socket
theSocket = socket(AF_INET, SOCK_STREAM, 0);
if (theSocket == NDB_INVALID_SOCKET) {
reportThreadError(remoteNodeId, TE_COULD_NOT_CREATE_SOCKET);
return false;
}
struct sockaddr_in server;
memset((void*)&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_addr = remoteHostAddress;
server.sin_port = htons(port);
struct sockaddr_in client;
memset((void*)&client, 0, sizeof(client));
client.sin_family = AF_INET;
client.sin_addr = localHostAddress;
client.sin_port = 0; // Any port
// Bind the socket
const int bindResult = bind(theSocket, (struct sockaddr *) &client,
sizeof(client));
if (bindResult < 0) {
reportThreadError(remoteNodeId, TE_COULD_NOT_BIND_SOCKET);
NDB_CLOSE_SOCKET(theSocket);
theSocket = NDB_INVALID_SOCKET;
return false;
}
const int connectRes = ::connect(theSocket, (struct sockaddr *) &server,
sizeof(server));
if(connectRes == 0){
setSocketOptions();
setSocketNonBlocking(theSocket);
return true;
}
NDB_CLOSE_SOCKET(theSocket);
theSocket = NDB_INVALID_SOCKET;
return false;
}
......@@ -14,24 +14,8 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
//****************************************************************************
//
// AUTHOR
// sa Fransson
//
// NAME
// TCP_Transporter
//
// DESCRIPTION
// A TCP_Transporter instance is created when TCP/IP-communication
// shall be used (user specified). It handles connect, disconnect,
// send and receive.
//
//
//
//***************************************************************************/
#ifndef TCP_Transporter_H
#define TCP_Transporter_H
#ifndef TCP_TRANSPORTER_HPP
#define TCP_TRANSPORTER_HPP
#include "Transporter.hpp"
#include "SendBuffer.hpp"
......@@ -61,11 +45,13 @@ class TCP_Transporter : public Transporter {
friend class TransporterRegistry;
private:
// Initialize member variables
TCP_Transporter(int sendBufferSize, int maxReceiveSize,
int port,
const char *rHostName,
TCP_Transporter(TransporterRegistry&,
int sendBufferSize, int maxReceiveSize,
const char *lHostName,
NodeId rHostId, NodeId lHostId,
const char *rHostName,
int r_port,
NodeId lHostId,
NodeId rHostId,
int byteorder,
bool compression, bool checksum, bool signalId,
Uint32 reportFreq = 4096);
......@@ -121,12 +107,14 @@ protected:
* A client connects to the remote server
* A server accepts any new connections
*/
bool connectImpl(Uint32 timeOutMillis);
virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd);
virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd);
bool connect_common(NDB_SOCKET_TYPE sockfd);
/**
* Disconnects a TCP/IP node. Empty send and receivebuffer.
*/
void disconnectImpl();
virtual void disconnectImpl();
private:
/**
......@@ -134,21 +122,11 @@ private:
*/
SendBuffer m_sendBuffer;
const bool isServer;
const unsigned int port;
// Sending/Receiving socket used by both client and server
NDB_SOCKET_TYPE theSocket;
Uint32 maxReceiveSize;
/**
* Remote host name/and address
*/
char remoteHostName[256];
struct in_addr remoteHostAddress;
struct in_addr localHostAddress;
/**
* Socket options
*/
......@@ -163,43 +141,6 @@ private:
bool sendIsPossible(struct timeval * timeout);
/**
* startTCPServer - None blocking
*
* create a server socket
* bind
* listen
*
* Note: Does not call accept
*/
bool startTCPServer();
/**
* acceptClient - Blocking
*
* Accept a connection
* checks if "right" client has connected
* if so
* close server socket
* else
* close newly created socket and goto begin
*/
bool acceptClient(struct timeval * timeout);
/**
* Creates a client socket
*
* Note does not call connect
*/
bool createClientSocket();
/**
* connectClient - Blocking
*
* connects to remote host
*/
bool connectClient(struct timeval * timeout);
/**
* Statistics
*/
......
......@@ -15,132 +15,122 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <TransporterRegistry.hpp>
#include <TransporterCallback.hpp>
#include "Transporter.hpp"
#include "TransporterInternalDefinitions.hpp"
#include <NdbSleep.h>
Transporter::Transporter(NodeId lNodeId, NodeId rNodeId,
#include <SocketAuthenticator.hpp>
#include <InputStream.hpp>
#include <OutputStream.hpp>
Transporter::Transporter(TransporterRegistry &t_reg,
const char *lHostName,
const char *rHostName,
int r_port,
NodeId lNodeId,
NodeId rNodeId,
int _byteorder,
bool _compression, bool _checksum, bool _signalId)
: localNodeId(lNodeId), remoteNodeId(rNodeId),
m_packer(_signalId, _checksum)
: m_r_port(r_port), localNodeId(lNodeId), remoteNodeId(rNodeId),
isServer(lNodeId < rNodeId),
m_packer(_signalId, _checksum),
m_transporter_registry(t_reg)
{
if (rHostName && strlen(rHostName) > 0){
strncpy(remoteHostName, rHostName, sizeof(remoteHostName));
Ndb_getInAddr(&remoteHostAddress, rHostName);
}
else
{
if (!isServer) {
ndbout << "Unable to setup transporter. Node " << rNodeId
<< " must have hostname. Update configuration." << endl;
exit(-1);
}
remoteHostName[0]= 0;
}
strncpy(localHostName, lHostName, sizeof(localHostName));
if (strlen(lHostName) > 0)
Ndb_getInAddr(&localHostAddress, lHostName);
byteOrder = _byteorder;
compressionUsed = _compression;
checksumUsed = _checksum;
signalIdUsed = _signalId;
_threadError = TE_NO_ERROR;
m_connected = false;
m_timeOutMillis = 1000;
_connecting = false;
_disconnecting = false;
_connected = false;
_timeOutMillis = 1000;
theThreadPtr = NULL;
theMutexPtr = NdbMutex_Create();
if (isServer)
m_socket_client= 0;
else
m_socket_client= new SocketClient(remoteHostName, r_port,
new SocketAuthSimple("ndbd", "ndbd passwd"));
}
Transporter::~Transporter(){
NdbMutex_Destroy(theMutexPtr);
if(theThreadPtr != 0){
void * retVal;
NdbThread_WaitFor(theThreadPtr, &retVal);
NdbThread_Destroy(&theThreadPtr);
}
}
extern "C"
void *
runConnect_C(void * me)
{
runConnect(me);
NdbThread_Exit(0);
return NULL;
}
void *
runConnect(void * me){
Transporter * t = (Transporter *) me;
DEBUG("Connect thread to " << t->remoteNodeId << " started");
while(true){
NdbMutex_Lock(t->theMutexPtr);
if(t->_disconnecting){
t->_connecting = false;
NdbMutex_Unlock(t->theMutexPtr);
DEBUG("Connect Thread " << t->remoteNodeId << " stop due to disconnect");
return 0;
}
NdbMutex_Unlock(t->theMutexPtr);
bool res = t->connectImpl(t->_timeOutMillis); // 1000 ms
DEBUG("Waiting for " << t->remoteNodeId << "...");
if(res){
t->_connected = true;
t->_connecting = false;
t->_errorCount = 0;
t->_threadError = TE_NO_ERROR;
DEBUG("Connect Thread " << t->remoteNodeId << " stop due to connect");
return 0;
}
}
if (m_socket_client)
delete m_socket_client;
}
void
Transporter::doConnect() {
bool
Transporter::connect_server(NDB_SOCKET_TYPE sockfd) {
if(m_connected)
return true; // TODO assert(0);
NdbMutex_Lock(theMutexPtr);
if(_connecting || _disconnecting || _connected){
NdbMutex_Unlock(theMutexPtr);
return;
bool res = connect_server_impl(sockfd);
if(res){
m_connected = true;
m_errorCount = 0;
}
_connecting = true;
_threadError = TE_NO_ERROR;
return res;
}
// Start thread
bool
Transporter::connect_client() {
if(m_connected)
return true;
NDB_SOCKET_TYPE sockfd = m_socket_client->connect();
char buf[16];
snprintf(buf, sizeof(buf), "ndb_con_%d", remoteNodeId);
if (sockfd < 0)
return false;
// send info about own id
SocketOutputStream s_output(sockfd);
s_output.println("%d", localNodeId);
// get remote id
int nodeId;
SocketInputStream s_input(sockfd);
char buf[256];
if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd);
return false;
}
if (sscanf(buf, "%d", &nodeId) != 1) {
NDB_CLOSE_SOCKET(sockfd);
return false;
}
if(theThreadPtr != 0){
void * retVal;
NdbThread_WaitFor(theThreadPtr, &retVal);
NdbThread_Destroy(&theThreadPtr);
bool res = connect_client_impl(sockfd);
if(res){
m_connected = true;
m_errorCount = 0;
}
theThreadPtr = NdbThread_Create(runConnect_C,
(void**)this,
32768,
buf,
NDB_THREAD_PRIO_LOW);
NdbSleep_MilliSleep(100); // Let thread start
NdbMutex_Unlock(theMutexPtr);
return res;
}
void
Transporter::doDisconnect() {
NdbMutex_Lock(theMutexPtr);
_disconnecting = true;
while(_connecting){
DEBUG("Waiting for connect to finish...");
NdbMutex_Unlock(theMutexPtr);
NdbSleep_MilliSleep(500);
NdbMutex_Lock(theMutexPtr);
}
_connected = false;
Transporter::doDisconnect() {
if(!m_connected)
return; //assert(0); TODO will fail
disconnectImpl();
_threadError = TE_NO_ERROR;
_disconnecting = false;
NdbMutex_Unlock(theMutexPtr);
m_connected= false;
}
......@@ -19,6 +19,9 @@
#include <ndb_global.h>
#include <SocketClient.hpp>
#include <TransporterRegistry.hpp>
#include <TransporterCallback.hpp>
#include "TransporterDefinitions.hpp"
#include "Packer.hpp"
......@@ -40,8 +43,9 @@ public:
* None blocking
* Use isConnected() to check status
*/
virtual void doConnect();
bool connect_client();
bool connect_server(NDB_SOCKET_TYPE socket);
/**
* Blocking
*/
......@@ -60,14 +64,17 @@ public:
*/
NodeId getRemoteNodeId() const;
/**
* Set callback object
* Local (own) Node Id
*/
void setCallbackObject(void * callback);
NodeId getLocalNodeId() const;
protected:
Transporter(NodeId lNodeId,
Transporter(TransporterRegistry &,
const char *lHostName,
const char *rHostName,
int r_port,
NodeId lNodeId,
NodeId rNodeId,
int byteorder,
bool compression,
......@@ -78,58 +85,59 @@ protected:
* Blocking, for max timeOut milli seconds
* Returns true if connect succeded
*/
virtual bool connectImpl(Uint32 timeOut) = 0;
virtual bool connect_server_impl(NDB_SOCKET_TYPE sockfd) = 0;
virtual bool connect_client_impl(NDB_SOCKET_TYPE sockfd) = 0;
/**
* Blocking
*/
virtual void disconnectImpl() = 0;
const NodeId localNodeId;
/**
* Remote host name/and address
*/
char remoteHostName[256];
char localHostName[256];
struct in_addr remoteHostAddress;
struct in_addr localHostAddress;
const unsigned int m_r_port;
const NodeId remoteNodeId;
const NodeId localNodeId;
const bool isServer;
unsigned createIndex;
int byteOrder;
bool compressionUsed;
bool checksumUsed;
bool signalIdUsed;
Packer m_packer;
Packer m_packer;
private:
/**
* Thread and mutex for connect
*/
NdbThread* theThreadPtr;
friend void* runConnect(void * me);
SocketClient *m_socket_client;
protected:
/**
* Error reporting from connect thread(s)
*/
void reportThreadError(NodeId nodeId,
TransporterError errorCode);
Uint32 getErrorCount();
TransporterError getThreadError();
void resetThreadError();
TransporterError _threadError;
Uint32 _timeOutMillis;
Uint32 _errorCount;
protected:
NdbMutex* theMutexPtr;
bool _connected; // Are we connected
bool _connecting; // Connect thread is running
bool _disconnecting; // We are disconnecting
void * callbackObj;
Uint32 m_errorCount;
Uint32 m_timeOutMillis;
protected:
bool m_connected; // Are we connected
TransporterRegistry &m_transporter_registry;
void *get_callback_obj() { return m_transporter_registry.callbackObj; };
void report_disconnect(int err){m_transporter_registry.report_disconnect(remoteNodeId,err);};
void report_error(enum TransporterError err){reportError(get_callback_obj(),remoteNodeId,err);};
};
inline
bool
Transporter::isConnected() const {
return _connected;
return m_connected;
}
inline
......@@ -138,42 +146,17 @@ Transporter::getRemoteNodeId() const {
return remoteNodeId;
}
inline
void
Transporter::reportThreadError(NodeId nodeId, TransporterError errorCode)
{
#if 0
ndbout_c("Transporter::reportThreadError (NodeId: %d, Error code: %d)",
nodeId, errorCode);
#endif
_threadError = errorCode;
_errorCount++;
}
inline
TransporterError
Transporter::getThreadError(){
return _threadError;
NodeId
Transporter::getLocalNodeId() const {
return remoteNodeId;
}
inline
Uint32
Transporter::getErrorCount()
{
return _errorCount;
}
inline
void
Transporter::resetThreadError()
{
_threadError = TE_NO_ERROR;
}
inline
void
Transporter::setCallbackObject(void * callback) {
callbackObj = callback;
return m_errorCount;
}
#endif // Define of Transporter_H
......@@ -16,10 +16,11 @@
#include <ndb_global.h>
#include "TransporterRegistry.hpp"
#include <TransporterRegistry.hpp>
#include "TransporterInternalDefinitions.hpp"
#include "Transporter.hpp"
#include <SocketAuthenticator.hpp>
#ifdef NDB_TCP_TRANSPORTER
#include "TCP_Transporter.hpp"
......@@ -42,20 +43,67 @@
#include "NdbOut.hpp"
#include <NdbSleep.h>
#include <NdbTick.h>
#define STEPPING 1
#include <InputStream.hpp>
#include <OutputStream.hpp>
SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
{
if (m_auth && !m_auth->server_authenticate(sockfd)){
NDB_CLOSE_SOCKET(sockfd);
return 0;
}
{
// read node id from client
int nodeId;
SocketInputStream s_input(sockfd);
char buf[256];
if (s_input.gets(buf, 256) == 0) {
NDB_CLOSE_SOCKET(sockfd);
return 0;
}
if (sscanf(buf, "%d", &nodeId) != 1) {
NDB_CLOSE_SOCKET(sockfd);
return 0;
}
//check that nodeid is valid and that there is an allocated transporter
if ( nodeId < 0 || nodeId >= m_transporter_registry->maxTransporters) {
NDB_CLOSE_SOCKET(sockfd);
return 0;
}
if (m_transporter_registry->theTransporters[nodeId] == 0) {
NDB_CLOSE_SOCKET(sockfd);
return 0;
}
//check that the transporter should be connected
if (m_transporter_registry->performStates[nodeId] != TransporterRegistry::CONNECTING) {
NDB_CLOSE_SOCKET(sockfd);
return 0;
}
Transporter *t= m_transporter_registry->theTransporters[nodeId];
// send info about own id (just as response to acknowledge connection)
SocketOutputStream s_output(sockfd);
s_output.println("%d", t->getLocalNodeId());
// setup transporter (transporter responsible for closing sockfd)
t->connect_server(sockfd);
}
return 0;
}
TransporterRegistry::TransporterRegistry(void * callback,
unsigned _maxTransporters,
unsigned sizeOfLongSignalMemory) {
m_transporter_service= 0;
nodeIdSpecified = false;
maxTransporters = _maxTransporters;
sendCounter = 1;
m_ccCount = 0;
m_ccIndex = 0;
m_ccStep = STEPPING;
m_ccReady = false;
m_nTransportersPerformConnect=0;
callbackObj=callback;
......@@ -82,7 +130,7 @@ TransporterRegistry::TransporterRegistry(void * callback,
theSHMTransporters[i] = NULL;
theOSETransporters[i] = NULL;
theTransporters[i] = NULL;
performStates[i] = PerformNothing;
performStates[i] = DISCONNECTED;
ioStates[i] = NoHalt;
}
theOSEReceiver = 0;
......@@ -154,13 +202,14 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
return false;
TCP_Transporter * t = new TCP_Transporter(config->sendBufferSize,
config->maxReceiveSize,
config->port,
config->remoteHostName,
TCP_Transporter * t = new TCP_Transporter(*this,
config->sendBufferSize,
config->maxReceiveSize,
config->localHostName,
config->remoteNodeId,
config->remoteHostName,
config->port,
localNodeId,
config->remoteNodeId,
config->byteOrder,
config->compression,
config->checksum,
......@@ -172,13 +221,11 @@ TransporterRegistry::createTransporter(TCP_TransporterConfiguration *config) {
return false;
}
t->setCallbackObject(callbackObj);
// Put the transporter in the transporter arrays
theTCPTransporters[nTCPTransporters] = t;
theTransporters[t->getRemoteNodeId()] = t;
theTransporterTypes[t->getRemoteNodeId()] = tt_TCP_TRANSPORTER;
performStates[t->getRemoteNodeId()] = PerformNothing;
performStates[t->getRemoteNodeId()] = DISCONNECTED;
nTransporters++;
nTCPTransporters++;
......@@ -228,12 +275,11 @@ TransporterRegistry::createTransporter(OSE_TransporterConfiguration *conf) {
delete t;
return false;
}
t->setCallbackObject(callbackObj);
// Put the transporter in the transporter arrays
theOSETransporters[nOSETransporters] = t;
theTransporters[t->getRemoteNodeId()] = t;
theTransporterTypes[t->getRemoteNodeId()] = tt_OSE_TRANSPORTER;
performStates[t->getRemoteNodeId()] = PerformNothing;
performStates[t->getRemoteNodeId()] = DISCONNECTED;
nTransporters++;
nOSETransporters++;
......@@ -279,12 +325,11 @@ TransporterRegistry::createTransporter(SCI_TransporterConfiguration *config) {
delete t;
return false;
}
t->setCallbackObject(callbackObj);
// Put the transporter in the transporter arrays
theSCITransporters[nSCITransporters] = t;
theTransporters[t->getRemoteNodeId()] = t;
theTransporterTypes[t->getRemoteNodeId()] = tt_SCI_TRANSPORTER;
performStates[t->getRemoteNodeId()] = PerformNothing;
performStates[t->getRemoteNodeId()] = DISCONNECTED;
nTransporters++;
nSCITransporters++;
......@@ -321,12 +366,11 @@ TransporterRegistry::createTransporter(SHM_TransporterConfiguration *config) {
delete t;
return false;
}
t->setCallbackObject(callbackObj);
// Put the transporter in the transporter arrays
theSHMTransporters[nSHMTransporters] = t;
theTransporters[t->getRemoteNodeId()] = t;
theTransporterTypes[t->getRemoteNodeId()] = tt_SHM_TRANSPORTER;
performStates[t->getRemoteNodeId()] = PerformNothing;
performStates[t->getRemoteNodeId()] = DISCONNECTED;
nTransporters++;
nSHMTransporters++;
......@@ -781,7 +825,7 @@ TransporterRegistry::performReceive(){
TCP_Transporter *t = theTCPTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
const NDB_SOCKET_TYPE socket = t->getSocket();
if(performStates[nodeId] == PerformIO){
if(is_connected(nodeId)){
if(t->isConnected() && FD_ISSET(socket, &tcpReadset)) {
const int receiveSize = t->doReceive();
if(receiveSize > 0){
......@@ -804,7 +848,7 @@ TransporterRegistry::performReceive(){
checkJobBuffer();
SCI_Transporter *t = theSCITransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
if(performStates[nodeId] == PerformIO){
if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected()){
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
......@@ -819,7 +863,7 @@ TransporterRegistry::performReceive(){
checkJobBuffer();
SHM_Transporter *t = theSHMTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
if(performStates[nodeId] == PerformIO){
if(is_connected(nodeId)){
if(t->isConnected() && t->checkConnected()){
Uint32 * readPtr, * eodPtr;
t->getReceivePtr(&readPtr, &eodPtr);
......@@ -840,7 +884,7 @@ TransporterRegistry::performSend(){
#ifdef NDB_OSE_TRANSPORTER
for (int i = 0; i < nOSETransporters; i++){
OSE_Transporter *t = theOSETransporters[i];
if((performStates[t->getRemoteNodeId()] == PerformIO) &&
if((is_connected(t->getRemoteNodeId()) &&
(t->isConnected())) {
t->doSend();
}//if
......@@ -887,7 +931,7 @@ TransporterRegistry::performSend(){
TCP_Transporter *t = theTCPTransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
const int socket = t->getSocket();
if(performStates[nodeId] == PerformIO){
if(is_connected(nodeId)){
if(t->isConnected() && FD_ISSET(socket, &writeset)) {
t->doSend();
}//if
......@@ -901,7 +945,7 @@ TransporterRegistry::performSend(){
if (t &&
(t->hasDataToSend()) &&
(t->isConnected()) &&
(performStates[t->getRemoteNodeId()] == PerformIO)) {
(is_connected(t->getRemoteNodeId()))) {
t->doSend();
}//if
}//for
......@@ -910,7 +954,7 @@ TransporterRegistry::performSend(){
if (t &&
(t->hasDataToSend()) &&
(t->isConnected()) &&
(performStates[t->getRemoteNodeId()] == PerformIO)) {
(is_connected(t->getRemoteNodeId()))) {
t->doSend();
}//if
}//for
......@@ -925,7 +969,7 @@ TransporterRegistry::performSend(){
SCI_Transporter *t = theSCITransporters[i];
const NodeId nodeId = t->getRemoteNodeId();
if(performStates[nodeId] == PerformIO){
if(is_connected(nodeId)){
if(t->isConnected() && t->hasDataToSend()) {
t->doSend();
} //if
......@@ -961,70 +1005,212 @@ TransporterRegistry::printState(){
}
#endif
PerformState
TransporterRegistry::performState(NodeId nodeId) {
return performStates[nodeId];
IOState
TransporterRegistry::ioState(NodeId nodeId) {
return ioStates[nodeId];
}
#ifdef DEBUG_TRANSPORTER
const char *
performStateString(PerformState state){
switch(state){
case PerformNothing:
return "PerformNothing";
break;
case PerformIO:
return "PerformIO";
void
TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
DEBUG("TransporterRegistry::setIOState("
<< nodeId << ", " << state << ")");
ioStates[nodeId] = state;
}
static void *
run_start_clients_C(void * me)
{
((TransporterRegistry*) me)->start_clients_thread();
NdbThread_Exit(0);
return me;
}
// Run by kernel thread
void
TransporterRegistry::do_connect(NodeId node_id)
{
PerformState &curr_state = performStates[node_id];
switch(curr_state){
case DISCONNECTED:
break;
case PerformConnect:
return "PerformConnect";
case CONNECTED:
return;
case CONNECTING:
return;
case DISCONNECTING:
break;
case PerformDisconnect:
return "PerformDisconnect";
}
curr_state= CONNECTING;
}
void
TransporterRegistry::do_disconnect(NodeId node_id)
{
PerformState &curr_state = performStates[node_id];
switch(curr_state){
case DISCONNECTED:
return;
case CONNECTED:
break;
case RemoveTransporter:
return "RemoveTransporter";
case CONNECTING:
break;
case DISCONNECTING:
return;
}
return "Unknown";
curr_state= DISCONNECTING;
}
#endif
void
TransporterRegistry::setPerformState(NodeId nodeId, PerformState state) {
DEBUG("TransporterRegistry::setPerformState("
<< nodeId << ", " << performStateString(state) << ")");
performStates[nodeId] = state;
TransporterRegistry::report_connect(NodeId node_id)
{
performStates[node_id] = CONNECTED;
reportConnect(callbackObj, node_id);
}
void
TransporterRegistry::setPerformState(PerformState state) {
int count = 0;
int index = 0;
while(count < nTransporters){
if(theTransporters[index] != 0){
setPerformState(theTransporters[index]->getRemoteNodeId(), state);
count ++;
TransporterRegistry::report_disconnect(NodeId node_id, int errnum)
{
performStates[node_id] = DISCONNECTED;
reportDisconnect(callbackObj, node_id, errnum);
}
void
TransporterRegistry::update_connections()
{
for (int i= 0, n= 0; n < nTransporters; i++){
Transporter * t = theTransporters[i];
if (!t)
continue;
n++;
const NodeId nodeId = t->getRemoteNodeId();
switch(performStates[nodeId]){
case CONNECTED:
case DISCONNECTED:
break;
case CONNECTING:
if(t->isConnected())
report_connect(nodeId);
break;
case DISCONNECTING:
if(!t->isConnected())
report_disconnect(nodeId, 0);
break;
}
index ++;
}
}
IOState
TransporterRegistry::ioState(NodeId nodeId) {
return ioStates[nodeId];
// run as own thread
void
TransporterRegistry::start_clients_thread()
{
while (m_run_start_clients_thread) {
NdbSleep_MilliSleep(100);
for (int i= 0, n= 0; n < nTransporters && m_run_start_clients_thread; i++){
Transporter * t = theTransporters[i];
if (!t)
continue;
n++;
const NodeId nodeId = t->getRemoteNodeId();
switch(performStates[nodeId]){
case CONNECTING:
if(!t->isConnected() && !t->isServer)
t->connect_client();
break;
case DISCONNECTING:
if(t->isConnected())
t->doDisconnect();
break;
default:
break;
}
}
}
}
void
TransporterRegistry::setIOState(NodeId nodeId, IOState state) {
DEBUG("TransporterRegistry::setIOState("
<< nodeId << ", " << state << ")");
ioStates[nodeId] = state;
bool
TransporterRegistry::start_clients()
{
m_run_start_clients_thread= true;
m_start_clients_thread= NdbThread_Create(run_start_clients_C,
(void**)this,
32768,
"ndb_start_clients",
NDB_THREAD_PRIO_LOW);
if (m_start_clients_thread == 0) {
m_run_start_clients_thread= false;
return false;
}
return true;
}
bool
TransporterRegistry::stop_clients()
{
if (m_start_clients_thread) {
m_run_start_clients_thread= false;
void* status;
int r= NdbThread_WaitFor(m_start_clients_thread, &status);
NdbThread_Destroy(&m_start_clients_thread);
}
return true;
}
bool
TransporterRegistry::start_service(SocketServer& socket_server)
{
#if 0
for (int i= 0, n= 0; n < nTransporters; i++){
Transporter * t = theTransporters[i];
if (!t)
continue;
n++;
if (t->isServer) {
t->m_service = new TransporterService(new SocketAuthSimple("ndbd passwd"));
if(!socket_server.setup(t->m_service, t->m_r_port, 0))
{
ndbout_c("Unable to setup transporter service port: %d!\n"
"Please check if the port is already used,\n"
"(perhaps a mgmtsrvrserver is already running)",
m_service_port);
delete t->m_service;
return false;
}
}
}
#endif
if (m_service_port != 0) {
m_transporter_service = new TransporterService(new SocketAuthSimple("ndbd", "ndbd passwd"));
if (nodeIdSpecified != true) {
ndbout_c("TransporterRegistry::startReceiving: localNodeId not specified");
return false;
}
//m_interface_name = "ndbd";
m_interface_name = 0;
if(!socket_server.setup(m_transporter_service, m_service_port, m_interface_name))
{
ndbout_c("Unable to setup transporter service port: %d!\n"
"Please check if the port is already used,\n"
"(perhaps a mgmtsrvrserver is already running)",
m_service_port);
delete m_transporter_service;
return false;
}
m_transporter_service->setTransporterRegistry(this);
} else
m_transporter_service= 0;
return true;
}
void
TransporterRegistry::startReceiving(){
TransporterRegistry::startReceiving()
{
#ifdef NDB_OSE_TRANSPORTER
if(theOSEReceiver != NULL){
theOSEReceiver->createPhantom();
......@@ -1081,99 +1267,6 @@ TransporterRegistry::stopSending(){
#endif
}
/**
* The old implementation did not scale with a large
* number of nodes. (Watchdog killed NDB because
* it took too long time to allocated threads in
* doConnect.
*
* The new implementation only checks the connection
* for a number of transporters (STEPPING), until to
* the point where all transporters has executed
* doConnect once. After that, the behaviour is as
* in the old implemenation, i.e, checking the connection
* for all transporters.
* @todo: instead of STEPPING, maybe we should only
* allow checkConnections to execute for a certain
* time that somehow factors in heartbeat times and
* watchdog times.
*
*/
void
TransporterRegistry::checkConnections(){
if(m_ccStep > nTransporters)
m_ccStep = nTransporters;
while(m_ccCount < m_ccStep){
if(theTransporters[m_ccIndex] != 0){
Transporter * t = theTransporters[m_ccIndex];
const NodeId nodeId = t->getRemoteNodeId();
if(t->getThreadError() != 0) {
reportError(callbackObj, nodeId, t->getThreadError());
t->resetThreadError();
}
switch(performStates[nodeId]){
case PerformConnect:
if(!t->isConnected()){
t->doConnect();
if(m_nTransportersPerformConnect!=nTransporters)
m_nTransportersPerformConnect++;
} else {
performStates[nodeId] = PerformIO;
reportConnect(callbackObj, nodeId);
}
break;
case PerformDisconnect:
{
bool wasConnected = t->isConnected();
t->doDisconnect();
performStates[nodeId] = PerformNothing;
if(wasConnected){
reportDisconnect(callbackObj, nodeId,0);
}
}
break;
case RemoveTransporter:
removeTransporter(nodeId);
break;
case PerformNothing:
case PerformIO:
break;
}
m_ccCount ++;
}
m_ccIndex ++;
}
if(!m_ccReady) {
if(m_ccCount < nTransporters) {
if(nTransporters - m_ccStep < STEPPING)
m_ccStep += nTransporters-m_ccStep;
else
m_ccStep += STEPPING;
// ndbout_c("count %d step %d ", m_ccCount, m_ccStep);
}
else {
m_ccCount = 0;
m_ccIndex = 0;
m_ccStep = STEPPING;
// ndbout_c("count %d step %d ", m_ccCount, m_ccStep);
}
}
if((nTransporters == m_nTransportersPerformConnect) || m_ccReady) {
m_ccReady = true;
m_ccCount = 0;
m_ccIndex = 0;
m_ccStep = nTransporters;
// ndbout_c("alla count %d step %d ", m_ccCount, m_ccStep);
}
}//TransporterRegistry::checkConnections()
NdbOut & operator <<(NdbOut & out, SignalHeader & sh){
out << "-- Signal Header --" << endl;
out << "theLength: " << sh.theLength << endl;
......
......@@ -3,7 +3,8 @@ noinst_LTLIBRARIES = libgeneral.la
libgeneral_la_SOURCES = \
File.cpp md5_hash.cpp Properties.cpp socket_io.cpp \
SimpleProperties.cpp Parser.cpp InputStream.cpp SocketServer.cpp \
SimpleProperties.cpp Parser.cpp InputStream.cpp \
SocketServer.cpp SocketClient.cpp SocketAuthenticator.cpp\
OutputStream.cpp NdbOut.cpp BaseString.cpp Base64.cpp \
NdbSqlUtil.cpp new.cpp \
uucode.c random.c getarg.c version.c \
......
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <SocketClient.hpp>
#include <SocketAuthenticator.hpp>
#include <InputStream.hpp>
#include <OutputStream.hpp>
#include <NdbOut.hpp>
SocketAuthSimple::SocketAuthSimple(const char *username, const char *passwd) {
if (username)
m_username= strdup(username);
else
m_username= 0;
if (passwd)
m_passwd= strdup(passwd);
else
m_passwd= 0;
}
SocketAuthSimple::~SocketAuthSimple()
{
if (m_passwd)
free((void*)m_passwd);
if (m_username)
free((void*)m_username);
}
bool SocketAuthSimple::client_authenticate(int sockfd)
{
SocketOutputStream s_output(sockfd);
SocketInputStream s_input(sockfd);
if (m_username)
s_output.println("%s", m_username);
else
s_output.println("");
if (m_passwd)
s_output.println("%s", m_passwd);
else
s_output.println("");
char buf[16];
if (s_input.gets(buf, 16) == 0) return false;
if (strncmp("ok", buf, 2) == 0)
return true;
return false;
}
bool SocketAuthSimple::server_authenticate(int sockfd)
{
SocketOutputStream s_output(sockfd);
SocketInputStream s_input(sockfd);
char buf[256];
if (s_input.gets(buf, 256) == 0) return false;
buf[255]= 0;
if (m_username)
free((void*)m_username);
m_username= strdup(buf);
if (s_input.gets(buf, 256) == 0) return false;
buf[255]= 0;
if (m_passwd)
free((void*)m_passwd);
m_passwd= strdup(buf);
s_output.println("ok");
return true;
}
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <ndb_global.h>
#include <NdbOut.hpp>
#include <SocketClient.hpp>
#include <SocketAuthenticator.hpp>
SocketClient::SocketClient(const char *server_name, unsigned short port, SocketAuthenticator *sa)
{
m_auth= sa;
m_port= port;
m_server_name= strdup(server_name);
m_sockfd= -1;
}
SocketClient::~SocketClient()
{
if (m_server_name)
free(m_server_name);
if (m_sockfd >= 0)
NDB_CLOSE_SOCKET(m_sockfd);
if (m_auth)
delete m_auth;
}
bool
SocketClient::init()
{
if (m_sockfd >= 0)
NDB_CLOSE_SOCKET(m_sockfd);
memset(&m_servaddr, 0, sizeof(m_servaddr));
m_servaddr.sin_family = AF_INET;
m_servaddr.sin_port = htons(m_port);
// Convert ip address presentation format to numeric format
if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
return false;
m_sockfd= socket(AF_INET, SOCK_STREAM, 0);
if (m_sockfd == NDB_INVALID_SOCKET) {
return false;
}
return true;
}
NDB_SOCKET_TYPE
SocketClient::connect()
{
if (m_sockfd < 0)
{
if (!init()) {
ndbout << "SocketClient::connect() failed " << m_server_name << " " << m_port << endl;
return -1;
}
}
const int r = ::connect(m_sockfd, (struct sockaddr*) &m_servaddr, sizeof(m_servaddr));
if (r == -1)
return -1;
if (m_auth)
if (!m_auth->client_authenticate(m_sockfd))
{
NDB_CLOSE_SOCKET(m_sockfd);
m_sockfd= -1;
return -1;
}
NDB_SOCKET_TYPE sockfd= m_sockfd;
m_sockfd= -1;
return sockfd;
}
......@@ -17,7 +17,7 @@
#include <ndb_global.h>
#include "SocketServer.hpp"
#include <SocketServer.hpp>
#include <NdbTCP.h>
#include <NdbOut.hpp>
......
......@@ -4,7 +4,7 @@ include $(top_srcdir)/ndb/config/common.mk.am
ndbbin_PROGRAMS = ndbd
ndbd_SOURCES = Main.cpp SimBlockList.cpp
ndbd_SOURCES = main.cpp SimBlockList.cpp
include $(top_srcdir)/ndb/config/type_kernel.mk.am
......
......@@ -362,7 +362,7 @@ void Cmvmi::execCLOSE_COMREQ(Signal* signal)
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
globalTransporterRegistry.setIOState(i, HaltIO);
globalTransporterRegistry.setPerformState(i, PerformDisconnect);
globalTransporterRegistry.do_disconnect(i);
/**
* Cancel possible event subscription
......@@ -390,7 +390,7 @@ void Cmvmi::execOPEN_COMREQ(Signal* signal)
const Uint32 len = signal->getLength();
if(len == 2){
globalTransporterRegistry.setPerformState(tStartingNode, PerformConnect);
globalTransporterRegistry.do_connect(tStartingNode);
globalTransporterRegistry.setIOState(tStartingNode, HaltIO);
//-----------------------------------------------------
......@@ -405,7 +405,7 @@ void Cmvmi::execOPEN_COMREQ(Signal* signal)
jam();
if (i != getOwnNodeId() && getNodeInfo(i).m_type == tData2){
jam();
globalTransporterRegistry.setPerformState(i, PerformConnect);
globalTransporterRegistry.do_connect(i);
globalTransporterRegistry.setIOState(i, HaltIO);
signal->theData[0] = EventReport::CommunicationOpened;
......@@ -456,34 +456,21 @@ void Cmvmi::execDISCONNECT_REP(Signal *signal)
const NodeInfo::NodeType type = getNodeInfo(hostId).getType();
ndbrequire(type != NodeInfo::INVALID);
if (globalTransporterRegistry.performState(hostId) != PerformDisconnect) {
if(type == NodeInfo::DB || globalData.theStartLevel == NodeState::SL_STARTED){
jam();
// -------------------------------------------------------------------
// We do not report the disconnection when disconnection is already ongoing.
// This reporting should be looked into but this secures that we avoid
// crashes due to too quick re-reporting of disconnection.
// -------------------------------------------------------------------
if(type == NodeInfo::DB || globalData.theStartLevel == NodeState::SL_STARTED){
jam();
DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0];
rep->nodeId = hostId;
rep->err = errNo;
sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal,
DisconnectRep::SignalLength, JBA);
globalTransporterRegistry.setPerformState(hostId, PerformDisconnect);
} else if(globalData.theStartLevel == NodeState::SL_CMVMI ||
globalData.theStartLevel == NodeState::SL_STARTING) {
/**
* Someone disconnected during cmvmi period
*/
if(type == NodeInfo::MGM){
jam();
globalTransporterRegistry.setPerformState(hostId, PerformConnect);
} else {
globalTransporterRegistry.setPerformState(hostId, PerformDisconnect);
}
}
DisconnectRep * const rep = (DisconnectRep *)&signal->theData[0];
rep->nodeId = hostId;
rep->err = errNo;
sendSignal(QMGR_REF, GSN_DISCONNECT_REP, signal,
DisconnectRep::SignalLength, JBA);
} else if((globalData.theStartLevel == NodeState::SL_CMVMI ||
globalData.theStartLevel == NodeState::SL_STARTING)
&& type == NodeInfo::MGM) {
/**
* Someone disconnected during cmvmi period
*/
jam();
globalTransporterRegistry.do_connect(hostId);
}
signal->theData[0] = EventReport::Disconnected;
......@@ -522,7 +509,8 @@ void Cmvmi::execCONNECT_REP(Signal *signal){
/**
* Dont allow api nodes to connect
*/
globalTransporterRegistry.setPerformState(hostId, PerformDisconnect);
abort();
globalTransporterRegistry.do_disconnect(hostId);
}
}
......@@ -756,8 +744,8 @@ Cmvmi::execSTART_ORD(Signal* signal) {
*/
for(unsigned int i = 1; i < MAX_NODES; i++ ){
if (getNodeInfo(i).m_type == NodeInfo::MGM){
if(globalTransporterRegistry.performState(i) != PerformIO){
globalTransporterRegistry.setPerformState(i, PerformConnect);
if(!globalTransporterRegistry.is_connected(i)){
globalTransporterRegistry.do_connect(i);
globalTransporterRegistry.setIOState(i, NoHalt);
}
}
......@@ -783,7 +771,7 @@ Cmvmi::execSTART_ORD(Signal* signal) {
// without any connected nodes.
for(unsigned int i = 1; i < MAX_NODES; i++ ){
if (i != getOwnNodeId() && getNodeInfo(i).m_type != NodeInfo::MGM){
globalTransporterRegistry.setPerformState(i, PerformDisconnect);
globalTransporterRegistry.do_disconnect(i);
globalTransporterRegistry.setIOState(i, HaltIO);
}
}
......@@ -1062,29 +1050,10 @@ Cmvmi::execDUMP_STATE_ORD(Signal* signal)
if(nodeTypeStr == 0)
continue;
const char* actionStr = "";
switch (globalTransporterRegistry.performState(i)){
case PerformNothing:
actionStr = "does nothing";
break;
case PerformIO:
actionStr = "is connected";
break;
case PerformConnect:
actionStr = "is trying to connect";
break;
case PerformDisconnect:
actionStr = "is trying to disconnect";
break;
case RemoveTransporter:
actionStr = "will be removed";
break;
}
infoEvent("Connection to %d (%s) %s",
i,
nodeTypeStr,
actionStr);
globalTransporterRegistry.getPerformStateString(i));
}
}
......
......@@ -1704,6 +1704,7 @@ void Qmgr::sendApiFailReq(Signal* signal, Uint16 failedNodeNo)
sendSignal(DBTC_REF, GSN_API_FAILREQ, signal, 2, JBA);
sendSignal(DBDICT_REF, GSN_API_FAILREQ, signal, 2, JBA);
sendSignal(SUMA_REF, GSN_API_FAILREQ, signal, 2, JBA);
/**
* GREP also need the information that an API node
* (actually a REP node) has failed.
......@@ -1978,8 +1979,10 @@ void Qmgr::execAPI_REGREQ(Signal* signal)
apiRegConf->nodeState.dynamicId = -dynamicId;
}
}
apiRegConf->nodeState.m_connected_nodes.assign(c_connectedNodes);
sendSignal(ref, GSN_API_REGCONF, signal, ApiRegConf::SignalLength, JBB);
if ((getNodeState().startLevel == NodeState::SL_STARTED ||
getNodeState().getSingleUserMode())
&& apiNodePtr.p->phase == ZAPI_INACTIVE) {
......
......@@ -27,6 +27,8 @@
#include <NdbConfig.h>
#include <Configuration.hpp>
#include <NdbAutoPtr.hpp>
#define MESSAGE_LENGTH 400
const char* errorType[] = {
......@@ -66,23 +68,23 @@ ErrorReporter::formatTimeStampString(){
return (const char *)&theDateTimeString;
}
void
ErrorReporter::formatTraceFileName(char* theName, int maxLen){
int
ErrorReporter::get_trace_no(){
FILE *stream;
unsigned int traceFileNo;
char fileNameBuf[255];
char buf[255];
char *file_name= NdbConfig_NextTraceFileName(globalData.ownId);
NdbAutoPtr<char> tmp_aptr(file_name);
NdbConfig_HomePath(fileNameBuf, 255);
strncat(fileNameBuf, "NextTraceFileNo.log", 255);
/*
* Read last number from tracefile
*/
stream = fopen(fileNameBuf, "r+");
stream = fopen(file_name, "r+");
if (stream == NULL){
traceFileNo = 1;
} else {
char buf[255];
fgets(buf, 255, stream);
const int scan = sscanf(buf, "%u", &traceFileNo);
if(scan != 1){
......@@ -103,16 +105,13 @@ ErrorReporter::formatTraceFileName(char* theName, int maxLen){
/**
* Save new number to the file
*/
stream = fopen(fileNameBuf, "w");
stream = fopen(file_name, "w");
if(stream != NULL){
fprintf(stream, "%u", traceFileNo);
fclose(stream);
}
/**
* Format trace file name
*/
snprintf(theName, maxLen, "%sNDB_TraceFile_%u.trace",
NdbConfig_HomePath(fileNameBuf, 255), traceFileNo);
return traceFileNo;
}
......@@ -214,16 +213,22 @@ WriteMessage(ErrorCategory thrdType, int thrdMessageID,
unsigned offset;
unsigned long maxOffset; // Maximum size of file.
char theMessage[MESSAGE_LENGTH];
char theTraceFileName[255];
char theErrorFileName[255];
ErrorReporter::formatTraceFileName(theTraceFileName, 255);
/**
* Format trace file name
*/
int file_no= ErrorReporter::get_trace_no();
char *theTraceFileName= NdbConfig_TraceFileName(globalData.ownId, file_no);
NdbAutoPtr<char> tmp_aptr1(theTraceFileName);
// The first 69 bytes is info about the current offset
Uint32 noMsg = globalEmulatorData.theConfiguration->maxNoOfErrorLogs();
maxOffset = (69 + (noMsg * MESSAGE_LENGTH));
NdbConfig_ErrorFileName(theErrorFileName, 255);
char *theErrorFileName= (char *)NdbConfig_ErrorFileName(globalData.ownId);
NdbAutoPtr<char> tmp_aptr2(theErrorFileName);
stream = fopen(theErrorFileName, "r+");
if (stream == NULL) { /* If the file could not be opened. */
......
......@@ -81,7 +81,7 @@ public:
const char* theNameOfTheTraceFile,
char* messptr);
static void formatTraceFileName(char* theName, int maxLen);
static int get_trace_no();
static const char* formatTimeStampString();
......
......@@ -20,7 +20,7 @@
#include "Configuration.hpp"
#include <TransporterRegistry.hpp>
#include "SimBlockList.hpp"
#include "vm/SimBlockList.hpp"
#include "ThreadConfig.hpp"
#include <SignalLoggerManager.hpp>
#include <NdbOut.hpp>
......@@ -31,7 +31,8 @@
#include <LogLevel.hpp>
#include <EventLogger.hpp>
#include <NodeState.hpp>
#include <NdbAutoPtr.hpp>
#if defined NDB_SOLARIS // ok
#include <sys/processor.h> // For system informatio
......@@ -71,15 +72,12 @@ NDB_MAIN(ndb_kernel){
theConfig->setupConfiguration();
}
// Get NDB_HOME path
char homePath[255];
NdbConfig_HomePath(homePath, 255);
if (theConfig->getDaemonMode()) {
// Become a daemon
char lockfile[255], logfile[255];
snprintf(lockfile, 255, "%snode%d.pid", homePath, globalData.ownId);
snprintf(logfile, 255, "%snode%d.out", homePath, globalData.ownId);
char *lockfile= NdbConfig_PidFileName(globalData.ownId);
char *logfile= NdbConfig_StdoutFileName(globalData.ownId);
NdbAutoPtr<char> tmp_aptr1(lockfile), tmp_aptr2(logfile);
if (NdbDaemon_Make(lockfile, logfile, 0) == -1) {
ndbout << "Cannot become daemon: " << NdbDaemon_ErrorText << endl;
return 1;
......@@ -90,6 +88,8 @@ NDB_MAIN(ndb_kernel){
/**
* Parent
*/
theConfig->closeConfiguration();
catchsigs(true);
int status = 0;
......@@ -147,9 +147,9 @@ NDB_MAIN(ndb_kernel){
#ifdef VM_TRACE
// Create a signal logger
char buf[255];
strcpy(buf, homePath);
FILE * signalLog = fopen(strncat(buf,"Signal.log", 255), "a");
char *buf= NdbConfig_SignalLogFileName(globalData.ownId);
NdbAutoPtr<char> tmp_aptr(buf);
FILE * signalLog = fopen(buf, "a");
globalSignalLoggers.setOwnNodeId(globalData.ownId);
globalSignalLoggers.setOutputStream(signalLog);
#endif
......@@ -171,13 +171,31 @@ NDB_MAIN(ndb_kernel){
NDB_ASSERT(0, "Illegal state globalData.theRestartFlag");
}
SocketServer socket_server;
globalTransporterRegistry.startSending();
globalTransporterRegistry.startReceiving();
if (!globalTransporterRegistry.start_service(socket_server))
NDB_ASSERT(0, "globalTransporterRegistry.start_service() failed");
if (!globalTransporterRegistry.start_clients())
NDB_ASSERT(0, "globalTransporterRegistry.start_clients() failed");
globalEmulatorData.theWatchDog->doStart();
socket_server.startServer();
// theConfig->closeConfiguration();
globalEmulatorData.theThreadConfig->ipControlLoop();
NdbShutdown(NST_Normal);
socket_server.stopServer();
socket_server.stopSessions();
globalTransporterRegistry.stop_clients();
return NRT_Default;
}
......
......@@ -138,6 +138,7 @@ Configuration::Configuration()
_fsPath = 0;
_initialStart = false;
_daemonMode = false;
m_config_retriever= 0;
}
Configuration::~Configuration(){
......@@ -146,6 +147,18 @@ Configuration::~Configuration(){
if(_fsPath != NULL)
free(_fsPath);
if (m_config_retriever) {
delete m_config_retriever;
}
}
void
Configuration::closeConfiguration(){
if (m_config_retriever) {
delete m_config_retriever;
}
m_config_retriever= 0;
}
void
......@@ -153,7 +166,12 @@ Configuration::setupConfiguration(){
/**
* Fetch configuration from management server
*/
ConfigRetriever cr;
if (m_config_retriever) {
delete m_config_retriever;
}
m_config_retriever= new ConfigRetriever();
ConfigRetriever &cr= *m_config_retriever;
cr.setConnectString(_connectString);
stopOnError(true);
ndb_mgm_configuration * p = cr.getConfig(NDB_VERSION, NODE_TYPE_DB);
......
......@@ -20,6 +20,8 @@
#include <mgmapi.h>
#include <ndb_types.h>
class ConfigRetriever;
class Configuration {
public:
Configuration();
......@@ -31,6 +33,7 @@ public:
bool init(int argc, const char** argv);
void setupConfiguration();
void closeConfiguration();
bool lockPagesInMainMemory() const;
......@@ -78,6 +81,8 @@ private:
ndb_mgm_configuration_iterator * m_clusterConfigIter;
ndb_mgm_configuration_iterator * m_ownConfigIterator;
ConfigRetriever *m_config_retriever;
/**
* arguments to NDB process
*/
......
......@@ -147,8 +147,8 @@ void ThreadConfig::ipControlLoop()
// plus checking for any received messages.
//--------------------------------------------------------------------
if (i++ >= 20) {
globalTransporterRegistry.update_connections();
globalData.incrementWatchDogCounter(5);
globalTransporterRegistry.checkConnections();
i = 0;
}//if
......
......@@ -24,6 +24,7 @@
#include <NdbOut.hpp>
#include <SocketServer.hpp>
#include <SocketClient.hpp>
#include <Parser.hpp>
#include <OutputStream.hpp>
#include <InputStream.hpp>
......@@ -318,8 +319,8 @@ ndb_mgm_call(NdbMgmHandle handle, const ParserRow<ParserDummy> *command_reply,
/**
* Print some info about why the parser returns NULL
*/
// ndbout << " status=" << ctx.m_status << ", curr="
// << ctx.m_currentToken << endl;
//ndbout << " status=" << ctx.m_status << ", curr="
//<< ctx.m_currentToken << endl;
}
#ifdef MGMAPI_LOG
else {
......@@ -362,30 +363,11 @@ ndb_mgm_connect(NdbMgmHandle handle, const char * mgmsrv)
/**
* Do connect
*/
const NDB_SOCKET_TYPE sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd == NDB_INVALID_SOCKET) {
SET_ERROR(handle, NDB_MGM_ILLEGAL_SOCKET, "");
return -1;
}
struct sockaddr_in servaddr;
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(handle->port);
// Convert ip address presentation format to numeric format
const int res1 = Ndb_getInAddr(&servaddr.sin_addr, handle->hostname);
if (res1 != 0) {
DEBUG("Ndb_getInAddr(...) == -1");
setError(handle, EINVAL, __LINE__, "Invalid hostname/address");
return -1;
}
const int res2 = connect(sockfd, (struct sockaddr*) &servaddr,
sizeof(servaddr));
if (res2 == -1) {
NDB_CLOSE_SOCKET(sockfd);
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, "Unable to connect to %s",
mgmsrv);
SocketClient s(handle->hostname, handle->port);
const NDB_SOCKET_TYPE sockfd = s.connect();
if (sockfd < 0) {
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
"Unable to connect to %s", mgmsrv);
return -1;
}
......@@ -1523,6 +1505,55 @@ ndb_mgm_get_configuration(NdbMgmHandle handle, unsigned int version) {
return 0;
}
extern "C"
int
ndb_mgm_alloc_nodeid(NdbMgmHandle handle, unsigned int version, unsigned *pnodeid, int nodetype)
{
CHECK_HANDLE(handle, 0);
CHECK_CONNECTED(handle, 0);
Properties args;
args.put("version", version);
args.put("nodetype", nodetype);
args.put("nodeid", *pnodeid);
args.put("user", "mysqld");
args.put("password", "mysqld");
args.put("public key", "a public key");
const ParserRow<ParserDummy> reply[]= {
MGM_CMD("get nodeid reply", NULL, ""),
MGM_ARG("nodeid", Int, Optional, "Error message"),
MGM_ARG("result", String, Mandatory, "Error message"),
MGM_END()
};
const Properties *prop;
prop= ndb_mgm_call(handle, reply, "get nodeid", &args);
if(prop == NULL) {
SET_ERROR(handle, EIO, "Unable to alloc nodeid");
return -1;
}
int res= -1;
do {
const char * buf;
if(!prop->get("result", &buf) || strcmp(buf, "Ok") != 0){
ndbout_c("ERROR Message: %s\n", buf);
break;
}
if(!prop->get("nodeid", pnodeid) != 0){
ndbout_c("ERROR Message: <nodeid Unspecified>\n");
break;
}
res= 0;
}while(0);
delete prop;
return res;
}
/*****************************************************************************
* Global Replication
******************************************************************************/
......
......@@ -26,7 +26,7 @@
#include <signal.h>
const char *progname = "mgmtclient";
const char *progname = "ndb_mgm";
static CommandInterpreter* com;
......@@ -47,7 +47,10 @@ handler(int sig){
int main(int argc, const char** argv){
int optind = 0;
const char *_default_connectstring = "host=localhost:2200;nodeid=0";
char _default_connectstring_buf[256];
snprintf(_default_connectstring_buf, sizeof(_default_connectstring_buf),
"host=localhost:%u", NDB_BASE_PORT);
const char *_default_connectstring= _default_connectstring_buf;
const char *_host = 0;
int _port = 0;
int _help = 0;
......
......@@ -43,10 +43,12 @@
#include <DebuggerNames.hpp>
#include <ndb_version.h>
#include "SocketServer.hpp"
#include <SocketServer.hpp>
#include "NodeLogLevel.hpp"
#include <NdbConfig.h>
#include <NdbAutoPtr.hpp>
#include <mgmapi.h>
#include <mgmapi_configuration.hpp>
#include <mgmapi_config_parameters.h>
......@@ -240,10 +242,9 @@ MgmtSrvr::startEventLog()
const char * tmp;
BaseString logdest;
char clusterLog[MAXPATHLEN];
NdbConfig_ClusterLogFileName(clusterLog, sizeof(clusterLog));
char *clusterLog= NdbConfig_ClusterLogFileName(_ownNodeId);
NdbAutoPtr<char> tmp_aptr(clusterLog);
if(ndb_mgm_get_string_parameter(iter, CFG_LOG_DESTINATION, &tmp) == 0){
logdest.assign(tmp);
}
......@@ -390,6 +391,95 @@ MgmtSrvr::getNodeCount(enum ndb_mgm_node_type type) const
return count;
}
int
MgmtSrvr::getPort() const {
const Properties *mgmProps;
ndb_mgm_configuration_iterator * iter =
ndb_mgm_create_configuration_iterator(_config->m_configValues,
CFG_SECTION_NODE);
if(iter == 0)
return 0;
if(ndb_mgm_find(iter, CFG_NODE_ID, getOwnNodeId()) != 0){
ndbout << "Could not retrieve configuration for Node "
<< getOwnNodeId() << " in config file." << endl
<< "Have you set correct NodeId for this node?" << endl;
ndb_mgm_destroy_iterator(iter);
return 0;
}
unsigned type;
if(ndb_mgm_get_int_parameter(iter, CFG_TYPE_OF_SECTION, &type) != 0 ||
type != NODE_TYPE_MGM){
ndbout << "Local node id " << getOwnNodeId()
<< " is not defined as management server" << endl
<< "Have you set correct NodeId for this node?" << endl;
return 0;
}
Uint32 port = 0;
if(ndb_mgm_get_int_parameter(iter, CFG_MGM_PORT, &port) != 0){
ndbout << "Could not find PortNumber in the configuration file." << endl;
return 0;
}
/*****************
* Set Stat Port *
*****************/
#if 0
if (!mgmProps->get("PortNumberStats", &tmp)){
ndbout << "Could not find PortNumberStats in the configuration file."
<< endl;
return false;
}
glob.port_stats = tmp;
#endif
#if 0
const char * host;
if(ndb_mgm_get_string_parameter(iter, mgmProps->get("ExecuteOnComputer", host)){
ndbout << "Failed to find \"ExecuteOnComputer\" for my node" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
const char * hostname;
{
const Properties * p;
char buf[255];
snprintf(buf, sizeof(buf), "Computer_%s", host.c_str());
if(!glob.cluster_config->get(buf, &p)){
ndbout << "Failed to find computer " << host << " in config" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
if(!p->get("HostName", &hostname)){
ndbout << "Failed to find \"HostName\" for computer " << host
<< " in config" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
if(NdbHost_GetHostName(buf) != 0){
ndbout << "Unable to get own hostname" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
}
const char * ip_address;
if(mgmProps->get("IpAddress", &ip_address)){
glob.use_specific_ip = true;
glob.interface_name = strdup(ip_address);
return true;
}
glob.interface_name = strdup(hostname);
#endif
return port;
}
int
MgmtSrvr::getStatPort() const {
#if 0
......@@ -417,9 +507,9 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
_ownReference(0),
theSignalIdleList(NULL),
theWaitState(WAIT_SUBSCRIBE_CONF),
theConfCount(0) {
theConfCount(0),
m_allocated_resources(*this) {
_ownNodeId = nodeId;
_config = NULL;
_isStatPortActive = false;
_isClusterLogStatActive = false;
......@@ -429,6 +519,8 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
_logLevelThreadSleep = 500;
_startedNodeId = 0;
theFacade = 0;
m_newConfig = NULL;
m_configFilename = configFilename;
setCallback(CmdBackupCallback);
......@@ -486,6 +578,19 @@ MgmtSrvr::MgmtSrvr(NodeId nodeId,
_clusterLogLevelList = new NodeLogLevelList();
_props = NULL;
_ownNodeId= 0;
NodeId tmp= nodeId;
if (getFreeNodeId(&tmp, NDB_MGM_NODE_TYPE_MGM, 0, 0)){
_ownNodeId= tmp;
if (nodeId != 0 && nodeId != tmp) {
ndbout << "Unable to obtain requested nodeid " << nodeId
<< " nodeid " << tmp << " available\n";
_ownNodeId= 0; // did not get nodeid requested
}
m_allocated_resources.reserve_node(_ownNodeId);
} else
NDB_ASSERT(0, "Unable to retrieve own node id");
}
......@@ -510,8 +615,7 @@ MgmtSrvr::start()
return false;
}
theFacade = TransporterFacade::start_instance
(_ownNodeId,
(ndb_mgm_configuration*)_config->m_configValues);
(_ownNodeId,(ndb_mgm_configuration*)_config->m_configValues);
if(theFacade == 0) {
DEBUG("MgmtSrvr.cpp: theFacade is NULL.");
......@@ -573,8 +677,7 @@ MgmtSrvr::~MgmtSrvr()
stopEventLog();
NdbCondition_Destroy(theMgmtWaitForResponseCondPtr);
NdbMutex_Destroy(m_configMutex);
NdbCondition_Destroy(theMgmtWaitForResponseCondPtr); NdbMutex_Destroy(m_configMutex);
if(m_newConfig != NULL)
free(m_newConfig);
......@@ -818,7 +921,7 @@ MgmtSrvr::restart(bool nostart, bool initalStart, bool abort,
return 0;
}
TransporterFacade::instance()->lock_mutex();
theFacade->lock_mutex();
int waitTime = timeOut/m_stopRec.sentCount;
if (receiveOptimisedResponse(waitTime) != 0) {
m_stopRec.inUse = false;
......@@ -993,8 +1096,7 @@ MgmtSrvr::version(int * stopCount, bool abort,
}
for(Uint32 i = 0; i<MAX_NODES; i++) {
if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB) {
node =
TransporterFacade::instance()->theClusterMgr->getNodeInfo(i);
node = theFacade->theClusterMgr->getNodeInfo(i);
version = node.m_info.m_version;
if(theFacade->theClusterMgr->getNodeInfo(i).connected)
m_versionRec.callback(i, version, this,0);
......@@ -1148,7 +1250,7 @@ MgmtSrvr::stop(int * stopCount, bool abort, StopCallback callback,
if(m_stopRec.sentCount > 0){
if(callback == 0){
TransporterFacade::instance()->lock_mutex();
theFacade->lock_mutex();
receiveOptimisedResponse(timeOut / m_stopRec.sentCount);
} else {
return 0;
......@@ -1178,7 +1280,7 @@ MgmtSrvr::enterSingleUser(int * stopCount, Uint32 singleUserNodeId,
for(Uint32 i = 0; i<MAX_NODES; i++) {
if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB) {
node = TransporterFacade::instance()->theClusterMgr->getNodeInfo(i);
node = theFacade->theClusterMgr->getNodeInfo(i);
if((node.m_state.startLevel != NodeState::SL_STARTED) &&
(node.m_state.startLevel != NodeState::SL_NOTHING)) {
return 5063;
......@@ -1337,7 +1439,7 @@ MgmtSrvr::status(int processId,
}
const ClusterMgr::Node node =
TransporterFacade::instance()->theClusterMgr->getNodeInfo(processId);
theFacade->theClusterMgr->getNodeInfo(processId);
if(!node.connected){
* _status = NDB_MGM_NODE_STATUS_NO_CONTACT;
......@@ -1896,6 +1998,7 @@ MgmtSrvr::handleReceivedSignal(NdbApiSignal* signal)
int returnCode;
int gsn = signal->readSignalNumber();
switch (gsn) {
case GSN_API_VERSION_CONF: {
if (theWaitState == WAIT_VERSION) {
......@@ -2000,8 +2103,7 @@ MgmtSrvr::handleReceivedSignal(NdbApiSignal* signal)
req->senderData = 19;
req->backupDataLen = 0;
int i = TransporterFacade::instance()->sendSignalUnCond(&aSignal,
aNodeId);
int i = theFacade->sendSignalUnCond(&aSignal, aNodeId);
if(i == 0){
return;
}
......@@ -2083,7 +2185,7 @@ MgmtSrvr::handleStopReply(NodeId nodeId, Uint32 errCode)
bool failure = true;
for(Uint32 i = 0; i<MAX_NODES; i++) {
if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB) {
node = TransporterFacade::instance()->theClusterMgr->getNodeInfo(i);
node = theFacade->theClusterMgr->getNodeInfo(i);
if((node.m_state.startLevel == NodeState::SL_NOTHING))
failure = true;
else
......@@ -2187,6 +2289,66 @@ MgmtSrvr::getNodeType(NodeId nodeId) const
return nodeTypes[nodeId];
}
bool
MgmtSrvr::getFreeNodeId(NodeId * nodeId, enum ndb_mgm_node_type type,
struct sockaddr *client_addr, socklen_t *client_addr_len) const
{
#if 0
ndbout << "MgmtSrvr::getFreeNodeId type=" << type
<< " *nodeid=" << *nodeId << endl;
#endif
NodeBitmask connected_nodes(m_reserved_nodes);
if (theFacade && theFacade->theClusterMgr) {
for(Uint32 i = 0; i < MAX_NODES; i++)
if (getNodeType(i) == NDB_MGM_NODE_TYPE_NDB) {
const ClusterMgr::Node &node= theFacade->theClusterMgr->getNodeInfo(i);
if (node.connected)
connected_nodes.bitOR(node.m_state.m_connected_nodes);
}
}
ndb_mgm_configuration_iterator iter(*(ndb_mgm_configuration *)_config->m_configValues,
CFG_SECTION_NODE);
for(iter.first(); iter.valid(); iter.next()) {
unsigned tmp= 0;
if(iter.get(CFG_NODE_ID, &tmp)) abort();
if (connected_nodes.get(tmp))
continue;
if (*nodeId && *nodeId != tmp)
continue;
unsigned type_c;
if(iter.get(CFG_TYPE_OF_SECTION, &type_c)) abort();
if(type_c != type)
continue;
const char *config_hostname = 0;
if(iter.get(CFG_NODE_HOST, &config_hostname)) abort();
// getsockname(int s, struct sockaddr *name, socklen_t *namelen);
if (config_hostname && config_hostname[0] != 0 && client_addr) {
// check hostname compatability
struct in_addr config_addr;
if(Ndb_getInAddr(&config_addr, config_hostname) != 0
|| memcmp(&config_addr, &(((sockaddr_in*)client_addr)->sin_addr),
sizeof(config_addr)) != 0) {
#if 0
ndbout << "MgmtSrvr::getFreeNodeId compare failed for \"" << config_hostname
<< "\" id=" << tmp << endl;
#endif
continue;
}
}
*nodeId= tmp;
#if 0
ndbout << "MgmtSrvr::getFreeNodeId found type=" << type
<< " *nodeid=" << *nodeId << endl;
#endif
return true;
}
return false;
}
bool
MgmtSrvr::getNextNodeId(NodeId * nodeId, enum ndb_mgm_node_type type) const
{
......@@ -2573,3 +2735,22 @@ MgmtSrvr::getPrimaryNode() const {
return 0;
#endif
}
MgmtSrvr::Allocated_resources::Allocated_resources(MgmtSrvr &m)
: m_mgmsrv(m)
{
}
MgmtSrvr::Allocated_resources::~Allocated_resources()
{
m_mgmsrv.m_reserved_nodes.bitANDC(m_reserved_nodes);
}
void
MgmtSrvr::Allocated_resources::reserve_node(NodeId id)
{
m_reserved_nodes.set(id);
m_mgmsrv.m_reserved_nodes.set(id);
}
......@@ -68,6 +68,18 @@ public:
virtual void println_statistics(const BaseString &s) = 0;
};
class Allocated_resources {
public:
Allocated_resources(class MgmtSrvr &m);
~Allocated_resources();
// methods to reserve/allocate resources which
// will be freed when running destructor
void reserve_node(NodeId id);
private:
MgmtSrvr &m_mgmsrv;
NodeBitmask m_reserved_nodes;
};
/**
* Set a reference to the socket server.
*/
......@@ -150,10 +162,12 @@ public:
enum LogMode {In, Out, InOut, Off};
/* Constructor */
MgmtSrvr(NodeId nodeId, /* Local nodeid */
const BaseString &config_filename, /* Where to save config */
const BaseString &ndb_config_filename, /* Ndb.cfg filename */
Config * config);
NodeId getOwnNodeId() const {return _ownNodeId;};
/**
* Read (initial) config file, create TransporterFacade,
......@@ -448,6 +462,8 @@ public:
* @return false if none found
*/
bool getNextNodeId(NodeId * _nodeId, enum ndb_mgm_node_type type) const ;
bool getFreeNodeId(NodeId * _nodeId, enum ndb_mgm_node_type type,
struct sockaddr *client_addr, socklen_t *client_addr_len) const ;
/**
*
......@@ -492,7 +508,11 @@ public:
* @return statistic port number.
*/
int getStatPort() const;
/**
* Returns the port number.
* @return port number.
*/
int getPort() const;
//**************************************************************************
private:
......@@ -530,13 +550,14 @@ private:
BaseString m_configFilename;
BaseString m_localNdbConfigFilename;
Uint32 m_nextConfigGenerationNumber;
NodeBitmask m_reserved_nodes;
Allocated_resources m_allocated_resources;
int _setVarReqResult; // The result of the SET_VAR_REQ response
Statistics _statistics; // handleSTATISTICS_CONF store the result here,
// and getStatistics reads it.
//**************************************************************************
// Specific signal handling methods
//**************************************************************************
......
......@@ -121,6 +121,15 @@ ParserRow<MgmApiSession> commands[] = {
MGM_ARG("version", Int, Mandatory, "Configuration version number"),
MGM_ARG("node", Int, Optional, "Node ID"),
MGM_CMD("get nodeid", &MgmApiSession::get_nodeid, ""),
MGM_ARG("version", Int, Mandatory, "Configuration version number"),
MGM_ARG("nodetype", Int, Mandatory, "Node type"),
MGM_ARG("transporter", String, Optional, "Transporter type"),
MGM_ARG("nodeid", Int, Optional, "Node ID"),
MGM_ARG("user", String, Mandatory, "Password"),
MGM_ARG("password", String, Mandatory, "Password"),
MGM_ARG("public key", String, Mandatory, "Public key"),
MGM_CMD("get version", &MgmApiSession::getVersion, ""),
MGM_CMD("get status", &MgmApiSession::getStatus, ""),
......@@ -224,6 +233,19 @@ MgmApiSession::MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock)
m_input = new SocketInputStream(sock);
m_output = new SocketOutputStream(sock);
m_parser = new Parser_t(commands, *m_input, true, true, true);
m_allocated_resources= new MgmtSrvr::Allocated_resources(m_mgmsrv);
}
MgmApiSession::~MgmApiSession()
{
if (m_input)
delete m_input;
if (m_output)
delete m_output;
if (m_parser)
delete m_parser;
if (m_allocated_resources)
delete m_allocated_resources;
}
void
......@@ -332,6 +354,91 @@ backward(const char * base, const Properties* reply){
return ret;
}
void
MgmApiSession::get_nodeid(Parser_t::Context &,
const class Properties &args)
{
const char *cmd= "get nodeid reply";
Uint32 version, nodeid= 0, nodetype= 0xff;
const char * transporter;
const char * user;
const char * password;
const char * public_key;
args.get("version", &version);
args.get("nodetype", &nodetype);
args.get("transporter", &transporter);
args.get("nodeid", &nodeid);
args.get("user", &user);
args.get("password", &password);
args.get("public key", &public_key);
bool compatible;
switch (nodetype) {
case NODE_TYPE_MGM:
case NODE_TYPE_API:
compatible = ndbCompatible_mgmt_api(NDB_VERSION, version);
break;
case NODE_TYPE_DB:
compatible = ndbCompatible_mgmt_ndb(NDB_VERSION, version);
break;
default:
m_output->println(cmd);
m_output->println("result: unknown nodetype %d", nodetype);
m_output->println("");
return;
}
struct sockaddr addr;
socklen_t addrlen;
int r;
if (r= getsockname(m_socket, &addr, &addrlen)) {
m_output->println(cmd);
m_output->println("result: getsockname(%d) failed, err= %d", m_socket, r);
m_output->println("");
return;
}
NodeId free_id= 0;
NodeId tmp= nodeid;
if (m_mgmsrv.getFreeNodeId(&tmp, (enum ndb_mgm_node_type)nodetype, &addr, &addrlen))
free_id= tmp;
if (nodeid != 0 && free_id != nodeid){
m_output->println(cmd);
m_output->println("result: no free nodeid %d for nodetype %d",
nodeid, nodetype);
m_output->println("");
return;
}
if (free_id == 0){
m_output->println(cmd);
m_output->println("result: no free nodeid for nodetype %d", nodetype);
m_output->println("");
return;
}
#if 0
if (!compatible){
m_output->println(cmd);
m_output->println("result: incompatible version mgmt 0x%x and node 0x%x",
NDB_VERSION, version);
m_output->println("");
return;
}
#endif
m_output->println(cmd);
m_output->println("nodeid: %u", free_id);
m_output->println("result: Ok");
m_output->println("");
m_allocated_resources->reserve_node(free_id);
return;
}
void
MgmApiSession::getConfig_common(Parser_t::Context &,
const class Properties &args,
......@@ -432,7 +539,6 @@ MgmApiSession::getConfig_common(Parser_t::Context &,
m_output->println("Content-Transfer-Encoding: base64");
m_output->println("");
m_output->println(str.c_str());
m_output->println("");
return;
}
......
......@@ -36,6 +36,7 @@ private:
InputStream *m_input;
OutputStream *m_output;
Parser_t *m_parser;
MgmtSrvr::Allocated_resources *m_allocated_resources;
void getConfig_common(Parser_t::Context &ctx,
const class Properties &args,
......@@ -43,6 +44,7 @@ private:
public:
MgmApiSession(class MgmtSrvr & mgm, NDB_SOCKET_TYPE sock);
virtual ~MgmApiSession();
void runSession();
void getStatPort(Parser_t::Context &ctx, const class Properties &args);
......@@ -51,6 +53,7 @@ public:
void getConfig_old(Parser_t::Context &ctx);
#endif /* MGM_GET_CONFIG_BACKWARDS_COMPAT */
void get_nodeid(Parser_t::Context &ctx, const class Properties &args);
void getVersion(Parser_t::Context &ctx, const class Properties &args);
void getStatus(Parser_t::Context &ctx, const class Properties &args);
void getInfoClusterLog(Parser_t::Context &ctx, const class Properties &args);
......
......@@ -20,7 +20,7 @@
#include "MgmtSrvr.hpp"
#include "EventLogger.hpp"
#include "Config.hpp"
#include <Config.hpp>
#include "InitConfigFileParser.hpp"
#include <SocketServer.hpp>
#include "Services.hpp"
......@@ -37,6 +37,8 @@
#include <mgmapi_config_parameters.h>
#include <getarg.h>
#include <NdbAutoPtr.hpp>
#if defined NDB_OSE || defined NDB_SOFTOSE
#include <efs.h>
#else
......@@ -88,7 +90,6 @@ static MgmGlobals glob;
******************************************************************************/
static bool readLocalConfig();
static bool readGlobalConfig();
static bool setPortNo();
/**
* Global variables
......@@ -146,7 +147,9 @@ NDB_MAIN(mgmsrv){
exit(1);
}
glob.socketServer = new SocketServer();
MgmApiService * mapi = new MgmApiService();
MgmStatService * mstat = new MgmStatService();
/****************************
......@@ -157,9 +160,27 @@ NDB_MAIN(mgmsrv){
if (!readGlobalConfig())
goto error_end;
if (!setPortNo())
glob.mgmObject = new MgmtSrvr(glob.localNodeId,
BaseString(glob.config_filename),
BaseString(glob.local_config_filename == 0 ?
"" : glob.local_config_filename),
glob.cluster_config);
glob.cluster_config = 0;
glob.localNodeId= glob.mgmObject->getOwnNodeId();
if (glob.localNodeId == 0) {
goto error_end;
}
glob.port= glob.mgmObject->getPort();
if (glob.port == 0)
goto error_end;
glob.interface_name = 0;
glob.use_specific_ip = false;
if(!glob.use_specific_ip){
if(!glob.socketServer->tryBind(glob.port, glob.interface_name)){
ndbout_c("Unable to setup port: %s:%d!\n"
......@@ -190,25 +211,18 @@ NDB_MAIN(mgmsrv){
goto error_end;
}
glob.mgmObject = new MgmtSrvr(glob.localNodeId,
BaseString(glob.config_filename),
BaseString(glob.local_config_filename == 0 ? "" : glob.local_config_filename),
glob.cluster_config);
glob.cluster_config = 0;
if(!glob.mgmObject->check_start()){
ndbout_c("Unable to start management server.");
ndbout_c("Unable to check start management server.");
ndbout_c("Probably caused by illegal initial configuration file.");
goto error_end;
}
if (glob.daemon) {
// Become a daemon
char homePath[255],lockfile[255], logfile[255];
NdbConfig_HomePath(homePath, 255);
snprintf(lockfile, 255, "%snode%d.pid", homePath, glob.localNodeId);
snprintf(logfile, 255, "%snode%d.out", homePath, glob.localNodeId);
char *lockfile= NdbConfig_PidFileName(glob.localNodeId);
char *logfile= NdbConfig_StdoutFileName(glob.localNodeId);
NdbAutoPtr<char> tmp_aptr1(lockfile), tmp_aptr2(logfile);
if (NdbDaemon_Make(lockfile, logfile, 0) == -1) {
ndbout << "Cannot become daemon: " << NdbDaemon_ErrorText << endl;
return 1;
......@@ -233,8 +247,8 @@ NDB_MAIN(mgmsrv){
ndbout_c(msg);
g_EventLogger.info(msg);
snprintf(msg, 256, "Command port: %d, Statistics port: %d",
glob.port, glob.port_stats);
snprintf(msg, 256, "Id: %d, Command port: %d, Statistics port: %d",
glob.localNodeId, glob.port, glob.port_stats);
ndbout_c(msg);
g_EventLogger.info(msg);
......@@ -343,108 +357,3 @@ readGlobalConfig() {
}
return true;
}
/**
* @fn setPortNo
* @param glob : Global variables
* @return true if success, false otherwise.
*
* Port number:
* 2. Use port number from global configuration file
* 4. Use port number for statistics from global configuration file
*/
static bool
setPortNo(){
const Properties *mgmProps;
ndb_mgm_configuration_iterator * iter =
ndb_mgm_create_configuration_iterator(glob.cluster_config->m_configValues,
CFG_SECTION_NODE);
if(iter == 0)
return false;
if(ndb_mgm_find(iter, CFG_NODE_ID, glob.localNodeId) != 0){
ndbout << "Could not retrieve configuration for Node "
<< glob.localNodeId << " in config file." << endl
<< "Have you set correct NodeId for this node?" << endl;
ndb_mgm_destroy_iterator(iter);
return false;
}
unsigned type;
if(ndb_mgm_get_int_parameter(iter, CFG_TYPE_OF_SECTION, &type) != 0 ||
type != NODE_TYPE_MGM){
ndbout << "Local node id " << glob.localNodeId
<< " is not defined as management server" << endl
<< "Have you set correct NodeId for this node?" << endl;
return false;
}
/************
* Set Port *
************/
Uint32 tmp = 0;
if(ndb_mgm_get_int_parameter(iter, CFG_MGM_PORT, &tmp) != 0){
ndbout << "Could not find PortNumber in the configuration file." << endl;
return false;
}
glob.port = tmp;
/*****************
* Set Stat Port *
*****************/
#if 0
if (!mgmProps->get("PortNumberStats", &tmp)){
ndbout << "Could not find PortNumberStats in the configuration file."
<< endl;
return false;
}
glob.port_stats = tmp;
#endif
#if 0
const char * host;
if(ndb_mgm_get_string_parameter(iter, mgmProps->get("ExecuteOnComputer", host)){
ndbout << "Failed to find \"ExecuteOnComputer\" for my node" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
const char * hostname;
{
const Properties * p;
char buf[255];
snprintf(buf, sizeof(buf), "Computer_%s", host.c_str());
if(!glob.cluster_config->get(buf, &p)){
ndbout << "Failed to find computer " << host << " in config" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
if(!p->get("HostName", &hostname)){
ndbout << "Failed to find \"HostName\" for computer " << host
<< " in config" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
if(NdbHost_GetHostName(buf) != 0){
ndbout << "Unable to get own hostname" << endl;
ndbout << "Unable to verify own hostname" << endl;
return false;
}
}
const char * ip_address;
if(mgmProps->get("IpAddress", &ip_address)){
glob.use_specific_ip = true;
glob.interface_name = strdup(ip_address);
return true;
}
glob.interface_name = strdup(hostname);
#endif
glob.interface_name = 0;
glob.use_specific_ip = false;
return true;
}
......@@ -295,6 +295,7 @@ ClusterMgr::execAPI_REGREQ(const Uint32 * theData){
}
int global_mgmt_server_check = 0; // set to one in mgmtsrvr main;
void
ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
const ApiRegConf * const apiRegConf = (ApiRegConf *)&theData[0];
......@@ -309,6 +310,7 @@ ClusterMgr::execAPI_REGCONF(const Uint32 * theData){
Node & node = theNodes[nodeId];
assert(node.defined == true);
assert(node.connected == true);
if(node.m_info.m_version != apiRegConf->version){
node.m_info.m_version = apiRegConf->version;
if (global_mgmt_server_check == 1)
......
......@@ -39,6 +39,7 @@
#endif
//#define REPORT_TRANSPORTER
//#define API_TRACE;
#if defined DEBUG_TRANSPORTER
#define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
......@@ -47,7 +48,7 @@
#endif
TransporterFacade* TransporterFacade::theFacadeInstance = NULL;
ConfigRetriever *TransporterFacade::s_config_retriever= 0;
/*****************************************************************************
......@@ -332,11 +333,15 @@ atexit_stop_instance(){
*
* Which is protected by a mutex
*/
TransporterFacade*
TransporterFacade::start_instance(const char * connectString){
// TransporterFacade used from API get config from mgmt srvr
ConfigRetriever configRetriever;
s_config_retriever= new ConfigRetriever;
ConfigRetriever &configRetriever= *s_config_retriever;
configRetriever.setConnectString(connectString);
ndb_mgm_configuration * props = configRetriever.getConfig(NDB_VERSION,
NODE_TYPE_API);
......@@ -389,6 +394,14 @@ TransporterFacade::start_instance(int nodeId,
return tf;
}
void
TransporterFacade::close_configuration(){
if (s_config_retriever) {
delete s_config_retriever;
s_config_retriever= 0;
}
}
/**
* Note that this function need no locking since its
* only called from the destructor of Ndb (the NdbObject)
......@@ -397,6 +410,9 @@ TransporterFacade::start_instance(int nodeId,
*/
void
TransporterFacade::stop_instance(){
close_configuration();
if(theFacadeInstance == NULL){
/**
* We are called from atexit function
......@@ -440,7 +456,17 @@ runSendRequest_C(void * me)
void TransporterFacade::threadMainSend(void)
{
SocketServer socket_server;
theTransporterRegistry->startSending();
if (!theTransporterRegistry->start_service(socket_server))
NDB_ASSERT(0, "Unable to start theTransporterRegistry->start_service");
if (!theTransporterRegistry->start_clients())
NDB_ASSERT(0, "Unable to start theTransporterRegistry->start_clients");
socket_server.startServer();
while(!theStopReceive) {
NdbSleep_MilliSleep(10);
NdbMutex_Lock(theMutexPtr);
......@@ -451,6 +477,11 @@ void TransporterFacade::threadMainSend(void)
NdbMutex_Unlock(theMutexPtr);
}
theTransporterRegistry->stopSending();
socket_server.stopServer();
socket_server.stopSessions();
theTransporterRegistry->stop_clients();
}
extern "C"
......@@ -466,7 +497,7 @@ void TransporterFacade::threadMainReceive(void)
{
theTransporterRegistry->startReceiving();
NdbMutex_Lock(theMutexPtr);
theTransporterRegistry->checkConnections();
theTransporterRegistry->update_connections();
NdbMutex_Unlock(theMutexPtr);
while(!theStopReceive) {
for(int i = 0; i<10; i++){
......@@ -478,7 +509,7 @@ void TransporterFacade::threadMainReceive(void)
}
}
NdbMutex_Lock(theMutexPtr);
theTransporterRegistry->checkConnections();
theTransporterRegistry->update_connections();
NdbMutex_Unlock(theMutexPtr);
}//while
theTransporterRegistry->stopReceiving();
......@@ -875,13 +906,13 @@ TransporterFacade::sendFragmentedSignalUnCond(NdbApiSignal* aSignal,
void
TransporterFacade::doConnect(int aNodeId){
theTransporterRegistry->setIOState(aNodeId, NoHalt);
theTransporterRegistry->setPerformState(aNodeId, PerformConnect);
theTransporterRegistry->do_connect(aNodeId);
}
void
TransporterFacade::doDisconnect(int aNodeId)
{
theTransporterRegistry->setPerformState(aNodeId, PerformDisconnect);
theTransporterRegistry->do_disconnect(aNodeId);
}
void
......@@ -906,7 +937,7 @@ TransporterFacade::ownId() const
bool
TransporterFacade::isConnected(NodeId aNodeId){
return theTransporterRegistry->performState(aNodeId) == PerformIO;
return theTransporterRegistry->is_connected(aNodeId);
}
NodeId
......
......@@ -29,6 +29,7 @@ class ClusterMgr;
class ArbitMgr;
class IPCConfig;
struct ndb_mgm_configuration;
class ConfigRetriever;
class Ndb;
class NdbApiSignal;
......@@ -56,6 +57,7 @@ public:
static TransporterFacade* instance();
static TransporterFacade* start_instance(int, const ndb_mgm_configuration*);
static TransporterFacade* start_instance(const char *connectString);
static void close_configuration();
static void stop_instance();
/**
......@@ -110,7 +112,6 @@ public:
// Close this block number
int close_local(BlockNumber blockNumber);
void setState(Uint32 aNodeId, PerformState aState);
private:
/**
......@@ -219,6 +220,7 @@ public:
NdbMutex* theMutexPtr;
private:
static TransporterFacade* theFacadeInstance;
static ConfigRetriever *s_config_retriever;
public:
GlobalDictCache m_globalDictCache;
......
......@@ -71,7 +71,7 @@ NdbBackup::getFileSystemPathForNode(int _node_id){
*/
ConfigRetriever cr;
ndb_mgm_configuration * p = cr.getConfig(host, port, 0);
ndb_mgm_configuration * p = cr.getConfig(host, port, 0, NODE_TYPE_API);
if(p == 0){
const char * s = cr.getErrorString();
if(s == 0)
......@@ -154,7 +154,7 @@ NdbBackup::execRestore(bool _restore_data,
#endif
snprintf(buf, 255, "ndb_restore -c \"nodeid=%d;host=%s\" -n %d -b %d %s %s .",
snprintf(buf, 255, "valgrind --leak-check=yes -v ndb_restore -c \"nodeid=%d;host=%s\" -n %d -b %d %s %s .",
ownNodeId,
addr,
_node_id,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment