/* Copyright (C) 2003 MySQL AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; version 2 of the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */

/* ***************************************************
       FLEXHAMMER
       Hammer ndb with read, insert, update and delete transactions. 

       Arguments:
        -t Number of threads to start, default 1
        -o Number of operations per hammering-round, default 500
	-l Number of loops to run, default 1, 0=infinite
        -a Number of attributes, default 25
        -c Number of tables, default 1
	-s Size of each attribute, default 1
	-simple Use simple read to read from database
        -dirty Use dirty read to read from database
	-write Use writeTuple to write to db
	-r Number of records to Hammer
        -no_table_create Don't create tables in db
        -regulate To be able to regulate the load flexHammer produces.
        -stdtables Use standard table names
	-sleep Sleep a number of seconds before running the test, this 
	       can be used so that another flexProgram have tome to create tables

       Returns:
        0 - Test passed
       -1 - Test failed
        1 - Invalid arguments

Revision history:
  1.7  020208 epesson: Adapted to use NDBT
  1.10 020222 epesson: Finalised handling of thread results
  1.11 020222 epesson: Bug in checking results during delete fixed

 * *************************************************** */

#include <ndb_global.h>
#include <NdbApi.hpp>

#include <NdbMain.h>
#include <NdbThread.h>
#include <NdbSleep.h>
#include <NdbTick.h>
#include <NdbOut.hpp>
#include <NdbTimer.hpp>
#include <NdbTick.h>
#include <NdbTest.hpp>
#include <NDBT_Error.hpp>
#include <NdbSchemaCon.hpp>

ErrorData * flexHammerErrorData;


#define MAXSTRLEN 16 
#define MAXATTR 64
#define MAXTABLES 64
#define NDB_MAXTHREADS 256
/*
  NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
  #define from <sys/thread.h> on AIX (IBM compiler).  We explicitly
  #undef it here lest someone use it by habit and get really funny
  results.  K&R says we may #undef non-existent symbols, so let's go.
*/
#undef MAXTHREADS
#define MAXATTRSIZE 100
// Max number of retries if something fails
#define MaxNoOfAttemptsC 10 

enum StartType {
  stIdle,
  stHammer,
  stStop,
  stLast};

enum MyOpType {
  otInsert,
  otRead,
  otDelete,
  otUpdate,
  otLast};

struct ThreadNdb {
  int threadNo;
  NdbThread* threadLife;
  int threadReady;
  StartType threadStart;
  int threadResult;};

extern "C" void* flexHammerThread(void*);
static int setAttrNames(void);
static int setTableNames(void);
static int readArguments(int, const char**);
static int createTables(Ndb*);
static void sleepBeforeStartingTest(int seconds);
static int checkThreadResults(ThreadNdb *threadArrayP, char* phase);

//enum OperationType {
//  otInsert,
//  otRead,
//  otUpdate,
//  otDelete,
//  otVerifyDelete,
//  otLast };

enum ReadyType {
	stReady, 
	stRunning
} ;
static int			tNoOfThreads;
static int			tNoOfAttributes;
static int			tNoOfTables;
static int			tNoOfBackups;
static int			tAttributeSize;
static int			tNoOfOperations;
static int			tNoOfRecords;
static int			tNoOfLoops;
static				ReadyType ThreadReady[NDB_MAXTHREADS];
static				StartType ThreadStart[NDB_MAXTHREADS];
static char			tableName[MAXTABLES][MAXSTRLEN];
static char			attrName[MAXATTR][MAXSTRLEN];
static int			theSimpleFlag = 0;
static int			theWriteFlag = 0;
static int			theDirtyFlag = 0;
static int			theTableCreateFlag = 0;
static int			theStandardTableNameFlag = 0;
static unsigned int tSleepTime = 0;

#define START_TIMER { NdbTimer timer; timer.doStart();
#define STOP_TIMER timer.doStop();
#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); }; 


// Initialise thread data
void 
resetThreads(ThreadNdb *threadArrayP) {

  for (int i = 0; i < tNoOfThreads ; i++)
    {
      threadArrayP[i].threadReady = 0;
      threadArrayP[i].threadResult = 0;
      threadArrayP[i].threadStart = stIdle;
    }
} // resetThreads

void 
waitForThreads(ThreadNdb *threadArrayP)
{
  int cont = 1;

  while (cont) {
    NdbSleep_MilliSleep(100);
    cont = 0;
    for (int i = 0; i < tNoOfThreads ; i++) {
      if (threadArrayP[i].threadReady == 0) {
	cont = 1;
      } // if
    } // for
  } // while
} // waitForThreads

void 
tellThreads(ThreadNdb* threadArrayP, const StartType what)
{
  for (int i = 0; i < tNoOfThreads ; i++) 
    {
    threadArrayP[i].threadStart = what;
    } // for
} // tellThreads

static Ndb_cluster_connection *g_cluster_connection= 0;
 
NDB_COMMAND(flexHammer, "flexHammer", "flexHammer", "flexHammer", 65535)
//main(int argc, const char** argv)
{
  ndb_init();
  ThreadNdb* pThreads = NULL; // Pointer to thread data array
  Ndb* pMyNdb = NULL;	      // Pointer to Ndb object
  int tLoops = 0;
  int returnValue = 0;
  int check = 0;
  
  flexHammerErrorData = new ErrorData;

  flexHammerErrorData->resetErrorCounters();

  if (readArguments(argc, argv) != 0) {
    ndbout << "Wrong arguments to flexHammer" << endl;
    return NDBT_ProgramExit(NDBT_WRONGARGS);
  } // if

  /* print Setting */
  flexHammerErrorData->printSettings(ndbout);

  check = setAttrNames();
  if (check == -1) {
    ndbout << "Couldn't set attribute names" << endl;
    return NDBT_ProgramExit(NDBT_FAILED);
  } // if
  check = setTableNames();
  if (check == -1) {
    ndbout << "Couldn't set table names" << endl;
    return NDBT_ProgramExit(NDBT_FAILED);
  } // if

  // Create thread data array
  pThreads = new ThreadNdb[tNoOfThreads];
  // NdbThread_SetConcurrencyLevel(tNoOfThreads + 2);

  // Create and init Ndb object
  Ndb_cluster_connection con;
  if(con.connect(12, 5, 1) != 0)
  {
    return NDBT_ProgramExit(NDBT_FAILED);
  }
  g_cluster_connection= &con;
  pMyNdb = new Ndb(g_cluster_connection, "TEST_DB");
  pMyNdb->init();

  // Wait for Ndb to become ready
  if (pMyNdb->waitUntilReady(10000) != 0) {
    ndbout << "NDB is not ready" << endl << "Benchmark failed" << endl;
    returnValue = NDBT_FAILED;
  }

  else {
    check = createTables(pMyNdb);
    if (check != 0) {
      returnValue = NDBT_FAILED;
    } // if
    else {
      sleepBeforeStartingTest(tSleepTime);
      
      // Create threads.                                           *
      resetThreads(pThreads);
      for (int i = 0; i < tNoOfThreads ; i++) {  
	pThreads[i].threadNo = i;
	pThreads[i].threadLife = NdbThread_Create(flexHammerThread,
						  (void**)&pThreads[i],
						  65535,
						  "flexHammerThread",
                                                  NDB_THREAD_PRIO_LOW);
      } // for
      
      // And wait until they are ready
      waitForThreads(pThreads);
      if (checkThreadResults(pThreads, "init") != 0) {
        returnValue = NDBT_FAILED;
      } // if
      
      
      if (returnValue == NDBT_OK) {
	ndbout << endl <<  "All threads started" << endl << endl;
	
	for(;;) {
	  
	  // Check if it's time to exit program
	  if((tNoOfLoops != 0) && (tNoOfLoops <= tLoops))
	    break;
	  
	  // Tell all threads to start hammer
	  ndbout << "Hammering..." << endl;
	  
	  resetThreads(pThreads);
	  
	  START_TIMER;
	  tellThreads(pThreads, stHammer);
	  
	  waitForThreads(pThreads);
	  ndbout << "Threads ready to continue..." << endl;
	  STOP_TIMER;
	  
	  // Check here if anything went wrong
	  if (checkThreadResults(pThreads, "hammer") != 0) {
	    ndbout << "Thread(s) failed." << endl;
	    returnValue = NDBT_FAILED;
	  } // if
	  
	  PRINT_TIMER("hammer", tNoOfOperations*tNoOfThreads, tNoOfTables*6);
	  
	  ndbout << endl;
	  
	  tLoops++;
	  
	} // for
      } // if

      // Signaling threads to stop 
      resetThreads(pThreads);
      tellThreads(pThreads, stStop);
      
      // Wait for threads to stop
      waitForThreads(pThreads);
      
      ndbout << "----------------------------------------------" << endl << endl;
      ndbout << "Benchmark completed" << endl;
    } // else
  } // else
  // Clean up 

  flexHammerErrorData->printErrorCounters(ndbout);

  // Kill them all! 
  void* tmp;
  for(int i = 0; i < tNoOfThreads; i++){
    NdbThread_WaitFor(pThreads[i].threadLife, &tmp);
    NdbThread_Destroy(&pThreads[i].threadLife);
  }
  delete flexHammerErrorData;
  delete [] pThreads;
  delete pMyNdb;

  // Exit via NDBT
  return NDBT_ProgramExit(returnValue);
  
} //main

extern "C"
void*
flexHammerThread(void* pArg)
{
  ThreadNdb* pThreadData = (ThreadNdb*)pArg;
  unsigned int threadNo = pThreadData->threadNo;
  Ndb* pMyNdb = NULL ;
  NdbConnection *pMyTransaction = NULL ;
  //  NdbOperation*	pMyOperation[MAXTABLES] = {NULL};
  NdbOperation*	pMyOperation[MAXTABLES];
  int check = 0;
  int loop_count_ops = 0;
  int loop_count_tables = 0;
  int loop_count_attributes = 0;
  int count_round = 0;
  int count = 0;
  int count_tables = 0;
  int count_attributes = 0;
  int i = 0;
  int j = 0;
  int tThreadResult = 0;
  MyOpType tMyOpType = otLast;
  int pkValue = 0;
  int readValue[MAXATTR][MAXATTRSIZE] = {0};
  int attrValue[MAXATTRSIZE];
  NdbRecAttr* tTmp = NULL;
  int tNoOfAttempts = 0;
 
  for (i = 0; i < MAXATTRSIZE; i++)
    attrValue[i] = 0; 
  // Ndb object for each thread
  pMyNdb = new Ndb(g_cluster_connection, "TEST_DB" );
  pMyNdb->init();
  if (pMyNdb->waitUntilReady(10000) != 0) {
    // Error, NDB is not ready
    tThreadResult = 99;
    // Go to idle directly
    pThreadData->threadStart = stIdle;
  } // if
  
  for(;;) {
    pThreadData->threadResult = tThreadResult;
    pThreadData->threadReady = 1; // Signalling ready to main
    
    // If Idle just wait to be stopped from main
    while (pThreadData->threadStart == stIdle) {
      NdbSleep_MilliSleep(100);   
    } // while
    
    // Check if signal to exit is received
    if (pThreadData->threadStart == stStop) {
      pThreadData->threadReady = 1;
      // break out of eternal loop
      break;
    } // if
    
    // Set to Idle to prepare for possible error break
    pThreadData->threadStart = stIdle;
    
    // Prepare transaction
    loop_count_ops = tNoOfOperations;
    loop_count_tables = tNoOfTables;
    loop_count_attributes = tNoOfAttributes;
    
    for (count=0 ; count < loop_count_ops ; count++) {
      
      //pkValue = (int)(count + thread_base);
      // This limits the number of records used in this test
      pkValue = count % tNoOfRecords; 
      
      for (count_round = 0; count_round < 5; ) {
	switch (count_round) {
	case 0:       // Insert
	  tMyOpType = otInsert;
	  // Increase attrValues
	  for (i=0; i < MAXATTRSIZE; i ++) {
	    attrValue[i]++;
	  }
	  break;
	case 1: 
	case 3:       // Read and verify
	  tMyOpType = otRead;
	  break;
	case 2:       // Update
	  // Increase attrValues
	  for(i=0; i < MAXATTRSIZE; i ++) {
	    attrValue[i]++;
	  }
	  tMyOpType = otUpdate;
	  break;
	case 4:       // Delete
	  tMyOpType = otDelete;
	  break;
	default:
	  assert(false);
	  break;
	} // switch
	    
	// Get transaction object
	pMyTransaction = pMyNdb->startTransaction();
	if (pMyTransaction == NULL) {
	  // Fatal error
	  tThreadResult = 1;
	  // break out of for count_round loop waiting to be stopped by main
	  break;
	} // if

	for (count_tables = 0; count_tables < loop_count_tables; 
	     count_tables++) {
	  pMyOperation[count_tables] = 
	    pMyTransaction->getNdbOperation(tableName[count_tables]);	
	  if (pMyOperation[count_tables] == NULL) {
	    //Fatal error
	    tThreadResult = 2;
	    // break out of inner for count_tables loop
	    break;
	  } // if
		
	  switch (tMyOpType) {
	  case otInsert:			// Insert case
	    if (theWriteFlag == 1 && theDirtyFlag == 1) {
	      check = pMyOperation[count_tables]->dirtyWrite();
	    } else if (theWriteFlag == 1) {
	      check = pMyOperation[count_tables]->writeTuple();
	    } else {
	      check = pMyOperation[count_tables]->insertTuple();
	    } // if else
	    break;
	  case otRead:			// Read Case
	    if (theSimpleFlag == 1) {
	      check = pMyOperation[count_tables]->simpleRead();
	    } else if (theDirtyFlag == 1) {
	      check = pMyOperation[count_tables]->dirtyRead();
	    } else {
	      check = pMyOperation[count_tables]->readTuple();
	    } // if else
	    break;
	  case otUpdate:			// Update Case
	    if (theWriteFlag == 1 && theDirtyFlag == 1) {
	      check = pMyOperation[count_tables]->dirtyWrite();
	    } else if (theWriteFlag == 1) {
	      check = pMyOperation[count_tables]->writeTuple();
	    } else if (theDirtyFlag == 1) {
	      check = pMyOperation[count_tables]->dirtyUpdate();
	    } else {
	      check = pMyOperation[count_tables]->updateTuple();
	    } // if else
	    break;
	  case otDelete:			// Delete Case
	    check = pMyOperation[count_tables]->deleteTuple();
	    break;
	  default:
	    assert(false);
	    break;
	  } // switch
	  if (check == -1) {
	    // Fatal error
	    tThreadResult = 3;
	    // break out of inner for count_tables loop
	    break;
	  } // if

	  check = pMyOperation[count_tables]->equal( (char*)attrName[0], 
						    (char*)&pkValue );

	  if (check == -1) {
	    // Fatal error
	    tThreadResult = 4;
	    ndbout << "pMyOperation equal failed" << endl;
	    // break out of inner for count_tables loop
	    break;
	  } // if
	  
	  check = -1;
	  tTmp = NULL;
	  switch (tMyOpType) {
	  case otInsert:			// Insert case
	  case otUpdate:			// Update Case
	    for (count_attributes = 1; count_attributes < loop_count_attributes;
		 count_attributes++) {
	      check = 
		pMyOperation[count_tables]->setValue((char*)attrName[count_attributes], (char*)&attrValue[0]);
	    } // for
	    break;
	  case otRead:			// Read Case
	    for (count_attributes = 1; count_attributes < loop_count_attributes; 
		 count_attributes++) {
	      tTmp = pMyOperation[count_tables]->
		getValue( (char*)attrName[count_attributes], 
			  (char*)&readValue[count_attributes][0] );
	    } // for
	    break;
	  case otDelete:			// Delete Case
	    break;
	  default:
	    assert(false);
	    break;
	  } // switch
	  if (check == -1 && tTmp == NULL && tMyOpType != otDelete) {
	    // Fatal error
	    tThreadResult = 5;
	    break;
	  } // if
	} // for count_tables
	    
	// Only execute if everything is OK
	if (tThreadResult != 0) {
	  // Close transaction (below)
	  // and continue with next count_round
	  count_round++;
	  tNoOfAttempts = 0;
	} // if
	else {
	  check = pMyTransaction->execute(Commit);
	  if (check == -1 ) {
	    const NdbError & err = pMyTransaction->getNdbError();

	// Add complete error handling here

              int retCode = flexHammerErrorData->handleErrorCommon(pMyTransaction->getNdbError());
              if (retCode == 1) {
		//if (strcmp(pMyTransaction->getNdbError().message, "Tuple did not exist") != 0 && strcmp(pMyTransaction->getNdbError().message,"Tuple already existed when attempting to insert") != 0) ndbout_c("execute: %s", pMyTransaction->getNdbError().message);

		if (pMyTransaction->getNdbError().code != 626 && pMyTransaction->getNdbError().code != 630){
     ndbout_c("Error code = %d", pMyTransaction->getNdbError().code);
     ndbout_c("execute: %s", pMyTransaction->getNdbError().message);}

              } else if (retCode == 2) {
                ndbout << "4115 should not happen in flexHammer" << endl;
              } else if (retCode == 3) {
// --------------------------------------------------------------------
// We are not certain if the transaction was successful or not.
// We must reexecute but might very well find that the transaction
// actually was updated. Updates and Reads are no problem here. Inserts
// will not cause a problem if error code 630 arrives. Deletes will
// not cause a problem if 626 arrives.
// --------------------------------------------------------------------
		/* What can we do here? */
		ndbout_c("execute: %s", pMyTransaction->getNdbError().message);
                 }//if(retCode == 3)
	// End of adding complete error handling

	    switch( err.classification) {
	    case NdbError::ConstraintViolation:	// Tuple already existed
	      count_round++;
	      tNoOfAttempts = 0;
	      break;
	    case NdbError::TimeoutExpired:
	    case NdbError::NodeRecoveryError:
	    case NdbError::TemporaryResourceError:
	    case NdbError::OverloadError:
	      if (tNoOfAttempts <= MaxNoOfAttemptsC) {
		// Retry
		tNoOfAttempts++;
	      } else {
		// Too many retries, continue with next
		count_round++;
		tNoOfAttempts = 0;
	      } // else if
	      break;
	      // Fatal, just continue
	    default:
	      count_round++;
	      tNoOfAttempts = 0;
	      break;
	    } // switch
	  } // if
	  else {
	    // Execute commit was OK
	    // This is verifying read values
	    //switch (tMyOpType) {
	    //case otRead:  // Read case
	    //for (j = 0; j < tNoOfAttributes; j++) {
	    //for(i = 1; i < tAttributeSize; i++) {
	    //if ( readValue[j][i] != attrValue[i]) {
	    //ndbout << "pkValue = " << pkValue << endl;
	    //ndbout << "readValue != attrValue" << endl;
	    //ndbout << readValue[j][i] << " != " << attrValue[i] << endl;
	    //} // if
	    //  } // for
	    //} // for
	    //break;
	    //} // switch
	    count_round++;
	    tNoOfAttempts = 0;
	  } // else if
	} // else if
	pMyNdb->closeTransaction(pMyTransaction);
      } // for count_round
    } // for count
  } // for (;;)

  // Clean up
  delete pMyNdb; 
  pMyNdb = NULL;

  flexHammerErrorData->resetErrorCounters();

  return  NULL; // thread exits
  
} // flexHammerThread


int
readArguments (int argc, const char** argv)
{
  int i = 1;
  
  tNoOfThreads = 5;		// Default Value
  tNoOfOperations = 500;	// Default Value
  tNoOfRecords = 1;             // Default Value
  tNoOfLoops = 1;	        // Default Value
  tNoOfAttributes = 25;		// Default Value
  tNoOfTables = 1;		// Default Value
  tNoOfBackups = 0;		// Default Value
  tAttributeSize = 1;		// Default Value
  theTableCreateFlag = 0;
  
  while (argc > 1) {
    if (strcmp(argv[i], "-t") == 0) {
      tNoOfThreads = atoi(argv[i+1]);
      if ((tNoOfThreads < 1) || (tNoOfThreads > NDB_MAXTHREADS))
	return(1);
    }
    else if (strcmp(argv[i], "-o") == 0) {
      tNoOfOperations = atoi(argv[i+1]);
      if (tNoOfOperations < 1)
	return(1);
    }
    else if (strcmp(argv[i], "-r") == 0) {
      tNoOfRecords = atoi(argv[i+1]);
      if (tNoOfRecords < 1)
	return(1);
    }
    else if (strcmp(argv[i], "-a") == 0) {
      tNoOfAttributes = atoi(argv[i+1]);
      if ((tNoOfAttributes < 2) || (tNoOfAttributes > MAXATTR))
	return(1);
    }
    else if (strcmp(argv[i], "-c") == 0) {
      tNoOfTables = atoi(argv[i+1]);
      if ((tNoOfTables < 1) || (tNoOfTables > MAXTABLES))
	return(1);
    }
    else if (strcmp(argv[i], "-l") == 0) {
      tNoOfLoops = atoi(argv[i+1]);
      if ((tNoOfLoops < 0) || (tNoOfLoops > 100000))
	return(1);
    }
    else if (strcmp(argv[i], "-s") == 0) {
      tAttributeSize = atoi(argv[i+1]);
      if ((tAttributeSize < 1) || (tAttributeSize > MAXATTRSIZE))
	return(1);
    }
    else if (strcmp(argv[i], "-sleep") == 0) {
      tSleepTime = atoi(argv[i+1]);
      if ((tSleepTime < 1) || (tSleepTime > 3600))
	exit(-1);
    }
    else if (strcmp(argv[i], "-simple") == 0) {
      theSimpleFlag = 1;
      argc++;
      i--;
    }
    else if (strcmp(argv[i], "-write") == 0) {
      theWriteFlag = 1;
      argc++;
      i--;
    }
    else if (strcmp(argv[i], "-dirty") == 0) {
      theDirtyFlag = 1;
      argc++;
      i--;
    }
    else if (strcmp(argv[i], "-no_table_create") == 0) {
      theTableCreateFlag = 1;
      argc++;
      i--;
    }
    else if (strcmp(argv[i], "-stdtables") == 0) {
      theStandardTableNameFlag = 1;
      argc++;
      i--;
    } // if
    else {
      return(1);
    }
    
    argc -= 2;
    i = i + 2;
  } // while
  
  ndbout << endl << "FLEXHAMMER - Starting normal mode" << endl;
  ndbout << "Hammer ndb with read, insert, update and delete transactions"<< endl << endl;
  
  ndbout << "  " << tNoOfThreads << " thread(s) " << endl;
  ndbout << "  " << tNoOfLoops << " iterations " << endl;
  ndbout << "  " << tNoOfTables << " table(s) and " << 1 << " operation(s) per transaction " << endl;
  ndbout << "  " << tNoOfRecords << " records to hammer(limit this with the -r option)" << endl;
  ndbout << "  " << tNoOfAttributes << " attributes per table " << endl;
  ndbout << "  " << tNoOfOperations << " transaction(s) per thread and round " << endl;
  ndbout << "  " << tAttributeSize << " is the number of 32 bit words per attribute " << endl << endl;
  return 0;
} // readArguments


void sleepBeforeStartingTest(int seconds)
{
  if (seconds > 0) {
    ndbout << "Sleeping(" << seconds << ")...";
    NdbSleep_SecSleep(seconds);
    ndbout << " done!" << endl;
  } // if
} // sleepBeforeStartingTest

static int
createTables(Ndb* pMyNdb)
{
  int i = 0;
  int j = 0;
  int check = 0;
  NdbSchemaCon *MySchemaTransaction = NULL;
  NdbSchemaOp *MySchemaOp = NULL;

  //	Create Table and Attributes. 	                          
  if (theTableCreateFlag == 0) {

    for (i = 0; i < tNoOfTables; i++) {

      ndbout << "Creating " << tableName[i] << "...";
      // Check if table exists already
      const void * p = pMyNdb->getDictionary()->getTable(tableName[i]);
      if (p != 0) {
	ndbout << " already exists." << endl;
	// Continue with next table at once
	continue;
      } // if
      ndbout << endl;
      
      MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb);
      if (MySchemaTransaction == NULL) {
	return(-1);
      } // if
      
      MySchemaOp = MySchemaTransaction->getNdbSchemaOp();	
      if (MySchemaOp == NULL) {
	// Clean up opened schema transaction
	NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
	return(-1);
      } // if
      
      // Create tables, rest of parameters are default right now
      check = MySchemaOp->createTable(tableName[i],
				      8,		// Table Size
				      TupleKey,	// Key Type
				      40);		// Nr of Pages

      if (check == -1) { 
	// Clean up opened schema transaction
	NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
	return(-1);
      } // if
      
      // Primary key
      //ndbout << "  pk " << (char*)&attrName[0] << "..." << endl;
      check = MySchemaOp->createAttribute( (char*)attrName[0], TupleKey, 32,
					   1, UnSigned, MMBased,
					   NotNullAttribute );
      if (check == -1) { 
	// Clean up opened schema transaction
	NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
	return(-1);
      } // if

      // Rest of attributes
      for (j = 1; j < tNoOfAttributes ; j++) {
	//ndbout << "    " << (char*)attrName[j] << "..." << endl;
	check = MySchemaOp->createAttribute( (char*)attrName[j], NoKey, 32, 
					     tAttributeSize, UnSigned, MMBased, 
					     NotNullAttribute );
	if (check == -1) {
	  // Clean up opened schema transaction
	  NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
	  return(-1);
	} // if
      } // for
  
      // Execute creation
      check = MySchemaTransaction->execute();
      if (check == -1) {
	// Clean up opened schema transaction
	NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
	return(-1);
      } // if
      
      NdbSchemaCon::closeSchemaTrans(MySchemaTransaction);
    } // for
  } // if

  return(0);

} // createTables 


static int setAttrNames()
{
  int i = 0;
  int retVal = 0;

  for (i = 0; i < MAXATTR ; i++) {
    retVal = BaseString::snprintf(attrName[i], MAXSTRLEN, "COL%d", i);
    if (retVal < 0) {
      // Error in conversion
      return(-1);
    } // if
  } // for

  return (0);
} // setAttrNames

static int setTableNames()
{
  // Note! Uses only uppercase letters in table name's
  // so that we can look at the tables wits SQL
  int i = 0;
  int retVal = 0;

  for (i = 0; i < MAXTABLES ; i++) {
    if (theStandardTableNameFlag == 0) {
      retVal = BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%d", i, 
		 NdbTick_CurrentMillisecond()/1000);
    } // if 
    else {
      retVal = BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i);
    } // else
    if (retVal < 0) {
      // Error in conversion
      return(-1);
    } // if
  } // for

  return(0);
} // setTableNames

static int checkThreadResults(ThreadNdb *threadArrayP, char* phase)
{
  int i = 0;

  for (i = 0; i < tNoOfThreads; i++) {
    if (threadArrayP[i].threadResult != 0) {
      ndbout << "Thread " << i << " reported fatal error " 
	     << threadArrayP[i].threadResult << " during " << phase << endl;
      return(-1);
    } // if
  } // for

  return(0);
}