Commit 2be8bc02 authored by joreland@mysql.com's avatar joreland@mysql.com

Merge mysql.com:/home/jonas/src/mysql-4.1

into mysql.com:/home/jonas/src/mysql-4.1-ndb
parents fc7ba088 41d4cd89
...@@ -111,7 +111,7 @@ inline ...@@ -111,7 +111,7 @@ inline
void void
TcKeyConf::setNoOfOperations(Uint32 & confInfo, Uint32 noOfOps){ TcKeyConf::setNoOfOperations(Uint32 & confInfo, Uint32 noOfOps){
ASSERT_MAX(noOfOps, 65535, "TcKeyConf::setNoOfOperations"); ASSERT_MAX(noOfOps, 65535, "TcKeyConf::setNoOfOperations");
confInfo |= noOfOps; confInfo = (confInfo & 0xFFFF0000) | noOfOps;
} }
inline inline
......
...@@ -165,6 +165,7 @@ Parser<T>::Parser(const ParserRow<T> rows[], class InputStream & in, ...@@ -165,6 +165,7 @@ Parser<T>::Parser(const ParserRow<T> rows[], class InputStream & in,
template<class T> template<class T>
inline inline
Parser<T>::~Parser(){ Parser<T>::~Parser(){
delete impl;
} }
template<class T> template<class T>
......
ndbtools_PROGRAMS = ndb_restore ndbtools_PROGRAMS = ndb_restore
ndb_restore_SOURCES = main.cpp Restore.cpp ndb_restore_SOURCES = main.cpp consumer.cpp consumer_restore.cpp consumer_printer.cpp Restore.cpp
LDADD_LOC = $(top_builddir)/ndb/src/libndbclient.la LDADD_LOC = $(top_builddir)/ndb/src/libndbclient.la
......
include .defs.mk
TYPE := *
BIN_TARGET := restore
BIN_TARGET_LIBS :=
BIN_TARGET_ARCHIVES := NDB_API
CCFLAGS_LOC = -I.. -I$(NDB_TOP)/src/ndbapi -I$(NDB_TOP)/include/ndbapi -I$(NDB_TOP)/include/util -I$(NDB_TOP)/include/portlib -I$(NDB_TOP)/include/kernel
#ifneq ($(MYSQLCLUSTER_TOP),)
#CCFLAGS_LOC +=-I$(MYSQLCLUSTER_TOP)/include -D USE_MYSQL
#LDFLAGS_LOC += -L$(MYSQLCLUSTER_TOP)/libmysql_r/ -lmysqlclient_r
#endif
SOURCES = main.cpp Restore.cpp
include $(NDB_TOP)/Epilogue.mk
...@@ -33,32 +33,32 @@ Uint32 Twiddle32(Uint32 in); // Byte shift 32-bit data ...@@ -33,32 +33,32 @@ Uint32 Twiddle32(Uint32 in); // Byte shift 32-bit data
Uint64 Twiddle64(Uint64 in); // Byte shift 64-bit data Uint64 Twiddle64(Uint64 in); // Byte shift 64-bit data
bool bool
BackupFile::Twiddle(AttributeS* attr, Uint32 arraySize){ BackupFile::Twiddle(const AttributeDesc* attr_desc, AttributeData* attr_data, Uint32 arraySize){
if(m_hostByteOrder) if(m_hostByteOrder)
return true; return true;
if(arraySize == 0){ if(arraySize == 0){
arraySize = attr->Desc->arraySize; arraySize = attr_desc->arraySize;
} }
switch(attr->Desc->size){ switch(attr_desc->size){
case 8: case 8:
return true; return true;
case 16: case 16:
for(unsigned i = 0; i<arraySize; i++){ for(unsigned i = 0; i<arraySize; i++){
attr->Data.u_int16_value[i] = Twiddle16(attr->Data.u_int16_value[i]); attr_data->u_int16_value[i] = Twiddle16(attr_data->u_int16_value[i]);
} }
return true; return true;
case 32: case 32:
for(unsigned i = 0; i<arraySize; i++){ for(unsigned i = 0; i<arraySize; i++){
attr->Data.u_int32_value[i] = Twiddle32(attr->Data.u_int32_value[i]); attr_data->u_int32_value[i] = Twiddle32(attr_data->u_int32_value[i]);
} }
return true; return true;
case 64: case 64:
for(unsigned i = 0; i<arraySize; i++){ for(unsigned i = 0; i<arraySize; i++){
attr->Data.u_int64_value[i] = Twiddle64(attr->Data.u_int64_value[i]); attr_data->u_int64_value[i] = Twiddle64(attr_data->u_int64_value[i]);
} }
return true; return true;
default: default:
...@@ -82,14 +82,14 @@ RestoreMetaData::RestoreMetaData(const char* path, Uint32 nodeId, Uint32 bNo) { ...@@ -82,14 +82,14 @@ RestoreMetaData::RestoreMetaData(const char* path, Uint32 nodeId, Uint32 bNo) {
} }
RestoreMetaData::~RestoreMetaData(){ RestoreMetaData::~RestoreMetaData(){
for(int i = 0; i<allTables.size(); i++) for(Uint32 i= 0; i < allTables.size(); i++)
delete allTables[i]; delete allTables[i];
allTables.clear(); allTables.clear();
} }
const TableS * const TableS *
RestoreMetaData::getTable(Uint32 tableId) const { RestoreMetaData::getTable(Uint32 tableId) const {
for(int i = 0; i<allTables.size(); i++) for(Uint32 i= 0; i < allTables.size(); i++)
if(allTables[i]->getTableId() == tableId) if(allTables[i]->getTableId() == tableId)
return allTables[i]; return allTables[i];
return NULL; return NULL;
...@@ -122,7 +122,8 @@ RestoreMetaData::readMetaTableList() { ...@@ -122,7 +122,8 @@ RestoreMetaData::readMetaTableList() {
Uint32 sectionInfo[2]; Uint32 sectionInfo[2];
if (fread(&sectionInfo, sizeof(sectionInfo), 1, m_file) != 1){ if (buffer_read(&sectionInfo, sizeof(sectionInfo), 1) != 1){
err << "readMetaTableList read header error" << endl;
return 0; return 0;
} }
sectionInfo[0] = ntohl(sectionInfo[0]); sectionInfo[0] = ntohl(sectionInfo[0]);
...@@ -130,11 +131,9 @@ RestoreMetaData::readMetaTableList() { ...@@ -130,11 +131,9 @@ RestoreMetaData::readMetaTableList() {
const Uint32 tabCount = sectionInfo[1] - 2; const Uint32 tabCount = sectionInfo[1] - 2;
const Uint32 len = 4 * tabCount; void *tmp;
if(createBuffer(len) == 0) if (buffer_get_ptr(&tmp, 4, tabCount) != tabCount){
abort(); err << "readMetaTableList read tabCount error" << endl;
if (fread(m_buffer, 1, len, m_file) != len){
return 0; return 0;
} }
...@@ -147,7 +146,7 @@ RestoreMetaData::readMetaTableDesc() { ...@@ -147,7 +146,7 @@ RestoreMetaData::readMetaTableDesc() {
Uint32 sectionInfo[2]; Uint32 sectionInfo[2];
// Read section header // Read section header
if (fread(&sectionInfo, sizeof(sectionInfo), 1, m_file) != 1){ if (buffer_read(&sectionInfo, sizeof(sectionInfo), 1) != 1){
err << "readMetaTableDesc read header error" << endl; err << "readMetaTableDesc read header error" << endl;
return false; return false;
} // if } // if
...@@ -156,20 +155,15 @@ RestoreMetaData::readMetaTableDesc() { ...@@ -156,20 +155,15 @@ RestoreMetaData::readMetaTableDesc() {
assert(sectionInfo[0] == BackupFormat::TABLE_DESCRIPTION); assert(sectionInfo[0] == BackupFormat::TABLE_DESCRIPTION);
// Allocate temporary storage for dictTabInfo buffer
const Uint32 len = (sectionInfo[1] - 2);
if (createBuffer(4 * (len+1)) == NULL) {
err << "readMetaTableDesc allocation error" << endl;
return false;
} // if
// Read dictTabInfo buffer // Read dictTabInfo buffer
if (fread(m_buffer, 4, len, m_file) != len){ const Uint32 len = (sectionInfo[1] - 2);
void *ptr;
if (buffer_get_ptr(&ptr, 4, len) != len){
err << "readMetaTableDesc read error" << endl; err << "readMetaTableDesc read error" << endl;
return false; return false;
} // if } // if
return parseTableDescriptor(m_buffer, len); return parseTableDescriptor((Uint32*)ptr, len);
} }
bool bool
...@@ -177,11 +171,10 @@ RestoreMetaData::readGCPEntry() { ...@@ -177,11 +171,10 @@ RestoreMetaData::readGCPEntry() {
Uint32 data[4]; Uint32 data[4];
BackupFormat::CtlFile::GCPEntry * dst = BackupFormat::CtlFile::GCPEntry * dst =
(BackupFormat::CtlFile::GCPEntry *)&data[0]; (BackupFormat::CtlFile::GCPEntry *)&data[0];
if(fread(dst, 4, 4, m_file) != 4){ if(buffer_read(dst, 4, 4) != 4){
err << "readGCPEntry read error" << endl; err << "readGCPEntry read error" << endl;
return false; return false;
} }
...@@ -208,10 +201,16 @@ TableS::TableS(NdbTableImpl* tableImpl) ...@@ -208,10 +201,16 @@ TableS::TableS(NdbTableImpl* tableImpl)
m_dictTable = tableImpl; m_dictTable = tableImpl;
m_noOfNullable = m_nullBitmaskSize = 0; m_noOfNullable = m_nullBitmaskSize = 0;
for (Uint32 i = 0; i < tableImpl->getNoOfColumns(); i++) for (int i = 0; i < tableImpl->getNoOfColumns(); i++)
createAttr(tableImpl->getColumn(i)); createAttr(tableImpl->getColumn(i));
} }
TableS::~TableS()
{
for (Uint32 i= 0; i < allAttributesDesc.size(); i++)
delete allAttributesDesc[i];
}
// Parse dictTabInfo buffer and pushback to to vector storage // Parse dictTabInfo buffer and pushback to to vector storage
bool bool
RestoreMetaData::parseTableDescriptor(const Uint32 * data, Uint32 len) RestoreMetaData::parseTableDescriptor(const Uint32 * data, Uint32 len)
...@@ -246,56 +245,68 @@ RestoreMetaData::parseTableDescriptor(const Uint32 * data, Uint32 len) ...@@ -246,56 +245,68 @@ RestoreMetaData::parseTableDescriptor(const Uint32 * data, Uint32 len)
} }
// Constructor // Constructor
RestoreDataIterator::RestoreDataIterator(const RestoreMetaData & md) RestoreDataIterator::RestoreDataIterator(const RestoreMetaData & md, void (* _free_data_callback)())
: m_metaData(md) : BackupFile(_free_data_callback), m_metaData(md)
{ {
debug << "RestoreDataIterator constructor" << endl; debug << "RestoreDataIterator constructor" << endl;
setDataFile(md, 0); setDataFile(md, 0);
} }
RestoreDataIterator::~RestoreDataIterator(){ TupleS & TupleS::operator=(const TupleS& tuple)
{
prepareRecord(*tuple.m_currentTable);
if (allAttrData)
memcpy(allAttrData, tuple.allAttrData, getNoOfAttributes()*sizeof(AttributeData));
return *this;
};
int TupleS::getNoOfAttributes() const {
if (m_currentTable == 0)
return 0;
return m_currentTable->getNoOfAttributes();
};
const TableS * TupleS::getTable() const {
return m_currentTable;
};
const AttributeDesc * TupleS::getDesc(int i) const {
return m_currentTable->allAttributesDesc[i];
} }
AttributeData * TupleS::getData(int i) const{
return &(allAttrData[i]);
};
bool bool
TupleS::prepareRecord(const TableS & tab){ TupleS::prepareRecord(const TableS & tab){
m_currentTable = &tab; if (allAttrData) {
for(int i = 0; i<allAttributes.size(); i++) { if (getNoOfAttributes() == tab.getNoOfAttributes())
if(allAttributes[i] != NULL) {
delete allAttributes[i]; m_currentTable = &tab;
} return true;
allAttributes.clear();
AttributeS * a;
for(int i = 0; i<tab.getNoOfAttributes(); i++){
a = new AttributeS;
if(a == NULL) {
ndbout_c("Restore: Failed to allocate memory");
return false;
} }
a->Desc = tab[i]; delete [] allAttrData;
allAttributes.push_back(a); m_currentTable= 0;
} }
allAttrData = new AttributeData[tab.getNoOfAttributes()];
if (allAttrData == 0)
return false;
m_currentTable = &tab;
return true; return true;
} }
const TupleS * const TupleS *
RestoreDataIterator::getNextTuple(int & res) { RestoreDataIterator::getNextTuple(int & res)
TupleS * tup = new TupleS(); {
if(tup == NULL) {
ndbout_c("Restore: Failed to allocate memory");
res = -1;
return NULL;
}
if(!tup->prepareRecord(* m_currentTable)) {
res =-1;
return NULL;
}
Uint32 dataLength = 0; Uint32 dataLength = 0;
// Read record length // Read record length
if (fread(&dataLength, sizeof(dataLength), 1, m_file) != 1){ if (buffer_read(&dataLength, sizeof(dataLength), 1) != 1){
err << "getNextTuple:Error reading length of data part" << endl; err << "getNextTuple:Error reading length of data part" << endl;
delete tup;
res = -1; res = -1;
return NULL; return NULL;
} // if } // if
...@@ -309,34 +320,34 @@ RestoreDataIterator::getNextTuple(int & res) { ...@@ -309,34 +320,34 @@ RestoreDataIterator::getNextTuple(int & res) {
// End of this data fragment // End of this data fragment
debug << "End of fragment" << endl; debug << "End of fragment" << endl;
res = 0; res = 0;
delete tup;
return NULL; return NULL;
} // if } // if
tup->createDataRecord(dataLenBytes);
// Read tuple data // Read tuple data
if (fread(tup->getDataRecord(), 1, dataLenBytes, m_file) != dataLenBytes) { void *_buf_ptr;
if (buffer_get_ptr(&_buf_ptr, 1, dataLenBytes) != dataLenBytes) {
err << "getNextTuple:Read error: " << endl; err << "getNextTuple:Read error: " << endl;
delete tup;
res = -1; res = -1;
return NULL; return NULL;
} }
Uint32 * ptr = tup->getDataRecord(); Uint32 *buf_ptr = (Uint32*)_buf_ptr, *ptr = buf_ptr;
ptr += m_currentTable->m_nullBitmaskSize; ptr += m_currentTable->m_nullBitmaskSize;
for(int i = 0; i < m_currentTable->m_fixedKeys.size(); i++){ for(Uint32 i= 0; i < m_currentTable->m_fixedKeys.size(); i++){
assert(ptr < tup->getDataRecord() + dataLength); assert(ptr < buf_ptr + dataLength);
const Uint32 attrId = m_currentTable->m_fixedKeys[i]->attrId; const Uint32 attrId = m_currentTable->m_fixedKeys[i]->attrId;
AttributeS * attr = tup->allAttributes[attrId];
const Uint32 sz = attr->Desc->getSizeInWords(); AttributeData * attr_data = m_tuple.getData(attrId);
const AttributeDesc * attr_desc = m_tuple.getDesc(attrId);
attr->Data.null = false; const Uint32 sz = attr_desc->getSizeInWords();
attr->Data.void_value = ptr;
if(!Twiddle(attr)) attr_data->null = false;
attr_data->void_value = ptr;
if(!Twiddle(attr_desc, attr_data))
{ {
res = -1; res = -1;
return NULL; return NULL;
...@@ -344,18 +355,20 @@ RestoreDataIterator::getNextTuple(int & res) { ...@@ -344,18 +355,20 @@ RestoreDataIterator::getNextTuple(int & res) {
ptr += sz; ptr += sz;
} }
for(int i = 0; i<m_currentTable->m_fixedAttribs.size(); i++){ for(Uint32 i = 0; i < m_currentTable->m_fixedAttribs.size(); i++){
assert(ptr < tup->getDataRecord() + dataLength); assert(ptr < buf_ptr + dataLength);
const Uint32 attrId = m_currentTable->m_fixedAttribs[i]->attrId; const Uint32 attrId = m_currentTable->m_fixedAttribs[i]->attrId;
AttributeS * attr = tup->allAttributes[attrId];
const Uint32 sz = attr->Desc->getSizeInWords(); AttributeData * attr_data = m_tuple.getData(attrId);
const AttributeDesc * attr_desc = m_tuple.getDesc(attrId);
const Uint32 sz = attr_desc->getSizeInWords();
attr->Data.null = false; attr_data->null = false;
attr->Data.void_value = ptr; attr_data->void_value = ptr;
if(!Twiddle(attr)) if(!Twiddle(attr_desc, attr_data))
{ {
res = -1; res = -1;
return NULL; return NULL;
...@@ -364,21 +377,23 @@ RestoreDataIterator::getNextTuple(int & res) { ...@@ -364,21 +377,23 @@ RestoreDataIterator::getNextTuple(int & res) {
ptr += sz; ptr += sz;
} }
for(int i = 0; i<m_currentTable->m_variableAttribs.size(); i++){ for(Uint32 i = 0; i < m_currentTable->m_variableAttribs.size(); i++){
const Uint32 attrId = m_currentTable->m_variableAttribs[i]->attrId; const Uint32 attrId = m_currentTable->m_variableAttribs[i]->attrId;
AttributeS * attr = tup->allAttributes[attrId];
AttributeData * attr_data = m_tuple.getData(attrId);
const AttributeDesc * attr_desc = m_tuple.getDesc(attrId);
if(attr->Desc->m_column->getNullable()){ if(attr_desc->m_column->getNullable()){
const Uint32 ind = attr->Desc->m_nullBitIndex; const Uint32 ind = attr_desc->m_nullBitIndex;
if(BitmaskImpl::get(m_currentTable->m_nullBitmaskSize, if(BitmaskImpl::get(m_currentTable->m_nullBitmaskSize,
tup->getDataRecord(),ind)){ buf_ptr,ind)){
attr->Data.null = true; attr_data->null = true;
attr->Data.void_value = NULL; attr_data->void_value = NULL;
continue; continue;
} }
} }
assert(ptr < tup->getDataRecord() + dataLength); assert(ptr < buf_ptr + dataLength);
typedef BackupFormat::DataFile::VariableData VarData; typedef BackupFormat::DataFile::VariableData VarData;
VarData * data = (VarData *)ptr; VarData * data = (VarData *)ptr;
...@@ -386,15 +401,15 @@ RestoreDataIterator::getNextTuple(int & res) { ...@@ -386,15 +401,15 @@ RestoreDataIterator::getNextTuple(int & res) {
Uint32 id = ntohl(data->Id); Uint32 id = ntohl(data->Id);
assert(id == attrId); assert(id == attrId);
attr->Data.null = false; attr_data->null = false;
attr->Data.void_value = &data->Data[0]; attr_data->void_value = &data->Data[0];
/** /**
* Compute array size * Compute array size
*/ */
const Uint32 arraySize = (4 * sz) / (attr->Desc->size / 8); const Uint32 arraySize = (4 * sz) / (attr_desc->size / 8);
assert(arraySize >= attr->Desc->arraySize); assert(arraySize >= attr_desc->arraySize);
if(!Twiddle(attr, attr->Desc->arraySize)) if(!Twiddle(attr_desc, attr_data, attr_desc->arraySize))
{ {
res = -1; res = -1;
return NULL; return NULL;
...@@ -405,15 +420,20 @@ RestoreDataIterator::getNextTuple(int & res) { ...@@ -405,15 +420,20 @@ RestoreDataIterator::getNextTuple(int & res) {
m_count ++; m_count ++;
res = 0; res = 0;
return tup; return &m_tuple;
} // RestoreDataIterator::getNextTuple } // RestoreDataIterator::getNextTuple
BackupFile::BackupFile(){ BackupFile::BackupFile(void (* _free_data_callback)())
: free_data_callback(_free_data_callback)
{
m_file = 0; m_file = 0;
m_path[0] = 0; m_path[0] = 0;
m_fileName[0] = 0; m_fileName[0] = 0;
m_buffer = 0;
m_bufferSize = 0; m_buffer_sz = 64*1024;
m_buffer = malloc(m_buffer_sz);
m_buffer_ptr = m_buffer;
m_buffer_data_left = 0;
} }
BackupFile::~BackupFile(){ BackupFile::~BackupFile(){
...@@ -434,15 +454,54 @@ BackupFile::openFile(){ ...@@ -434,15 +454,54 @@ BackupFile::openFile(){
return m_file != 0; return m_file != 0;
} }
Uint32 * Uint32 BackupFile::buffer_get_ptr_ahead(void **p_buf_ptr, Uint32 size, Uint32 nmemb)
BackupFile::createBuffer(Uint32 bytes){ {
if(bytes > m_bufferSize){ Uint32 sz = size*nmemb;
if(m_buffer != 0) if (sz > m_buffer_data_left) {
free(m_buffer);
m_bufferSize = m_bufferSize + 2 * bytes; if (free_data_callback)
m_buffer = (Uint32*)malloc(m_bufferSize); (*free_data_callback)();
memcpy(m_buffer, m_buffer_ptr, m_buffer_data_left);
size_t r = fread(((char *)m_buffer) + m_buffer_data_left, 1, m_buffer_sz - m_buffer_data_left, m_file);
m_buffer_data_left += r;
m_buffer_ptr = m_buffer;
if (sz > m_buffer_data_left)
sz = size * (m_buffer_data_left / size);
} }
return m_buffer;
*p_buf_ptr = m_buffer_ptr;
return sz/size;
}
Uint32 BackupFile::buffer_get_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb)
{
Uint32 r = buffer_get_ptr_ahead(p_buf_ptr, size, nmemb);
m_buffer_ptr = ((char*)m_buffer_ptr)+(r*size);
m_buffer_data_left -= (r*size);
return r;
}
Uint32 BackupFile::buffer_read_ahead(void *ptr, Uint32 size, Uint32 nmemb)
{
void *buf_ptr;
Uint32 r = buffer_get_ptr_ahead(&buf_ptr, size, nmemb);
memcpy(ptr, buf_ptr, r*size);
return r;
}
Uint32 BackupFile::buffer_read(void *ptr, Uint32 size, Uint32 nmemb)
{
void *buf_ptr;
Uint32 r = buffer_get_ptr(&buf_ptr, size, nmemb);
memcpy(ptr, buf_ptr, r*size);
return r;
} }
void void
...@@ -503,7 +562,7 @@ BackupFile::readHeader(){ ...@@ -503,7 +562,7 @@ BackupFile::readHeader(){
return false; return false;
} }
if(fread(&m_fileHeader, sizeof(m_fileHeader), 1, m_file) != 1){ if(buffer_read(&m_fileHeader, sizeof(m_fileHeader), 1) != 1){
err << "readDataFileHeader: Error reading header" << endl; err << "readDataFileHeader: Error reading header" << endl;
return false; return false;
} }
...@@ -551,14 +610,13 @@ BackupFile::validateFooter(){ ...@@ -551,14 +610,13 @@ BackupFile::validateFooter(){
return true; return true;
} }
bool bool RestoreDataIterator::readFragmentHeader(int & ret)
RestoreDataIterator::readFragmentHeader(int & ret)
{ {
BackupFormat::DataFile::FragmentHeader Header; BackupFormat::DataFile::FragmentHeader Header;
debug << "RestoreDataIterator::getNextFragment" << endl; debug << "RestoreDataIterator::getNextFragment" << endl;
if (fread(&Header, sizeof(Header), 1, m_file) != 1){ if (buffer_read(&Header, sizeof(Header), 1) != 1){
ret = 0; ret = 0;
return false; return false;
} // if } // if
...@@ -581,6 +639,12 @@ RestoreDataIterator::readFragmentHeader(int & ret) ...@@ -581,6 +639,12 @@ RestoreDataIterator::readFragmentHeader(int & ret)
return false; return false;
} }
if(!m_tuple.prepareRecord(*m_currentTable))
{
ret =-1;
return false;
}
info << "_____________________________________________________" << endl info << "_____________________________________________________" << endl
<< "Restoring data in table: " << m_currentTable->getTableName() << "Restoring data in table: " << m_currentTable->getTableName()
<< "(" << Header.TableId << ") fragment " << "(" << Header.TableId << ") fragment "
...@@ -588,6 +652,7 @@ RestoreDataIterator::readFragmentHeader(int & ret) ...@@ -588,6 +652,7 @@ RestoreDataIterator::readFragmentHeader(int & ret)
m_count = 0; m_count = 0;
ret = 0; ret = 0;
return true; return true;
} // RestoreDataIterator::getNextFragment } // RestoreDataIterator::getNextFragment
...@@ -596,7 +661,7 @@ bool ...@@ -596,7 +661,7 @@ bool
RestoreDataIterator::validateFragmentFooter() { RestoreDataIterator::validateFragmentFooter() {
BackupFormat::DataFile::FragmentFooter footer; BackupFormat::DataFile::FragmentFooter footer;
if (fread(&footer, sizeof(footer), 1, m_file) != 1){ if (buffer_read(&footer, sizeof(footer), 1) != 1){
err << "getFragmentFooter:Error reading fragment footer" << endl; err << "getFragmentFooter:Error reading fragment footer" << endl;
return false; return false;
} }
...@@ -704,45 +769,32 @@ RestoreLogIterator::getNextLogEntry(int & res) { ...@@ -704,45 +769,32 @@ RestoreLogIterator::getNextLogEntry(int & res) {
// Read record length // Read record length
typedef BackupFormat::LogFile::LogEntry LogE; typedef BackupFormat::LogFile::LogEntry LogE;
Uint32 gcp = 0; Uint32 gcp= 0;
LogE * logE = 0; LogE * logE= 0;
Uint32 len = ~0; Uint32 len= ~0;
const Uint32 stopGCP = m_metaData.getStopGCP(); const Uint32 stopGCP = m_metaData.getStopGCP();
do { do {
if (buffer_read_ahead(&len, sizeof(Uint32), 1) != 1){
if(createBuffer(4) == 0) { res= -1;
res = -1; return 0;
return NULL;
} }
len= ntohl(len);
if (fread(m_buffer, sizeof(Uint32), 1, m_file) != 1){ Uint32 data_len = sizeof(Uint32) + len*4;
res = -1; if (buffer_get_ptr((void **)(&logE), 1, data_len) != data_len) {
return NULL; res= -2;
return 0;
} }
m_buffer[0] = ntohl(m_buffer[0]);
len = m_buffer[0];
if(len == 0){ if(len == 0){
res = 0; res= 0;
return 0; return 0;
} }
if(createBuffer(4 * (len + 1)) == 0){ logE->TableId= ntohl(logE->TableId);
res = -1; logE->TriggerEvent= ntohl(logE->TriggerEvent);
return NULL;
}
if (fread(&m_buffer[1], 4, len, m_file) != len) {
res = -1;
return NULL;
}
logE = (LogE *)&m_buffer[0]; const bool hasGcp= (logE->TriggerEvent & 0x10000) != 0;
logE->TableId = ntohl(logE->TableId);
logE->TriggerEvent = ntohl(logE->TriggerEvent);
const bool hasGcp = (logE->TriggerEvent & 0x10000) != 0;
logE->TriggerEvent &= 0xFFFF; logE->TriggerEvent &= 0xFFFF;
if(hasGcp){ if(hasGcp){
...@@ -751,9 +803,6 @@ RestoreLogIterator::getNextLogEntry(int & res) { ...@@ -751,9 +803,6 @@ RestoreLogIterator::getNextLogEntry(int & res) {
} }
} while(gcp > stopGCP + 1); } while(gcp > stopGCP + 1);
for(int i=0; i<m_logEntry.m_values.size();i++)
delete m_logEntry.m_values[i];
m_logEntry.m_values.clear();
m_logEntry.m_table = m_metaData.getTable(logE->TableId); m_logEntry.m_table = m_metaData.getTable(logE->TableId);
switch(logE->TriggerEvent){ switch(logE->TriggerEvent){
case TriggerEvent::TE_INSERT: case TriggerEvent::TE_INSERT:
...@@ -771,17 +820,19 @@ RestoreLogIterator::getNextLogEntry(int & res) { ...@@ -771,17 +820,19 @@ RestoreLogIterator::getNextLogEntry(int & res) {
} }
const TableS * tab = m_logEntry.m_table; const TableS * tab = m_logEntry.m_table;
m_logEntry.clear();
AttributeHeader * ah = (AttributeHeader *)&logE->Data[0]; AttributeHeader * ah = (AttributeHeader *)&logE->Data[0];
AttributeHeader *end = (AttributeHeader *)&logE->Data[len - 2]; AttributeHeader *end = (AttributeHeader *)&logE->Data[len - 2];
AttributeS * attr; AttributeS * attr;
while(ah < end){ while(ah < end){
attr = new AttributeS; attr= m_logEntry.add_attr();
if(attr == NULL) { if(attr == NULL) {
ndbout_c("Restore: Failed to allocate memory"); ndbout_c("Restore: Failed to allocate memory");
res = -1; res = -1;
return NULL; return 0;
} }
attr->Desc = (* tab)[ah->getAttributeId()]; attr->Desc = (* tab)[ah->getAttributeId()];
assert(attr->Desc != 0); assert(attr->Desc != 0);
...@@ -794,13 +845,94 @@ RestoreLogIterator::getNextLogEntry(int & res) { ...@@ -794,13 +845,94 @@ RestoreLogIterator::getNextLogEntry(int & res) {
attr->Data.void_value = ah->getDataPtr(); attr->Data.void_value = ah->getDataPtr();
} }
Twiddle(attr); Twiddle(attr->Desc, &(attr->Data));
m_logEntry.m_values.push_back(attr);
ah = ah->getNext(); ah = ah->getNext();
} }
m_count ++; m_count ++;
res = 0; res = 0;
return &m_logEntry; return &m_logEntry;
} }
NdbOut &
operator<<(NdbOut& ndbout, const AttributeS& attr){
const AttributeData & data = attr.Data;
const AttributeDesc & desc = *(attr.Desc);
if (data.null)
{
ndbout << "<NULL>";
return ndbout;
}
NdbRecAttr tmprec;
tmprec.setup(desc.m_column, (char *)data.void_value);
ndbout << tmprec;
return ndbout;
}
// Print tuple data
NdbOut&
operator<<(NdbOut& ndbout, const TupleS& tuple)
{
ndbout << tuple.getTable()->getTableName() << "; ";
for (int i = 0; i < tuple.getNoOfAttributes(); i++)
{
AttributeData * attr_data = tuple.getData(i);
const AttributeDesc * attr_desc = tuple.getDesc(i);
const AttributeS attr = {attr_desc, *attr_data};
debug << i << " " << attr_desc->m_column->getName();
ndbout << attr;
if (i != (tuple.getNoOfAttributes() - 1))
ndbout << delimiter << " ";
} // for
return ndbout;
}
// Print tuple data
NdbOut&
operator<<(NdbOut& ndbout, const LogEntry& logE)
{
switch(logE.m_type)
{
case LogEntry::LE_INSERT:
ndbout << "INSERT " << logE.m_table->getTableName() << " ";
break;
case LogEntry::LE_DELETE:
ndbout << "DELETE " << logE.m_table->getTableName() << " ";
break;
case LogEntry::LE_UPDATE:
ndbout << "UPDATE " << logE.m_table->getTableName() << " ";
break;
default:
ndbout << "Unknown log entry type (not insert, delete or update)" ;
}
for (Uint32 i= 0; i < logE.size();i++)
{
const AttributeS * attr = logE[i];
ndbout << attr->Desc->m_column->getName() << "=";
ndbout << (* attr);
if (i < (logE.size() - 1))
ndbout << ", ";
}
return ndbout;
}
NdbOut &
operator<<(NdbOut& ndbout, const TableS & table){
ndbout << endl << "Table: " << table.getTableName() << endl;
for (int j = 0; j < table.getNoOfAttributes(); j++)
{
const AttributeDesc * desc = table[j];
ndbout << desc->m_column->getName() << ": " << desc->m_column->getType();
ndbout << " key: " << desc->m_column->getPrimaryKey();
ndbout << " array: " << desc->arraySize;
ndbout << " size: " << desc->size << endl;
} // for
return ndbout;
}
...@@ -18,13 +18,15 @@ ...@@ -18,13 +18,15 @@
#define RESTORE_H #define RESTORE_H
#include <ndb_global.h> #include <ndb_global.h>
#include <NdbOut.hpp>
#include <BackupFormat.hpp> #include <BackupFormat.hpp>
#include <NdbApi.hpp> #include <NdbApi.hpp>
#include "myVector.hpp"
#include <ndb_version.h> #include <ndb_version.h>
#include <version.h> #include <version.h>
static const char * delimiter = ";"; // Delimiter in file dump
const int FileNameLenC = 256; const int FileNameLenC = 256;
const int TableNameLenC = 256; const int TableNameLenC = 256;
const int AttrNameLenC = 256; const int AttrNameLenC = 256;
...@@ -89,19 +91,26 @@ class TupleS { ...@@ -89,19 +91,26 @@ class TupleS {
private: private:
friend class RestoreDataIterator; friend class RestoreDataIterator;
const TableS * m_currentTable; const TableS *m_currentTable;
myVector<AttributeS*> allAttributes; AttributeData *allAttrData;
Uint32 * dataRecord;
bool prepareRecord(const TableS &); bool prepareRecord(const TableS &);
public: public:
TupleS() {dataRecord = NULL;}; TupleS() {
~TupleS() {if(dataRecord != NULL) delete [] dataRecord;}; m_currentTable= 0;
int getNoOfAttributes() const { return allAttributes.size(); }; allAttrData= 0;
const TableS * getTable() const { return m_currentTable;}; };
const AttributeS * operator[](int i) const { return allAttributes[i];}; ~TupleS()
Uint32 * getDataRecord() { return dataRecord;}; {
void createDataRecord(Uint32 bytes) { dataRecord = new Uint32[bytes];}; if (allAttrData)
delete [] allAttrData;
};
TupleS(const TupleS& tuple); // disable copy constructor
TupleS & operator=(const TupleS& tuple);
int getNoOfAttributes() const;
const TableS * getTable() const;
const AttributeDesc * getDesc(int i) const;
AttributeData * getData(int i) const;
}; // class TupleS }; // class TupleS
class TableS { class TableS {
...@@ -112,27 +121,23 @@ class TableS { ...@@ -112,27 +121,23 @@ class TableS {
Uint32 schemaVersion; Uint32 schemaVersion;
Uint32 backupVersion; Uint32 backupVersion;
myVector<AttributeDesc *> allAttributesDesc; Vector<AttributeDesc *> allAttributesDesc;
myVector<AttributeDesc *> m_fixedKeys; Vector<AttributeDesc *> m_fixedKeys;
//myVector<AttributeDesc *> m_variableKey; //Vector<AttributeDesc *> m_variableKey;
myVector<AttributeDesc *> m_fixedAttribs; Vector<AttributeDesc *> m_fixedAttribs;
myVector<AttributeDesc *> m_variableAttribs; Vector<AttributeDesc *> m_variableAttribs;
Uint32 m_noOfNullable; Uint32 m_noOfNullable;
Uint32 m_nullBitmaskSize; Uint32 m_nullBitmaskSize;
int pos; int pos;
char create_string[2048];
/*
char mysqlTableName[1024];
char mysqlDatabaseName[1024];
*/
void createAttr(NdbDictionary::Column *column); void createAttr(NdbDictionary::Column *column);
public: public:
class NdbDictionary::Table* m_dictTable; class NdbDictionary::Table* m_dictTable;
TableS (class NdbTableImpl* dictTable); TableS (class NdbTableImpl* dictTable);
~TableS();
Uint32 getTableId() const { Uint32 getTableId() const {
return m_dictTable->getTableId(); return m_dictTable->getTableId();
...@@ -185,18 +190,26 @@ protected: ...@@ -185,18 +190,26 @@ protected:
BackupFormat::FileHeader m_expectedFileHeader; BackupFormat::FileHeader m_expectedFileHeader;
Uint32 m_nodeId; Uint32 m_nodeId;
Uint32 * m_buffer;
Uint32 m_bufferSize;
Uint32 * createBuffer(Uint32 bytes);
void * m_buffer;
void * m_buffer_ptr;
Uint32 m_buffer_sz;
Uint32 m_buffer_data_left;
void (* free_data_callback)();
bool openFile(); bool openFile();
void setCtlFile(Uint32 nodeId, Uint32 backupId, const char * path); void setCtlFile(Uint32 nodeId, Uint32 backupId, const char * path);
void setDataFile(const BackupFile & bf, Uint32 no); void setDataFile(const BackupFile & bf, Uint32 no);
void setLogFile(const BackupFile & bf, Uint32 no); void setLogFile(const BackupFile & bf, Uint32 no);
Uint32 buffer_get_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb);
Uint32 buffer_read(void *ptr, Uint32 size, Uint32 nmemb);
Uint32 buffer_get_ptr_ahead(void **p_buf_ptr, Uint32 size, Uint32 nmemb);
Uint32 buffer_read_ahead(void *ptr, Uint32 size, Uint32 nmemb);
void setName(const char * path, const char * name); void setName(const char * path, const char * name);
BackupFile(); BackupFile(void (* free_data_callback)() = 0);
~BackupFile(); ~BackupFile();
public: public:
bool readHeader(); bool readHeader();
...@@ -206,12 +219,12 @@ public: ...@@ -206,12 +219,12 @@ public:
const char * getFilename() const { return m_fileName;} const char * getFilename() const { return m_fileName;}
Uint32 getNodeId() const { return m_nodeId;} Uint32 getNodeId() const { return m_nodeId;}
const BackupFormat::FileHeader & getFileHeader() const { return m_fileHeader;} const BackupFormat::FileHeader & getFileHeader() const { return m_fileHeader;}
bool Twiddle(AttributeS * attr, Uint32 arraySize = 0); bool Twiddle(const AttributeDesc * attr_desc, AttributeData * attr_data, Uint32 arraySize = 0);
}; };
class RestoreMetaData : public BackupFile { class RestoreMetaData : public BackupFile {
myVector<TableS *> allTables; Vector<TableS *> allTables;
bool readMetaFileHeader(); bool readMetaFileHeader();
bool readMetaTableDesc(); bool readMetaTableDesc();
...@@ -224,14 +237,11 @@ class RestoreMetaData : public BackupFile { ...@@ -224,14 +237,11 @@ class RestoreMetaData : public BackupFile {
bool parseTableDescriptor(const Uint32 * data, Uint32 len); bool parseTableDescriptor(const Uint32 * data, Uint32 len);
public: public:
RestoreMetaData(const char * path, Uint32 nodeId, Uint32 bNo); RestoreMetaData(const char * path, Uint32 nodeId, Uint32 bNo);
~RestoreMetaData(); virtual ~RestoreMetaData();
int loadContent(); int loadContent();
Uint32 getNoOfTables() const { return allTables.size();} Uint32 getNoOfTables() const { return allTables.size();}
const TableS * operator[](int i) const { return allTables[i];} const TableS * operator[](int i) const { return allTables[i];}
...@@ -243,20 +253,20 @@ public: ...@@ -243,20 +253,20 @@ public:
class RestoreDataIterator : public BackupFile { class RestoreDataIterator : public BackupFile {
const RestoreMetaData & m_metaData; const RestoreMetaData & m_metaData;
Uint32 m_count; Uint32 m_count;
TupleS m_tuple;
const TableS* m_currentTable; const TableS* m_currentTable;
TupleS m_tuple;
public: public:
// Constructor // Constructor
RestoreDataIterator(const RestoreMetaData &); RestoreDataIterator(const RestoreMetaData &, void (* free_data_callback)());
~RestoreDataIterator(); ~RestoreDataIterator() {};
// Read data file fragment header // Read data file fragment header
bool readFragmentHeader(int & res); bool readFragmentHeader(int & res);
bool validateFragmentFooter(); bool validateFragmentFooter();
const TupleS *getNextTuple(int & res); const TupleS *getNextTuple(int & res);
}; };
...@@ -269,9 +279,35 @@ public: ...@@ -269,9 +279,35 @@ public:
}; };
EntryType m_type; EntryType m_type;
const TableS * m_table; const TableS * m_table;
myVector<AttributeS*> m_values; Vector<AttributeS*> m_values;
Vector<AttributeS*> m_values_e;
AttributeS *add_attr() {
AttributeS * attr;
if (m_values_e.size() > 0) {
attr = m_values_e[m_values_e.size()-1];
m_values_e.erase(m_values_e.size()-1);
}
else
{
attr = new AttributeS;
}
m_values.push_back(attr);
return attr;
}
void clear() {
for(Uint32 i= 0; i < m_values.size(); i++)
m_values_e.push_back(m_values[i]);
m_values.clear();
}
~LogEntry()
{
for(Uint32 i= 0; i< m_values.size(); i++)
delete m_values[i];
for(Uint32 i= 0; i< m_values_e.size(); i++)
delete m_values_e[i];
}
Uint32 size() const { return m_values.size(); }
const AttributeS * operator[](int i) const { return m_values[i];}
}; };
class RestoreLogIterator : public BackupFile { class RestoreLogIterator : public BackupFile {
...@@ -282,10 +318,16 @@ private: ...@@ -282,10 +318,16 @@ private:
LogEntry m_logEntry; LogEntry m_logEntry;
public: public:
RestoreLogIterator(const RestoreMetaData &); RestoreLogIterator(const RestoreMetaData &);
virtual ~RestoreLogIterator() {};
const LogEntry * getNextLogEntry(int & res); const LogEntry * getNextLogEntry(int & res);
}; };
NdbOut& operator<<(NdbOut& ndbout, const TableS&);
NdbOut& operator<<(NdbOut& ndbout, const TupleS&);
NdbOut& operator<<(NdbOut& ndbout, const LogEntry&);
NdbOut& operator<<(NdbOut& ndbout, const RestoreMetaData&);
#endif #endif
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "consumer.hpp"
#ifdef USE_MYSQL
int
BackupConsumer::create_table_string(const TableS & table,
char * tableName,
char *buf){
int pos = 0;
int pos2 = 0;
char buf2[2048];
pos += sprintf(buf+pos, "%s%s", "CREATE TABLE ", tableName);
pos += sprintf(buf+pos, "%s", "(");
pos2 += sprintf(buf2+pos2, "%s", " primary key(");
for (int j = 0; j < table.getNoOfAttributes(); j++)
{
const AttributeDesc * desc = table[j];
// ndbout << desc->name << ": ";
pos += sprintf(buf+pos, "%s%s", desc->m_column->getName()," ");
switch(desc->m_column->getType()){
case NdbDictionary::Column::Int:
pos += sprintf(buf+pos, "%s", "int");
break;
case NdbDictionary::Column::Unsigned:
pos += sprintf(buf+pos, "%s", "int unsigned");
break;
case NdbDictionary::Column::Float:
pos += sprintf(buf+pos, "%s", "float");
break;
case NdbDictionary::Column::Decimal:
pos += sprintf(buf+pos, "%s", "decimal");
break;
case NdbDictionary::Column::Char:
pos += sprintf(buf+pos, "%s", "char");
break;
case NdbDictionary::Column::Varchar:
pos += sprintf(buf+pos, "%s", "varchar");
break;
case NdbDictionary::Column::Binary:
pos += sprintf(buf+pos, "%s", "binary");
break;
case NdbDictionary::Column::Varbinary:
pos += sprintf(buf+pos, "%s", "varchar binary");
break;
case NdbDictionary::Column::Bigint:
pos += sprintf(buf+pos, "%s", "bigint");
break;
case NdbDictionary::Column::Bigunsigned:
pos += sprintf(buf+pos, "%s", "bigint unsigned");
break;
case NdbDictionary::Column::Double:
pos += sprintf(buf+pos, "%s", "double");
break;
case NdbDictionary::Column::Datetime:
pos += sprintf(buf+pos, "%s", "datetime");
break;
case NdbDictionary::Column::Timespec:
pos += sprintf(buf+pos, "%s", "time");
break;
case NdbDictionary::Column::Undefined:
// pos += sprintf(buf+pos, "%s", "varchar binary");
return -1;
break;
default:
//pos += sprintf(buf+pos, "%s", "varchar binary");
return -1;
}
if (desc->arraySize > 1) {
int attrSize = desc->arraySize;
pos += sprintf(buf+pos, "%s%u%s",
"(",
attrSize,
")");
}
if (desc->m_column->getPrimaryKey()) {
pos += sprintf(buf+pos, "%s", " not null");
pos2 += sprintf(buf2+pos2, "%s%s", desc->m_column->getName(), ",");
}
pos += sprintf(buf+pos, "%s", ",");
} // for
pos2--; // remove trailing comma
pos2 += sprintf(buf2+pos2, "%s", ")");
// pos--; // remove trailing comma
pos += sprintf(buf+pos, "%s", buf2);
pos += sprintf(buf+pos, "%s", ") type=ndbcluster");
return 0;
}
#endif // USE_MYSQL
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef CONSUMER_HPP
#define CONSUMER_HPP
#include "Restore.hpp"
class BackupConsumer {
public:
virtual ~BackupConsumer() { }
virtual bool init() { return true;}
virtual bool table(const TableS &){return true;}
virtual void tuple(const TupleS &){}
virtual void tuple_free(){}
virtual void endOfTuples(){}
virtual void logEntry(const LogEntry &){}
virtual void endOfLogEntrys(){}
};
#endif
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "consumer_printer.hpp"
bool
BackupPrinter::table(const TableS & tab)
{
if (m_print || m_print_meta)
{
m_ndbout << tab;
ndbout_c("Successfully printed table: %s", tab.m_dictTable->getName());
}
return true;
}
void
BackupPrinter::tuple(const TupleS & tup)
{
m_dataCount++;
if (m_print || m_print_data)
m_ndbout << tup << endl;
}
void
BackupPrinter::logEntry(const LogEntry & logE)
{
if (m_print || m_print_log)
m_ndbout << logE << endl;
m_logCount++;
}
void
BackupPrinter::endOfLogEntrys()
{
if (m_print || m_print_log)
{
ndbout << "Printed " << m_dataCount << " tuples and "
<< m_logCount << " log entries"
<< " to stdout." << endl;
}
}
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef CONSUMER_PRINTER_HPP
#define CONSUMER_PRINTER_HPP
#include "consumer.hpp"
class BackupPrinter : public BackupConsumer
{
NdbOut & m_ndbout;
public:
BackupPrinter(NdbOut & out = ndbout) : m_ndbout(out)
{
m_print = false;
m_print_log = false;
m_print_data = false;
m_print_meta = false;
}
virtual bool table(const TableS &);
#ifdef USE_MYSQL
virtual bool table(const TableS &, MYSQL* mysqlp);
#endif
virtual void tuple(const TupleS &);
virtual void logEntry(const LogEntry &);
virtual void endOfTuples() {};
virtual void endOfLogEntrys();
bool m_print;
bool m_print_log;
bool m_print_data;
bool m_print_meta;
Uint32 m_logCount;
Uint32 m_dataCount;
};
#endif
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "consumer_restore.hpp"
#include <NdbSleep.h>
extern FilteredNdbOut err;
extern FilteredNdbOut info;
extern FilteredNdbOut debug;
static void callback(int, NdbConnection*, void*);
bool
BackupRestore::init()
{
release();
if (!m_restore && !m_restore_meta)
return true;
m_ndb = new Ndb();
if (m_ndb == NULL)
return false;
// Turn off table name completion
m_ndb->useFullyQualifiedNames(false);
m_ndb->init(1024);
if (m_ndb->waitUntilReady(30) != 0)
{
err << "Failed to connect to ndb!!" << endl;
return false;
}
info << "Connected to ndb!!" << endl;
m_callback = new restore_callback_t[m_parallelism];
if (m_callback == 0)
{
err << "Failed to allocate callback structs" << endl;
return false;
}
m_tuples = new TupleS[m_parallelism];
if (m_tuples == 0)
{
err << "Failed to allocate tuples" << endl;
return false;
}
m_free_callback= m_callback;
for (Uint32 i= 0; i < m_parallelism; i++) {
m_callback[i].restore= this;
m_callback[i].connection= 0;
m_callback[i].tup= &m_tuples[i];
if (i > 0)
m_callback[i-1].next= &(m_callback[i]);
}
m_callback[m_parallelism-1].next = 0;
return true;
}
void BackupRestore::release()
{
if (m_ndb)
{
delete m_ndb;
m_ndb= 0;
}
if (m_callback)
{
delete [] m_callback;
m_callback= 0;
}
if (m_tuples)
{
delete [] m_tuples;
m_tuples= 0;
}
}
BackupRestore::~BackupRestore()
{
release();
}
bool
BackupRestore::table(const TableS & table){
if (!m_restore_meta)
return true;
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
if (dict->createTable(*table.m_dictTable) == -1)
{
err << "Create table " << table.getTableName() << " failed: "
<< dict->getNdbError() << endl;
return false;
}
info << "Successfully restored table " << table.getTableName()<< endl ;
return true;
}
void BackupRestore::tuple(const TupleS & tup)
{
if (!m_restore)
return;
restore_callback_t * cb = m_free_callback;
if (cb == 0)
assert(false);
m_free_callback = cb->next;
cb->retries = 0;
*(cb->tup) = tup; // must do copy!
tuple_a(cb);
if (m_free_callback == 0)
{
// send-poll all transactions
// close transaction is done in callback
m_ndb->sendPollNdb(3000, 1);
}
}
void BackupRestore::tuple_a(restore_callback_t *cb)
{
while (cb->retries < 10)
{
/**
* start transactions
*/
cb->connection = m_ndb->startTransaction();
if (cb->connection == NULL)
{
/*
if (errorHandler(cb))
{
continue;
}
*/
exitHandler();
} // if
const TupleS &tup = *(cb->tup);
const TableS * table = tup.getTable();
NdbOperation * op = cb->connection->getNdbOperation(table->getTableName());
if (op == NULL)
{
if (errorHandler(cb))
continue;
exitHandler();
} // if
if (op->writeTuple() == -1)
{
if (errorHandler(cb))
continue;
exitHandler();
} // if
int ret = 0;
for (int j = 0; j < 2; j++)
{
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeDesc * attr_desc = tup.getDesc(i);
const AttributeData * attr_data = tup.getData(i);
int size = attr_desc->size;
int arraySize = attr_desc->arraySize;
char * dataPtr = attr_data->string_value;
Uint32 length = (size * arraySize) / 8;
if (attr_desc->m_column->getPrimaryKey())
{
if (j == 1) continue;
ret = op->equal(i, dataPtr, length);
}
else
{
if (j == 0) continue;
if (attr_data->null)
ret = op->setValue(i, NULL, 0);
else
ret = op->setValue(i, dataPtr, length);
}
if (ret < 0) {
ndbout_c("Column: %d type %d",i,
attr_desc->m_column->getType());
break;
}
}
if (ret < 0)
break;
}
if (ret < 0)
{
if (errorHandler(cb))
continue;
exitHandler();
}
// Prepare transaction (the transaction is NOT yet sent to NDB)
cb->connection->executeAsynchPrepare(Commit, &callback, cb);
m_transactions++;
return;
}
err << "Unable to recover from errors. Exiting..." << endl;
exitHandler();
}
void BackupRestore::cback(int result, restore_callback_t *cb)
{
m_transactions--;
if (result < 0)
{
/**
* Error. temporary or permanent?
*/
if (errorHandler(cb))
tuple_a(cb); // retry
else
{
err << "Restore: Failed to restore data due to a unrecoverable error. Exiting..." << endl;
exitHandler();
}
}
else
{
/**
* OK! close transaction
*/
m_ndb->closeTransaction(cb->connection);
cb->connection= 0;
cb->next= m_free_callback;
m_free_callback= cb;
m_dataCount++;
}
}
/**
* returns true if is recoverable,
* Error handling based on hugo
* false if it is an error that generates an abort.
*/
bool BackupRestore::errorHandler(restore_callback_t *cb)
{
NdbError error= cb->connection->getNdbError();
m_ndb->closeTransaction(cb->connection);
cb->connection= 0;
cb->retries++;
switch(error.status)
{
case NdbError::Success:
return false;
// ERROR!
break;
case NdbError::TemporaryError:
NdbSleep_MilliSleep(10);
return true;
// RETRY
break;
case NdbError::UnknownResult:
err << error << endl;
return false;
// ERROR!
break;
default:
case NdbError::PermanentError:
switch (error.code)
{
case 499:
case 250:
NdbSleep_MilliSleep(10);
return true; //temp errors?
default:
break;
}
//ERROR
err << error << endl;
return false;
break;
}
return false;
}
void BackupRestore::exitHandler()
{
release();
exit(-1);
}
void
BackupRestore::tuple_free()
{
if (!m_restore)
return;
if (m_transactions > 0) {
// Send all transactions to NDB
m_ndb->sendPreparedTransactions(0);
// Poll all transactions
while (m_transactions > 0)
m_ndb->pollNdb(3000, m_transactions);
}
}
void
BackupRestore::endOfTuples()
{
tuple_free();
}
void
BackupRestore::logEntry(const LogEntry & tup)
{
if (!m_restore)
return;
NdbConnection * trans = m_ndb->startTransaction();
if (trans == NULL)
{
// Deep shit, TODO: handle the error
err << "Cannot start transaction" << endl;
exit(-1);
} // if
const TableS * table = tup.m_table;
NdbOperation * op = trans->getNdbOperation(table->getTableName());
if (op == NULL)
{
err << "Cannot get operation: " << trans->getNdbError() << endl;
exit(-1);
} // if
int check = 0;
switch(tup.m_type)
{
case LogEntry::LE_INSERT:
check = op->insertTuple();
break;
case LogEntry::LE_UPDATE:
check = op->updateTuple();
break;
case LogEntry::LE_DELETE:
check = op->deleteTuple();
break;
default:
err << "Log entry has wrong operation type."
<< " Exiting...";
exit(-1);
}
for (Uint32 i= 0; i < tup.size(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size / 8) * arraySize;
if (attr->Desc->m_column->getPrimaryKey())
op->equal(attr->Desc->attrId, dataPtr, length);
else
op->setValue(attr->Desc->attrId, dataPtr, length);
}
const int ret = trans->execute(Commit);
if (ret != 0)
{
// Both insert update and delete can fail during log running
// and it's ok
// TODO: check that the error is either tuple exists or tuple does not exist?
switch(tup.m_type)
{
case LogEntry::LE_INSERT:
break;
case LogEntry::LE_UPDATE:
break;
case LogEntry::LE_DELETE:
break;
}
if (false)
{
err << "execute failed: " << trans->getNdbError() << endl;
exit(-1);
}
}
m_ndb->closeTransaction(trans);
m_logCount++;
}
void
BackupRestore::endOfLogEntrys()
{
if (!m_restore)
return;
info << "Restored " << m_dataCount << " tuples and "
<< m_logCount << " log entries" << endl;
}
/*
* callback : This is called when the transaction is polled
*
* (This function must have three arguments:
* - The result of the transaction,
* - The NdbConnection object, and
* - A pointer to an arbitrary object.)
*/
static void
callback(int result, NdbConnection* trans, void* aObject)
{
restore_callback_t *cb = (restore_callback_t *)aObject;
(cb->restore)->cback(result, cb);
}
#if 0 // old tuple impl
void
BackupRestore::tuple(const TupleS & tup)
{
if (!m_restore)
return;
while (1)
{
NdbConnection * trans = m_ndb->startTransaction();
if (trans == NULL)
{
// Deep shit, TODO: handle the error
ndbout << "Cannot start transaction" << endl;
exit(-1);
} // if
const TableS * table = tup.getTable();
NdbOperation * op = trans->getNdbOperation(table->getTableName());
if (op == NULL)
{
ndbout << "Cannot get operation: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
// TODO: check return value and handle error
if (op->writeTuple() == -1)
{
ndbout << "writeTuple call failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (attr->Desc->m_column->getPrimaryKey())
op->equal(i, dataPtr, length);
}
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (!attr->Desc->m_column->getPrimaryKey())
if (attr->Data.null)
op->setValue(i, NULL, 0);
else
op->setValue(i, dataPtr, length);
}
int ret = trans->execute(Commit);
if (ret != 0)
{
ndbout << "execute failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
}
m_ndb->closeTransaction(trans);
if (ret == 0)
break;
}
m_dataCount++;
}
#endif
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef CONSUMER_RESTORE_HPP
#define CONSUMER_RESTORE_HPP
#include "consumer.hpp"
struct restore_callback_t {
class BackupRestore *restore;
class TupleS *tup;
class NdbConnection *connection;
int retries;
restore_callback_t *next;
};
class BackupRestore : public BackupConsumer
{
public:
BackupRestore(Uint32 parallelism=1)
{
m_ndb = 0;
m_logCount = m_dataCount = 0;
m_restore = false;
m_restore_meta = false;
m_parallelism = parallelism;
m_callback = 0;
m_tuples = 0;
m_free_callback = 0;
m_transactions = 0;
}
virtual ~BackupRestore();
virtual bool init();
virtual void release();
virtual bool table(const TableS &);
virtual void tuple(const TupleS &);
virtual void tuple_free();
virtual void tuple_a(restore_callback_t *cb);
virtual void cback(int result, restore_callback_t *cb);
virtual bool errorHandler(restore_callback_t *cb);
virtual void exitHandler();
virtual void endOfTuples();
virtual void logEntry(const LogEntry &);
virtual void endOfLogEntrys();
void connectToMysql();
Ndb * m_ndb;
bool m_restore;
bool m_restore_meta;
Uint32 m_logCount;
Uint32 m_dataCount;
Uint32 m_parallelism;
Uint32 m_transactions;
TupleS *m_tuples;
restore_callback_t *m_callback;
restore_callback_t *m_free_callback;
};
#endif
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "consumer_restore.hpp"
#include <NdbSleep.h>
extern FilteredNdbOut err;
extern FilteredNdbOut info;
extern FilteredNdbOut debug;
static bool asynchErrorHandler(NdbConnection * trans, Ndb * ndb);
static void callback(int result, NdbConnection* trans, void* aObject);
bool
BackupRestore::init()
{
if (!m_restore && !m_restore_meta)
return true;
m_ndb = new Ndb();
if (m_ndb == NULL)
return false;
// Turn off table name completion
m_ndb->useFullyQualifiedNames(false);
m_ndb->init(1024);
if (m_ndb->waitUntilReady(30) != 0)
{
ndbout << "Failed to connect to ndb!!" << endl;
return false;
}
ndbout << "Connected to ndb!!" << endl;
#if USE_MYSQL
if(use_mysql)
{
if ( mysql_thread_safe() == 0 )
{
ndbout << "Not thread safe mysql library..." << endl;
exit(-1);
}
ndbout << "Connecting to MySQL..." <<endl;
/**
* nwe param:
* port
* host
* user
*/
bool returnValue = true;
mysql_init(&mysql);
{
int portNo = 3306;
if ( mysql_real_connect(&mysql,
ga_host,
ga_user,
ga_password,
ga_database,
ga_port,
:: ga_socket,
0) == NULL )
{
ndbout_c("Connect failed: %s", mysql_error(&mysql));
returnValue = false;
}
ndbout << "Connected to MySQL!!!" <<endl;
}
/* if(returnValue){
mysql_set_server_option(&mysql, MYSQL_OPTION_MULTI_STATEMENTS_ON);
}
*/
return returnValue;
}
#endif
if (m_callback) {
delete [] m_callback;
m_callback = 0;
}
m_callback = new restore_callback_t[m_parallelism];
if (m_callback == 0)
{
ndbout << "Failed to allocate callback structs" << endl;
return false;
}
m_free_callback = m_callback;
for (int i= 0; i < m_parallelism; i++) {
m_callback[i].restore = this;
m_callback[i].connection = 0;
m_callback[i].retries = 0;
if (i > 0)
m_callback[i-1].next = &(m_callback[i]);
}
m_callback[m_parallelism-1].next = 0;
return true;
}
BackupRestore::~BackupRestore()
{
if (m_ndb != 0)
delete m_ndb;
if (m_callback)
delete [] m_callback;
}
#ifdef USE_MYSQL
bool
BackupRestore::table(const TableS & table, MYSQL * mysqlp){
if (!m_restore_meta)
{
return true;
}
char tmpTabName[MAX_TAB_NAME_SIZE*2];
sprintf(tmpTabName, "%s", table.getTableName());
char * database = strtok(tmpTabName, "/");
char * schema = strtok( NULL , "/");
char * tableName = strtok( NULL , "/");
/**
* this means that the user did not specify schema
* and it is a v2x backup
*/
if(database == NULL)
return false;
if(schema == NULL)
return false;
if(tableName==NULL)
tableName = schema;
char stmtCreateDB[255];
sprintf(stmtCreateDB,"CREATE DATABASE %s", database);
/*ignore return value. mysql_select_db will trap errors anyways*/
if (mysql_query(mysqlp,stmtCreateDB) == 0)
{
//ndbout_c("%s", stmtCreateDB);
}
if (mysql_select_db(&mysql, database) != 0)
{
ndbout_c("Error: %s", mysql_error(&mysql));
return false;
}
char buf [2048];
/**
* create table ddl
*/
if (create_table_string(table, tableName, buf))
{
ndbout_c("Unable to create a table definition since the "
"backup contains undefined types");
return false;
}
//ndbout_c("%s", buf);
if (mysql_query(mysqlp,buf) != 0)
{
ndbout_c("Error: %s", mysql_error(&mysql));
return false;
} else
{
ndbout_c("Successfully restored table %s into database %s", tableName, database);
}
return true;
}
#endif
bool
BackupRestore::table(const TableS & table){
if (!m_restore_meta)
{
return true;
}
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
if (dict->createTable(*table.m_dictTable) == -1)
{
err << "Create table " << table.getTableName() << " failed: "
<< dict->getNdbError() << endl;
return false;
}
info << "Successfully restored table " << table.getTableName()<< endl ;
return true;
}
void BackupRestore::tuple(const TupleS & tup)
{
if (!m_restore)
{
delete &tup;
return;
}
restore_callback_t * cb = m_free_callback;
if (cb)
{
m_free_callback = cb->next;
cb->retries = 0;
cb->tup = &tup;
tuple_a(cb);
}
if (m_free_callback == 0)
{
// send-poll all transactions
// close transaction is done in callback
m_ndb->sendPollNdb(3000, 1);
}
}
void BackupRestore::tuple_a(restore_callback_t *cb)
{
while (cb->retries < 10)
{
/**
* start transactions
*/
cb->connection = m_ndb->startTransaction();
if (cb->connection == NULL)
{
/*
if (asynchErrorHandler(cb->connection, m_ndb))
{
cb->retries++;
continue;
}
*/
asynchExitHandler();
} // if
const TupleS &tup = *(cb->tup);
const TableS * table = tup.getTable();
NdbOperation * op = cb->connection->getNdbOperation(table->getTableName());
if (op == NULL)
{
if (asynchErrorHandler(cb->connection, m_ndb))
{
cb->retries++;
continue;
}
asynchExitHandler();
} // if
if (op->writeTuple() == -1)
{
if (asynchErrorHandler(cb->connection, m_ndb))
{
cb->retries++;
continue;
}
asynchExitHandler();
} // if
Uint32 ret = 0;
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
char * dataPtr = attr->Data.string_value;
Uint32 length = (size * arraySize) / 8;
if (attr->Desc->m_column->getPrimaryKey())
{
ret = op->equal(i, dataPtr, length);
}
else
{
if (attr->Data.null)
ret = op->setValue(i, NULL, 0);
else
ret = op->setValue(i, dataPtr, length);
}
if (ret<0)
{
ndbout_c("Column: %d type %d",i,
tup.getTable()->m_dictTable->getColumn(i)->getType());
if (asynchErrorHandler(cb->connection, m_ndb))
{
cb->retries++;
break;
}
asynchExitHandler();
}
}
if (ret < 0)
continue;
// Prepare transaction (the transaction is NOT yet sent to NDB)
cb->connection->executeAsynchPrepare(Commit, &callback, cb);
m_transactions++;
}
ndbout_c("Unable to recover from errors. Exiting...");
asynchExitHandler();
}
void BackupRestore::cback(int result, restore_callback_t *cb)
{
if (result<0)
{
/**
* Error. temporary or permanent?
*/
if (asynchErrorHandler(cb->connection, m_ndb))
{
cb->retries++;
tuple_a(cb);
}
else
{
ndbout_c("Restore: Failed to restore data "
"due to a unrecoverable error. Exiting...");
delete m_ndb;
delete cb->tup;
exit(-1);
}
}
else
{
/**
* OK! close transaction
*/
m_ndb->closeTransaction(cb->connection);
delete cb->tup;
m_transactions--;
}
}
void BackupRestore::asynchExitHandler()
{
if (m_ndb != NULL)
delete m_ndb;
exit(-1);
}
#if 0 // old tuple impl
void
BackupRestore::tuple(const TupleS & tup)
{
if (!m_restore)
return;
while (1)
{
NdbConnection * trans = m_ndb->startTransaction();
if (trans == NULL)
{
// Deep shit, TODO: handle the error
ndbout << "Cannot start transaction" << endl;
exit(-1);
} // if
const TableS * table = tup.getTable();
NdbOperation * op = trans->getNdbOperation(table->getTableName());
if (op == NULL)
{
ndbout << "Cannot get operation: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
// TODO: check return value and handle error
if (op->writeTuple() == -1)
{
ndbout << "writeTuple call failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (attr->Desc->m_column->getPrimaryKey())
op->equal(i, dataPtr, length);
}
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (!attr->Desc->m_column->getPrimaryKey())
if (attr->Data.null)
op->setValue(i, NULL, 0);
else
op->setValue(i, dataPtr, length);
}
int ret = trans->execute(Commit);
if (ret != 0)
{
ndbout << "execute failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
}
m_ndb->closeTransaction(trans);
if (ret == 0)
break;
}
m_dataCount++;
}
#endif
void
BackupRestore::endOfTuples()
{
if (!m_restore)
return;
// Send all transactions to NDB
m_ndb->sendPreparedTransactions(0);
// Poll all transactions
m_ndb->pollNdb(3000, m_transactions);
// Close all transactions
// for (int i = 0; i < nPreparedTransactions; i++)
// m_ndb->closeTransaction(asynchTrans[i]);
}
void
BackupRestore::logEntry(const LogEntry & tup)
{
if (!m_restore)
return;
NdbConnection * trans = m_ndb->startTransaction();
if (trans == NULL)
{
// Deep shit, TODO: handle the error
ndbout << "Cannot start transaction" << endl;
exit(-1);
} // if
const TableS * table = tup.m_table;
NdbOperation * op = trans->getNdbOperation(table->getTableName());
if (op == NULL)
{
ndbout << "Cannot get operation: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
int check = 0;
switch(tup.m_type)
{
case LogEntry::LE_INSERT:
check = op->insertTuple();
break;
case LogEntry::LE_UPDATE:
check = op->updateTuple();
break;
case LogEntry::LE_DELETE:
check = op->deleteTuple();
break;
default:
ndbout << "Log entry has wrong operation type."
<< " Exiting...";
exit(-1);
}
for (int i = 0; i < tup.m_values.size(); i++)
{
const AttributeS * attr = tup.m_values[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size / 8) * arraySize;
if (attr->Desc->m_column->getPrimaryKey())
op->equal(attr->Desc->attrId, dataPtr, length);
else
op->setValue(attr->Desc->attrId, dataPtr, length);
}
#if 1
trans->execute(Commit);
#else
const int ret = trans->execute(Commit);
// Both insert update and delete can fail during log running
// and it's ok
if (ret != 0)
{
ndbout << "execute failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
}
#endif
m_ndb->closeTransaction(trans);
m_logCount++;
}
void
BackupRestore::endOfLogEntrys()
{
if (m_restore)
{
ndbout << "Restored " << m_dataCount << " tuples and "
<< m_logCount << " log entries" << endl;
}
}
#if 0
/*****************************************
*
* Callback function for asynchronous transactions
*
* Idea for error handling: Transaction objects have to be stored globally when
* they are prepared.
* In the callback function if the transaction:
* succeeded: delete the object from global storage
* failed but can be retried: execute the object that is in global storage
* failed but fatal: delete the object from global storage
*
******************************************/
static void restoreCallback(int result, // Result for transaction
NdbConnection *object, // Transaction object
void *anything) // Not used
{
static Uint32 counter = 0;
debug << "restoreCallback function called " << counter << " time(s)" << endl;
++counter;
if (result == -1)
{
ndbout << " restoreCallback (" << counter;
if ((counter % 10) == 1)
{
ndbout << "st";
} // if
else if ((counter % 10) == 2)
{
ndbout << "nd";
} // else if
else if ((counter % 10 ) ==3)
{
ndbout << "rd";
} // else if
else
{
ndbout << "th";
} // else
err << " time: error detected " << object->getNdbError() << endl;
} // if
} // restoreCallback
#endif
/*
* callback : This is called when the transaction is polled
*
* (This function must have three arguments:
* - The result of the transaction,
* - The NdbConnection object, and
* - A pointer to an arbitrary object.)
*/
static void
callback(int result, NdbConnection* trans, void* aObject)
{
restore_callback_t *cb = (restore_callback_t *)aObject;
(cb->restore)->cback(result, cb);
}
/**
* returns true if is recoverable,
* Error handling based on hugo
* false if it is an error that generates an abort.
*/
static
bool asynchErrorHandler(NdbConnection * trans, Ndb* ndb)
{
NdbError error = trans->getNdbError();
ndb->closeTransaction(trans);
switch(error.status)
{
case NdbError::Success:
return false;
// ERROR!
break;
case NdbError::TemporaryError:
NdbSleep_MilliSleep(10);
return true;
// RETRY
break;
case NdbError::UnknownResult:
ndbout << error << endl;
return false;
// ERROR!
break;
default:
case NdbError::PermanentError:
switch (error.code)
{
case 499:
case 250:
NdbSleep_MilliSleep(10);
return true; //temp errors?
default:
break;
}
//ERROR
ndbout << error << endl;
return false;
break;
}
return false;
}
...@@ -14,62 +14,26 @@ ...@@ -14,62 +14,26 @@
along with this program; if not, write to the Free Software along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "Restore.hpp"
#include <getarg.h> #include <getarg.h>
#include <NdbSleep.h>
#include <Vector.hpp> #include <Vector.hpp>
#include <ndb_limits.h> #include <ndb_limits.h>
#include <NdbTCP.h> #include <NdbTCP.h>
#ifdef USE_MYSQL
#include <mysql.h>
#endif
#include <NdbOut.hpp> #include <NdbOut.hpp>
NdbOut& operator<<(NdbOut& ndbout, const TupleS& tuple); #include "consumer_restore.hpp"
NdbOut& operator<<(NdbOut& ndbout, const LogEntry& logEntry); #include "consumer_printer.hpp"
NdbOut& operator<<(NdbOut& ndbout, const RestoreMetaData &);
extern FilteredNdbOut err; extern FilteredNdbOut err;
extern FilteredNdbOut info; extern FilteredNdbOut info;
extern FilteredNdbOut debug; extern FilteredNdbOut debug;
static const char * delimiter = ";"; // Delimiter in file dump
static int ga_nodeId = 0; static int ga_nodeId = 0;
static int ga_nParallelism = 1; static int ga_nParallelism = 128;
static int ga_backupId = 0; static int ga_backupId = 0;
static bool ga_dont_ignore_systab_0 = false; static bool ga_dont_ignore_systab_0 = false;
static myVector<class BackupConsumer *> g_consumers; static Vector<class BackupConsumer *> g_consumers;
#ifdef USE_MYSQL
/**
* mysql specific stuff:
*/
static const char* ga_user = "root";
static const char* ga_host = "localhost";
static const char* ga_socket = "/tmp/mysql.sock";
static const char* ga_password = "";
static const char* ga_database = "";
static int ga_port = 3306;
static bool use_mysql = false;
static MYSQL mysql;
#endif
#ifdef NDB_WIN32
static const char* ga_backupPath = ".\\";
#else
static const char* ga_backupPath = "./";
#endif
typedef struct { static const char* ga_backupPath = "." DIR_SEPARATOR;
void * ndb;
void * restore;
TupleS * tup;
int transaction;
int retries;
} restore_callback_t;
static const char* ga_connect_NDB = NULL; static const char* ga_connect_NDB = NULL;
...@@ -78,102 +42,9 @@ static const char* ga_connect_NDB = NULL; ...@@ -78,102 +42,9 @@ static const char* ga_connect_NDB = NULL;
*/ */
static bool ga_restore = false; static bool ga_restore = false;
static bool ga_print = false; static bool ga_print = false;
class BackupConsumer {
public:
virtual bool init() { return true;}
virtual bool table(const TableS &){return true;}
#ifdef USE_MYSQL
virtual bool table(const TableS &, MYSQL* mysqlp) {return true;};
#endif
virtual void tuple(const TupleS &){}
virtual void tupleAsynch(const TupleS &, restore_callback_t * callback) {};
// virtual bool asynchErrorHandler(NdbConnection * trans){return true;};
virtual void asynchExitHandler(){};
virtual void endOfTuples(){}
virtual void logEntry(const LogEntry &){}
virtual void endOfLogEntrys(){}
protected:
#ifdef USE_MYSQL
int create_table_string(const TableS & table, char * ,char *);
#endif
};
class BackupPrinter : public BackupConsumer
{
NdbOut & m_ndbout;
public:
BackupPrinter(NdbOut & out = ndbout) : m_ndbout(out)
{
m_print = false;
m_print_log = false;
m_print_data = false;
m_print_meta = false;
}
virtual bool table(const TableS &);
#ifdef USE_MYSQL
virtual bool table(const TableS &, MYSQL* mysqlp);
#endif
virtual void tuple(const TupleS &);
virtual void logEntry(const LogEntry &);
virtual void endOfTuples() {};
virtual void endOfLogEntrys();
virtual void tupleAsynch(const TupleS &, restore_callback_t * callback);
bool m_print;
bool m_print_log;
bool m_print_data;
bool m_print_meta;
Uint32 m_logCount;
Uint32 m_dataCount;
};
class BackupRestore : public BackupConsumer
{
public:
BackupRestore()
{
m_ndb = 0;
m_logCount = m_dataCount = 0;
m_restore = false;
m_restore_meta = false;
}
virtual ~BackupRestore();
virtual bool init();
virtual bool table(const TableS &);
#ifdef USE_MYSQL
virtual bool table(const TableS &, MYSQL* mysqlp);
#endif
virtual void tuple(const TupleS &);
virtual void tupleAsynch(const TupleS &, restore_callback_t * callback);
virtual void asynchExitHandler();
virtual void endOfTuples();
virtual void logEntry(const LogEntry &);
virtual void endOfLogEntrys();
void connectToMysql();
Ndb * m_ndb;
bool m_restore;
bool m_restore_meta;
Uint32 m_logCount;
Uint32 m_dataCount;
};
bool bool
readArguments(const int argc, const char** argv) readArguments(const int argc, const char** argv)
{ {
BackupPrinter* printer = new BackupPrinter();
if (printer == NULL)
return false;
BackupRestore* restore = new BackupRestore();
if (restore == NULL)
{
delete printer;
return false;
}
int _print = 0; int _print = 0;
int _print_meta = 0; int _print_meta = 0;
...@@ -236,10 +107,18 @@ readArguments(const int argc, const char** argv) ...@@ -236,10 +107,18 @@ readArguments(const int argc, const char** argv)
ga_nParallelism < 1 || ga_nParallelism < 1 ||
ga_nParallelism >1024) ga_nParallelism >1024)
{ {
arg_printusage(args, num_args, argv[0], "<path to backup files>\n"); arg_printusage(args, num_args, argv[0], "<path to backup files>\n");
return false;
}
BackupPrinter* printer = new BackupPrinter();
if (printer == NULL)
return false;
BackupRestore* restore = new BackupRestore(ga_nParallelism);
if (restore == NULL)
{
delete printer; delete printer;
delete restore;
return false; return false;
} }
...@@ -282,11 +161,11 @@ readArguments(const int argc, const char** argv) ...@@ -282,11 +161,11 @@ readArguments(const int argc, const char** argv)
} }
{ {
BackupConsumer * c = printer; BackupConsumer * c = printer;
g_consumers.push_back(c); g_consumers.push_back(c);
} }
{ {
BackupConsumer * c = restore; BackupConsumer * c = restore;
g_consumers.push_back(c); g_consumers.push_back(c);
} }
// Set backup file path // Set backup file path
...@@ -294,20 +173,6 @@ readArguments(const int argc, const char** argv) ...@@ -294,20 +173,6 @@ readArguments(const int argc, const char** argv)
{ {
ga_backupPath = argv[optind]; ga_backupPath = argv[optind];
} }
#ifdef USE_MYSQL
if(use_mysql) {
ga_dont_ignore_systab_0 = false;
ga_database = ""; //not used yet. pethaps later if we want to
// restore meta data in an existing mysql database,
// and not just restore it to the same database
// as when the backup was taken.
// If implementing this, then the
// tupleAsynch must also be changed so that the
// table data is restored to the correct table.
// also, mysql_select_db must be set properly (ie.,
// ignored in codw below)
}
#endif
return true; return true;
} }
...@@ -316,15 +181,12 @@ readArguments(const int argc, const char** argv) ...@@ -316,15 +181,12 @@ readArguments(const int argc, const char** argv)
void void
clearConsumers() clearConsumers()
{ {
for(int i = 0; i<g_consumers.size(); i++) for(Uint32 i= 0; i<g_consumers.size(); i++)
delete g_consumers[i]; delete g_consumers[i];
g_consumers.clear(); g_consumers.clear();
} }
static bool asynchErrorHandler(NdbConnection * trans, Ndb * ndb); static bool
static NdbConnection * asynchTrans[1024];
bool
checkSysTable(const char *tableName) checkSysTable(const char *tableName)
{ {
return ga_dont_ignore_systab_0 || return ga_dont_ignore_systab_0 ||
...@@ -334,6 +196,12 @@ checkSysTable(const char *tableName) ...@@ -334,6 +196,12 @@ checkSysTable(const char *tableName)
strcmp(tableName, "sys/def/NDB$EVENTS_0") != 0); strcmp(tableName, "sys/def/NDB$EVENTS_0") != 0);
} }
static void
free_data_callback()
{
for(Uint32 i= 0; i < g_consumers.size(); i++)
g_consumers[i]->tuple_free();
}
int int
main(int argc, const char** argv) main(int argc, const char** argv)
...@@ -343,6 +211,12 @@ main(int argc, const char** argv) ...@@ -343,6 +211,12 @@ main(int argc, const char** argv)
return -1; return -1;
} }
if (ga_connect_NDB != NULL)
{
// Use connection string
Ndb::setConnectString(ga_connect_NDB);
}
/** /**
* we must always load meta data, even if we will only print it to stdout * we must always load meta data, even if we will only print it to stdout
*/ */
...@@ -377,7 +251,7 @@ main(int argc, const char** argv) ...@@ -377,7 +251,7 @@ main(int argc, const char** argv)
} }
for(int i = 0; i<g_consumers.size(); i++) for(Uint32 i= 0; i < g_consumers.size(); i++)
{ {
if (!g_consumers[i]->init()) if (!g_consumers[i]->init())
{ {
...@@ -391,36 +265,22 @@ main(int argc, const char** argv) ...@@ -391,36 +265,22 @@ main(int argc, const char** argv)
{ {
if (checkSysTable(metaData[i]->getTableName())) if (checkSysTable(metaData[i]->getTableName()))
{ {
for(int j = 0; j<g_consumers.size(); j++) for(Uint32 j= 0; j < g_consumers.size(); j++)
#ifdef USE_MYSQL if (!g_consumers[j]->table(* metaData[i]))
if(use_mysql) { {
if (!g_consumers[j]->table(* metaData[i], &mysql)) ndbout_c("Restore: Failed to restore table: %s. "
{ "Exiting...",
ndbout_c("Restore: Failed to restore table: %s. " metaData[i]->getTableName());
"Exiting...", return -11;
metaData[i]->getTableName()); }
return -11;
}
} else
#endif
if (!g_consumers[j]->table(* metaData[i]))
{
ndbout_c("Restore: Failed to restore table: %s. "
"Exiting...",
metaData[i]->getTableName());
return -11;
}
} }
} }
if (ga_restore || ga_print) if (ga_restore || ga_print)
{ {
if (ga_restore) if (ga_restore)
{ {
RestoreDataIterator dataIter(metaData); RestoreDataIterator dataIter(metaData, &free_data_callback);
// Read data file header // Read data file header
if (!dataIter.readHeader()) if (!dataIter.readHeader())
...@@ -430,19 +290,15 @@ main(int argc, const char** argv) ...@@ -430,19 +290,15 @@ main(int argc, const char** argv)
} }
while (dataIter.readFragmentHeader(res)) while (dataIter.readFragmentHeader(res= 0))
{ {
const TupleS* tuple = 0; const TupleS* tuple;
while ((tuple = dataIter.getNextTuple(res)) != NULL) while ((tuple = dataIter.getNextTuple(res= 1)) != 0)
{ {
if (checkSysTable(tuple->getTable()->getTableName())) if (checkSysTable(tuple->getTable()->getTableName()))
{ for(Uint32 i= 0; i < g_consumers.size(); i++)
for(int i = 0; i<g_consumers.size(); i++) g_consumers[i]->tuple(* tuple);
{ } // while (tuple != NULL);
g_consumers[i]->tupleAsynch(* tuple, 0);
}
}
} while (tuple != NULL);
if (res < 0) if (res < 0)
{ {
...@@ -459,44 +315,37 @@ main(int argc, const char** argv) ...@@ -459,44 +315,37 @@ main(int argc, const char** argv)
if (res < 0) if (res < 0)
{ {
ndbout_c("Restore: An error occured while restoring data. " err << "Restore: An error occured while restoring data. Exiting... res=" << res << endl;
"Exiting...");
return -1; return -1;
} }
dataIter.validateFooter(); //not implemented dataIter.validateFooter(); //not implemented
for (int i = 0; i<g_consumers.size(); i++)
for (Uint32 i= 0; i < g_consumers.size(); i++)
g_consumers[i]->endOfTuples(); g_consumers[i]->endOfTuples();
RestoreLogIterator logIter(metaData); RestoreLogIterator logIter(metaData);
if (!logIter.readHeader()) if (!logIter.readHeader())
{ {
ndbout << "Failed to read header of data file. Exiting..."; err << "Failed to read header of data file. Exiting..." << endl;
return -1; return -1;
} }
/**
* I have not touched the part below : -johan 040218
* except fixing return values.
*/
const LogEntry * logEntry = 0; const LogEntry * logEntry = 0;
while ((logEntry = logIter.getNextLogEntry(res))) while ((logEntry = logIter.getNextLogEntry(res= 0)) != 0)
{ {
if (checkSysTable(logEntry->m_table->getTableName())) if (checkSysTable(logEntry->m_table->getTableName()))
{ for(Uint32 i= 0; i < g_consumers.size(); i++)
for(int i = 0; i<g_consumers.size(); i++)
g_consumers[i]->logEntry(* logEntry); g_consumers[i]->logEntry(* logEntry);
}
} }
if (res < 0) if (res < 0)
{ {
ndbout_c("Restore: An restoring the data log" err << "Restore: An restoring the data log. Exiting... res=" << res << endl;
"Exiting...");
return -1; return -1;
} }
logIter.validateFooter(); //not implemented logIter.validateFooter(); //not implemented
for (int i = 0; i<g_consumers.size(); i++) for (Uint32 i= 0; i < g_consumers.size(); i++)
g_consumers[i]->endOfLogEntrys(); g_consumers[i]->endOfLogEntrys();
} }
} }
...@@ -504,893 +353,3 @@ main(int argc, const char** argv) ...@@ -504,893 +353,3 @@ main(int argc, const char** argv)
return 1; return 1;
} // main } // main
NdbOut &
operator<<(NdbOut& ndbout, const AttributeS& attr){
const AttributeData & data = attr.Data;
const AttributeDesc & desc = *attr.Desc;
if (data.null)
{
ndbout << "<NULL>";
return ndbout;
}
NdbRecAttr tmprec;
tmprec.setup(desc.m_column, (char *)data.void_value);
ndbout << tmprec;
return ndbout;
}
// Print tuple data
NdbOut&
operator<<(NdbOut& ndbout, const TupleS& tuple)
{
ndbout << tuple.getTable()->getTableName() << "; ";
for (int i = 0; i < tuple.getNoOfAttributes(); i++)
{
const AttributeS * attr = tuple[i];
debug << i << " " << attr->Desc->m_column->getName();
ndbout << (* attr);
if (i != (tuple.getNoOfAttributes() - 1))
ndbout << delimiter << " ";
} // for
return ndbout;
}
// Print tuple data
NdbOut&
operator<<(NdbOut& ndbout, const LogEntry& logE)
{
switch(logE.m_type)
{
case LogEntry::LE_INSERT:
ndbout << "INSERT " << logE.m_table->getTableName() << " ";
break;
case LogEntry::LE_DELETE:
ndbout << "DELETE " << logE.m_table->getTableName() << " ";
break;
case LogEntry::LE_UPDATE:
ndbout << "UPDATE " << logE.m_table->getTableName() << " ";
break;
default:
ndbout << "Unknown log entry type (not insert, delete or update)" ;
}
for (int i = 0; i < logE.m_values.size();i++)
{
const AttributeS * attr = logE.m_values[i];
ndbout << attr->Desc->m_column->getName() << "=";
ndbout << (* attr);
if (i < (logE.m_values.size() - 1))
ndbout << ", ";
}
return ndbout;
}
NdbOut &
operator<<(NdbOut& ndbout, const TableS & table){
ndbout << endl << "Table: " << table.getTableName() << endl;
for (int j = 0; j < table.getNoOfAttributes(); j++)
{
const AttributeDesc * desc = table[j];
ndbout << desc->m_column->getName() << ": " << desc->m_column->getType();
ndbout << " key: " << desc->m_column->getPrimaryKey();
ndbout << " array: " << desc->arraySize;
ndbout << " size: " << desc->size << endl;
} // for
return ndbout;
}
#if 0
/*****************************************
*
* Callback function for asynchronous transactions
*
* Idea for error handling: Transaction objects have to be stored globally when
* they are prepared.
* In the callback function if the transaction:
* succeeded: delete the object from global storage
* failed but can be retried: execute the object that is in global storage
* failed but fatal: delete the object from global storage
*
******************************************/
static void restoreCallback(int result, // Result for transaction
NdbConnection *object, // Transaction object
void *anything) // Not used
{
static Uint32 counter = 0;
debug << "restoreCallback function called " << counter << " time(s)" << endl;
++counter;
if (result == -1)
{
ndbout << " restoreCallback (" << counter;
if ((counter % 10) == 1)
{
ndbout << "st";
} // if
else if ((counter % 10) == 2)
{
ndbout << "nd";
} // else if
else if ((counter % 10 ) ==3)
{
ndbout << "rd";
} // else if
else
{
ndbout << "th";
} // else
err << " time: error detected " << object->getNdbError() << endl;
} // if
} // restoreCallback
#endif
bool
BackupPrinter::table(const TableS & tab)
{
if (m_print || m_print_meta)
{
m_ndbout << tab;
ndbout_c("Successfully printed table: %s", tab.m_dictTable->getName());
}
return true;
}
#ifdef USE_MYSQL
bool
BackupPrinter::table(const TableS & tab, MYSQL * mysql)
{
if (m_print || m_print_meta)
{
char tmpTabName[MAX_TAB_NAME_SIZE*2];
sprintf(tmpTabName, "%s", tab.getTableName());
char * database = strtok(tmpTabName, "/");
char * schema = strtok( NULL , "/");
char * tableName = strtok( NULL , "/");
/**
* this means that the user did not specify schema
* and it is a v2x backup
*/
if(database == NULL)
return false;
if(schema == NULL)
return false;
if(tableName==NULL)
tableName = schema;
char stmtCreateDB[255];
sprintf(stmtCreateDB,"CREATE DATABASE %s", database);
ndbout_c("%s", stmtCreateDB);
char buf [2048];
create_table_string(tab, tableName, buf);
ndbout_c("%s", buf);
ndbout_c("Successfully printed table: %s", tab.m_dictTable->getName());
}
return true;
}
#endif
void
BackupPrinter::tuple(const TupleS & tup)
{
if (m_print || m_print_data)
m_ndbout << tup << endl;
}
void
BackupPrinter::logEntry(const LogEntry & logE)
{
if (m_print || m_print_log)
m_ndbout << logE << endl;
m_logCount++;
}
bool
BackupRestore::init()
{
if (!m_restore && !m_restore_meta)
return true;
if (ga_connect_NDB != NULL)
{
// Use connection string
Ndb::setConnectString(ga_connect_NDB);
}
m_ndb = new Ndb();
if (m_ndb == NULL)
return false;
// Turn off table name completion
m_ndb->useFullyQualifiedNames(false);
m_ndb->init(1024);
if (m_ndb->waitUntilReady(30) != 0)
{
ndbout << "Failed to connect to ndb!!" << endl;
delete m_ndb;
return false;
}
ndbout << "Connected to ndb!!" << endl;
#if USE_MYSQL
if(use_mysql)
{
if ( mysql_thread_safe() == 0 )
{
ndbout << "Not thread safe mysql library..." << endl;
exit(-1);
}
ndbout << "Connecting to MySQL..." <<endl;
/**
* nwe param:
* port
* host
* user
*/
bool returnValue = true;
mysql_init(&mysql);
{
int portNo = 3306;
if ( mysql_real_connect(&mysql,
ga_host,
ga_user,
ga_password,
ga_database,
ga_port,
ga_socket,
0) == NULL )
{
ndbout_c("Connect failed: %s", mysql_error(&mysql));
returnValue = false;
}
ndbout << "Connected to MySQL!!!" <<endl;
}
/* if(returnValue){
mysql_set_server_option(&mysql, MYSQL_OPTION_MULTI_STATEMENTS_ON);
}
*/
return returnValue;
}
#endif
return true;
}
BackupRestore::~BackupRestore()
{
if (m_ndb != 0)
delete m_ndb;
}
#ifdef USE_MYSQL
bool
BackupRestore::table(const TableS & table, MYSQL * mysqlp){
if (!m_restore_meta)
{
return true;
}
char tmpTabName[MAX_TAB_NAME_SIZE*2];
sprintf(tmpTabName, "%s", table.getTableName());
char * database = strtok(tmpTabName, "/");
char * schema = strtok( NULL , "/");
char * tableName = strtok( NULL , "/");
/**
* this means that the user did not specify schema
* and it is a v2x backup
*/
if(database == NULL)
return false;
if(schema == NULL)
return false;
if(tableName==NULL)
tableName = schema;
char stmtCreateDB[255];
sprintf(stmtCreateDB,"CREATE DATABASE %s", database);
/*ignore return value. mysql_select_db will trap errors anyways*/
if (mysql_query(mysqlp,stmtCreateDB) == 0)
{
//ndbout_c("%s", stmtCreateDB);
}
if (mysql_select_db(&mysql, database) != 0)
{
ndbout_c("Error: %s", mysql_error(&mysql));
return false;
}
char buf [2048];
/**
* create table ddl
*/
if (create_table_string(table, tableName, buf))
{
ndbout_c("Unable to create a table definition since the "
"backup contains undefined types");
return false;
}
//ndbout_c("%s", buf);
if (mysql_query(mysqlp,buf) != 0)
{
ndbout_c("Error: %s", mysql_error(&mysql));
return false;
} else
{
ndbout_c("Successfully restored table %s into database %s", tableName, database);
}
return true;
}
int
BackupConsumer::create_table_string(const TableS & table,
char * tableName,
char *buf){
int pos = 0;
int pos2 = 0;
char buf2[2048];
pos += sprintf(buf+pos, "%s%s", "CREATE TABLE ", tableName);
pos += sprintf(buf+pos, "%s", "(");
pos2 += sprintf(buf2+pos2, "%s", " primary key(");
for (int j = 0; j < table.getNoOfAttributes(); j++)
{
const AttributeDesc * desc = table[j];
// ndbout << desc->name << ": ";
pos += sprintf(buf+pos, "%s%s", desc->m_column->getName()," ");
switch(desc->m_column->getType()){
case NdbDictionary::Column::Int:
pos += sprintf(buf+pos, "%s", "int");
break;
case NdbDictionary::Column::Unsigned:
pos += sprintf(buf+pos, "%s", "int unsigned");
break;
case NdbDictionary::Column::Float:
pos += sprintf(buf+pos, "%s", "float");
break;
case NdbDictionary::Column::Decimal:
pos += sprintf(buf+pos, "%s", "decimal");
break;
case NdbDictionary::Column::Char:
pos += sprintf(buf+pos, "%s", "char");
break;
case NdbDictionary::Column::Varchar:
pos += sprintf(buf+pos, "%s", "varchar");
break;
case NdbDictionary::Column::Binary:
pos += sprintf(buf+pos, "%s", "binary");
break;
case NdbDictionary::Column::Varbinary:
pos += sprintf(buf+pos, "%s", "varchar binary");
break;
case NdbDictionary::Column::Bigint:
pos += sprintf(buf+pos, "%s", "bigint");
break;
case NdbDictionary::Column::Bigunsigned:
pos += sprintf(buf+pos, "%s", "bigint unsigned");
break;
case NdbDictionary::Column::Double:
pos += sprintf(buf+pos, "%s", "double");
break;
case NdbDictionary::Column::Datetime:
pos += sprintf(buf+pos, "%s", "datetime");
break;
case NdbDictionary::Column::Timespec:
pos += sprintf(buf+pos, "%s", "time");
break;
case NdbDictionary::Column::Undefined:
// pos += sprintf(buf+pos, "%s", "varchar binary");
return -1;
break;
default:
//pos += sprintf(buf+pos, "%s", "varchar binary");
return -1;
}
if (desc->arraySize > 1) {
int attrSize = desc->arraySize;
pos += sprintf(buf+pos, "%s%u%s",
"(",
attrSize,
")");
}
if (desc->m_column->getPrimaryKey()) {
pos += sprintf(buf+pos, "%s", " not null");
pos2 += sprintf(buf2+pos2, "%s%s", desc->m_column->getName(), ",");
}
pos += sprintf(buf+pos, "%s", ",");
} // for
pos2--; // remove trailing comma
pos2 += sprintf(buf2+pos2, "%s", ")");
// pos--; // remove trailing comma
pos += sprintf(buf+pos, "%s", buf2);
pos += sprintf(buf+pos, "%s", ") type=ndbcluster");
return 0;
}
#endif // USE_MYSQL
bool
BackupRestore::table(const TableS & table){
if (!m_restore_meta)
{
return true;
}
NdbDictionary::Dictionary* dict = m_ndb->getDictionary();
if (dict->createTable(*table.m_dictTable) == -1)
{
err << "Create table " << table.getTableName() << " failed: "
<< dict->getNdbError() << endl;
return false;
}
info << "Successfully restored table " << table.getTableName()<< endl ;
return true;
}
/*
* callback : This is called when the transaction is polled
*
* (This function must have three arguments:
* - The result of the transaction,
* - The NdbConnection object, and
* - A pointer to an arbitrary object.)
*/
static void
callback(int result, NdbConnection* trans, void* aObject)
{
restore_callback_t * cbData = (restore_callback_t *)aObject;
if (result<0)
{
/**
* Error. temporary or permanent?
*/
if (asynchErrorHandler(trans, (Ndb*)cbData->ndb))
{
((Ndb*)cbData->ndb)->closeTransaction(asynchTrans[cbData->transaction]);
cbData->retries++;
((BackupRestore*)cbData)->tupleAsynch( * (TupleS*)(cbData->tup), cbData);
}
else
{
ndbout_c("Restore: Failed to restore data "
"due to a unrecoverable error. Exiting...");
delete (Ndb*)cbData->ndb;
delete cbData->tup;
delete cbData;
exit(-1);
}
}
else
{
/**
* OK! close transaction
*/
((Ndb*)cbData->ndb)->closeTransaction(asynchTrans[cbData->transaction]);
delete cbData->tup;
delete cbData;
}
}
static int nPreparedTransactions = 0;
void
BackupPrinter::tupleAsynch(const TupleS & tup, restore_callback_t * callback)
{
m_dataCount++;
if (m_print || m_print_data)
m_ndbout << tup << endl;
}
void BackupRestore::tupleAsynch(const TupleS & tup, restore_callback_t * cbData)
{
if (!m_restore)
{
delete &tup;
return;
}
Uint32 retries;
if (cbData!=0)
retries = cbData->retries;
else
retries = 0;
while (retries < 10)
{
/**
* start transactions
*/
asynchTrans[nPreparedTransactions] = m_ndb->startTransaction();
if (asynchTrans[nPreparedTransactions] == NULL)
{
if (asynchErrorHandler(asynchTrans[nPreparedTransactions], m_ndb))
{
retries++;
continue;
}
asynchExitHandler();
} // if
const TableS * table = tup.getTable();
NdbOperation * op =
asynchTrans[nPreparedTransactions]->getNdbOperation(table->getTableName());
if (op == NULL)
{
if (asynchErrorHandler(asynchTrans[nPreparedTransactions], m_ndb))
{
retries++;
continue;
}
asynchExitHandler();
} // if
if (op->writeTuple() == -1)
{
if (asynchErrorHandler(asynchTrans[nPreparedTransactions], m_ndb))
{
retries++;
continue;
}
asynchExitHandler();
} // if
Uint32 ret = 0;
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
char * dataPtr = attr->Data.string_value;
Uint32 length = (size * arraySize) / 8;
if (attr->Desc->m_column->getPrimaryKey())
{
ret = op->equal(i, dataPtr, length);
if (ret<0)
{
ndbout_c("Column: %d type %d",i,
tup.getTable()->m_dictTable->getColumn(i)->getType());
if (asynchErrorHandler(asynchTrans[nPreparedTransactions],m_ndb))
{
retries++;
continue;
}
asynchExitHandler();
}
}
}
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
char * dataPtr = attr->Data.string_value;
Uint32 length = (size * arraySize) / 8;
if (!attr->Desc->m_column->getPrimaryKey())
if (attr->Data.null)
ret = op->setValue(i, NULL, 0);
else
ret = op->setValue(i, dataPtr, length);
if (ret<0)
{
ndbout_c("Column: %d type %d",i,
tup.getTable()->m_dictTable->getColumn(i)->getType());
if (asynchErrorHandler(asynchTrans[nPreparedTransactions], m_ndb))
{
retries++;
continue;
}
asynchExitHandler();
}
}
restore_callback_t * cb;
if (cbData ==0)
{
cb = new restore_callback_t;
cb->retries = 0;
}
else
cb =cbData;
cb->ndb = m_ndb;
cb->restore = this;
cb->tup = (TupleS*)&tup;
cb->transaction = nPreparedTransactions;
// Prepare transaction (the transaction is NOT yet sent to NDB)
asynchTrans[nPreparedTransactions]->executeAsynchPrepare(Commit,
&callback,
cb);
if (nPreparedTransactions == ga_nParallelism-1)
{
// send-poll all transactions
// close transaction is done in callback
m_ndb->sendPollNdb(3000, ga_nParallelism);
nPreparedTransactions=0;
}
else
nPreparedTransactions++;
m_dataCount++;
return;
}
ndbout_c("Unable to recover from errors. Exiting...");
asynchExitHandler();
}
void BackupRestore::asynchExitHandler()
{
if (m_ndb != NULL)
delete m_ndb;
exit(-1);
}
/**
* returns true if is recoverable,
* Error handling based on hugo
* false if it is an error that generates an abort.
*/
static
bool asynchErrorHandler(NdbConnection * trans, Ndb* ndb)
{
NdbError error = trans->getNdbError();
ndb->closeTransaction(trans);
switch(error.status)
{
case NdbError::Success:
return false;
// ERROR!
break;
case NdbError::TemporaryError:
NdbSleep_MilliSleep(10);
return true;
// RETRY
break;
case NdbError::UnknownResult:
ndbout << error << endl;
return false;
// ERROR!
break;
default:
case NdbError::PermanentError:
switch (error.code)
{
case 499:
case 250:
NdbSleep_MilliSleep(10);
return true; //temp errors?
default:
break;
}
//ERROR
ndbout << error << endl;
return false;
break;
}
return false;
}
void
BackupRestore::tuple(const TupleS & tup)
{
if (!m_restore)
return;
while (1)
{
NdbConnection * trans = m_ndb->startTransaction();
if (trans == NULL)
{
// Deep shit, TODO: handle the error
ndbout << "Cannot start transaction" << endl;
exit(-1);
} // if
const TableS * table = tup.getTable();
NdbOperation * op = trans->getNdbOperation(table->getTableName());
if (op == NULL)
{
ndbout << "Cannot get operation: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
// TODO: check return value and handle error
if (op->writeTuple() == -1)
{
ndbout << "writeTuple call failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (attr->Desc->m_column->getPrimaryKey())
op->equal(i, dataPtr, length);
}
for (int i = 0; i < tup.getNoOfAttributes(); i++)
{
const AttributeS * attr = tup[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size * arraySize) / 8;
if (!attr->Desc->m_column->getPrimaryKey())
if (attr->Data.null)
op->setValue(i, NULL, 0);
else
op->setValue(i, dataPtr, length);
}
int ret = trans->execute(Commit);
if (ret != 0)
{
ndbout << "execute failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
}
m_ndb->closeTransaction(trans);
if (ret == 0)
break;
}
m_dataCount++;
}
void
BackupRestore::endOfTuples()
{
if (!m_restore)
return;
// Send all transactions to NDB
m_ndb->sendPreparedTransactions(0);
// Poll all transactions
m_ndb->pollNdb(3000, nPreparedTransactions);
// Close all transactions
// for (int i = 0; i < nPreparedTransactions; i++)
// m_ndb->closeTransaction(asynchTrans[i]);
nPreparedTransactions=0;
}
void
BackupRestore::logEntry(const LogEntry & tup)
{
if (!m_restore)
return;
NdbConnection * trans = m_ndb->startTransaction();
if (trans == NULL)
{
// Deep shit, TODO: handle the error
ndbout << "Cannot start transaction" << endl;
exit(-1);
} // if
const TableS * table = tup.m_table;
NdbOperation * op = trans->getNdbOperation(table->getTableName());
if (op == NULL)
{
ndbout << "Cannot get operation: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
} // if
int check = 0;
switch(tup.m_type)
{
case LogEntry::LE_INSERT:
check = op->insertTuple();
break;
case LogEntry::LE_UPDATE:
check = op->updateTuple();
break;
case LogEntry::LE_DELETE:
check = op->deleteTuple();
break;
default:
ndbout << "Log entry has wrong operation type."
<< " Exiting...";
exit(-1);
}
for (int i = 0; i < tup.m_values.size(); i++)
{
const AttributeS * attr = tup.m_values[i];
int size = attr->Desc->size;
int arraySize = attr->Desc->arraySize;
const char * dataPtr = attr->Data.string_value;
const Uint32 length = (size / 8) * arraySize;
if (attr->Desc->m_column->getPrimaryKey())
op->equal(attr->Desc->attrId, dataPtr, length);
else
op->setValue(attr->Desc->attrId, dataPtr, length);
}
#if 1
trans->execute(Commit);
#else
const int ret = trans->execute(Commit);
// Both insert update and delete can fail during log running
// and it's ok
if (ret != 0)
{
ndbout << "execute failed: ";
ndbout << trans->getNdbError() << endl;
exit(-1);
}
#endif
m_ndb->closeTransaction(trans);
m_logCount++;
}
void
BackupRestore::endOfLogEntrys()
{
if (ga_restore)
{
ndbout << "Restored " << m_dataCount << " tuples and "
<< m_logCount << " log entries" << endl;
}
}
void
BackupPrinter::endOfLogEntrys()
{
if (m_print || m_print_log)
{
ndbout << "Printed " << m_dataCount << " tuples and "
<< m_logCount << " log entries"
<< " to stdout." << endl;
}
}
/* Copyright (C) 2003 MySQL AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#ifndef MY_VECTOR_HPP
#define MY_VECTOR_HPP
// Template class for std::vector-like class (hopefully works in OSE)
template <class T>
class myVector
{
// Note that last element in array is used for end() and is always empty
int sizeIncrement;
int thisSize;
int used;
T *storage;
public:
// Assignment of whole vector
myVector<T> & operator=(myVector<T> & org) {
// Don't copy if they point to the same address
if (!(this == &org)) {
// Check memory space
if (thisSize < org.thisSize) {
// We have to increase memory for destination
T* tmpStorage = new T[org.thisSize];
delete[] storage;
storage = tmpStorage;
} // if
thisSize = org.thisSize;
sizeIncrement = org.sizeIncrement;
used = org.used;
for (int i = 0; i < thisSize; i++) {
storage[i] = org.storage[i];
} // for
} // if
return *this;
} // operator=
// Construct with size s+1
myVector(int s = 1) : sizeIncrement(5), // sizeIncrement(s),
thisSize(s + 1),
used(0),
storage(new T[s + 1]) { }
~myVector() { delete[] storage; } // Destructor: deallocate memory
T& operator[](int i) { // Return by index
if ((i < 0) || (i >= used)) {
// Index error
ndbout << "vector index out of range" << endl;
abort();
return storage[used - 1];
} // if
else {
return storage[i];
} // else
} // operator[]
const T& operator[](int i) const { // Return by index
if ((i < 0) || (i >= used)) {
// Index error
ndbout << "vector index out of range" << endl;
abort();
return storage[used - 1];
} // if
else {
return storage[i];
} // else
} // operator[]
int getSize() const { return used; }
void push_back (T& item) {
if (used >= thisSize - 1) {
// We have to allocate new storage
int newSize = thisSize + sizeIncrement;
T* tmpStorage = new T[newSize];
if (tmpStorage == NULL) {
// Memory allocation error! break
ndbout << "PANIC: Memory allocation error in vector" << endl;
return;
} // if
thisSize = newSize;
for (int i = 0; i < used; i++) {
tmpStorage[i] = storage[i];
} // for
delete[] storage;
storage = tmpStorage;
} // if
// Now push
storage[used] = item;
used++;
}; // myVector<> push_back()
// Remove item at back
void pop_back() {
if (used > 0) {
used--;
} // if
} // pop_back()
int size() const { return used; };
bool empty() const { return(used == 0); }
void clear() {
used = 0;
}
};
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment