Commit e82eabcb authored by unknown's avatar unknown

more ndb restore cleanup

parent d294a753
......@@ -85,7 +85,7 @@ public:
struct AttributeS {
const AttributeDesc * Desc;
AttributeData * Data;
AttributeData Data;
};
class TupleS {
......@@ -97,7 +97,10 @@ private:
bool prepareRecord(const TableS &);
public:
TupleS() {};
TupleS() {
m_currentTable= 0;
allAttrData= 0;
};
~TupleS()
{
if (allAttrData)
......@@ -129,17 +132,13 @@ class TableS {
Uint32 m_nullBitmaskSize;
int pos;
char create_string[2048];
/*
char mysqlTableName[1024];
char mysqlDatabaseName[1024];
*/
void createAttr(NdbDictionary::Column *column);
public:
class NdbDictionary::Table* m_dictTable;
TableS (class NdbTableImpl* dictTable);
~TableS();
Uint32 getTableId() const {
return m_dictTable->getTableId();
......@@ -192,18 +191,26 @@ protected:
BackupFormat::FileHeader m_expectedFileHeader;
Uint32 m_nodeId;
Uint32 * m_buffer;
Uint32 m_bufferSize;
Uint32 * createBuffer(Uint32 bytes);
void * m_buffer;
void * m_buffer_ptr;
Uint32 m_buffer_sz;
Uint32 m_buffer_data_left;
void (* free_data_callback)();
bool openFile();
void setCtlFile(Uint32 nodeId, Uint32 backupId, const char * path);
void setDataFile(const BackupFile & bf, Uint32 no);
void setLogFile(const BackupFile & bf, Uint32 no);
Uint32 buffer_get_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb);
Uint32 buffer_read(void *ptr, Uint32 size, Uint32 nmemb);
Uint32 buffer_get_ptr_ahead(void **p_buf_ptr, Uint32 size, Uint32 nmemb);
Uint32 buffer_read_ahead(void *ptr, Uint32 size, Uint32 nmemb);
void setName(const char * path, const char * name);
BackupFile();
BackupFile(void (* free_data_callback)() = 0);
~BackupFile();
public:
bool readHeader();
......@@ -231,14 +238,11 @@ class RestoreMetaData : public BackupFile {
bool parseTableDescriptor(const Uint32 * data, Uint32 len);
public:
RestoreMetaData(const char * path, Uint32 nodeId, Uint32 bNo);
~RestoreMetaData();
virtual ~RestoreMetaData();
int loadContent();
Uint32 getNoOfTables() const { return allTables.size();}
const TableS * operator[](int i) const { return allTables[i];}
......@@ -254,24 +258,16 @@ class RestoreDataIterator : public BackupFile {
const TableS* m_currentTable;
TupleS m_tuple;
void * m_buffer;
void * m_buffer_ptr;
Uint32 m_buffer_sz;
Uint32 m_buffer_data_left;
void (* free_data_callback)();
public:
// Constructor
RestoreDataIterator(const RestoreMetaData &, void (* free_data_callback)());
~RestoreDataIterator();
~RestoreDataIterator() {};
// Read data file fragment header
bool readFragmentHeader(int & res);
bool validateFragmentFooter();
Uint32 get_buffer_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb, FILE *stream);
Uint32 fread_buffer(void *ptr, Uint32 size, Uint32 nmemb, FILE *stream);
const TupleS *getNextTuple(int & res);
};
......@@ -285,8 +281,34 @@ public:
EntryType m_type;
const TableS * m_table;
myVector<AttributeS*> m_values;
myVector<AttributeS*> m_values_e;
AttributeS *add_attr() {
AttributeS * attr;
if (m_values_e.size() > 0) {
attr = m_values_e[m_values_e.size()-1];
m_values_e.pop_back();
}
else
{
attr = new AttributeS;
}
m_values.push_back(attr);
return attr;
}
void clear() {
for(int i= 0; i < m_values.size(); i++)
m_values_e.push_back(m_values[i]);
m_values.clear();
}
~LogEntry()
{
for(int i= 0; i< m_values.size(); i++)
delete m_values[i];
for(int i= 0; i< m_values_e.size(); i++)
delete m_values_e[i];
}
int size() const { return m_values.size(); }
const AttributeS * operator[](int i) const { return m_values[i];}
};
class RestoreLogIterator : public BackupFile {
......@@ -297,7 +319,8 @@ private:
LogEntry m_logEntry;
public:
RestoreLogIterator(const RestoreMetaData &);
virtual ~RestoreLogIterator() {};
const LogEntry * getNextLogEntry(int & res);
};
......
......@@ -21,20 +21,14 @@
class BackupConsumer {
public:
virtual ~BackupConsumer() { }
virtual bool init() { return true;}
virtual bool table(const TableS &){return true;}
#ifdef USE_MYSQL
virtual bool table(const TableS &, MYSQL* mysqlp) {return true;};
#endif
virtual void tuple(const TupleS &){}
virtual void tuple_free(){}
virtual void endOfTuples(){}
virtual void logEntry(const LogEntry &){}
virtual void endOfLogEntrys(){}
protected:
#ifdef USE_MYSQL
int create_table_string(const TableS & table, char * ,char *);
#endif
};
#endif
......@@ -27,51 +27,10 @@ BackupPrinter::table(const TableS & tab)
return true;
}
#ifdef USE_MYSQL
bool
BackupPrinter::table(const TableS & tab, MYSQL * mysql)
{
if (m_print || m_print_meta)
{
char tmpTabName[MAX_TAB_NAME_SIZE*2];
sprintf(tmpTabName, "%s", tab.getTableName());
char * database = strtok(tmpTabName, "/");
char * schema = strtok( NULL , "/");
char * tableName = strtok( NULL , "/");
/**
* this means that the user did not specify schema
* and it is a v2x backup
*/
if(database == NULL)
return false;
if(schema == NULL)
return false;
if(tableName==NULL)
tableName = schema;
char stmtCreateDB[255];
sprintf(stmtCreateDB,"CREATE DATABASE %s", database);
ndbout_c("%s", stmtCreateDB);
char buf [2048];
create_table_string(tab, tableName, buf);
ndbout_c("%s", buf);
ndbout_c("Successfully printed table: %s", tab.m_dictTable->getName());
}
return true;
}
#endif
void
BackupPrinter::tuple(const TupleS & tup)
{
m_dataCount++;
m_dataCount++;
if (m_print || m_print_data)
m_ndbout << tup << endl;
}
......
......@@ -105,9 +105,8 @@ BackupRestore::~BackupRestore()
bool
BackupRestore::table(const TableS & table){
if (!m_restore_meta)
{
return true;
}
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
if (dict->createTable(*table.m_dictTable) == -1)
{
......@@ -320,13 +319,14 @@ BackupRestore::tuple_free()
if (!m_restore)
return;
// Send all transactions to NDB
if (m_transactions > 0)
if (m_transactions > 0) {
// Send all transactions to NDB
m_ndb->sendPreparedTransactions(0);
// Poll all transactions
while (m_transactions > 0)
m_ndb->pollNdb(3000, m_transactions);
// Poll all transactions
while (m_transactions > 0)
m_ndb->pollNdb(3000, m_transactions);
}
}
void
......@@ -375,12 +375,12 @@ BackupRestore::logEntry(const LogEntry & tup)
exit(-1);
}
for (int i = 0; i < tup.m_values.size(); i++)
for (int i = 0; i < tup.size(); i++)
{
const AttributeS * attr = tup.m_values[i];
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data->string_value;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size / 8) * arraySize;
if (attr->Desc->m_column->getPrimaryKey())
......@@ -389,19 +389,27 @@ BackupRestore::logEntry(const LogEntry & tup)
op->setValue(attr->Desc->attrId, dataPtr, length);
}
#if 1
trans->execute(Commit);
#else
const int ret = trans->execute(Commit);
// Both insert update and delete can fail during log running
// and it's ok
if (ret != 0)
{
err << "execute failed: " << trans->getNdbError() << endl;
exit(-1);
// Both insert update and delete can fail during log running
// and it's ok
// TODO: check that the error is either tuple exists or tuple does not exist?
switch(tup.m_type)
{
case LogEntry::LE_INSERT:
break;
case LogEntry::LE_UPDATE:
break;
case LogEntry::LE_DELETE:
break;
}
if (false)
{
err << "execute failed: " << trans->getNdbError() << endl;
exit(-1);
}
}
#endif
m_ndb->closeTransaction(trans);
m_logCount++;
......@@ -410,11 +418,11 @@ BackupRestore::logEntry(const LogEntry & tup)
void
BackupRestore::endOfLogEntrys()
{
if (m_restore)
{
info << "Restored " << m_dataCount << " tuples and "
<< m_logCount << " log entries" << endl;
}
if (!m_restore)
return;
info << "Restored " << m_dataCount << " tuples and "
<< m_logCount << " log entries" << endl;
}
/*
......@@ -471,7 +479,7 @@ BackupRestore::tuple(const TupleS & tup)
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data->string_value;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (attr->Desc->m_column->getPrimaryKey())
......@@ -483,11 +491,11 @@ BackupRestore::tuple(const TupleS & tup)
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data->string_value;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (!attr->Desc->m_column->getPrimaryKey())
if (attr->Data->null)
if (attr->Data.null)
op->setValue(i, NULL, 0);
else
op->setValue(i, dataPtr, length);
......
......@@ -45,13 +45,9 @@ public:
}
virtual ~BackupRestore();
virtual bool init();
virtual void release();
virtual bool table(const TableS &);
#ifdef USE_MYSQL
virtual bool table(const TableS &, MYSQL* mysqlp);
#endif
virtual void tuple(const TupleS &);
virtual void tuple_free();
virtual void tuple_a(restore_callback_t *cb);
......
......@@ -18,10 +18,6 @@
#include <Vector.hpp>
#include <ndb_limits.h>
#include <NdbTCP.h>
#ifdef USE_MYSQL
#include <mysql.h>
#endif
#include <NdbOut.hpp>
#include "consumer_restore.hpp"
......@@ -37,26 +33,7 @@ static int ga_backupId = 0;
static bool ga_dont_ignore_systab_0 = false;
static myVector<class BackupConsumer *> g_consumers;
#ifdef USE_MYSQL
/**
* mysql specific stuff:
*/
static const char* ga_user = "root";
static const char* ga_host = "localhost";
static const char* ga_socket = "/tmp/mysql.sock";
static const char* ga_password = "";
static const char* ga_database = "";
static int ga_port = 3306;
static bool use_mysql = false;
static MYSQL mysql;
#endif
#ifdef NDB_WIN32
static const char* ga_backupPath = "." DIR_SEPARATOR;
#else
static const char* ga_backupPath = "." DIR_SEPARATOR;
#endif
static const char* ga_connect_NDB = NULL;
......@@ -130,7 +107,6 @@ readArguments(const int argc, const char** argv)
ga_nParallelism < 1 ||
ga_nParallelism >1024)
{
arg_printusage(args, num_args, argv[0], "<path to backup files>\n");
return false;
}
......@@ -185,11 +161,11 @@ readArguments(const int argc, const char** argv)
}
{
BackupConsumer * c = printer;
BackupConsumer * c = printer;
g_consumers.push_back(c);
}
{
BackupConsumer * c = restore;
BackupConsumer * c = restore;
g_consumers.push_back(c);
}
// Set backup file path
......@@ -197,20 +173,6 @@ readArguments(const int argc, const char** argv)
{
ga_backupPath = argv[optind];
}
#ifdef USE_MYSQL
if(use_mysql) {
ga_dont_ignore_systab_0 = false;
ga_database = ""; //not used yet. pethaps later if we want to
// restore meta data in an existing mysql database,
// and not just restore it to the same database
// as when the backup was taken.
// If implementing this, then the
// tupleAsynch must also be changed so that the
// table data is restored to the correct table.
// also, mysql_select_db must be set properly (ie.,
// ignored in codw below)
}
#endif
return true;
}
......@@ -234,7 +196,6 @@ checkSysTable(const char *tableName)
strcmp(tableName, "sys/def/NDB$EVENTS_0") != 0);
}
static void
free_data_callback()
{
......@@ -305,25 +266,13 @@ main(int argc, const char** argv)
if (checkSysTable(metaData[i]->getTableName()))
{
for(int j = 0; j<g_consumers.size(); j++)
#ifdef USE_MYSQL
if(use_mysql) {
if (!g_consumers[j]->table(* metaData[i], &mysql))
{
ndbout_c("Restore: Failed to restore table: %s. "
"Exiting...",
metaData[i]->getTableName());
return -11;
}
} else
#endif
if (!g_consumers[j]->table(* metaData[i]))
{
ndbout_c("Restore: Failed to restore table: %s. "
"Exiting...",
metaData[i]->getTableName());
return -11;
}
if (!g_consumers[j]->table(* metaData[i]))
{
ndbout_c("Restore: Failed to restore table: %s. "
"Exiting...",
metaData[i]->getTableName());
return -11;
}
}
}
......@@ -341,10 +290,10 @@ main(int argc, const char** argv)
}
while (dataIter.readFragmentHeader(res))
while (dataIter.readFragmentHeader(res= 0))
{
const TupleS* tuple;
while ((tuple = dataIter.getNextTuple(res)) != NULL)
while ((tuple = dataIter.getNextTuple(res= 1)) != 0)
{
if (checkSysTable(tuple->getTable()->getTableName()))
for(int i = 0; i < g_consumers.size(); i++)
......@@ -366,40 +315,33 @@ main(int argc, const char** argv)
if (res < 0)
{
ndbout_c("Restore: An error occured while restoring data. "
"Exiting...");
err << "Restore: An error occured while restoring data. Exiting... res=" << res << endl;
return -1;
}
dataIter.validateFooter(); //not implemented
for (int i = 0; i<g_consumers.size(); i++)
g_consumers[i]->endOfTuples();
RestoreLogIterator logIter(metaData);
if (!logIter.readHeader())
{
ndbout << "Failed to read header of data file. Exiting...";
err << "Failed to read header of data file. Exiting..." << endl;
return -1;
}
/**
* I have not touched the part below : -johan 040218
* except fixing return values.
*/
const LogEntry * logEntry = 0;
while ((logEntry = logIter.getNextLogEntry(res)))
while ((logEntry = logIter.getNextLogEntry(res= 0)) != 0)
{
if (checkSysTable(logEntry->m_table->getTableName()))
{
for(int i = 0; i<g_consumers.size(); i++)
g_consumers[i]->logEntry(* logEntry);
}
}
if (res < 0)
{
ndbout_c("Restore: An restoring the data log"
"Exiting...");
err << "Restore: An restoring the data log. Exiting... res=" << res << endl;
return -1;
}
logIter.validateFooter(); //not implemented
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment