Commit b0222c91 authored by unknown's avatar unknown

simple extend of listen_event to do apply on remote cluster

parent cc675f5e
...@@ -22,6 +22,128 @@ ...@@ -22,6 +22,128 @@
#include <getarg.h> #include <getarg.h>
#define BATCH_SIZE 128
struct Table_info
{
Uint32 id;
};
struct Trans_arg
{
Ndb *ndb;
NdbTransaction *trans;
Uint32 bytes_batched;
};
Vector< Vector<NdbRecAttr*> > event_values;
Vector< Vector<NdbRecAttr*> > event_pre_values;
Vector<struct Table_info> table_infos;
static void do_begin(Ndb *ndb, struct Trans_arg &trans_arg)
{
trans_arg.ndb = ndb;
trans_arg.trans = ndb->startTransaction();
trans_arg.bytes_batched = 0;
}
static void do_equal(NdbOperation *op,
NdbEventOperation *pOp)
{
struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
Vector<NdbRecAttr*> &ev = event_values[ti->id];
const NdbDictionary::Table *tab= pOp->getTable();
unsigned i, n_columns = tab->getNoOfColumns();
for (i= 0; i < n_columns; i++)
{
if (tab->getColumn(i)->getPrimaryKey() &&
op->equal(i, ev[i]->aRef()))
{
abort();
}
}
}
static void do_set_value(NdbOperation *op,
NdbEventOperation *pOp)
{
struct Table_info *ti = (struct Table_info *)pOp->getCustomData();
Vector<NdbRecAttr*> &ev = event_values[ti->id];
const NdbDictionary::Table *tab= pOp->getTable();
unsigned i, n_columns = tab->getNoOfColumns();
for (i= 0; i < n_columns; i++)
{
if (!tab->getColumn(i)->getPrimaryKey() &&
op->setValue(i, ev[i]->aRef()))
{
abort();
}
}
}
static void do_insert(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
{
if (!trans_arg.trans)
return;
NdbOperation *op =
trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
op->writeTuple();
do_equal(op, pOp);
do_set_value(op, pOp);
trans_arg.bytes_batched++;
if (trans_arg.bytes_batched > BATCH_SIZE)
{
trans_arg.trans->execute(NdbTransaction::NoCommit);
trans_arg.bytes_batched = 0;
}
}
static void do_update(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
{
if (!trans_arg.trans)
return;
NdbOperation *op =
trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
op->writeTuple();
do_equal(op, pOp);
do_set_value(op, pOp);
trans_arg.bytes_batched++;
if (trans_arg.bytes_batched > BATCH_SIZE)
{
trans_arg.trans->execute(NdbTransaction::NoCommit);
trans_arg.bytes_batched = 0;
}
}
static void do_delete(struct Trans_arg &trans_arg, NdbEventOperation *pOp)
{
if (!trans_arg.trans)
return;
NdbOperation *op =
trans_arg.trans->getNdbOperation(pOp->getEvent()->getTableName());
op->deleteTuple();
do_equal(op, pOp);
trans_arg.bytes_batched++;
if (trans_arg.bytes_batched > BATCH_SIZE)
{
trans_arg.trans->execute(NdbTransaction::NoCommit);
trans_arg.bytes_batched = 0;
}
}
static void do_commit(struct Trans_arg &trans_arg)
{
if (!trans_arg.trans)
return;
trans_arg.trans->execute(NdbTransaction::Commit);
trans_arg.ndb->closeTransaction(trans_arg.trans);
}
int int
main(int argc, const char** argv){ main(int argc, const char** argv){
ndb_init(); ndb_init();
...@@ -29,8 +151,14 @@ main(int argc, const char** argv){ ...@@ -29,8 +151,14 @@ main(int argc, const char** argv){
int _help = 0; int _help = 0;
const char* db = 0; const char* db = 0;
const char* connectstring1 = 0;
const char* connectstring2 = 0;
struct getargs args[] = { struct getargs args[] = {
{ "connectstring1", 'c',
arg_string, &connectstring1, "connectstring1", "" },
{ "connectstring2", 'C',
arg_string, &connectstring2, "connectstring2", "" },
{ "database", 'd', arg_string, &db, "Database", "" }, { "database", 'd', arg_string, &db, "Database", "" },
{ "usage", '?', arg_flag, &_help, "Print help", "" } { "usage", '?', arg_flag, &_help, "Print help", "" }
}; };
...@@ -46,7 +174,7 @@ main(int argc, const char** argv){ ...@@ -46,7 +174,7 @@ main(int argc, const char** argv){
} }
// Connect to Ndb // Connect to Ndb
Ndb_cluster_connection con; Ndb_cluster_connection con(connectstring1);
if(con.connect(12, 5, 1) != 0) if(con.connect(12, 5, 1) != 0)
{ {
return NDBT_ProgramExit(NDBT_FAILED); return NDBT_ProgramExit(NDBT_FAILED);
...@@ -61,12 +189,35 @@ main(int argc, const char** argv){ ...@@ -61,12 +189,35 @@ main(int argc, const char** argv){
// Connect to Ndb and wait for it to become ready // Connect to Ndb and wait for it to become ready
while(MyNdb.waitUntilReady() != 0) while(MyNdb.waitUntilReady() != 0)
ndbout << "Waiting for ndb to become ready..." << endl; ndbout << "Waiting for ndb to become ready..." << endl;
Ndb_cluster_connection *con2 = NULL;
Ndb *ndb2 = NULL;
if (connectstring2)
{
con2 = new Ndb_cluster_connection(connectstring2);
if(con2->connect(12, 5, 1) != 0)
{
return NDBT_ProgramExit(NDBT_FAILED);
}
ndb2 = new Ndb( con2, db ? db : "TEST_DB" );
if(ndb2->init() != 0){
ERR(ndb2->getNdbError());
return NDBT_ProgramExit(NDBT_FAILED);
}
// Connect to Ndb and wait for it to become ready
while(ndb2->waitUntilReady() != 0)
ndbout << "Waiting for ndb to become ready..." << endl;
}
int result = 0; int result = 0;
NdbDictionary::Dictionary *myDict = MyNdb.getDictionary(); NdbDictionary::Dictionary *myDict = MyNdb.getDictionary();
Vector<NdbDictionary::Event*> events; Vector<NdbDictionary::Event*> events;
Vector<NdbEventOperation*> event_ops; Vector<NdbEventOperation*> event_ops;
int sz = 0;
for(i= optind; i<argc; i++) for(i= optind; i<argc; i++)
{ {
const NdbDictionary::Table* table= myDict->getTable(argv[i]); const NdbDictionary::Table* table= myDict->getTable(argv[i]);
...@@ -121,12 +272,23 @@ main(int argc, const char** argv){ ...@@ -121,12 +272,23 @@ main(int argc, const char** argv){
goto end; goto end;
} }
event_values.push_back(Vector<NdbRecAttr *>());
event_pre_values.push_back(Vector<NdbRecAttr *>());
for (int a = 0; a < table->getNoOfColumns(); a++) for (int a = 0; a < table->getNoOfColumns(); a++)
{ {
pOp->getValue(table->getColumn(a)->getName()); event_values[sz].
pOp->getPreValue(table->getColumn(a)->getName()); push_back(pOp->getValue(table->getColumn(a)->getName()));
event_pre_values[sz].
push_back(pOp->getPreValue(table->getColumn(a)->getName()));
} }
event_ops.push_back(pOp); event_ops.push_back(pOp);
{
struct Table_info ti;
ti.id = sz;
table_infos.push_back(ti);
}
pOp->setCustomData((void *)&table_infos[sz]);
sz++;
} }
for(i= 0; i<(int)event_ops.size(); i++) for(i= 0; i<(int)event_ops.size(); i++)
...@@ -140,6 +302,7 @@ main(int argc, const char** argv){ ...@@ -140,6 +302,7 @@ main(int argc, const char** argv){
} }
} }
struct Trans_arg trans_arg;
while(true) while(true)
{ {
while(MyNdb.pollEvents(100) == 0); while(MyNdb.pollEvents(100) == 0);
...@@ -149,18 +312,26 @@ main(int argc, const char** argv){ ...@@ -149,18 +312,26 @@ main(int argc, const char** argv){
{ {
Uint64 gci= pOp->getGCI(); Uint64 gci= pOp->getGCI();
Uint64 cnt_i= 0, cnt_u= 0, cnt_d= 0; Uint64 cnt_i= 0, cnt_u= 0, cnt_d= 0;
if (ndb2)
do_begin(ndb2, trans_arg);
do do
{ {
switch(pOp->getEventType()) switch(pOp->getEventType())
{ {
case NdbDictionary::Event::TE_INSERT: case NdbDictionary::Event::TE_INSERT:
cnt_i++; cnt_i++;
if (ndb2)
do_insert(trans_arg, pOp);
break; break;
case NdbDictionary::Event::TE_DELETE: case NdbDictionary::Event::TE_DELETE:
cnt_d++; cnt_d++;
if (ndb2)
do_delete(trans_arg, pOp);
break; break;
case NdbDictionary::Event::TE_UPDATE: case NdbDictionary::Event::TE_UPDATE:
cnt_u++; cnt_u++;
if (ndb2)
do_update(trans_arg, pOp);
break; break;
case NdbDictionary::Event::TE_CLUSTER_FAILURE: case NdbDictionary::Event::TE_CLUSTER_FAILURE:
break; break;
...@@ -180,12 +351,21 @@ main(int argc, const char** argv){ ...@@ -180,12 +351,21 @@ main(int argc, const char** argv){
abort(); abort();
} }
} while ((pOp= MyNdb.nextEvent()) && gci == pOp->getGCI()); } while ((pOp= MyNdb.nextEvent()) && gci == pOp->getGCI());
if (ndb2)
do_commit(trans_arg);
ndbout_c("GCI: %lld events: %lld(I) %lld(U) %lld(D)", gci, cnt_i, cnt_u, cnt_d); ndbout_c("GCI: %lld events: %lld(I) %lld(U) %lld(D)", gci, cnt_i, cnt_u, cnt_d);
} }
} }
end: end:
if (ndb2)
delete ndb2;
if (con2)
delete con2;
return NDBT_ProgramExit(NDBT_OK); return NDBT_ProgramExit(NDBT_OK);
} }
template class Vector<struct Table_info>;
template class Vector<NdbRecAttr*>;
template class Vector< Vector<NdbRecAttr*> >;
template class Vector<NdbDictionary::Event*>; template class Vector<NdbDictionary::Event*>;
template class Vector<NdbEventOperation*>; template class Vector<NdbEventOperation*>;
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment