Commit c1bce74b authored by unknown's avatar unknown

merge error

parent 81127711
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <mysql.h>
#include <ndbapi/NdbApi.hpp>
#include <mgmapi.h>
#include <stdio.h>
/*
* export LD_LIBRARY_PATH=../../../libmysql_r/.libs:../../../ndb/src/.libs
*/
#define MGMERROR(h) \
{ \
fprintf(stderr, "code: %d msg: %s\n", \
ndb_mgm_get_latest_error(h), \
ndb_mgm_get_latest_error_msg(h)); \
exit(-1); \
}
#define LOGEVENTERROR(h) \
{ \
fprintf(stderr, "code: %d msg: %s\n", \
ndb_logevent_get_latest_error(h), \
ndb_logevent_get_latest_error_msg(h)); \
exit(-1); \
}
int main()
{
NdbMgmHandle h;
NdbLogEventHandle le;
int filter[] = { 15, NDB_MGM_EVENT_CATEGORY_BACKUP,
15, NDB_MGM_EVENT_CATEGORY_CONNECTION,
15, NDB_MGM_EVENT_CATEGORY_NODE_RESTART,
15, NDB_MGM_EVENT_CATEGORY_STARTUP,
15, NDB_MGM_EVENT_CATEGORY_ERROR,
0 };
struct ndb_logevent event;
ndb_init();
h= ndb_mgm_create_handle();
if ( h == 0)
{
printf("Unable to create handle\n");
exit(-1);
}
if (ndb_mgm_connect(h,0,0,0)) MGMERROR(h);
le= ndb_mgm_create_logevent_handle(h, filter);
if ( le == 0 ) MGMERROR(h);
while (1)
{
int timeout= 5000;
int r= ndb_logevent_get_next(le,&event,timeout);
if (r == 0)
printf("No event within %d milliseconds\n", timeout);
else if (r < 0)
LOGEVENTERROR(le)
else
{
switch (event.type) {
case NDB_LE_BackupStarted:
printf("Node %d: BackupStarted\n", event.source_nodeid);
printf(" Starting node ID: %d\n", event.BackupStarted.starting_node);
printf(" Backup ID: %d\n", event.BackupStarted.backup_id);
break;
case NDB_LE_BackupCompleted:
printf("Node %d: BackupCompleted\n", event.source_nodeid);
printf(" Backup ID: %d\n", event.BackupStarted.backup_id);
break;
case NDB_LE_BackupAborted:
printf("Node %d: BackupAborted\n", event.source_nodeid);
break;
case NDB_LE_BackupFailedToStart:
printf("Node %d: BackupFailedToStart\n", event.source_nodeid);
break;
case NDB_LE_NodeFailCompleted:
printf("Node %d: NodeFailCompleted\n", event.source_nodeid);
break;
case NDB_LE_ArbitResult:
printf("Node %d: ArbitResult\n", event.source_nodeid);
printf(" code %d, arbit_node %d\n",
event.ArbitResult.code & 0xffff,
event.ArbitResult.arbit_node);
break;
case NDB_LE_DeadDueToHeartbeat:
printf("Node %d: DeadDueToHeartbeat\n", event.source_nodeid);
printf(" node %d\n", event.DeadDueToHeartbeat.node);
break;
case NDB_LE_Connected:
printf("Node %d: Connected\n", event.source_nodeid);
printf(" node %d\n", event.Connected.node);
break;
case NDB_LE_Disconnected:
printf("Node %d: Disconnected\n", event.source_nodeid);
printf(" node %d\n", event.Disconnected.node);
break;
case NDB_LE_NDBStartCompleted:
printf("Node %d: StartCompleted\n", event.source_nodeid);
printf(" version %d.%d.%d\n",
event.NDBStartCompleted.version >> 16 & 0xff,
event.NDBStartCompleted.version >> 8 & 0xff,
event.NDBStartCompleted.version >> 0 & 0xff);
break;
case NDB_LE_ArbitState:
printf("Node %d: ArbitState\n", event.source_nodeid);
printf(" code %d, arbit_node %d\n",
event.ArbitState.code & 0xffff,
event.ArbitResult.arbit_node);
break;
default:
break;
}
}
}
ndb_mgm_destroy_logevent_handle(&le);
ndb_mgm_destroy_handle(&h);
ndb_end(0);
return 0;
}
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/**
* ndbapi_async.cpp:
* Illustrates how to use callbacks and error handling using the asynchronous
* part of the NDBAPI.
*
* Classes and methods in NDBAPI used in this example:
*
* Ndb_cluster_connection
* connect()
* wait_until_ready()
*
* Ndb
* init()
* startTransaction()
* closeTransaction()
* sendPollNdb()
* getNdbError()
*
* NdbConnection
* getNdbOperation()
* executeAsynchPrepare()
* getNdbError()
*
* NdbOperation
* insertTuple()
* equal()
* setValue()
*
*/
#include <mysql.h>
#include <mysqld_error.h>
#include <NdbApi.hpp>
#include <iostream> // Used for cout
/**
* Helper sleep function
*/
static void
milliSleep(int milliseconds){
struct timeval sleeptime;
sleeptime.tv_sec = milliseconds / 1000;
sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000;
select(0, 0, 0, 0, &sleeptime);
}
/**
* error printout macro
*/
#define PRINT_ERROR(code,msg) \
std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
<< ", code: " << code \
<< ", msg: " << msg << "." << std::endl
#define MYSQLERROR(mysql) { \
PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
exit(-1); }
#define APIERROR(error) { \
PRINT_ERROR(error.code,error.message); \
exit(-1); }
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/**
* callback struct.
* transaction : index of the transaction in transaction[] array below
* data : the data that the transaction was modifying.
* retries : counter for how many times the trans. has been retried
*/
typedef struct {
Ndb * ndb;
int transaction;
int data;
int retries;
} async_callback_t;
/**
* Structure used in "free list" to a NdbTransaction
*/
typedef struct {
NdbTransaction* conn;
int used;
} transaction_t;
/**
* Free list holding transactions
*/
transaction_t transaction[1024]; //1024 - max number of outstanding
//transaction in one Ndb object
#endif
/**
* prototypes
*/
/**
* Prepare and send transaction
*/
int populate(Ndb * myNdb, int data, async_callback_t * cbData);
/**
* Error handler.
*/
bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb);
/**
* Exit function
*/
void asynchExitHandler(Ndb * m_ndb) ;
/**
* Helper function used in callback(...)
*/
void closeTransaction(Ndb * ndb , async_callback_t * cb);
/**
* Function to create table
*/
int create_table(Ndb * myNdb);
/**
* stat. variables
*/
int tempErrors = 0;
int permErrors = 0;
void
closeTransaction(Ndb * ndb , async_callback_t * cb)
{
ndb->closeTransaction(transaction[cb->transaction].conn);
transaction[cb->transaction].conn = 0;
transaction[cb->transaction].used = 0;
cb->retries++;
}
/**
* Callback executed when transaction has return from NDB
*/
static void
callback(int result, NdbTransaction* trans, void* aObject)
{
async_callback_t * cbData = (async_callback_t *)aObject;
if (result<0)
{
/**
* Error: Temporary or permanent?
*/
if (asynchErrorHandler(trans, (Ndb*)cbData->ndb))
{
closeTransaction((Ndb*)cbData->ndb, cbData);
while(populate((Ndb*)cbData->ndb, cbData->data, cbData) < 0)
milliSleep(10);
}
else
{
std::cout << "Restore: Failed to restore data "
<< "due to a unrecoverable error. Exiting..." << std::endl;
delete cbData;
asynchExitHandler((Ndb*)cbData->ndb);
}
}
else
{
/**
* OK! close transaction
*/
closeTransaction((Ndb*)cbData->ndb, cbData);
delete cbData;
}
}
/**
* Create table "GARAGE"
*/
int create_table(MYSQL &mysql)
{
while (mysql_query(&mysql,
"CREATE TABLE"
" GARAGE"
" (REG_NO INT UNSIGNED NOT NULL,"
" BRAND CHAR(20) NOT NULL,"
" COLOR CHAR(20) NOT NULL,"
" PRIMARY KEY USING HASH (REG_NO))"
" ENGINE=NDB"))
{
if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR)
MYSQLERROR(mysql);
std::cout << "MySQL Cluster already has example table: GARAGE. "
<< "Dropping it..." << std::endl;
/**************
* Drop table *
**************/
if (mysql_query(&mysql, "DROP TABLE GARAGE"))
MYSQLERROR(mysql);
}
return 1;
}
void asynchExitHandler(Ndb * m_ndb)
{
if (m_ndb != NULL)
delete m_ndb;
exit(-1);
}
/* returns true if is recoverable (temporary),
* false if it is an error that is permanent.
*/
bool asynchErrorHandler(NdbTransaction * trans, Ndb* ndb)
{
NdbError error = trans->getNdbError();
switch(error.status)
{
case NdbError::Success:
return false;
break;
case NdbError::TemporaryError:
/**
* The error code indicates a temporary error.
* The application should typically retry.
* (Includes classifications: NdbError::InsufficientSpace,
* NdbError::TemporaryResourceError, NdbError::NodeRecoveryError,
* NdbError::OverloadError, NdbError::NodeShutdown
* and NdbError::TimeoutExpired.)
*
* We should sleep for a while and retry, except for insufficient space
*/
if(error.classification == NdbError::InsufficientSpace)
return false;
milliSleep(10);
tempErrors++;
return true;
break;
case NdbError::UnknownResult:
std::cout << error.message << std::endl;
return false;
break;
default:
case NdbError::PermanentError:
switch (error.code)
{
case 499:
case 250:
milliSleep(10);
return true; // SCAN errors that can be retried. Requires restart of scan.
default:
break;
}
//ERROR
std::cout << error.message << std::endl;
return false;
break;
}
return false;
}
static int nPreparedTransactions = 0;
static int MAX_RETRIES = 10;
static int parallelism = 100;
/************************************************************************
* populate()
* 1. Prepare 'parallelism' number of insert transactions.
* 2. Send transactions to NDB and wait for callbacks to execute
*/
int populate(Ndb * myNdb, int data, async_callback_t * cbData)
{
NdbOperation* myNdbOperation; // For operations
const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");
if (myTable == NULL)
APIERROR(myDict->getNdbError());
async_callback_t * cb;
int retries = 0;
int current = 0;
for(int i=0; i<1024; i++)
{
if(transaction[i].used == 0)
{
current = i;
if (cbData == 0)
{
/**
* We already have a callback
* This is an absolutely new transaction
*/
cb = new async_callback_t;
cb->retries = 0;
}
else
{
/**
* We already have a callback
*/
cb =cbData;
retries = cbData->retries;
}
/**
* Set data used by the callback
*/
cb->ndb = myNdb; //handle to Ndb object so that we can close transaction
// in the callback (alt. make myNdb global).
cb->data = data; //this is the data we want to insert
cb->transaction = current; //This is the number (id) of this transaction
transaction[current].used = 1 ; //Mark the transaction as used
break;
}
}
if(!current)
return -1;
while(retries < MAX_RETRIES)
{
transaction[current].conn = myNdb->startTransaction();
if (transaction[current].conn == NULL) {
if (asynchErrorHandler(transaction[current].conn, myNdb))
{
/**
* no transaction to close since conn == null
*/
milliSleep(10);
retries++;
continue;
}
asynchExitHandler(myNdb);
}
myNdbOperation = transaction[current].conn->getNdbOperation(myTable);
if (myNdbOperation == NULL)
{
if (asynchErrorHandler(transaction[current].conn, myNdb))
{
myNdb->closeTransaction(transaction[current].conn);
transaction[current].conn = 0;
milliSleep(10);
retries++;
continue;
}
asynchExitHandler(myNdb);
} // if
if(myNdbOperation->insertTuple() < 0 ||
myNdbOperation->equal("REG_NO", data) < 0 ||
myNdbOperation->setValue("BRAND", "Mercedes") <0 ||
myNdbOperation->setValue("COLOR", "Blue") < 0)
{
if (asynchErrorHandler(transaction[current].conn, myNdb))
{
myNdb->closeTransaction(transaction[current].conn);
transaction[current].conn = 0;
retries++;
milliSleep(10);
continue;
}
asynchExitHandler(myNdb);
}
/*Prepare transaction (the transaction is NOT yet sent to NDB)*/
transaction[current].conn->executeAsynchPrepare(NdbTransaction::Commit,
&callback,
cb);
/**
* When we have prepared parallelism number of transactions ->
* send the transaction to ndb.
* Next time we will deal with the transactions are in the
* callback. There we will see which ones that were successful
* and which ones to retry.
*/
if (nPreparedTransactions == parallelism-1)
{
// send-poll all transactions
// close transaction is done in callback
myNdb->sendPollNdb(3000, parallelism );
nPreparedTransactions=0;
}
else
nPreparedTransactions++;
return 1;
}
std::cout << "Unable to recover from errors. Exiting..." << std::endl;
asynchExitHandler(myNdb);
return -1;
}
int main()
{
ndb_init();
MYSQL mysql;
/**************************************************************
* Connect to mysql server and create table *
**************************************************************/
{
if ( !mysql_init(&mysql) ) {
std::cout << "mysql_init failed\n";
exit(-1);
}
if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
3306, "/tmp/mysql.sock", 0) )
MYSQLERROR(mysql);
mysql_query(&mysql, "CREATE DATABASE TEST_DB");
if (mysql_query(&mysql, "USE TEST_DB") != 0) MYSQLERROR(mysql);
create_table(mysql);
}
/**************************************************************
* Connect to ndb cluster *
**************************************************************/
Ndb_cluster_connection cluster_connection;
if (cluster_connection.connect(4, 5, 1))
{
std::cout << "Unable to connect to cluster within 30 secs." << std::endl;
exit(-1);
}
// Optionally connect and wait for the storage nodes (ndbd's)
if (cluster_connection.wait_until_ready(30,0) < 0)
{
std::cout << "Cluster was not ready within 30 secs.\n";
exit(-1);
}
Ndb* myNdb = new Ndb( &cluster_connection,
"TEST_DB" ); // Object representing the database
if (myNdb->init(1024) == -1) { // Set max 1024 parallel transactions
APIERROR(myNdb->getNdbError());
}
/**
* Initialise transaction array
*/
for(int i = 0 ; i < 1024 ; i++)
{
transaction[i].used = 0;
transaction[i].conn = 0;
}
int i=0;
/**
* Do 20000 insert transactions.
*/
while(i < 20000)
{
while(populate(myNdb,i,0)<0) // <0, no space on free list. Sleep and try again.
milliSleep(10);
i++;
}
std::cout << "Number of temporary errors: " << tempErrors << std::endl;
delete myNdb;
}
1. Set NDB_OS in Makefile
2. Add path to libNDB_API.so in LD_LIBRARY_PATH
3. Set NDB_CONNECTSTRING
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
//
// ndbapi_async1.cpp: Using asynchronous transactions in NDB API
//
// Execute ndbapi_example1 to create the table "MYTABLENAME"
// before executing this program.
//
// Correct output from this program is:
//
// Successful insert.
// Successful insert.
#include <NdbApi.hpp>
// Used for cout
#include <iostream>
#define APIERROR(error) \
{ std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \
<< error.code << ", msg: " << error.message << "." << std::endl; \
exit(-1); }
static void callback(int result, NdbTransaction* NdbObject, void* aObject);
int main()
{
ndb_init();
Ndb_cluster_connection *cluster_connection=
new Ndb_cluster_connection(); // Object representing the cluster
if (cluster_connection->wait_until_ready(30,30))
{
std::cout << "Cluster was not ready within 30 secs." << std::endl;
exit(-1);
}
int r= cluster_connection->connect(5 /* retries */,
3 /* delay between retries */,
1 /* verbose */);
if (r > 0)
{
std::cout
<< "Cluster connect failed, possibly resolved with more retries.\n";
exit(-1);
}
else if (r < 0)
{
std::cout
<< "Cluster connect failed.\n";
exit(-1);
}
if (cluster_connection->wait_until_ready(30,30))
{
std::cout << "Cluster was not ready within 30 secs." << std::endl;
exit(-1);
}
Ndb* myNdb = new Ndb( cluster_connection,
"TEST_DB_2" ); // Object representing the database
NdbTransaction* myNdbTransaction[2]; // For transactions
NdbOperation* myNdbOperation; // For operations
if (myNdb->init(2) == -1) { // Want two parallel insert transactions
APIERROR(myNdb->getNdbError());
exit(-1);
}
/******************************************************
* Insert (we do two insert transactions in parallel) *
******************************************************/
const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
const NdbDictionary::Table *myTable= myDict->getTable("MYTABLENAME");
if (myTable == NULL)
APIERROR(myDict->getNdbError());
for (int i = 0; i < 2; i++) {
myNdbTransaction[i] = myNdb->startTransaction();
if (myNdbTransaction[i] == NULL) APIERROR(myNdb->getNdbError());
myNdbOperation = myNdbTransaction[i]->getNdbOperation(myTable);
if (myNdbOperation == NULL) APIERROR(myNdbTransaction[i]->getNdbError());
myNdbOperation->insertTuple();
myNdbOperation->equal("ATTR1", 20 + i);
myNdbOperation->setValue("ATTR2", 20 + i);
// Prepare transaction (the transaction is NOT yet sent to NDB)
myNdbTransaction[i]->executeAsynchPrepare(NdbTransaction::Commit,
&callback, NULL);
}
// Send all transactions to NDB
myNdb->sendPreparedTransactions(0);
// Poll all transactions
myNdb->pollNdb(3000, 2);
// Close all transactions
for (int i = 0; i < 2; i++)
myNdb->closeTransaction(myNdbTransaction[i]);
delete myNdb;
delete cluster_connection;
ndb_end(0);
return 0;
}
/*
* callback : This is called when the transaction is polled
*
* (This function must have three arguments:
* - The result of the transaction,
* - The NdbTransaction object, and
* - A pointer to an arbitrary object.)
*/
static void
callback(int result, NdbTransaction* myTrans, void* aObject)
{
if (result == -1) {
std::cout << "Poll error: " << std::endl;
APIERROR(myTrans->getNdbError());
} else {
std::cout << "Successful insert." << std::endl;
}
}
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/**
* ndbapi_event.cpp: Using API level events in NDB API
*
* Classes and methods used in this example:
*
* Ndb_cluster_connection
* connect()
* wait_until_ready()
*
* Ndb
* init()
* getDictionary()
* createEventOperation()
* dropEventOperation()
* pollEvents()
* nextEvent()
*
* NdbDictionary
* createEvent()
* dropEvent()
*
* NdbDictionary::Event
* setTable()
* addTableEvent()
* addEventColumn()
*
* NdbEventOperation
* getValue()
* getPreValue()
* execute()
* getEventType()
*
*/
#include <NdbApi.hpp>
// Used for cout
#include <stdio.h>
#include <iostream>
#include <unistd.h>
/**
*
* Assume that there is a table TAB0 which is being updated by
* another process (e.g. flexBench -l 0 -stdtables).
* We want to monitor what happens with columns COL0, COL2, COL11
*
* or together with the mysql client;
*
* shell> mysql -u root
* mysql> create database TEST_DB;
* mysql> use TEST_DB;
* mysql> create table TAB0 (COL0 int primary key, COL1 int, COL11 int) engine=ndb;
*
* In another window start ndbapi_event, wait until properly started
*
insert into TAB0 values (1,2,3);
insert into TAB0 values (2,2,3);
insert into TAB0 values (3,2,9);
update TAB0 set COL1=10 where COL0=1;
delete from TAB0 where COL0=1;
*
* you should see the data popping up in the example window
*
*/
#define APIERROR(error) \
{ std::cout << "Error in " << __FILE__ << ", line:" << __LINE__ << ", code:" \
<< error.code << ", msg: " << error.message << "." << std::endl; \
exit(-1); }
int myCreateEvent(Ndb* myNdb,
const char *eventName,
const char *eventTableName,
const char **eventColumnName,
const int noEventColumnName);
int main()
{
ndb_init();
Ndb_cluster_connection *cluster_connection=
new Ndb_cluster_connection(); // Object representing the cluster
int r= cluster_connection->connect(5 /* retries */,
3 /* delay between retries */,
1 /* verbose */);
if (r > 0)
{
std::cout
<< "Cluster connect failed, possibly resolved with more retries.\n";
exit(-1);
}
else if (r < 0)
{
std::cout
<< "Cluster connect failed.\n";
exit(-1);
}
if (cluster_connection->wait_until_ready(30,30))
{
std::cout << "Cluster was not ready within 30 secs." << std::endl;
exit(-1);
}
Ndb* myNdb= new Ndb(cluster_connection,
"TEST_DB"); // Object representing the database
if (myNdb->init() == -1) APIERROR(myNdb->getNdbError());
const char *eventName= "CHNG_IN_TAB0";
const char *eventTableName= "TAB0";
const int noEventColumnName= 3;
const char *eventColumnName[noEventColumnName]=
{"COL0",
"COL1",
"COL11"};
// Create events
myCreateEvent(myNdb,
eventName,
eventTableName,
eventColumnName,
noEventColumnName);
int j= 0;
while (j < 5) {
// Start "transaction" for handling events
NdbEventOperation* op;
printf("create EventOperation\n");
if ((op = myNdb->createEventOperation(eventName)) == NULL)
APIERROR(myNdb->getNdbError());
printf("get values\n");
NdbRecAttr* recAttr[noEventColumnName];
NdbRecAttr* recAttrPre[noEventColumnName];
// primary keys should always be a part of the result
for (int i = 0; i < noEventColumnName; i++) {
recAttr[i] = op->getValue(eventColumnName[i]);
recAttrPre[i] = op->getPreValue(eventColumnName[i]);
}
// set up the callbacks
printf("execute\n");
// This starts changes to "start flowing"
if (op->execute())
APIERROR(op->getNdbError());
int i= 0;
while(i < 40) {
// printf("now waiting for event...\n");
int r= myNdb->pollEvents(1000); // wait for event or 1000 ms
if (r > 0) {
// printf("got data! %d\n", r);
while ((op= myNdb->nextEvent())) {
i++;
switch (op->getEventType()) {
case NdbDictionary::Event::TE_INSERT:
printf("%u INSERT: ", i);
break;
case NdbDictionary::Event::TE_DELETE:
printf("%u DELETE: ", i);
break;
case NdbDictionary::Event::TE_UPDATE:
printf("%u UPDATE: ", i);
break;
default:
abort(); // should not happen
}
for (int i = 1; i < noEventColumnName; i++) {
if (recAttr[i]->isNULL() >= 0) { // we have a value
printf(" post[%u]=", i);
if (recAttr[i]->isNULL() == 0) // we have a non-null value
printf("%u", recAttr[i]->u_32_value());
else // we have a null value
printf("NULL");
}
if (recAttrPre[i]->isNULL() >= 0) { // we have a value
printf(" pre[%u]=", i);
if (recAttrPre[i]->isNULL() == 0) // we have a non-null value
printf("%u", recAttrPre[i]->u_32_value());
else // we have a null value
printf("NULL");
}
}
printf("\n");
}
} else
;//printf("timed out\n");
}
// don't want to listen to events anymore
if (myNdb->dropEventOperation(op)) APIERROR(myNdb->getNdbError());
j++;
}
{
NdbDictionary::Dictionary *myDict = myNdb->getDictionary();
if (!myDict) APIERROR(myNdb->getNdbError());
// remove event from database
if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
}
delete myNdb;
delete cluster_connection;
ndb_end(0);
return 0;
}
int myCreateEvent(Ndb* myNdb,
const char *eventName,
const char *eventTableName,
const char **eventColumnNames,
const int noEventColumnNames)
{
NdbDictionary::Dictionary *myDict= myNdb->getDictionary();
if (!myDict) APIERROR(myNdb->getNdbError());
const NdbDictionary::Table *table= myDict->getTable(eventTableName);
if (!table) APIERROR(myDict->getNdbError());
NdbDictionary::Event myEvent(eventName, *table);
myEvent.addTableEvent(NdbDictionary::Event::TE_ALL);
// myEvent.addTableEvent(NdbDictionary::Event::TE_INSERT);
// myEvent.addTableEvent(NdbDictionary::Event::TE_UPDATE);
// myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);
myEvent.addEventColumns(noEventColumnNames, eventColumnNames);
// Add event to database
if (myDict->createEvent(myEvent) == 0)
myEvent.print();
else if (myDict->getNdbError().classification ==
NdbError::SchemaObjectExists) {
printf("Event creation failed, event exists\n");
printf("dropping Event...\n");
if (myDict->dropEvent(eventName)) APIERROR(myDict->getNdbError());
// try again
// Add event to database
if ( myDict->createEvent(myEvent)) APIERROR(myDict->getNdbError());
} else
APIERROR(myDict->getNdbError());
return 0;
}
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
//
// ndbapi_retries.cpp: Error handling and transaction retries
//
// Execute ndbapi_simple to create the table "MYTABLENAME"
// before executing this program.
//
// There are many ways to program using the NDB API. In this example
// we execute two inserts in the same transaction using
// NdbConnection::execute(NoCommit).
//
// Transaction failing is handled by re-executing the transaction
// in case of non-permanent transaction errors.
// Application errors (i.e. errors at points marked with APIERROR)
// should be handled by the application programmer.
#include <NdbApi.hpp>
// Used for cout
#include <iostream>
// Used for sleep (use your own version of sleep)
#include <unistd.h>
#define TIME_TO_SLEEP_BETWEEN_TRANSACTION_RETRIES 1
//
// APIERROR prints an NdbError object
//
#define APIERROR(error) \
{ std::cout << "API ERROR: " << error.code << " " << error.message \
<< std::endl \
<< " " << "Status: " << error.status \
<< ", Classification: " << error.classification << std::endl\
<< " " << "File: " << __FILE__ \
<< " (Line: " << __LINE__ << ")" << std::endl \
; \
}
//
// TRANSERROR prints all error info regarding an NdbTransaction
//
#define TRANSERROR(ndbTransaction) \
{ NdbError error = ndbTransaction->getNdbError(); \
std::cout << "TRANS ERROR: " << error.code << " " << error.message \
<< std::endl \
<< " " << "Status: " << error.status \
<< ", Classification: " << error.classification << std::endl \
<< " " << "File: " << __FILE__ \
<< " (Line: " << __LINE__ << ")" << std::endl \
; \
printTransactionError(ndbTransaction); \
}
void printTransactionError(NdbTransaction *ndbTransaction) {
const NdbOperation *ndbOp = NULL;
int i=0;
/****************************************************************
* Print NdbError object of every operations in the transaction *
****************************************************************/
while ((ndbOp = ndbTransaction->getNextCompletedOperation(ndbOp)) != NULL) {
NdbError error = ndbOp->getNdbError();
std::cout << " OPERATION " << i+1 << ": "
<< error.code << " " << error.message << std::endl
<< " Status: " << error.status
<< ", Classification: " << error.classification << std::endl;
i++;
}
}
//
// Example insert
// @param myNdb Ndb object representing NDB Cluster
// @param myTransaction NdbTransaction used for transaction
// @param myTable Table to insert into
// @param error NdbError object returned in case of errors
// @return -1 in case of failures, 0 otherwise
//
int insert(int transactionId, NdbTransaction* myTransaction,
const NdbDictionary::Table *myTable) {
NdbOperation *myOperation; // For other operations
myOperation = myTransaction->getNdbOperation(myTable);
if (myOperation == NULL) return -1;
if (myOperation->insertTuple() ||
myOperation->equal("ATTR1", transactionId) ||
myOperation->setValue("ATTR2", transactionId)) {
APIERROR(myOperation->getNdbError());
exit(-1);
}
return myTransaction->execute(NdbTransaction::NoCommit);
}
//
// Execute function which re-executes (tries 10 times) the transaction
// if there are temporary errors (e.g. the NDB Cluster is overloaded).
// @return -1 failure, 1 success
//
int executeInsertTransaction(int transactionId, Ndb* myNdb,
const NdbDictionary::Table *myTable) {
int result = 0; // No result yet
int noOfRetriesLeft = 10;
NdbTransaction *myTransaction; // For other transactions
NdbError ndberror;
while (noOfRetriesLeft > 0 && !result) {
/*********************************
* Start and execute transaction *
*********************************/
myTransaction = myNdb->startTransaction();
if (myTransaction == NULL) {
APIERROR(myNdb->getNdbError());
ndberror = myNdb->getNdbError();
result = -1; // Failure
} else if (insert(transactionId, myTransaction, myTable) ||
insert(10000+transactionId, myTransaction, myTable) ||
myTransaction->execute(NdbTransaction::Commit)) {
TRANSERROR(myTransaction);
ndberror = myTransaction->getNdbError();
result = -1; // Failure
} else {
result = 1; // Success
}
/**********************************
* If failure, then analyze error *
**********************************/
if (result == -1) {
switch (ndberror.status) {
case NdbError::Success:
break;
case NdbError::TemporaryError:
std::cout << "Retrying transaction..." << std::endl;
sleep(TIME_TO_SLEEP_BETWEEN_TRANSACTION_RETRIES);
--noOfRetriesLeft;
result = 0; // No completed transaction yet
break;
case NdbError::UnknownResult:
case NdbError::PermanentError:
std::cout << "No retry of transaction..." << std::endl;
result = -1; // Permanent failure
break;
}
}
/*********************
* Close transaction *
*********************/
if (myTransaction != NULL) {
myNdb->closeTransaction(myTransaction);
}
}
if (result != 1) exit(-1);
return result;
}
int main()
{
ndb_init();
Ndb_cluster_connection *cluster_connection=
new Ndb_cluster_connection(); // Object representing the cluster
int r= cluster_connection->connect(5 /* retries */,
3 /* delay between retries */,
1 /* verbose */);
if (r > 0)
{
std::cout
<< "Cluster connect failed, possibly resolved with more retries.\n";
exit(-1);
}
else if (r < 0)
{
std::cout
<< "Cluster connect failed.\n";
exit(-1);
}
if (cluster_connection->wait_until_ready(30,30))
{
std::cout << "Cluster was not ready within 30 secs." << std::endl;
exit(-1);
}
Ndb* myNdb= new Ndb( cluster_connection,
"TEST_DB_1" ); // Object representing the database
if (myNdb->init() == -1) {
APIERROR(myNdb->getNdbError());
exit(-1);
}
const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
const NdbDictionary::Table *myTable= myDict->getTable("MYTABLENAME");
if (myTable == NULL)
{
APIERROR(myDict->getNdbError());
return -1;
}
/************************************
* Execute some insert transactions *
************************************/
for (int i = 10000; i < 20000; i++) {
executeInsertTransaction(i, myNdb, myTable);
}
delete myNdb;
delete cluster_connection;
ndb_end(0);
return 0;
}
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
* ndbapi_scan.cpp:
* Illustrates how to use the scan api in the NDBAPI.
* The example shows how to do scan, scan for update and scan for delete
* using NdbScanFilter and NdbScanOperation
*
* Classes and methods used in this example:
*
* Ndb_cluster_connection
* connect()
* wait_until_ready()
*
* Ndb
* init()
* getDictionary()
* startTransaction()
* closeTransaction()
*
* NdbTransaction
* getNdbScanOperation()
* execute()
*
* NdbScanOperation
* getValue()
* readTuples()
* nextResult()
* deleteCurrentTuple()
* updateCurrentTuple()
*
* const NdbDictionary::Dictionary
* getTable()
*
* const NdbDictionary::Table
* getColumn()
*
* const NdbDictionary::Column
* getLength()
*
* NdbOperation
* insertTuple()
* equal()
* setValue()
*
* NdbScanFilter
* begin()
* eq()
* end()
*
*/
#include <mysql.h>
#include <mysqld_error.h>
#include <NdbApi.hpp>
// Used for cout
#include <iostream>
#include <stdio.h>
/**
* Helper sleep function
*/
static void
milliSleep(int milliseconds){
struct timeval sleeptime;
sleeptime.tv_sec = milliseconds / 1000;
sleeptime.tv_usec = (milliseconds - (sleeptime.tv_sec * 1000)) * 1000000;
select(0, 0, 0, 0, &sleeptime);
}
/**
* Helper sleep function
*/
#define PRINT_ERROR(code,msg) \
std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
<< ", code: " << code \
<< ", msg: " << msg << "." << std::endl
#define MYSQLERROR(mysql) { \
PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
exit(-1); }
#define APIERROR(error) { \
PRINT_ERROR(error.code,error.message); \
exit(-1); }
struct Car
{
/**
* Note memset, so that entire char-fields are cleared
* as all 20 bytes are significant (as type is char)
*/
Car() { memset(this, 0, sizeof(* this)); }
unsigned int reg_no;
char brand[20];
char color[20];
};
/**
* Function to create table
*/
int create_table(MYSQL &mysql)
{
while (mysql_query(&mysql,
"CREATE TABLE"
" GARAGE"
" (REG_NO INT UNSIGNED NOT NULL,"
" BRAND CHAR(20) NOT NULL,"
" COLOR CHAR(20) NOT NULL,"
" PRIMARY KEY USING HASH (REG_NO))"
" ENGINE=NDB"))
{
if (mysql_errno(&mysql) != ER_TABLE_EXISTS_ERROR)
MYSQLERROR(mysql);
std::cout << "MySQL Cluster already has example table: GARAGE. "
<< "Dropping it..." << std::endl;
/**************
* Drop table *
**************/
if (mysql_query(&mysql, "DROP TABLE GARAGE"))
MYSQLERROR(mysql);
}
return 1;
}
int populate(Ndb * myNdb)
{
int i;
Car cars[15];
const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");
if (myTable == NULL)
APIERROR(myDict->getNdbError());
/**
* Five blue mercedes
*/
for (i = 0; i < 5; i++)
{
cars[i].reg_no = i;
sprintf(cars[i].brand, "Mercedes");
sprintf(cars[i].color, "Blue");
}
/**
* Five black bmw
*/
for (i = 5; i < 10; i++)
{
cars[i].reg_no = i;
sprintf(cars[i].brand, "BMW");
sprintf(cars[i].color, "Black");
}
/**
* Five pink toyotas
*/
for (i = 10; i < 15; i++)
{
cars[i].reg_no = i;
sprintf(cars[i].brand, "Toyota");
sprintf(cars[i].color, "Pink");
}
NdbTransaction* myTrans = myNdb->startTransaction();
if (myTrans == NULL)
APIERROR(myNdb->getNdbError());
for (i = 0; i < 15; i++)
{
NdbOperation* myNdbOperation = myTrans->getNdbOperation(myTable);
if (myNdbOperation == NULL)
APIERROR(myTrans->getNdbError());
myNdbOperation->insertTuple();
myNdbOperation->equal("REG_NO", cars[i].reg_no);
myNdbOperation->setValue("BRAND", cars[i].brand);
myNdbOperation->setValue("COLOR", cars[i].color);
}
int check = myTrans->execute(NdbTransaction::Commit);
myTrans->close();
return check != -1;
}
int scan_delete(Ndb* myNdb,
int column,
const char * color)
{
// Scan all records exclusive and delete
// them one by one
int retryAttempt = 0;
const int retryMax = 10;
int deletedRows = 0;
int check;
NdbError err;
NdbTransaction *myTrans;
NdbScanOperation *myScanOp;
const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");
if (myTable == NULL)
APIERROR(myDict->getNdbError());
/**
* Loop as long as :
* retryMax not reached
* failed operations due to TEMPORARY erros
*
* Exit loop;
* retyrMax reached
* Permanent error (return -1)
*/
while (true)
{
if (retryAttempt >= retryMax)
{
std::cout << "ERROR: has retried this operation " << retryAttempt
<< " times, failing!" << std::endl;
return -1;
}
myTrans = myNdb->startTransaction();
if (myTrans == NULL)
{
const NdbError err = myNdb->getNdbError();
if (err.status == NdbError::TemporaryError)
{
milliSleep(50);
retryAttempt++;
continue;
}
std::cout << err.message << std::endl;
return -1;
}
/**
* Get a scan operation.
*/
myScanOp = myTrans->getNdbScanOperation(myTable);
if (myScanOp == NULL)
{
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
return -1;
}
/**
* Define a result set for the scan.
*/
if(myScanOp->readTuples(NdbOperation::LM_Exclusive) != 0)
{
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
return -1;
}
/**
* Use NdbScanFilter to define a search critera
*/
NdbScanFilter filter(myScanOp) ;
if(filter.begin(NdbScanFilter::AND) < 0 ||
filter.cmp(NdbScanFilter::COND_EQ, column, color) < 0 ||
filter.end() < 0)
{
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
return -1;
}
/**
* Start scan (NoCommit since we are only reading at this stage);
*/
if(myTrans->execute(NdbTransaction::NoCommit) != 0){
err = myTrans->getNdbError();
if(err.status == NdbError::TemporaryError){
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
milliSleep(50);
continue;
}
std::cout << err.code << std::endl;
std::cout << myTrans->getNdbError().code << std::endl;
myNdb->closeTransaction(myTrans);
return -1;
}
/**
* start of loop: nextResult(true) means that "parallelism" number of
* rows are fetched from NDB and cached in NDBAPI
*/
while((check = myScanOp->nextResult(true)) == 0){
do
{
if (myScanOp->deleteCurrentTuple() != 0)
{
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
return -1;
}
deletedRows++;
/**
* nextResult(false) means that the records
* cached in the NDBAPI are modified before
* fetching more rows from NDB.
*/
} while((check = myScanOp->nextResult(false)) == 0);
/**
* Commit when all cached tuple have been marked for deletion
*/
if(check != -1)
{
check = myTrans->execute(NdbTransaction::Commit);
}
if(check == -1)
{
/**
* Create a new transaction, while keeping scan open
*/
check = myTrans->restart();
}
/**
* Check for errors
*/
err = myTrans->getNdbError();
if(check == -1)
{
if(err.status == NdbError::TemporaryError)
{
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
milliSleep(50);
continue;
}
}
/**
* End of loop
*/
}
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
return 0;
}
if(myTrans!=0)
{
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
}
return -1;
}
int scan_update(Ndb* myNdb,
int update_column,
const char * before_color,
const char * after_color)
{
// Scan all records exclusive and update
// them one by one
int retryAttempt = 0;
const int retryMax = 10;
int updatedRows = 0;
int check;
NdbError err;
NdbTransaction *myTrans;
NdbScanOperation *myScanOp;
const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");
if (myTable == NULL)
APIERROR(myDict->getNdbError());
/**
* Loop as long as :
* retryMax not reached
* failed operations due to TEMPORARY erros
*
* Exit loop;
* retyrMax reached
* Permanent error (return -1)
*/
while (true)
{
if (retryAttempt >= retryMax)
{
std::cout << "ERROR: has retried this operation " << retryAttempt
<< " times, failing!" << std::endl;
return -1;
}
myTrans = myNdb->startTransaction();
if (myTrans == NULL)
{
const NdbError err = myNdb->getNdbError();
if (err.status == NdbError::TemporaryError)
{
milliSleep(50);
retryAttempt++;
continue;
}
std::cout << err.message << std::endl;
return -1;
}
/**
* Get a scan operation.
*/
myScanOp = myTrans->getNdbScanOperation(myTable);
if (myScanOp == NULL)
{
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
return -1;
}
/**
* Define a result set for the scan.
*/
if( myScanOp->readTuples(NdbOperation::LM_Exclusive) )
{
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
return -1;
}
/**
* Use NdbScanFilter to define a search critera
*/
NdbScanFilter filter(myScanOp) ;
if(filter.begin(NdbScanFilter::AND) < 0 ||
filter.cmp(NdbScanFilter::COND_EQ, update_column, before_color) <0||
filter.end() <0)
{
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
return -1;
}
/**
* Start scan (NoCommit since we are only reading at this stage);
*/
if(myTrans->execute(NdbTransaction::NoCommit) != 0)
{
err = myTrans->getNdbError();
if(err.status == NdbError::TemporaryError){
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
milliSleep(50);
continue;
}
std::cout << myTrans->getNdbError().code << std::endl;
myNdb->closeTransaction(myTrans);
return -1;
}
/**
* start of loop: nextResult(true) means that "parallelism" number of
* rows are fetched from NDB and cached in NDBAPI
*/
while((check = myScanOp->nextResult(true)) == 0){
do {
/**
* Get update operation
*/
NdbOperation * myUpdateOp = myScanOp->updateCurrentTuple();
if (myUpdateOp == 0)
{
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
return -1;
}
updatedRows++;
/**
* do the update
*/
myUpdateOp->setValue(update_column, after_color);
/**
* nextResult(false) means that the records
* cached in the NDBAPI are modified before
* fetching more rows from NDB.
*/
} while((check = myScanOp->nextResult(false)) == 0);
/**
* NoCommit when all cached tuple have been updated
*/
if(check != -1)
{
check = myTrans->execute(NdbTransaction::NoCommit);
}
/**
* Check for errors
*/
err = myTrans->getNdbError();
if(check == -1)
{
if(err.status == NdbError::TemporaryError){
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
milliSleep(50);
continue;
}
}
/**
* End of loop
*/
}
/**
* Commit all prepared operations
*/
if(myTrans->execute(NdbTransaction::Commit) == -1)
{
if(err.status == NdbError::TemporaryError){
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
milliSleep(50);
continue;
}
}
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
return 0;
}
if(myTrans!=0)
{
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
}
return -1;
}
int scan_print(Ndb * myNdb)
{
// Scan all records exclusive and update
// them one by one
int retryAttempt = 0;
const int retryMax = 10;
int fetchedRows = 0;
int check;
NdbError err;
NdbTransaction *myTrans;
NdbScanOperation *myScanOp;
/* Result of reading attribute value, three columns:
REG_NO, BRAND, and COLOR
*/
NdbRecAttr * myRecAttr[3];
const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
const NdbDictionary::Table *myTable= myDict->getTable("GARAGE");
if (myTable == NULL)
APIERROR(myDict->getNdbError());
/**
* Loop as long as :
* retryMax not reached
* failed operations due to TEMPORARY erros
*
* Exit loop;
* retyrMax reached
* Permanent error (return -1)
*/
while (true)
{
if (retryAttempt >= retryMax)
{
std::cout << "ERROR: has retried this operation " << retryAttempt
<< " times, failing!" << std::endl;
return -1;
}
myTrans = myNdb->startTransaction();
if (myTrans == NULL)
{
const NdbError err = myNdb->getNdbError();
if (err.status == NdbError::TemporaryError)
{
milliSleep(50);
retryAttempt++;
continue;
}
std::cout << err.message << std::endl;
return -1;
}
/*
* Define a scan operation.
* NDBAPI.
*/
myScanOp = myTrans->getNdbScanOperation(myTable);
if (myScanOp == NULL)
{
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
return -1;
}
/**
* Read without locks, without being placed in lock queue
*/
if( myScanOp->readTuples(NdbOperation::LM_CommittedRead) == -1)
{
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
return -1;
}
/**
* Define storage for fetched attributes.
* E.g., the resulting attributes of executing
* myOp->getValue("REG_NO") is placed in myRecAttr[0].
* No data exists in myRecAttr until transaction has commited!
*/
myRecAttr[0] = myScanOp->getValue("REG_NO");
myRecAttr[1] = myScanOp->getValue("BRAND");
myRecAttr[2] = myScanOp->getValue("COLOR");
if(myRecAttr[0] ==NULL || myRecAttr[1] == NULL || myRecAttr[2]==NULL)
{
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
return -1;
}
/**
* Start scan (NoCommit since we are only reading at this stage);
*/
if(myTrans->execute(NdbTransaction::NoCommit) != 0){
err = myTrans->getNdbError();
if(err.status == NdbError::TemporaryError){
std::cout << myTrans->getNdbError().message << std::endl;
myNdb->closeTransaction(myTrans);
milliSleep(50);
continue;
}
std::cout << err.code << std::endl;
std::cout << myTrans->getNdbError().code << std::endl;
myNdb->closeTransaction(myTrans);
return -1;
}
/**
* start of loop: nextResult(true) means that "parallelism" number of
* rows are fetched from NDB and cached in NDBAPI
*/
while((check = myScanOp->nextResult(true)) == 0){
do {
fetchedRows++;
/**
* print REG_NO unsigned int
*/
std::cout << myRecAttr[0]->u_32_value() << "\t";
/**
* print BRAND character string
*/
std::cout << myRecAttr[1]->aRef() << "\t";
/**
* print COLOR character string
*/
std::cout << myRecAttr[2]->aRef() << std::endl;
/**
* nextResult(false) means that the records
* cached in the NDBAPI are modified before
* fetching more rows from NDB.
*/
} while((check = myScanOp->nextResult(false)) == 0);
}
myNdb->closeTransaction(myTrans);
return 1;
}
return -1;
}
int main()
{
ndb_init();
MYSQL mysql;
/**************************************************************
* Connect to mysql server and create table *
**************************************************************/
{
if ( !mysql_init(&mysql) ) {
std::cout << "mysql_init failed\n";
exit(-1);
}
if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
3306, "/tmp/mysql.sock", 0) )
MYSQLERROR(mysql);
mysql_query(&mysql, "CREATE DATABASE TEST_DB");
if (mysql_query(&mysql, "USE TEST_DB") != 0) MYSQLERROR(mysql);
create_table(mysql);
}
/**************************************************************
* Connect to ndb cluster *
**************************************************************/
Ndb_cluster_connection cluster_connection;
if (cluster_connection.connect(4, 5, 1))
{
std::cout << "Unable to connect to cluster within 30 secs." << std::endl;
exit(-1);
}
// Optionally connect and wait for the storage nodes (ndbd's)
if (cluster_connection.wait_until_ready(30,0) < 0)
{
std::cout << "Cluster was not ready within 30 secs.\n";
exit(-1);
}
Ndb myNdb(&cluster_connection,"TEST_DB");
if (myNdb.init(1024) == -1) { // Set max 1024 parallel transactions
APIERROR(myNdb.getNdbError());
exit(-1);
}
/*******************************************
* Check table definition *
*******************************************/
int column_color;
{
const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
const NdbDictionary::Table *t= myDict->getTable("GARAGE");
Car car;
if (t->getColumn("COLOR")->getLength() != sizeof(car.color) ||
t->getColumn("BRAND")->getLength() != sizeof(car.brand))
{
std::cout << "Wrong table definition" << std::endl;
exit(-1);
}
column_color= t->getColumn("COLOR")->getColumnNo();
}
if(populate(&myNdb) > 0)
std::cout << "populate: Success!" << std::endl;
if(scan_print(&myNdb) > 0)
std::cout << "scan_print: Success!" << std::endl << std::endl;
std::cout << "Going to delete all pink cars!" << std::endl;
{
/**
* Note! color needs to be of exact the same size as column defined
*/
Car tmp;
sprintf(tmp.color, "Pink");
if(scan_delete(&myNdb, column_color, tmp.color) > 0)
std::cout << "scan_delete: Success!" << std::endl << std::endl;
}
if(scan_print(&myNdb) > 0)
std::cout << "scan_print: Success!" << std::endl << std::endl;
{
/**
* Note! color1 & 2 need to be of exact the same size as column defined
*/
Car tmp1, tmp2;
sprintf(tmp1.color, "Blue");
sprintf(tmp2.color, "Black");
std::cout << "Going to update all " << tmp1.color
<< " cars to " << tmp2.color << " cars!" << std::endl;
if(scan_update(&myNdb, column_color, tmp1.color, tmp2.color) > 0)
std::cout << "scan_update: Success!" << std::endl << std::endl;
}
if(scan_print(&myNdb) > 0)
std::cout << "scan_print: Success!" << std::endl << std::endl;
return 0;
}
1. Set NDB_OS in Makefile
2. Add path to libNDB_API.so in LD_LIBRARY_PATH
3. Set NDB_CONNECTSTRING
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
* ndbapi_simple.cpp: Using synchronous transactions in NDB API
*
* Correct output from this program is:
*
* ATTR1 ATTR2
* 0 10
* 1 1
* 2 12
* Detected that deleted tuple doesn't exist!
* 4 14
* 5 5
* 6 16
* 7 7
* 8 18
* 9 9
*
*/
#include <mysql.h>
#include <NdbApi.hpp>
// Used for cout
#include <stdio.h>
#include <iostream>
static void run_application(MYSQL &, Ndb_cluster_connection &);
#define PRINT_ERROR(code,msg) \
std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
<< ", code: " << code \
<< ", msg: " << msg << "." << std::endl
#define MYSQLERROR(mysql) { \
PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
exit(-1); }
#define APIERROR(error) { \
PRINT_ERROR(error.code,error.message); \
exit(-1); }
int main()
{
// ndb_init must be called first
ndb_init();
// connect to mysql server and cluster and run application
{
// Object representing the cluster
Ndb_cluster_connection cluster_connection;
// Connect to cluster management server (ndb_mgmd)
if (cluster_connection.connect(4 /* retries */,
5 /* delay between retries */,
1 /* verbose */))
{
std::cout << "Cluster management server was not ready within 30 secs.\n";
exit(-1);
}
// Optionally connect and wait for the storage nodes (ndbd's)
if (cluster_connection.wait_until_ready(30,0) < 0)
{
std::cout << "Cluster was not ready within 30 secs.\n";
exit(-1);
}
// connect to mysql server
MYSQL mysql;
if ( !mysql_init(&mysql) ) {
std::cout << "mysql_init failed\n";
exit(-1);
}
if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
3306, "/tmp/mysql.sock", 0) )
MYSQLERROR(mysql);
// run the application code
run_application(mysql, cluster_connection);
}
ndb_end(0);
std::cout << "\nTo drop created table use:\n"
<< "echo \"drop table MYTABLENAME\" | mysql TEST_DB_1 -u root\n";
return 0;
}
static void create_table(MYSQL &);
static void do_insert(Ndb &);
static void do_update(Ndb &);
static void do_delete(Ndb &);
static void do_read(Ndb &);
static void run_application(MYSQL &mysql,
Ndb_cluster_connection &cluster_connection)
{
/********************************************
* Connect to database via mysql-c *
********************************************/
mysql_query(&mysql, "CREATE DATABASE TEST_DB_1");
if (mysql_query(&mysql, "USE TEST_DB_1") != 0) MYSQLERROR(mysql);
create_table(mysql);
/********************************************
* Connect to database via NdbApi *
********************************************/
// Object representing the database
Ndb myNdb( &cluster_connection, "TEST_DB_1" );
if (myNdb.init()) APIERROR(myNdb.getNdbError());
/*
* Do different operations on database
*/
do_insert(myNdb);
do_update(myNdb);
do_delete(myNdb);
do_read(myNdb);
}
/*********************************************************
* Create a table named MYTABLENAME if it does not exist *
*********************************************************/
static void create_table(MYSQL &mysql)
{
if (mysql_query(&mysql,
"CREATE TABLE"
" MYTABLENAME"
" (ATTR1 INT UNSIGNED NOT NULL PRIMARY KEY,"
" ATTR2 INT UNSIGNED NOT NULL)"
" ENGINE=NDB"))
MYSQLERROR(mysql);
}
/**************************************************************************
* Using 5 transactions, insert 10 tuples in table: (0,0),(1,1),...,(9,9) *
**************************************************************************/
static void do_insert(Ndb &myNdb)
{
const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
const NdbDictionary::Table *myTable= myDict->getTable("MYTABLENAME");
if (myTable == NULL)
APIERROR(myDict->getNdbError());
for (int i = 0; i < 5; i++) {
NdbTransaction *myTransaction= myNdb.startTransaction();
if (myTransaction == NULL) APIERROR(myNdb.getNdbError());
NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
myOperation->insertTuple();
myOperation->equal("ATTR1", i);
myOperation->setValue("ATTR2", i);
myOperation= myTransaction->getNdbOperation(myTable);
if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
myOperation->insertTuple();
myOperation->equal("ATTR1", i+5);
myOperation->setValue("ATTR2", i+5);
if (myTransaction->execute( NdbTransaction::Commit ) == -1)
APIERROR(myTransaction->getNdbError());
myNdb.closeTransaction(myTransaction);
}
}
/*****************************************************************
* Update the second attribute in half of the tuples (adding 10) *
*****************************************************************/
static void do_update(Ndb &myNdb)
{
const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
const NdbDictionary::Table *myTable= myDict->getTable("MYTABLENAME");
if (myTable == NULL)
APIERROR(myDict->getNdbError());
for (int i = 0; i < 10; i+=2) {
NdbTransaction *myTransaction= myNdb.startTransaction();
if (myTransaction == NULL) APIERROR(myNdb.getNdbError());
NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
myOperation->updateTuple();
myOperation->equal( "ATTR1", i );
myOperation->setValue( "ATTR2", i+10);
if( myTransaction->execute( NdbTransaction::Commit ) == -1 )
APIERROR(myTransaction->getNdbError());
myNdb.closeTransaction(myTransaction);
}
}
/*************************************************
* Delete one tuple (the one with primary key 3) *
*************************************************/
static void do_delete(Ndb &myNdb)
{
const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
const NdbDictionary::Table *myTable= myDict->getTable("MYTABLENAME");
if (myTable == NULL)
APIERROR(myDict->getNdbError());
NdbTransaction *myTransaction= myNdb.startTransaction();
if (myTransaction == NULL) APIERROR(myNdb.getNdbError());
NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
myOperation->deleteTuple();
myOperation->equal( "ATTR1", 3 );
if (myTransaction->execute(NdbTransaction::Commit) == -1)
APIERROR(myTransaction->getNdbError());
myNdb.closeTransaction(myTransaction);
}
/*****************************
* Read and print all tuples *
*****************************/
static void do_read(Ndb &myNdb)
{
const NdbDictionary::Dictionary* myDict= myNdb.getDictionary();
const NdbDictionary::Table *myTable= myDict->getTable("MYTABLENAME");
if (myTable == NULL)
APIERROR(myDict->getNdbError());
std::cout << "ATTR1 ATTR2" << std::endl;
for (int i = 0; i < 10; i++) {
NdbTransaction *myTransaction= myNdb.startTransaction();
if (myTransaction == NULL) APIERROR(myNdb.getNdbError());
NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
myOperation->readTuple(NdbOperation::LM_Read);
myOperation->equal("ATTR1", i);
NdbRecAttr *myRecAttr= myOperation->getValue("ATTR2", NULL);
if (myRecAttr == NULL) APIERROR(myTransaction->getNdbError());
if(myTransaction->execute( NdbTransaction::Commit ) == -1)
if (i == 3) {
std::cout << "Detected that deleted tuple doesn't exist!" << std::endl;
} else {
APIERROR(myTransaction->getNdbError());
}
if (i != 3) {
printf(" %2d %2d\n", i, myRecAttr->u_32_value());
}
myNdb.closeTransaction(myTransaction);
}
}
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
//
// ndbapi_simple_index.cpp: Using secondary indexes in NDB API
//
// Correct output from this program is:
//
// ATTR1 ATTR2
// 0 10
// 1 1
// 2 12
// Detected that deleted tuple doesn't exist!
// 4 14
// 5 5
// 6 16
// 7 7
// 8 18
// 9 9
#include <mysql.h>
#include <NdbApi.hpp>
// Used for cout
#include <stdio.h>
#include <iostream>
#define PRINT_ERROR(code,msg) \
std::cout << "Error in " << __FILE__ << ", line: " << __LINE__ \
<< ", code: " << code \
<< ", msg: " << msg << "." << std::endl
#define MYSQLERROR(mysql) { \
PRINT_ERROR(mysql_errno(&mysql),mysql_error(&mysql)); \
exit(-1); }
#define APIERROR(error) { \
PRINT_ERROR(error.code,error.message); \
exit(-1); }
int main()
{
ndb_init();
MYSQL mysql;
/**************************************************************
* Connect to mysql server and create table *
**************************************************************/
{
if ( !mysql_init(&mysql) ) {
std::cout << "mysql_init failed\n";
exit(-1);
}
if ( !mysql_real_connect(&mysql, "localhost", "root", "", "",
3306, "/tmp/mysql.sock", 0) )
MYSQLERROR(mysql);
mysql_query(&mysql, "CREATE DATABASE TEST_DB_1");
if (mysql_query(&mysql, "USE TEST_DB_1") != 0) MYSQLERROR(mysql);
if (mysql_query(&mysql,
"CREATE TABLE"
" MYTABLENAME"
" (ATTR1 INT UNSIGNED,"
" ATTR2 INT UNSIGNED NOT NULL,"
" PRIMARY KEY USING HASH (ATTR1),"
" UNIQUE MYINDEXNAME USING HASH (ATTR2))"
" ENGINE=NDB"))
MYSQLERROR(mysql);
}
/**************************************************************
* Connect to ndb cluster *
**************************************************************/
Ndb_cluster_connection *cluster_connection=
new Ndb_cluster_connection(); // Object representing the cluster
if (cluster_connection->connect(5,3,1))
{
std::cout << "Connect to cluster management server failed.\n";
exit(-1);
}
if (cluster_connection->wait_until_ready(30,30))
{
std::cout << "Cluster was not ready within 30 secs.\n";
exit(-1);
}
Ndb* myNdb = new Ndb( cluster_connection,
"TEST_DB_1" ); // Object representing the database
if (myNdb->init() == -1) {
APIERROR(myNdb->getNdbError());
exit(-1);
}
const NdbDictionary::Dictionary* myDict= myNdb->getDictionary();
const NdbDictionary::Table *myTable= myDict->getTable("MYTABLENAME");
if (myTable == NULL)
APIERROR(myDict->getNdbError());
const NdbDictionary::Index *myIndex= myDict->getIndex("MYINDEXNAME","MYTABLENAME");
if (myIndex == NULL)
APIERROR(myDict->getNdbError());
/**************************************************************************
* Using 5 transactions, insert 10 tuples in table: (0,0),(1,1),...,(9,9) *
**************************************************************************/
for (int i = 0; i < 5; i++) {
NdbTransaction *myTransaction= myNdb->startTransaction();
if (myTransaction == NULL) APIERROR(myNdb->getNdbError());
NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
myOperation->insertTuple();
myOperation->equal("ATTR1", i);
myOperation->setValue("ATTR2", i);
myOperation = myTransaction->getNdbOperation(myTable);
if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
myOperation->insertTuple();
myOperation->equal("ATTR1", i+5);
myOperation->setValue("ATTR2", i+5);
if (myTransaction->execute( NdbTransaction::Commit ) == -1)
APIERROR(myTransaction->getNdbError());
myNdb->closeTransaction(myTransaction);
}
/*****************************************
* Read and print all tuples using index *
*****************************************/
std::cout << "ATTR1 ATTR2" << std::endl;
for (int i = 0; i < 10; i++) {
NdbTransaction *myTransaction= myNdb->startTransaction();
if (myTransaction == NULL) APIERROR(myNdb->getNdbError());
NdbIndexOperation *myIndexOperation=
myTransaction->getNdbIndexOperation(myIndex);
if (myIndexOperation == NULL) APIERROR(myTransaction->getNdbError());
myIndexOperation->readTuple(NdbOperation::LM_Read);
myIndexOperation->equal("ATTR2", i);
NdbRecAttr *myRecAttr= myIndexOperation->getValue("ATTR1", NULL);
if (myRecAttr == NULL) APIERROR(myTransaction->getNdbError());
if(myTransaction->execute( NdbTransaction::Commit ) != -1)
printf(" %2d %2d\n", myRecAttr->u_32_value(), i);
myNdb->closeTransaction(myTransaction);
}
/*****************************************************************
* Update the second attribute in half of the tuples (adding 10) *
*****************************************************************/
for (int i = 0; i < 10; i+=2) {
NdbTransaction *myTransaction= myNdb->startTransaction();
if (myTransaction == NULL) APIERROR(myNdb->getNdbError());
NdbIndexOperation *myIndexOperation=
myTransaction->getNdbIndexOperation(myIndex);
if (myIndexOperation == NULL) APIERROR(myTransaction->getNdbError());
myIndexOperation->updateTuple();
myIndexOperation->equal( "ATTR2", i );
myIndexOperation->setValue( "ATTR2", i+10);
if( myTransaction->execute( NdbTransaction::Commit ) == -1 )
APIERROR(myTransaction->getNdbError());
myNdb->closeTransaction(myTransaction);
}
/*************************************************
* Delete one tuple (the one with primary key 3) *
*************************************************/
{
NdbTransaction *myTransaction= myNdb->startTransaction();
if (myTransaction == NULL) APIERROR(myNdb->getNdbError());
NdbIndexOperation *myIndexOperation=
myTransaction->getNdbIndexOperation(myIndex);
if (myIndexOperation == NULL) APIERROR(myTransaction->getNdbError());
myIndexOperation->deleteTuple();
myIndexOperation->equal( "ATTR2", 3 );
if (myTransaction->execute(NdbTransaction::Commit) == -1)
APIERROR(myTransaction->getNdbError());
myNdb->closeTransaction(myTransaction);
}
/*****************************
* Read and print all tuples *
*****************************/
{
std::cout << "ATTR1 ATTR2" << std::endl;
for (int i = 0; i < 10; i++) {
NdbTransaction *myTransaction= myNdb->startTransaction();
if (myTransaction == NULL) APIERROR(myNdb->getNdbError());
NdbOperation *myOperation= myTransaction->getNdbOperation(myTable);
if (myOperation == NULL) APIERROR(myTransaction->getNdbError());
myOperation->readTuple(NdbOperation::LM_Read);
myOperation->equal("ATTR1", i);
NdbRecAttr *myRecAttr= myOperation->getValue("ATTR2", NULL);
if (myRecAttr == NULL) APIERROR(myTransaction->getNdbError());
if(myTransaction->execute( NdbTransaction::Commit ) == -1)
if (i == 3) {
std::cout << "Detected that deleted tuple doesn't exist!\n";
} else {
APIERROR(myTransaction->getNdbError());
}
if (i != 3) {
printf(" %2d %2d\n", i, myRecAttr->u_32_value());
}
myNdb->closeTransaction(myTransaction);
}
}
/**************
* Drop table *
**************/
if (mysql_query(&mysql, "DROP TABLE MYTABLENAME"))
MYSQLERROR(mysql);
delete myNdb;
delete cluster_connection;
ndb_end(0);
return 0;
}
baseport: 14000
basedir: /space/autotest
mgm: CHOOSE_host1
ndb: CHOOSE_host2 CHOOSE_host3 CHOOSE_host2 CHOOSE_host3
api: CHOOSE_host1 CHOOSE_host1 CHOOSE_host1
-- cluster config
[DB DEFAULT]
NoOfReplicas: 2
IndexMemory: 100M
DataMemory: 300M
BackupMemory: 64M
MaxNoOfConcurrentScans: 100
DataDir: .
FileSystemPath: /space/autotest/run
[MGM DEFAULT]
PortNumber: 14000
ArbitrationRank: 1
DataDir: .
baseport: 16000
basedir: /space/autotest
mgm: CHOOSE_host1
ndb: CHOOSE_host2 CHOOSE_host3 CHOOSE_host2 CHOOSE_host3
api: CHOOSE_host1 CHOOSE_host1 CHOOSE_host1
-- cluster config
[DB DEFAULT]
NoOfReplicas: 2
IndexMemory: 100M
DataMemory: 300M
BackupMemory: 64M
MaxNoOfConcurrentScans: 100
DataDir: .
FileSystemPath: /space/autotest/run
[MGM DEFAULT]
PortNumber: 16000
ArbitrationRank: 1
DataDir: .
baseport: 16000
basedir: /space/autotest
mgm: CHOOSE_host1
ndb: CHOOSE_host2 CHOOSE_host3
mysqld: CHOOSE_host1 CHOOSE_host4
mysql: CHOOSE_host1 CHOOSE_host1 CHOOSE_host1 CHOOSE_host1 CHOOSE_host1 CHOOSE_host1 CHOOSE_host4 CHOOSE_host4 CHOOSE_host4 CHOOSE_host4 CHOOSE_host4 CHOOSE_host4
-- cluster config
[DB DEFAULT]
NoOfReplicas: 2
IndexMemory: 100M
DataMemory: 300M
BackupMemory: 64M
MaxNoOfConcurrentScans: 100
DataDir: .
FileSystemPath: /space/autotest/run
[MGM DEFAULT]
PortNumber: 16000
ArbitrationRank: 1
DataDir: .
CREATE DATABASE IF NOT EXISTS BANK default charset=latin1 default collate=latin1_bin;
USE BANK;
DROP TABLE IF EXISTS GL;
CREATE TABLE GL ( TIME BIGINT UNSIGNED NOT NULL,
ACCOUNT_TYPE INT UNSIGNED NOT NULL,
BALANCE INT UNSIGNED NOT NULL,
DEPOSIT_COUNT INT UNSIGNED NOT NULL,
DEPOSIT_SUM INT UNSIGNED NOT NULL,
WITHDRAWAL_COUNT INT UNSIGNED NOT NULL,
WITHDRAWAL_SUM INT UNSIGNED NOT NULL,
PURGED INT UNSIGNED NOT NULL,
PRIMARY KEY USING HASH (TIME,ACCOUNT_TYPE))
ENGINE = NDB;
DROP TABLE IF EXISTS ACCOUNT;
CREATE TABLE ACCOUNT ( ACCOUNT_ID INT UNSIGNED NOT NULL,
OWNER INT UNSIGNED NOT NULL,
BALANCE INT UNSIGNED NOT NULL,
ACCOUNT_TYPE INT UNSIGNED NOT NULL,
PRIMARY KEY USING HASH (ACCOUNT_ID))
ENGINE = NDB;
DROP TABLE IF EXISTS TRANSACTION;
CREATE TABLE TRANSACTION ( TRANSACTION_ID BIGINT UNSIGNED NOT NULL,
ACCOUNT INT UNSIGNED NOT NULL,
ACCOUNT_TYPE INT UNSIGNED NOT NULL,
OTHER_ACCOUNT INT UNSIGNED NOT NULL,
TRANSACTION_TYPE INT UNSIGNED NOT NULL,
TIME BIGINT UNSIGNED NOT NULL,
AMOUNT INT UNSIGNED NOT NULL,
PRIMARY KEY USING HASH (TRANSACTION_ID,ACCOUNT))
ENGINE = NDB;
DROP TABLE IF EXISTS SYSTEM_VALUES;
CREATE TABLE SYSTEM_VALUES ( SYSTEM_VALUES_ID INT UNSIGNED NOT NULL,
VALUE BIGINT UNSIGNED NOT NULL,
PRIMARY KEY USING HASH (SYSTEM_VALUES_ID))
ENGINE = NDB;
DROP TABLE IF EXISTS ACCOUNT_TYPE;
CREATE TABLE ACCOUNT_TYPE ( ACCOUNT_TYPE_ID INT UNSIGNED NOT NULL,
DESCRIPTION CHAR(64) NOT NULL,
PRIMARY KEY USING HASH (ACCOUNT_TYPE_ID))
ENGINE = NDB;
create database if not exists TEST_DB;
use TEST_DB;
drop table if exists T1;
create table T1 (KOL1 int unsigned not null,
KOL2 int unsigned not null,
KOL3 int unsigned not null,
KOL4 int unsigned not null,
KOL5 int unsigned not null,
primary key using hash(KOL1)) engine=ndb;
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include <NdbOut.hpp>
#include <NdbApi.hpp>
#include <NdbSleep.h>
#include <NDBT.hpp>
#include <HugoTransactions.hpp>
#include <getarg.h>
int
main(int argc, const char** argv){
ndb_init();
int _help = 0;
const char* db = 0;
struct getargs args[] = {
{ "database", 'd', arg_string, &db, "Database", "" },
{ "usage", '?', arg_flag, &_help, "Print help", "" }
};
int num_args = sizeof(args) / sizeof(args[0]);
int optind = 0, i;
char desc[] =
"<tabname>+ \nThis program listen to events on specified tables\n";
if(getarg(args, num_args, argc, argv, &optind) ||
argv[optind] == NULL || _help) {
arg_printusage(args, num_args, argv[0], desc);
return NDBT_ProgramExit(NDBT_WRONGARGS);
}
// Connect to Ndb
Ndb_cluster_connection con;
if(con.connect(12, 5, 1) != 0)
{
return NDBT_ProgramExit(NDBT_FAILED);
}
Ndb MyNdb( &con, db ? db : "TEST_DB" );
if(MyNdb.init() != 0){
ERR(MyNdb.getNdbError());
return NDBT_ProgramExit(NDBT_FAILED);
}
// Connect to Ndb and wait for it to become ready
while(MyNdb.waitUntilReady() != 0)
ndbout << "Waiting for ndb to become ready..." << endl;
int result = 0;
Uint64 last_gci= 0, cnt= 0;
NdbDictionary::Dictionary *myDict = MyNdb.getDictionary();
Vector<NdbDictionary::Event*> events;
Vector<NdbEventOperation*> event_ops;
for(i= optind; i<argc; i++)
{
const NdbDictionary::Table* table= myDict->getTable(argv[i]);
if(!table)
{
ndbout_c("Could not find table: %s, skipping", argv[i]);
continue;
}
BaseString name;
name.appfmt("EV-%s", argv[i]);
NdbDictionary::Event *myEvent= new NdbDictionary::Event(name.c_str());
myEvent->setTable(table->getName());
myEvent->addTableEvent(NdbDictionary::Event::TE_ALL);
for(int a = 0; a < table->getNoOfColumns(); a++){
myEvent->addEventColumn(a);
}
if (myDict->createEvent(* myEvent))
{
if(myDict->getNdbError().classification == NdbError::SchemaObjectExists)
{
g_info << "Event creation failed event exists\n";
if (myDict->dropEvent(name.c_str()))
{
g_err << "Failed to drop event: " << myDict->getNdbError() << endl;
result = 1;
goto end;
}
// try again
if (myDict->createEvent(* myEvent))
{
g_err << "Failed to create event: " << myDict->getNdbError() << endl;
result = 1;
goto end;
}
}
else
{
g_err << "Failed to create event: " << myDict->getNdbError() << endl;
result = 1;
goto end;
}
}
events.push_back(myEvent);
NdbEventOperation* pOp = MyNdb.createEventOperation(name.c_str());
if ( pOp == NULL ) {
g_err << "Event operation creation failed" << endl;
result = 1;
goto end;
}
for (int a = 0; a < table->getNoOfColumns(); a++)
{
pOp->getValue(table->getColumn(a)->getName());
pOp->getPreValue(table->getColumn(a)->getName());
}
event_ops.push_back(pOp);
}
for(i= 0; i<(int)event_ops.size(); i++)
{
if (event_ops[i]->execute())
{
g_err << "operation execution failed: " << event_ops[i]->getNdbError()
<< endl;
result = 1;
goto end;
}
}
while(true)
{
while(MyNdb.pollEvents(100) == 0);
NdbEventOperation* pOp;
while((pOp= MyNdb.nextEvent()) != 0)
{
if(pOp->getGCI() != last_gci)
{
if(cnt) ndbout_c("GCI: %lld events: %lld", last_gci, cnt);
cnt= 1;
last_gci= pOp->getGCI();
}
else
{
cnt++;
}
}
}
end:
return NDBT_ProgramExit(NDBT_OK);
}
template class Vector<NdbDictionary::Event*>;
template class Vector<NdbEventOperation*>;
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment