Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
6bac3b8f
Commit
6bac3b8f
authored
Aug 30, 2006
by
mskold/marty@mysql.com/linux.site
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
NDBAPI cleanup
parent
47d6296b
Changes
17
Hide whitespace changes
Inline
Side-by-side
Showing
17 changed files
with
184 additions
and
118 deletions
+184
-118
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.cc
+16
-0
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.cc
+18
-13
storage/ndb/include/ndbapi/Ndb.hpp
storage/ndb/include/ndbapi/Ndb.hpp
+9
-0
storage/ndb/include/ndbapi/NdbTransaction.hpp
storage/ndb/include/ndbapi/NdbTransaction.hpp
+2
-1
storage/ndb/include/ndbapi/ndb_cluster_connection.hpp
storage/ndb/include/ndbapi/ndb_cluster_connection.hpp
+2
-0
storage/ndb/include/transporter/TransporterRegistry.hpp
storage/ndb/include/transporter/TransporterRegistry.hpp
+1
-0
storage/ndb/src/common/transporter/Packer.cpp
storage/ndb/src/common/transporter/Packer.cpp
+2
-2
storage/ndb/src/common/transporter/TransporterRegistry.cpp
storage/ndb/src/common/transporter/TransporterRegistry.cpp
+7
-7
storage/ndb/src/common/util/ndb_init.c
storage/ndb/src/common/util/ndb_init.c
+19
-0
storage/ndb/src/mgmapi/mgmapi.cpp
storage/ndb/src/mgmapi/mgmapi.cpp
+1
-1
storage/ndb/src/ndbapi/Ndb.cpp
storage/ndb/src/ndbapi/Ndb.cpp
+6
-0
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
+0
-55
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
+6
-21
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
+3
-2
storage/ndb/src/ndbapi/NdbTransaction.cpp
storage/ndb/src/ndbapi/NdbTransaction.cpp
+6
-5
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
+76
-11
storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp
storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp
+10
-0
No files found.
sql/ha_ndbcluster.cc
View file @
6bac3b8f
...
...
@@ -48,6 +48,16 @@
extern
my_bool
opt_ndb_optimized_node_selection
;
extern
const
char
*
opt_ndbcluster_connectstring
;
// ndb interface initialization/cleanup
#ifdef __cplusplus
extern
"C"
{
#endif
extern
void
ndb_init_internal
();
extern
void
ndb_end_internal
();
#ifdef __cplusplus
}
#endif
const
char
*
ndb_distribution_names
[]
=
{
"KEYHASH"
,
"LINHASH"
,
NullS
};
TYPELIB
ndb_distribution_typelib
=
{
array_elements
(
ndb_distribution_names
)
-
1
,
""
,
ndb_distribution_names
,
NULL
};
...
...
@@ -6392,6 +6402,9 @@ static int ndbcluster_init()
if
(
have_ndbcluster
!=
SHOW_OPTION_YES
)
DBUG_RETURN
(
0
);
// nothing else to do
// Initialize ndb interface
ndb_init_internal
();
// Set connectstring if specified
if
(
opt_ndbcluster_connectstring
!=
0
)
DBUG_PRINT
(
"connectstring"
,
(
"%s"
,
opt_ndbcluster_connectstring
));
...
...
@@ -6540,6 +6553,9 @@ static int ndbcluster_end(ha_panic_function type)
delete
g_ndb_cluster_connection
;
g_ndb_cluster_connection
=
NULL
;
// cleanup ndb interface
ndb_end_internal
();
pthread_mutex_destroy
(
&
ndbcluster_mutex
);
pthread_mutex_destroy
(
&
LOCK_ndb_util_thread
);
pthread_cond_destroy
(
&
COND_ndb_util_thread
);
...
...
sql/ha_ndbcluster_binlog.cc
View file @
6bac3b8f
...
...
@@ -25,6 +25,7 @@
#include "slave.h"
#include "ha_ndbcluster_binlog.h"
#include "NdbDictionary.hpp"
#include "ndb_cluster_connection.hpp"
#include <util/NdbAutoPtr.hpp>
#ifdef ndb_dynamite
...
...
@@ -111,8 +112,7 @@ static NDB_SCHEMA_OBJECT *ndb_get_schema_object(const char *key,
static
void
ndb_free_schema_object
(
NDB_SCHEMA_OBJECT
**
ndb_schema_object
,
bool
have_lock
);
/* instantiated in storage/ndb/src/ndbapi/Ndbif.cpp */
extern
Uint64
g_latest_trans_gci
;
static
Uint64
*
p_latest_trans_gci
=
0
;
/*
Global variables for holding the binlog_index table reference
...
...
@@ -439,7 +439,7 @@ static void ndbcluster_binlog_wait(THD *thd)
{
DBUG_ENTER
(
"ndbcluster_binlog_wait"
);
const
char
*
save_info
=
thd
?
thd
->
proc_info
:
0
;
ulonglong
wait_epoch
=
g
_latest_trans_gci
;
ulonglong
wait_epoch
=
*
p
_latest_trans_gci
;
int
count
=
30
;
if
(
thd
)
thd
->
proc_info
=
"Waiting for ndbcluster binlog update to "
...
...
@@ -3284,6 +3284,7 @@ static void ndb_free_schema_object(NDB_SCHEMA_OBJECT **ndb_schema_object,
DBUG_VOID_RETURN
;
}
pthread_handler_t
ndb_binlog_thread_func
(
void
*
arg
)
{
THD
*
thd
;
/* needs to be first for thread_stack */
...
...
@@ -3292,6 +3293,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
Thd_ndb
*
thd_ndb
=
0
;
int
ndb_update_binlog_index
=
1
;
injector
*
inj
=
injector
::
instance
();
#ifdef RUN_NDB_BINLOG_TIMER
Timer
main_timer
;
#endif
...
...
@@ -3380,6 +3382,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
*/
injector_thd
=
thd
;
injector_ndb
=
i_ndb
;
p_latest_trans_gci
=
injector_ndb
->
get_ndb_cluster_connection
().
get_latest_trans_gci
();
schema_ndb
=
s_ndb
;
ndb_binlog_thread_running
=
1
;
if
(
opt_bin_log
)
...
...
@@ -3476,7 +3480,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
"ndb_latest_handled_binlog_epoch: %u, while current epoch: %u. "
"RESET MASTER should be issued. Resetting ndb_latest_handled_binlog_epoch."
,
(
unsigned
)
ndb_latest_handled_binlog_epoch
,
(
unsigned
)
schema_gci
);
g
_latest_trans_gci
=
0
;
*
p
_latest_trans_gci
=
0
;
ndb_latest_handled_binlog_epoch
=
0
;
ndb_latest_applied_binlog_epoch
=
0
;
ndb_latest_received_binlog_epoch
=
0
;
...
...
@@ -3503,7 +3507,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
}
do_ndbcluster_binlog_close_connection
=
BCCC_running
;
for
(
;
!
((
abort_loop
||
do_ndbcluster_binlog_close_connection
)
&&
ndb_latest_handled_binlog_epoch
>=
g
_latest_trans_gci
)
&&
ndb_latest_handled_binlog_epoch
>=
*
p
_latest_trans_gci
)
&&
do_ndbcluster_binlog_close_connection
!=
BCCC_restart
;
)
{
#ifndef DBUG_OFF
...
...
@@ -3511,8 +3515,8 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
{
DBUG_PRINT
(
"info"
,
(
"do_ndbcluster_binlog_close_connection: %d, "
"ndb_latest_handled_binlog_epoch: %llu, "
"
g
_latest_trans_gci: %llu"
,
do_ndbcluster_binlog_close_connection
,
ndb_latest_handled_binlog_epoch
,
g
_latest_trans_gci
));
"
*p
_latest_trans_gci: %llu"
,
do_ndbcluster_binlog_close_connection
,
ndb_latest_handled_binlog_epoch
,
*
p
_latest_trans_gci
));
}
#endif
#ifdef RUN_NDB_BINLOG_TIMER
...
...
@@ -3548,7 +3552,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
}
if
((
abort_loop
||
do_ndbcluster_binlog_close_connection
)
&&
(
ndb_latest_handled_binlog_epoch
>=
g
_latest_trans_gci
||
(
ndb_latest_handled_binlog_epoch
>=
*
p
_latest_trans_gci
||
!
ndb_binlog_running
))
break
;
/* Shutting down server */
...
...
@@ -3598,11 +3602,11 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
{
DBUG_PRINT
(
"info"
,
(
"do_ndbcluster_binlog_close_connection= BCCC_restart"
));
do_ndbcluster_binlog_close_connection
=
BCCC_restart
;
if
(
ndb_latest_received_binlog_epoch
<
g
_latest_trans_gci
&&
ndb_binlog_running
)
if
(
ndb_latest_received_binlog_epoch
<
*
p
_latest_trans_gci
&&
ndb_binlog_running
)
{
sql_print_error
(
"NDB Binlog: latest transaction in epoch %lld not in binlog "
"as latest received epoch is %lld"
,
g
_latest_trans_gci
,
ndb_latest_received_binlog_epoch
);
*
p
_latest_trans_gci
,
ndb_latest_received_binlog_epoch
);
}
}
}
...
...
@@ -3784,11 +3788,11 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
{
DBUG_PRINT
(
"info"
,
(
"do_ndbcluster_binlog_close_connection= BCCC_restart"
));
do_ndbcluster_binlog_close_connection
=
BCCC_restart
;
if
(
ndb_latest_received_binlog_epoch
<
g
_latest_trans_gci
&&
ndb_binlog_running
)
if
(
ndb_latest_received_binlog_epoch
<
*
p
_latest_trans_gci
&&
ndb_binlog_running
)
{
sql_print_error
(
"NDB Binlog: latest transaction in epoch %lld not in binlog "
"as latest received epoch is %lld"
,
g
_latest_trans_gci
,
ndb_latest_received_binlog_epoch
);
*
p
_latest_trans_gci
,
ndb_latest_received_binlog_epoch
);
}
}
}
...
...
@@ -3861,6 +3865,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
/* don't mess with the injector_ndb anymore from other threads */
injector_thd
=
0
;
injector_ndb
=
0
;
p_latest_trans_gci
=
0
;
schema_ndb
=
0
;
pthread_mutex_unlock
(
&
injector_mutex
);
thd
->
db
=
0
;
// as not to try to free memory
...
...
@@ -3960,7 +3965,7 @@ ndbcluster_show_status_binlog(THD* thd, stat_print_fn *stat_print,
"latest_handled_binlog_epoch=%s, "
"latest_applied_binlog_epoch=%s"
,
llstr
(
ndb_latest_epoch
,
buff1
),
llstr
(
g
_latest_trans_gci
,
buff2
),
llstr
(
*
p
_latest_trans_gci
,
buff2
),
llstr
(
ndb_latest_received_binlog_epoch
,
buff3
),
llstr
(
ndb_latest_handled_binlog_epoch
,
buff4
),
llstr
(
ndb_latest_applied_binlog_epoch
,
buff5
));
...
...
storage/ndb/include/ndbapi/Ndb.hpp
View file @
6bac3b8f
...
...
@@ -1093,6 +1093,15 @@ public:
~
Ndb
();
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/**
* The current ndb_cluster_connection get_ndb_cluster_connection.
*
* @return the current connection
*/
Ndb_cluster_connection
&
get_ndb_cluster_connection
();
#endif
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/**
* The current catalog name can be fetched by getCatalogName.
...
...
storage/ndb/include/ndbapi/NdbTransaction.hpp
View file @
6bac3b8f
...
...
@@ -735,6 +735,7 @@ private:
Uint32
theTCConPtr
;
// Transaction Co-ordinator connection pointer.
Uint64
theTransactionId
;
// theTransactionId of the transaction
Uint32
theGlobalCheckpointId
;
// The gloabl checkpoint identity of the transaction
Uint64
*
p_latest_trans_gci
;
// Reference to latest gci for connection
ConStatusType
theStatus
;
// The status of the connection
enum
CompletionStatus
{
NotCompleted
,
...
...
@@ -753,7 +754,7 @@ private:
bool
theTransactionIsStarted
;
bool
theInUseState
;
bool
theSimpleState
;
Uint8
m_abortOption
;
// Type of commi
t
Uint8
m_abortOption
;
// Type of commi
enum
ListState
{
NotInList
,
...
...
storage/ndb/include/ndbapi/ndb_cluster_connection.hpp
View file @
6bac3b8f
...
...
@@ -114,6 +114,8 @@ public:
void
init_get_next_node
(
Ndb_cluster_connection_node_iter
&
iter
);
unsigned
int
get_next_node
(
Ndb_cluster_connection_node_iter
&
iter
);
Uint64
*
get_latest_trans_gci
();
#endif
private:
...
...
storage/ndb/include/transporter/TransporterRegistry.hpp
View file @
6bac3b8f
...
...
@@ -361,6 +361,7 @@ private:
Uint32
poll_SHM
(
Uint32
timeOutMillis
);
int
m_shm_own_pid
;
int
m_transp_count
;
};
#endif // Define of TransporterRegistry_H
storage/ndb/src/common/transporter/Packer.cpp
View file @
6bac3b8f
...
...
@@ -213,8 +213,8 @@ TransporterRegistry::unpack(Uint32 * readPtr,
Uint32
*
eodPtr
,
NodeId
remoteNodeId
,
IOState
state
)
{
static
SignalHeader
signalHeader
;
static
LinearSectionPtr
ptr
[
3
];
SignalHeader
signalHeader
;
LinearSectionPtr
ptr
[
3
];
Uint32
loop_count
=
0
;
if
(
state
==
NoHalt
||
state
==
HaltOutput
){
while
((
readPtr
<
eodPtr
)
&&
(
loop_count
<
MAX_RECEIVED_SIGNALS
))
{
...
...
storage/ndb/src/common/transporter/TransporterRegistry.cpp
View file @
6bac3b8f
...
...
@@ -80,14 +80,15 @@ SocketServer::Session * TransporterService::newSession(NDB_SOCKET_TYPE sockfd)
TransporterRegistry
::
TransporterRegistry
(
void
*
callback
,
unsigned
_maxTransporters
,
unsigned
sizeOfLongSignalMemory
)
unsigned
sizeOfLongSignalMemory
)
:
m_mgm_handle
(
0
),
m_transp_count
(
0
)
{
DBUG_ENTER
(
"TransporterRegistry::TransporterRegistry"
);
nodeIdSpecified
=
false
;
maxTransporters
=
_maxTransporters
;
sendCounter
=
1
;
m_mgm_handle
=
0
;
callbackObj
=
callback
;
...
...
@@ -1002,7 +1003,6 @@ TransporterRegistry::performReceive()
#endif
}
static
int
x
=
0
;
void
TransporterRegistry
::
performSend
()
{
...
...
@@ -1070,7 +1070,7 @@ TransporterRegistry::performSend()
}
#endif
#ifdef NDB_TCP_TRANSPORTER
for
(
i
=
x
;
i
<
nTCPTransporters
;
i
++
)
for
(
i
=
m_transp_count
;
i
<
nTCPTransporters
;
i
++
)
{
TCP_Transporter
*
t
=
theTCPTransporters
[
i
];
if
(
t
&&
t
->
hasDataToSend
()
&&
t
->
isConnected
()
&&
...
...
@@ -1079,7 +1079,7 @@ TransporterRegistry::performSend()
t
->
doSend
();
}
}
for
(
i
=
0
;
i
<
x
&&
i
<
nTCPTransporters
;
i
++
)
for
(
i
=
0
;
i
<
m_transp_count
&&
i
<
nTCPTransporters
;
i
++
)
{
TCP_Transporter
*
t
=
theTCPTransporters
[
i
];
if
(
t
&&
t
->
hasDataToSend
()
&&
t
->
isConnected
()
&&
...
...
@@ -1088,8 +1088,8 @@ TransporterRegistry::performSend()
t
->
doSend
();
}
}
x
++
;
if
(
x
==
nTCPTransporters
)
x
=
0
;
m_transp_count
++
;
if
(
m_transp_count
==
nTCPTransporters
)
m_transp_count
=
0
;
#endif
#endif
#ifdef NDB_SCI_TRANSPORTER
...
...
storage/ndb/src/common/util/ndb_init.c
View file @
6bac3b8f
...
...
@@ -16,6 +16,16 @@
#include <ndb_global.h>
#include <my_sys.h>
#include <NdbMutex.h>
NdbMutex
*
g_ndb_connection_mutex
=
NULL
;
void
ndb_init_internal
()
{
if
(
!
g_ndb_connection_mutex
)
g_ndb_connection_mutex
=
NdbMutex_Create
();
}
int
ndb_init
()
...
...
@@ -25,11 +35,20 @@ ndb_init()
write
(
2
,
err
,
strlen
(
err
));
exit
(
1
);
}
ndb_init_internal
();
return
0
;
}
void
ndb_end_internal
()
{
if
(
g_ndb_connection_mutex
)
NdbMutex_Destroy
(
g_ndb_connection_mutex
);
}
void
ndb_end
(
int
flags
)
{
my_end
(
flags
);
ndb_end_internal
();
}
storage/ndb/src/mgmapi/mgmapi.cpp
View file @
6bac3b8f
...
...
@@ -1194,7 +1194,7 @@ const unsigned int *
ndb_mgm_get_clusterlog_severity_filter
(
NdbMgmHandle
handle
)
{
SET_ERROR
(
handle
,
NDB_MGM_NO_ERROR
,
"Executing: ndb_mgm_get_clusterlog_severity_filter"
);
static
unsigned
int
enabled
[(
int
)
NDB_MGM_EVENT_SEVERITY_ALL
]
=
unsigned
int
enabled
[(
int
)
NDB_MGM_EVENT_SEVERITY_ALL
]
=
{
0
,
0
,
0
,
0
,
0
,
0
,
0
};
const
ParserRow
<
ParserDummy
>
getinfo_reply
[]
=
{
MGM_CMD
(
"clusterlog"
,
NULL
,
""
),
...
...
storage/ndb/src/ndbapi/Ndb.cpp
View file @
6bac3b8f
...
...
@@ -1172,6 +1172,12 @@ convertEndian(Uint32 Data)
}
// <internal>
Ndb_cluster_connection
&
Ndb
::
get_ndb_cluster_connection
()
{
return
theImpl
->
m_ndb_cluster_connection
;
}
const
char
*
Ndb
::
getCatalogName
()
const
{
return
theImpl
->
m_dbname
.
c_str
();
...
...
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
View file @
6bac3b8f
...
...
@@ -1303,8 +1303,6 @@ NdbDictionaryImpl::NdbDictionaryImpl(Ndb &ndb,
m_local_table_data_size
=
0
;
}
static
int
f_dictionary_count
=
0
;
NdbDictionaryImpl
::~
NdbDictionaryImpl
()
{
NdbElement_t
<
Ndb_local_table_info
>
*
curr
=
m_localHash
.
m_tableHash
.
getNext
(
0
);
...
...
@@ -1317,33 +1315,6 @@ NdbDictionaryImpl::~NdbDictionaryImpl()
curr
=
m_localHash
.
m_tableHash
.
getNext
(
curr
);
}
m_globalHash
->
lock
();
if
(
--
f_dictionary_count
==
0
){
delete
NdbDictionary
::
Column
::
FRAGMENT
;
delete
NdbDictionary
::
Column
::
FRAGMENT_FIXED_MEMORY
;
delete
NdbDictionary
::
Column
::
FRAGMENT_VARSIZED_MEMORY
;
delete
NdbDictionary
::
Column
::
ROW_COUNT
;
delete
NdbDictionary
::
Column
::
COMMIT_COUNT
;
delete
NdbDictionary
::
Column
::
ROW_SIZE
;
delete
NdbDictionary
::
Column
::
RANGE_NO
;
delete
NdbDictionary
::
Column
::
DISK_REF
;
delete
NdbDictionary
::
Column
::
RECORDS_IN_RANGE
;
delete
NdbDictionary
::
Column
::
ROWID
;
delete
NdbDictionary
::
Column
::
ROW_GCI
;
NdbDictionary
::
Column
::
FRAGMENT
=
0
;
NdbDictionary
::
Column
::
FRAGMENT_FIXED_MEMORY
=
0
;
NdbDictionary
::
Column
::
FRAGMENT_VARSIZED_MEMORY
=
0
;
NdbDictionary
::
Column
::
ROW_COUNT
=
0
;
NdbDictionary
::
Column
::
COMMIT_COUNT
=
0
;
NdbDictionary
::
Column
::
ROW_SIZE
=
0
;
NdbDictionary
::
Column
::
RANGE_NO
=
0
;
NdbDictionary
::
Column
::
DISK_REF
=
0
;
NdbDictionary
::
Column
::
RECORDS_IN_RANGE
=
0
;
NdbDictionary
::
Column
::
ROWID
=
0
;
NdbDictionary
::
Column
::
ROW_GCI
=
0
;
}
m_globalHash
->
unlock
();
}
else
{
assert
(
curr
==
0
);
}
...
...
@@ -1486,32 +1457,6 @@ NdbDictionaryImpl::setTransporter(class Ndb* ndb,
{
m_globalHash
=
&
tf
->
m_globalDictCache
;
if
(
m_receiver
.
setTransporter
(
ndb
,
tf
)){
m_globalHash
->
lock
();
if
(
f_dictionary_count
++
==
0
){
NdbDictionary
::
Column
::
FRAGMENT
=
NdbColumnImpl
::
create_pseudo
(
"NDB$FRAGMENT"
);
NdbDictionary
::
Column
::
FRAGMENT_FIXED_MEMORY
=
NdbColumnImpl
::
create_pseudo
(
"NDB$FRAGMENT_FIXED_MEMORY"
);
NdbDictionary
::
Column
::
FRAGMENT_VARSIZED_MEMORY
=
NdbColumnImpl
::
create_pseudo
(
"NDB$FRAGMENT_VARSIZED_MEMORY"
);
NdbDictionary
::
Column
::
ROW_COUNT
=
NdbColumnImpl
::
create_pseudo
(
"NDB$ROW_COUNT"
);
NdbDictionary
::
Column
::
COMMIT_COUNT
=
NdbColumnImpl
::
create_pseudo
(
"NDB$COMMIT_COUNT"
);
NdbDictionary
::
Column
::
ROW_SIZE
=
NdbColumnImpl
::
create_pseudo
(
"NDB$ROW_SIZE"
);
NdbDictionary
::
Column
::
RANGE_NO
=
NdbColumnImpl
::
create_pseudo
(
"NDB$RANGE_NO"
);
NdbDictionary
::
Column
::
DISK_REF
=
NdbColumnImpl
::
create_pseudo
(
"NDB$DISK_REF"
);
NdbDictionary
::
Column
::
RECORDS_IN_RANGE
=
NdbColumnImpl
::
create_pseudo
(
"NDB$RECORDS_IN_RANGE"
);
NdbDictionary
::
Column
::
ROWID
=
NdbColumnImpl
::
create_pseudo
(
"NDB$ROWID"
);
NdbDictionary
::
Column
::
ROW_GCI
=
NdbColumnImpl
::
create_pseudo
(
"NDB$ROW_GCI"
);
}
m_globalHash
->
unlock
();
return
true
;
}
return
false
;
...
...
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
View file @
6bac3b8f
...
...
@@ -925,8 +925,6 @@ NdbEventOperationImpl::printAll()
* Each Ndb object has a Object.
*/
// ToDo ref count this so it get's destroyed
NdbMutex
*
NdbEventBuffer
::
p_add_drop_mutex
=
0
;
NdbEventBuffer
::
NdbEventBuffer
(
Ndb
*
ndb
)
:
m_system_nodes
(
ndb
->
theImpl
->
theNoOfDBnodes
),
...
...
@@ -938,7 +936,8 @@ NdbEventBuffer::NdbEventBuffer(Ndb *ndb) :
m_max_free_thresh
(
100
),
m_gci_slip_thresh
(
3
),
m_dropped_ev_op
(
0
),
m_active_op_count
(
0
)
m_active_op_count
(
0
),
m_add_drop_mutex
(
0
)
{
#ifdef VM_TRACE
m_latest_command
=
"NdbEventBuffer::NdbEventBuffer"
;
...
...
@@ -950,16 +949,6 @@ NdbEventBuffer::NdbEventBuffer(Ndb *ndb) :
exit
(
-
1
);
}
m_mutex
=
ndb
->
theImpl
->
theWaiter
.
m_mutex
;
lock
();
if
(
p_add_drop_mutex
==
0
)
{
if
((
p_add_drop_mutex
=
NdbMutex_Create
())
==
NULL
)
{
ndbout_c
(
"NdbEventBuffer: NdbMutex_Create() failed"
);
exit
(
-
1
);
}
}
unlock
();
// ToDo set event buffer size
// pre allocate event data array
m_sz
=
0
;
...
...
@@ -969,6 +958,10 @@ NdbEventBuffer::NdbEventBuffer(Ndb *ndb) :
m_free_data
=
0
;
m_free_data_sz
=
0
;
// get reference to mutex managed by current connection
m_add_drop_mutex
=
m_ndb
->
theImpl
->
m_ndb_cluster_connection
.
m_event_add_drop_mutex
;
// initialize lists
bzero
(
&
g_empty_gci_container
,
sizeof
(
Gci_container
));
init_gci_containers
();
...
...
@@ -1006,14 +999,6 @@ NdbEventBuffer::~NdbEventBuffer()
}
NdbCondition_Destroy
(
p_cond
);
lock
();
if
(
p_add_drop_mutex
)
{
NdbMutex_Destroy
(
p_add_drop_mutex
);
p_add_drop_mutex
=
0
;
}
unlock
();
}
void
...
...
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
View file @
6bac3b8f
...
...
@@ -406,8 +406,8 @@ public:
void
dropEventOperation
(
NdbEventOperation
*
);
static
NdbEventOperationImpl
*
getEventOperationImpl
(
NdbEventOperation
*
tOp
);
void
add_drop_lock
()
{
NdbMutex_Lock
(
p
_add_drop_mutex
);
}
void
add_drop_unlock
()
{
NdbMutex_Unlock
(
p
_add_drop_mutex
);
}
void
add_drop_lock
()
{
NdbMutex_Lock
(
m
_add_drop_mutex
);
}
void
add_drop_unlock
()
{
NdbMutex_Unlock
(
m
_add_drop_mutex
);
}
void
lock
()
{
NdbMutex_Lock
(
m_mutex
);
}
void
unlock
()
{
NdbMutex_Unlock
(
m_mutex
);
}
...
...
@@ -509,6 +509,7 @@ private:
NdbEventOperationImpl
*
m_dropped_ev_op
;
Uint32
m_active_op_count
;
NdbMutex
*
m_add_drop_mutex
;
};
inline
...
...
storage/ndb/src/ndbapi/NdbTransaction.cpp
View file @
6bac3b8f
...
...
@@ -32,8 +32,6 @@
#include <signaldata/TcKeyFailConf.hpp>
#include <signaldata/TcHbRep.hpp>
Uint64
g_latest_trans_gci
=
0
;
/*****************************************************************************
NdbTransaction( Ndb* aNdb );
...
...
@@ -64,6 +62,7 @@ NdbTransaction::NdbTransaction( Ndb* aNdb ) :
theTCConPtr
(
0
),
theTransactionId
(
0
),
theGlobalCheckpointId
(
0
),
p_latest_trans_gci
(
0
),
theStatus
(
NotConnected
),
theCompletionStatus
(
NotCompleted
),
theCommitStatus
(
NotStarted
),
...
...
@@ -129,6 +128,8 @@ NdbTransaction::init()
theCompletedLastOp
=
NULL
;
theGlobalCheckpointId
=
0
;
p_latest_trans_gci
=
theNdb
->
theImpl
->
m_ndb_cluster_connection
.
get_latest_trans_gci
();
theCommitStatus
=
Started
;
theCompletionStatus
=
NotCompleted
;
m_abortOption
=
AbortOnError
;
...
...
@@ -1572,7 +1573,7 @@ NdbTransaction::receiveTC_COMMITCONF(const TcCommitConf * commitConf)
theGlobalCheckpointId
=
commitConf
->
gci
;
// theGlobalCheckpointId == 0 if NoOp transaction
if
(
theGlobalCheckpointId
)
g
_latest_trans_gci
=
theGlobalCheckpointId
;
*
p
_latest_trans_gci
=
theGlobalCheckpointId
;
return
0
;
}
else
{
#ifdef NDB_NO_DROPPED_SIGNAL
...
...
@@ -1752,7 +1753,7 @@ from other transactions.
theCommitStatus
=
Committed
;
theGlobalCheckpointId
=
tGCI
;
assert
(
tGCI
);
g
_latest_trans_gci
=
tGCI
;
*
p
_latest_trans_gci
=
tGCI
;
}
else
if
((
tNoComp
>=
tNoSent
)
&&
(
theLastExecOpInList
->
theCommitIndicator
==
1
)){
...
...
@@ -1930,7 +1931,7 @@ NdbTransaction::receiveTCINDXCONF(const TcIndxConf * indxConf,
theCommitStatus
=
Committed
;
theGlobalCheckpointId
=
tGCI
;
assert
(
tGCI
);
g
_latest_trans_gci
=
tGCI
;
*
p
_latest_trans_gci
=
tGCI
;
}
else
if
((
tNoComp
>=
tNoSent
)
&&
(
theLastExecOpInList
->
theCommitIndicator
==
1
)){
/**********************************************************************/
...
...
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp
View file @
6bac3b8f
...
...
@@ -35,8 +35,6 @@
#include <EventLogger.hpp>
EventLogger
g_eventLogger
;
static
int
g_run_connect_thread
=
0
;
#include <NdbMutex.h>
#ifdef VM_TRACE
NdbMutex
*
ndb_print_state_mutex
=
NULL
;
...
...
@@ -87,8 +85,9 @@ const char *Ndb_cluster_connection::get_connectstring(char *buf,
pthread_handler_t
run_ndb_cluster_connection_connect_thread
(
void
*
me
)
{
g_run_connect_thread
=
1
;
((
Ndb_cluster_connection_impl
*
)
me
)
->
connect_thread
();
Ndb_cluster_connection_impl
*
connection
=
(
Ndb_cluster_connection_impl
*
)
me
;
connection
->
m_run_connect_thread
=
1
;
connection
->
connect_thread
();
return
me
;
}
...
...
@@ -258,9 +257,6 @@ unsigned Ndb_cluster_connection::get_connect_count() const
return
m_impl
.
get_connect_count
();
}
/*
* Ndb_cluster_connection_impl
*/
...
...
@@ -269,11 +265,17 @@ Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
connect_string
)
:
Ndb_cluster_connection
(
*
this
),
m_optimized_node_selection
(
1
),
m_name
(
0
)
m_name
(
0
),
m_run_connect_thread
(
0
),
m_event_add_drop_mutex
(
0
),
m_latest_trans_gci
(
0
)
{
DBUG_ENTER
(
"Ndb_cluster_connection"
);
DBUG_PRINT
(
"enter"
,(
"Ndb_cluster_connection this=0x%x"
,
this
));
if
(
!
m_event_add_drop_mutex
)
m_event_add_drop_mutex
=
NdbMutex_Create
();
g_eventLogger
.
createConsoleHandler
();
g_eventLogger
.
setCategory
(
"NdbApi"
);
g_eventLogger
.
enable
(
Logger
::
LL_ON
,
Logger
::
LL_ERROR
);
...
...
@@ -301,6 +303,33 @@ Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
}
m_transporter_facade
=
new
TransporterFacade
();
NdbMutex_Lock
(
g_ndb_connection_mutex
);
if
(
g_ndb_connection_count
++
==
0
){
NdbDictionary
::
Column
::
FRAGMENT
=
NdbColumnImpl
::
create_pseudo
(
"NDB$FRAGMENT"
);
NdbDictionary
::
Column
::
FRAGMENT_FIXED_MEMORY
=
NdbColumnImpl
::
create_pseudo
(
"NDB$FRAGMENT_FIXED_MEMORY"
);
NdbDictionary
::
Column
::
FRAGMENT_VARSIZED_MEMORY
=
NdbColumnImpl
::
create_pseudo
(
"NDB$FRAGMENT_VARSIZED_MEMORY"
);
NdbDictionary
::
Column
::
ROW_COUNT
=
NdbColumnImpl
::
create_pseudo
(
"NDB$ROW_COUNT"
);
NdbDictionary
::
Column
::
COMMIT_COUNT
=
NdbColumnImpl
::
create_pseudo
(
"NDB$COMMIT_COUNT"
);
NdbDictionary
::
Column
::
ROW_SIZE
=
NdbColumnImpl
::
create_pseudo
(
"NDB$ROW_SIZE"
);
NdbDictionary
::
Column
::
RANGE_NO
=
NdbColumnImpl
::
create_pseudo
(
"NDB$RANGE_NO"
);
NdbDictionary
::
Column
::
DISK_REF
=
NdbColumnImpl
::
create_pseudo
(
"NDB$DISK_REF"
);
NdbDictionary
::
Column
::
RECORDS_IN_RANGE
=
NdbColumnImpl
::
create_pseudo
(
"NDB$RECORDS_IN_RANGE"
);
NdbDictionary
::
Column
::
ROWID
=
NdbColumnImpl
::
create_pseudo
(
"NDB$ROWID"
);
NdbDictionary
::
Column
::
ROW_GCI
=
NdbColumnImpl
::
create_pseudo
(
"NDB$ROW_GCI"
);
}
NdbMutex_Unlock
(
g_ndb_connection_mutex
);
DBUG_VOID_RETURN
;
}
...
...
@@ -314,7 +343,7 @@ Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
if
(
m_connect_thread
)
{
void
*
status
;
g
_run_connect_thread
=
0
;
m
_run_connect_thread
=
0
;
NdbThread_WaitFor
(
m_connect_thread
,
&
status
);
NdbThread_Destroy
(
&
m_connect_thread
);
m_connect_thread
=
0
;
...
...
@@ -339,6 +368,36 @@ Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
if
(
m_name
)
free
(
m_name
);
NdbMutex_Lock
(
g_ndb_connection_mutex
);
if
(
--
g_ndb_connection_count
==
0
){
delete
NdbDictionary
::
Column
::
FRAGMENT
;
delete
NdbDictionary
::
Column
::
FRAGMENT_FIXED_MEMORY
;
delete
NdbDictionary
::
Column
::
FRAGMENT_VARSIZED_MEMORY
;
delete
NdbDictionary
::
Column
::
ROW_COUNT
;
delete
NdbDictionary
::
Column
::
COMMIT_COUNT
;
delete
NdbDictionary
::
Column
::
ROW_SIZE
;
delete
NdbDictionary
::
Column
::
RANGE_NO
;
delete
NdbDictionary
::
Column
::
DISK_REF
;
delete
NdbDictionary
::
Column
::
RECORDS_IN_RANGE
;
delete
NdbDictionary
::
Column
::
ROWID
;
delete
NdbDictionary
::
Column
::
ROW_GCI
;
NdbDictionary
::
Column
::
FRAGMENT
=
0
;
NdbDictionary
::
Column
::
FRAGMENT_FIXED_MEMORY
=
0
;
NdbDictionary
::
Column
::
FRAGMENT_VARSIZED_MEMORY
=
0
;
NdbDictionary
::
Column
::
ROW_COUNT
=
0
;
NdbDictionary
::
Column
::
COMMIT_COUNT
=
0
;
NdbDictionary
::
Column
::
ROW_SIZE
=
0
;
NdbDictionary
::
Column
::
RANGE_NO
=
0
;
NdbDictionary
::
Column
::
DISK_REF
=
0
;
NdbDictionary
::
Column
::
RECORDS_IN_RANGE
=
0
;
NdbDictionary
::
Column
::
ROWID
=
0
;
NdbDictionary
::
Column
::
ROW_GCI
=
0
;
}
NdbMutex_Unlock
(
g_ndb_connection_mutex
);
if
(
m_event_add_drop_mutex
)
NdbMutex_Destroy
(
m_event_add_drop_mutex
);
DBUG_VOID_RETURN
;
}
...
...
@@ -576,17 +635,23 @@ void Ndb_cluster_connection_impl::connect_thread()
if
(
r
==
-
1
)
{
printf
(
"Ndb_cluster_connection::connect_thread error
\n
"
);
DBUG_ASSERT
(
false
);
g
_run_connect_thread
=
0
;
m
_run_connect_thread
=
0
;
}
else
{
// Wait before making a new connect attempt
NdbSleep_SecSleep
(
1
);
}
}
while
(
g
_run_connect_thread
);
}
while
(
m
_run_connect_thread
);
if
(
m_connect_callback
)
(
*
m_connect_callback
)();
DBUG_VOID_RETURN
;
}
Uint64
*
Ndb_cluster_connection
::
get_latest_trans_gci
()
{
m_impl
.
get_latest_trans_gci
();
}
void
Ndb_cluster_connection
::
init_get_next_node
(
Ndb_cluster_connection_node_iter
&
iter
)
{
...
...
storage/ndb/src/ndbapi/ndb_cluster_connection_impl.hpp
View file @
6bac3b8f
...
...
@@ -20,6 +20,10 @@
#include <ndb_cluster_connection.hpp>
#include <Vector.hpp>
#include <NdbMutex.h>
extern
NdbMutex
*
g_ndb_connection_mutex
;
static
int
g_ndb_connection_count
=
0
;
class
TransporterFacade
;
class
ConfigRetriever
;
...
...
@@ -41,6 +45,9 @@ class Ndb_cluster_connection_impl : public Ndb_cluster_connection
Uint32
get_next_node
(
Ndb_cluster_connection_node_iter
&
iter
);
inline
unsigned
get_connect_count
()
const
;
public:
inline
Uint64
*
get_latest_trans_gci
()
{
return
&
m_latest_trans_gci
;
}
private:
friend
class
Ndb
;
friend
class
NdbImpl
;
...
...
@@ -72,6 +79,9 @@ private:
int
m_optimized_node_selection
;
char
*
m_name
;
int
m_run_connect_thread
;
NdbMutex
*
m_event_add_drop_mutex
;
Uint64
m_latest_trans_gci
;
};
#endif
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment