Commit a959459c authored by unknown's avatar unknown

Merge whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-new-maint

into  whalegate.ndb.mysql.com:/home/tomas/mysql-5.1-single-user

parents 604c12b4 b6a83383
......@@ -2641,6 +2641,16 @@ storage/ndb/lib/libNEWTON_BASICTEST_COMMON.so
storage/ndb/lib/libREP_API.so
storage/ndb/lib/libndbclient.so
storage/ndb/lib/libndbclient_extra.so
storage/ndb/ndbapi-examples/mgmapi_logevent/mgmapi_logevent
storage/ndb/ndbapi-examples/mgmapi_logevent2/mgmapi_logevent2
storage/ndb/ndbapi-examples/ndbapi_async/ndbapi_async
storage/ndb/ndbapi-examples/ndbapi_async1/ndbapi_async1
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event
storage/ndb/ndbapi-examples/ndbapi_retries/ndbapi_retries
storage/ndb/ndbapi-examples/ndbapi_scan/ndbapi_scan
storage/ndb/ndbapi-examples/ndbapi_simple/ndbapi_simple
storage/ndb/ndbapi-examples/ndbapi_simple_dual/ndbapi_simple_dual
storage/ndb/ndbapi-examples/ndbapi_simple_index/ndbapi_simple_index
storage/ndb/src/common/debugger/libtrace.dsp
storage/ndb/src/common/debugger/signaldata/libsignaldataprint.dsp
storage/ndb/src/common/logger/liblogger.dsp
......@@ -2718,6 +2728,8 @@ storage/ndb/test/ndbapi/testDataBuffers
storage/ndb/test/ndbapi/testDeadlock
storage/ndb/test/ndbapi/testDict
storage/ndb/test/ndbapi/testIndex
storage/ndb/test/ndbapi/testIndexStat
storage/ndb/test/ndbapi/testInterpreter
storage/ndb/test/ndbapi/testLcp
storage/ndb/test/ndbapi/testMgm
storage/ndb/test/ndbapi/testNdbApi
......@@ -2753,6 +2765,7 @@ storage/ndb/test/tools/hugoScanRead
storage/ndb/test/tools/hugoScanUpdate
storage/ndb/test/tools/listen_event
storage/ndb/test/tools/ndb_cpcc
storage/ndb/test/tools/rep_latency
storage/ndb/test/tools/restart
storage/ndb/test/tools/verify_index
storage/ndb/tools/ndb_config
......
......@@ -45,7 +45,7 @@ libmysqlsources = errmsg.c get_password.c libmysql.c client.c pack.c \
noinst_HEADERS = embedded_priv.h emb_qcache.h
sqlsources = derror.cc field.cc field_conv.cc strfunc.cc filesort.cc \
ha_ndbcluster.cc \
ha_ndbcluster.cc ha_ndbcluster_cond.cc \
ha_ndbcluster_binlog.cc ha_partition.cc \
handler.cc sql_handler.cc \
hostname.cc init.cc password.c \
......@@ -107,6 +107,9 @@ endif
ha_ndbcluster.o:ha_ndbcluster.cc
$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
ha_ndbcluster_cond.o:ha_ndbcluster_cond.cc
$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
ha_ndbcluster_binlog.o: ha_ndbcluster_binlog.cc
$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
......
......@@ -48,9 +48,9 @@ noinst_HEADERS = item.h item_func.h item_sum.h item_cmpfunc.h \
procedure.h sql_class.h sql_lex.h sql_list.h \
sql_map.h sql_string.h unireg.h \
sql_error.h field.h handler.h mysqld_suffix.h \
ha_partition.h \
ha_ndbcluster.h ha_ndbcluster_binlog.h \
ha_ndbcluster_tables.h rpl_constants.h \
ha_ndbcluster.h ha_ndbcluster_cond.h \
ha_ndbcluster_binlog.h ha_ndbcluster_tables.h \
ha_partition.h rpl_constants.h \
opt_range.h protocol.h rpl_tblmap.h rpl_utility.h \
log.h sql_show.h rpl_rli.h rpl_mi.h \
sql_select.h structs.h table.h sql_udf.h hash_filo.h \
......@@ -92,8 +92,8 @@ mysqld_SOURCES = sql_lex.cc sql_handler.cc sql_partition.cc \
log_event_old.cc rpl_record_old.cc \
discover.cc time.cc opt_range.cc opt_sum.cc \
records.cc filesort.cc handler.cc \
ha_partition.cc \
ha_ndbcluster.cc ha_ndbcluster_binlog.cc \
ha_ndbcluster.cc ha_ndbcluster_cond.cc \
ha_ndbcluster_binlog.cc ha_partition.cc \
sql_db.cc sql_table.cc sql_rename.cc sql_crypt.cc \
sql_load.cc mf_iocache.cc field_conv.cc sql_show.cc \
sql_udf.cc sql_analyse.cc sql_analyse.h sql_cache.cc \
......@@ -156,10 +156,13 @@ lex_hash.h: gen_lex_hash.cc lex.h
./gen_lex_hash$(EXEEXT) > $@-t
$(MV) $@-t $@
# the following three should eventually be moved out of this directory
# the following four should eventually be moved out of this directory
ha_ndbcluster.o:ha_ndbcluster.cc ha_ndbcluster.h
$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
ha_ndbcluster_cond.o:ha_ndbcluster_cond.cc ha_ndbcluster_cond.h
$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
ha_ndbcluster_binlog.o:ha_ndbcluster_binlog.cc ha_ndbcluster_binlog.h
$(CXXCOMPILE) @ndbcluster_includes@ $(LM_CFLAGS) -c $<
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -2917,11 +2917,10 @@ Dbtup::nr_update_gci(Uint32 fragPtrI, const Local_key* key, Uint32 gci)
int ret;
if (tablePtr.p->m_attributes[MM].m_no_of_varsize)
{
tablePtr.p->m_offsets[MM].m_fix_header_size +=
Tuple_header::HeaderSize+1;
const Uint32 XXX = Tuple_header::HeaderSize+Var_part_ref::SZ32;
tablePtr.p->m_offsets[MM].m_fix_header_size += XXX;
ret = alloc_page(tablePtr.p, fragPtr.p, &page_ptr, tmp.m_page_no);
tablePtr.p->m_offsets[MM].m_fix_header_size -=
Tuple_header::HeaderSize+1;
tablePtr.p->m_offsets[MM].m_fix_header_size -= XXX;
}
else
{
......
......@@ -27,15 +27,34 @@ public:
int getDbNodeId(int _i);
enum RestartFlags {
NRRF_INITIAL = 0x1,
NRRF_NOSTART = 0x2,
NRRF_ABORT = 0x4
};
int restartOneDbNode(int _nodeId,
bool initial = false,
bool nostart = false,
bool abort = false);
int restartOneDbNode2(int _nodeId, Uint32 flags){
return restartOneDbNode(_nodeId,
flags & NRRF_INITIAL,
flags & NRRF_NOSTART,
flags & NRRF_ABORT);
}
int restartAll(bool initial = false,
bool nostart = false,
bool abort = false);
int restartAll2(Uint32 flags){
return restartAll(flags & NRRF_INITIAL,
flags & NRRF_NOSTART,
flags & NRRF_ABORT);
}
int startAll();
int startNodes(const int * _nodes, int _num_nodes);
int waitClusterStarted(unsigned int _timeout = 120);
......
......@@ -1567,6 +1567,72 @@ runBug27466(NDBT_Context* ctx, NDBT_Step* step)
return NDBT_OK;
}
int
runBug28023(NDBT_Context* ctx, NDBT_Step* step)
{
int result = NDBT_OK;
int loops = ctx->getNumLoops();
int records = ctx->getNumRecords();
Ndb* pNdb = GETNDB(step);
NdbRestarter res;
if (res.getNumDbNodes() < 2)
{
return NDBT_OK;
}
HugoTransactions hugoTrans(*ctx->getTab());
if (hugoTrans.loadTable(pNdb, records) != 0){
return NDBT_FAILED;
}
if (hugoTrans.clearTable(pNdb, records) != 0)
{
return NDBT_FAILED;
}
for (Uint32 i = 0; i<loops; i++)
{
int node1 = res.getDbNodeId(rand() % res.getNumDbNodes());
if (res.restartOneDbNode2(node1,
NdbRestarter::NRRF_ABORT |
NdbRestarter::NRRF_NOSTART))
return NDBT_FAILED;
if (res.waitNodesNoStart(&node1, 1))
return NDBT_FAILED;
if (hugoTrans.loadTable(pNdb, records) != 0){
return NDBT_FAILED;
}
if (hugoTrans.clearTable(pNdb, records) != 0)
{
return NDBT_FAILED;
}
res.startNodes(&node1, 1);
if (res.waitClusterStarted())
return NDBT_FAILED;
if (hugoTrans.loadTable(pNdb, records) != 0){
return NDBT_FAILED;
}
if (hugoTrans.scanUpdateRecords(pNdb, records) != 0)
return NDBT_FAILED;
if (hugoTrans.clearTable(pNdb, records) != 0)
{
return NDBT_FAILED;
}
}
return NDBT_OK;
}
NDBT_TESTSUITE(testNodeRestart);
TESTCASE("NoLoad",
"Test that one node at a time can be stopped and then restarted "\
......@@ -1924,6 +1990,9 @@ TESTCASE("Bug27283", ""){
TESTCASE("Bug27466", ""){
INITIALIZER(runBug27466);
}
TESTCASE("Bug28023", ""){
INITIALIZER(runBug28023);
}
NDBT_TESTSUITE_END(testNodeRestart);
int main(int argc, const char** argv){
......
......@@ -549,6 +549,10 @@ max-time: 1000
cmd: testNodeRestart
args: -n Bug26481 T1
max-time: 1000
cmd: testNodeRestart
args: -n Bug28023 T7 D2
#
# DICT TESTS
max-time: 1500
......
......@@ -1146,7 +1146,7 @@ HugoTransactions::pkInterpretedUpdateRecords(Ndb* pNdb,
}
// PKs
if (equalForRow(pOp, r) != 0)
if (equalForRow(pUpdOp, r) != 0)
{
closeTransaction(pNdb);
return NDBT_FAILED;
......@@ -1714,7 +1714,7 @@ HugoTransactions::indexUpdateRecords(Ndb* pNdb,
if(!ordered)
{
if (equalForRow(pOp, r+b) != 0)
if (equalForRow(pUpdOp, r+b) != 0)
{
closeTransaction(pNdb);
return NDBT_FAILED;
......
......@@ -13,7 +13,7 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
ndbtest_PROGRAMS = hugoLoad hugoFill hugoLockRecords hugoPkDelete hugoPkRead hugoPkReadRecord hugoPkUpdate hugoScanRead hugoScanUpdate restart verify_index copy_tab create_index ndb_cpcc listen_event eventlog
ndbtest_PROGRAMS = hugoLoad hugoFill hugoLockRecords hugoPkDelete hugoPkRead hugoPkReadRecord hugoPkUpdate hugoScanRead hugoScanUpdate restart verify_index copy_tab create_index ndb_cpcc listen_event eventlog rep_latency
# transproxy
......@@ -34,6 +34,7 @@ create_index_SOURCES = create_index.cpp
ndb_cpcc_SOURCES = cpcc.cpp
listen_event_SOURCES = listen.cpp
eventlog_SOURCES = log_listner.cpp
rep_latency_SOURCES = rep_latency.cpp
include $(top_srcdir)/storage/ndb/config/common.mk.am
include $(top_srcdir)/storage/ndb/config/type_ndbapitest.mk.am
......
......@@ -22,6 +22,128 @@
#include <getarg.h>
#define BATCH_SIZE 128
struct Table_info
{
Uint32 id;
};
struct Trans_arg
{
Ndb *ndb;
NdbTransaction *trans;
Uint32 bytes_batched;
};
Vector< Vector<NdbRecAttr*> > event_values;
Vector< Vector<NdbRecAttr*> > event_pre_values;
Vector<struct Table_info> table_infos;
static void do_begin(Ndb *ndb, struct Trans_arg &trans_arg)
{
trans_arg.ndb = ndb;
trans_arg.trans = ndb->startTransaction();
trans_arg.bytes_batched = 0;
}
static void do_equal(NdbOperation *op,
NdbEventOperation *pOp)
{
struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
Vector<NdbRecAttr*> &ev = event_values[ti->id];
const NdbDictionary::Table *tab= pOp->getTable();
unsigned i, n_columns = tab->getNoOfColumns();
for (i= 0; i < n_columns; i++)
{
if (tab->getColumn(i)->getPrimaryKey() &&
op->equal(i, ev[i]->aRef()))
{
abort();
}
}
}
static void do_set_value(NdbOperation *op,
NdbEventOperation *pOp)
{
struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
Vector<NdbRecAttr*> &ev = event_values[ti->id];
const NdbDictionary::Table *tab= pOp->getTable();
unsigned i, n_columns = tab->getNoOfColumns();
for (i= 0; i < n_columns; i++)
{
if (!tab->getColumn(i)->getPrimaryKey() &&
op->setValue(i, ev[i]->aRef()))
{
abort();
}
}
}
static void do_insert(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
{
if (!trans_arg.trans)
return;
NdbOperation *op =
trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
op->writeTuple();
do_equal(op, pOp);
do_set_value(op, pOp);
trans_arg.bytes_batched++;
if (trans_arg.bytes_batched > BATCH_SIZE)
{
trans_arg.trans->execute(NdbTransaction::NoCommit);
trans_arg.bytes_batched = 0;
}
}
static void do_update(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
{
if (!trans_arg.trans)
return;
NdbOperation *op =
trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
op->writeTuple();
do_equal(op, pOp);
do_set_value(op, pOp);
trans_arg.bytes_batched++;
if (trans_arg.bytes_batched > BATCH_SIZE)
{
trans_arg.trans->execute(NdbTransaction::NoCommit);
trans_arg.bytes_batched = 0;
}
}
static void do_delete(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
{
if (!trans_arg.trans)
return;
NdbOperation *op =
trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
op->deleteTuple();
do_equal(op, pOp);
trans_arg.bytes_batched++;
if (trans_arg.bytes_batched > BATCH_SIZE)
{
trans_arg.trans->execute(NdbTransaction::NoCommit);
trans_arg.bytes_batched = 0;
}
}
static void do_commit(struct Trans_arg &trans_arg)
{
if (!trans_arg.trans)
return;
trans_arg.trans->execute(NdbTransaction::Commit);
trans_arg.ndb->closeTransaction(trans_arg.trans);
}
int
main(int argc, const char** argv){
ndb_init();
......@@ -29,8 +151,14 @@ main(int argc, const char** argv){
int _help = 0;
const char* db = 0;
const char* connectstring1 = 0;
const char* connectstring2 = 0;
struct getargs args[] = {
{ "connectstring1", 'c',
arg_string, &connectstring1, "connectstring1", "" },
{ "connectstring2", 'C',
arg_string, &connectstring2, "connectstring2", "" },
{ "database", 'd', arg_string, &db, "Database", "" },
{ "usage", '?', arg_flag, &_help, "Print help", "" }
};
......@@ -46,7 +174,7 @@ main(int argc, const char** argv){
}
// Connect to Ndb
Ndb_cluster_connection con;
Ndb_cluster_connection con(connectstring1);
if(con.connect(12, 5, 1) != 0)
{
return NDBT_ProgramExit(NDBT_FAILED);
......@@ -61,12 +189,35 @@ main(int argc, const char** argv){
// Connect to Ndb and wait for it to become ready
while(MyNdb.waitUntilReady() != 0)
ndbout << "Waiting for ndb to become ready..." << endl;
Ndb_cluster_connection *con2 = NULL;
Ndb *ndb2 = NULL;
if (connectstring2)
{
con2 = new Ndb_cluster_connection(connectstring2);
if(con2->connect(12, 5, 1) != 0)
{
return NDBT_ProgramExit(NDBT_FAILED);
}
ndb2 = new Ndb( con2, db ? db : "TEST_DB" );
if(ndb2->init() != 0){
ERR(ndb2->getNdbError());
return NDBT_ProgramExit(NDBT_FAILED);
}
// Connect to Ndb and wait for it to become ready
while(ndb2->waitUntilReady() != 0)
ndbout << "Waiting for ndb to become ready..." << endl;
}
int result = 0;
NdbDictionary::Dictionary *myDict = MyNdb.getDictionary();
Vector<NdbDictionary::Event*> events;
Vector<NdbEventOperation*> event_ops;
int sz = 0;
for(i= optind; i<argc; i++)
{
const NdbDictionary::Table* table= myDict->getTable(argv[i]);
......@@ -121,12 +272,23 @@ main(int argc, const char** argv){
goto end;
}
event_values.push_back(Vector<NdbRecAttr *>());
event_pre_values.push_back(Vector<NdbRecAttr *>());
for (int a = 0; a < table->getNoOfColumns(); a++)
{
pOp->getValue(table->getColumn(a)->getName());
pOp->getPreValue(table->getColumn(a)->getName());
event_values[sz].
push_back(pOp->getValue(table->getColumn(a)->getName()));
event_pre_values[sz].
push_back(pOp->getPreValue(table->getColumn(a)->getName()));
}
event_ops.push_back(pOp);
{
struct Table_info ti;
ti.id = sz;
table_infos.push_back(ti);
}
pOp->setCustomData((void *)&table_infos[sz]);
sz++;
}
for(i= 0; i<(int)event_ops.size(); i++)
......@@ -140,6 +302,7 @@ main(int argc, const char** argv){
}
}
struct Trans_arg trans_arg;
while(true)
{
while(MyNdb.pollEvents(100) == 0);
......@@ -149,18 +312,26 @@ main(int argc, const char** argv){
{
Uint64 gci= pOp->getGCI();
Uint64 cnt_i= 0, cnt_u= 0, cnt_d= 0;
if (ndb2)
do_begin(ndb2, trans_arg);
do
{
switch(pOp->getEventType())
{
case NdbDictionary::Event::TE_INSERT:
cnt_i++;
if (ndb2)
do_insert(trans_arg, pOp);
break;
case NdbDictionary::Event::TE_DELETE:
cnt_d++;
if (ndb2)
do_delete(trans_arg, pOp);
break;
case NdbDictionary::Event::TE_UPDATE:
cnt_u++;
if (ndb2)
do_update(trans_arg, pOp);
break;
case NdbDictionary::Event::TE_CLUSTER_FAILURE:
break;
......@@ -180,6 +351,8 @@ main(int argc, const char** argv){
abort();
}
} while ((pOp= MyNdb.nextEvent()) && gci == pOp->getGCI());
if (ndb2)
do_commit(trans_arg);
ndbout_c("GCI: %lld events: %lld(I) %lld(U) %lld(D)", gci, cnt_i, cnt_u, cnt_d);
}
}
......@@ -187,8 +360,15 @@ main(int argc, const char** argv){
for(i= 0; i<(int)event_ops.size(); i++)
MyNdb.dropEventOperation(event_ops[i]);
if (ndb2)
delete ndb2;
if (con2)
delete con2;
return NDBT_ProgramExit(NDBT_OK);
}
template class Vector<struct Table_info>;
template class Vector<NdbRecAttr*>;
template class Vector< Vector<NdbRecAttr*> >;
template class Vector<NdbDictionary::Event*>;
template class Vector<NdbEventOperation*>;
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
* Update on master wait for update on slave
*
*/
#include <NdbApi.hpp>
#include <NdbSleep.h>
#include <sys/time.h>
#include <NdbOut.hpp>
#include <NDBT.hpp>
struct Xxx
{
Ndb *ndb;
const NdbDictionary::Table *table;
Uint32 pk_col;
Uint32 col;
};
struct XxxR
{
Uint32 pk_val;
Uint32 val;
struct timeval start_time;
Uint32 latency;
};
static int
prepare_master_or_slave(Ndb &myNdb,
const char* table,
const char* pk,
Uint32 pk_val,
const char* col,
struct Xxx &xxx,
struct XxxR &xxxr);
static void
run_master_update(struct Xxx &xxx, struct XxxR &xxxr);
static void
run_slave_wait(struct Xxx &xxx, struct XxxR &xxxr);
#define PRINT_ERROR(code,msg) \
g_err << "Error in " << __FILE__ << ", line: " << __LINE__ \
<< ", code: " << code \
<< ", msg: " << msg << ".\n"
#define APIERROR(error) { \
PRINT_ERROR((error).code, (error).message); \
exit(-1); }
int main(int argc, char** argv)
{
if (argc != 8)
{
ndbout << "Arguments are <connect_string cluster 1> <connect_string cluster 2> <database> <table name> <primary key> <value of primary key> <attribute to update>.\n";
exit(-1);
}
// ndb_init must be called first
ndb_init();
{
const char *opt_connectstring1 = argv[1];
const char *opt_connectstring2 = argv[2];
const char *opt_db = argv[3];
const char *opt_table = argv[4];
const char *opt_pk = argv[5];
const Uint32 opt_pk_val = atoi(argv[6]);
const char *opt_col = argv[7];
// Object representing the cluster 1
Ndb_cluster_connection cluster1_connection(opt_connectstring1);
// Object representing the cluster 2
Ndb_cluster_connection cluster2_connection(opt_connectstring2);
// connect cluster 1 and run application
// Connect to cluster 1 management server (ndb_mgmd)
if (cluster1_connection.connect(4 /* retries */,
5 /* delay between retries */,
1 /* verbose */))
{
g_err << "Cluster 1 management server was not ready within 30 secs.\n";
exit(-1);
}
// Optionally connect and wait for the storage nodes (ndbd's)
if (cluster1_connection.wait_until_ready(30,0) < 0)
{
g_err << "Cluster 1 was not ready within 30 secs.\n";
exit(-1);
}
// connect cluster 2 and run application
// Connect to cluster management server (ndb_mgmd)
if (cluster2_connection.connect(4 /* retries */,
5 /* delay between retries */,
1 /* verbose */))
{
g_err << "Cluster 2 management server was not ready within 30 secs.\n";
exit(-1);
}
// Optionally connect and wait for the storage nodes (ndbd's)
if (cluster2_connection.wait_until_ready(30,0) < 0)
{
g_err << "Cluster 2 was not ready within 30 secs.\n";
exit(-1);
}
// Object representing the database
Ndb myNdb1(&cluster1_connection, opt_db);
Ndb myNdb2(&cluster2_connection, opt_db);
//
struct Xxx xxx1;
struct Xxx xxx2;
struct XxxR xxxr;
prepare_master_or_slave(myNdb1, opt_table, opt_pk, opt_pk_val, opt_col,
xxx1, xxxr);
prepare_master_or_slave(myNdb2, opt_table, opt_pk, opt_pk_val, opt_col,
xxx2, xxxr);
while (1)
{
// run the application code
run_master_update(xxx1, xxxr);
run_slave_wait(xxx2, xxxr);
ndbout << "latency: " << xxxr.latency << endl;
}
}
// Note: all connections must have been destroyed before calling ndb_end()
ndb_end(0);
return 0;
}
static int
prepare_master_or_slave(Ndb &myNdb,
const char* table,
const char* pk,
Uint32 pk_val,
const char* col,
struct Xxx &xxx,
struct XxxR &xxxr)
{
if (myNdb.init())
APIERROR(myNdb.getNdbError());
const NdbDictionary::Dictionary* myDict = myNdb.getDictionary();
const NdbDictionary::Table *myTable = myDict->getTable(table);
if (myTable == NULL)
APIERROR(myDict->getNdbError());
const NdbDictionary::Column *myPkCol = myTable->getColumn(pk);
if (myPkCol == NULL)
APIERROR(myDict->getNdbError());
if (myPkCol->getType() != NdbDictionary::Column::Unsigned)
{
PRINT_ERROR(0, "Primary key column not of type unsigned");
exit(-1);
}
const NdbDictionary::Column *myCol = myTable->getColumn(col);
if (myCol == NULL)
APIERROR(myDict->getNdbError());
if (myCol->getType() != NdbDictionary::Column::Unsigned)
{
PRINT_ERROR(0, "Update column not of type unsigned");
exit(-1);
}
xxx.ndb = &myNdb;
xxx.table = myTable;
xxx.pk_col = myPkCol->getColumnNo();
xxx.col = myCol->getColumnNo();
xxxr.pk_val = pk_val;
return 0;
}
static void run_master_update(struct Xxx &xxx, struct XxxR &xxxr)
{
Ndb *ndb = xxx.ndb;
const NdbDictionary::Table *myTable = xxx.table;
int retry_sleep= 10; /* 10 milliseconds */
int retries= 100;
while (1)
{
Uint32 val;
NdbTransaction *trans = ndb->startTransaction();
if (trans == NULL)
goto err;
{
NdbOperation *op = trans->getNdbOperation(myTable);
if (op == NULL)
APIERROR(trans->getNdbError());
op->readTupleExclusive();
op->equal(xxx.pk_col, xxxr.pk_val);
op->getValue(xxx.col, (char *)&val);
}
if (trans->execute(NdbTransaction::NoCommit))
goto err;
//fprintf(stderr, "read %u\n", val);
xxxr.val = val + 1;
{
NdbOperation *op = trans->getNdbOperation(myTable);
if (op == NULL)
APIERROR(trans->getNdbError());
op->updateTuple();
op->equal(xxx.pk_col, xxxr.pk_val);
op->setValue(xxx.col, xxxr.val);
}
if (trans->execute(NdbTransaction::Commit))
goto err;
ndb->closeTransaction(trans);
//fprintf(stderr, "updated to %u\n", xxxr.val);
break;
err:
const NdbError this_error= trans ?
trans->getNdbError() : ndb->getNdbError();
if (this_error.status == NdbError::TemporaryError)
{
if (retries--)
{
if (trans)
ndb->closeTransaction(trans);
NdbSleep_MilliSleep(retry_sleep);
continue; // retry
}
}
if (trans)
ndb->closeTransaction(trans);
APIERROR(this_error);
}
/* update done start timer */
gettimeofday(&xxxr.start_time, 0);
}
static void run_slave_wait(struct Xxx &xxx, struct XxxR &xxxr)
{
struct timeval old_end_time = xxxr.start_time, end_time;
Ndb *ndb = xxx.ndb;
const NdbDictionary::Table *myTable = xxx.table;
int retry_sleep= 10; /* 10 milliseconds */
int retries= 100;
while (1)
{
Uint32 val;
NdbTransaction *trans = ndb->startTransaction();
if (trans == NULL)
goto err;
{
NdbOperation *op = trans->getNdbOperation(myTable);
if (op == NULL)
APIERROR(trans->getNdbError());
op->readTuple();
op->equal(xxx.pk_col, xxxr.pk_val);
op->getValue(xxx.col, (char *)&val);
if (trans->execute(NdbTransaction::Commit))
goto err;
}
/* read done, check time of read */
gettimeofday(&end_time, 0);
ndb->closeTransaction(trans);
//fprintf(stderr, "read %u waiting for %u\n", val, xxxr.val);
if (xxxr.val != val)
{
/* expected value not received yet */
retries = 100;
NdbSleep_MilliSleep(retry_sleep);
old_end_time = end_time;
continue;
}
break;
err:
const NdbError this_error= trans ?
trans->getNdbError() : ndb->getNdbError();
if (this_error.status == NdbError::TemporaryError)
{
if (retries--)
{
if (trans)
ndb->closeTransaction(trans);
NdbSleep_MilliSleep(retry_sleep);
continue; // retry
}
}
if (trans)
ndb->closeTransaction(trans);
APIERROR(this_error);
}
Int64 elapsed_usec1 =
((Int64)end_time.tv_sec - (Int64)xxxr.start_time.tv_sec)*1000*1000 +
((Int64)end_time.tv_usec - (Int64)xxxr.start_time.tv_usec);
Int64 elapsed_usec2 =
((Int64)end_time.tv_sec - (Int64)old_end_time.tv_sec)*1000*1000 +
((Int64)end_time.tv_usec - (Int64)old_end_time.tv_usec);
xxxr.latency =
((elapsed_usec1 - elapsed_usec2/2)+999)/1000;
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment