Commit e9347de8 authored by unknown's avatar unknown

Bug#18594 ndb_restore log boken in 5.1

- added compatability with 5.0
- added test case for compatability with 5.0 and 5.1


mysql-test/std_data/ndb_backup50/BACKUP-1-0.1.Data:
  New BitKeeper file ``mysql-test/std_data/ndb_backup50/BACKUP-1-0.1.Data''
mysql-test/std_data/ndb_backup50/BACKUP-1-0.2.Data:
  New BitKeeper file ``mysql-test/std_data/ndb_backup50/BACKUP-1-0.2.Data''
mysql-test/std_data/ndb_backup50/BACKUP-1.1.log:
  New BitKeeper file ``mysql-test/std_data/ndb_backup50/BACKUP-1.1.log''
mysql-test/std_data/ndb_backup50/BACKUP-1.2.log:
  New BitKeeper file ``mysql-test/std_data/ndb_backup50/BACKUP-1.2.log''
mysql-test/std_data/ndb_backup51/BACKUP-1-0.1.Data:
  New BitKeeper file ``mysql-test/std_data/ndb_backup51/BACKUP-1-0.1.Data''
mysql-test/std_data/ndb_backup51/BACKUP-1-0.2.Data:
  New BitKeeper file ``mysql-test/std_data/ndb_backup51/BACKUP-1-0.2.Data''
parent 946b135f
...@@ -467,3 +467,116 @@ Create table test/def/t2_c failed: Translate frm error ...@@ -467,3 +467,116 @@ Create table test/def/t2_c failed: Translate frm error
drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9; drop table if exists t1,t2,t3,t4,t5,t6,t7,t8,t9;
drop table if exists t2_c; drop table if exists t2_c;
520093696,<the_backup_id> 520093696,<the_backup_id>
DROP DATABASE IF EXISTS BANK;
CREATE DATABASE BANK default charset=latin1 default collate=latin1_bin;
USE BANK;
SHOW TABLES;
Tables_in_BANK
ACCOUNT
GL
ACCOUNT_TYPE
TRANSACTION
SYSTEM_VALUES
SELECT * FROM GL ORDER BY TIME,ACCOUNT_TYPE;
TIME ACCOUNT_TYPE BALANCE DEPOSIT_COUNT DEPOSIT_SUM WITHDRAWAL_COUNT WITHDRAWAL_SUM PURGED
0 0 10000000 0 0 0 0 1
0 1 30000 0 0 0 0 1
0 2 20000 0 0 0 0 1
0 3 20000 0 0 0 0 1
0 4 20000 0 0 0 0 1
1 0 10000000 0 0 0 0 1
1 1 30000 0 0 0 0 1
1 2 20000 0 0 0 0 1
1 3 20000 0 0 0 0 1
1 4 20000 0 0 0 0 1
2 0 9857062 54 225197 76 368135 1
2 1 60601 174 822920 181 792319 1
2 2 68832 117 531214 98 482382 1
2 3 83550 106 521953 104 458403 1
2 4 19955 118 532084 110 532129 1
3 0 9732896 62 289563 88 413729 1
3 1 51056 202 895888 193 905433 0
3 2 67183 122 596787 127 598436 1
3 3 97669 159 761743 141 747624 1
3 4 141196 140 727808 136 606567 1
4 0 9616621 138 603930 142 720205 0
4 1 178927 348 1741521 344 1613650 0
4 2 52141 236 1169929 232 1184971 0
4 3 48938 228 1147957 244 1196688 0
4 4 193373 246 1257982 234 1205805 0
5 0 9515281 156 726253 166 827593 0
5 1 253798 597 2840640 545 2765769 0
5 2 102776 362 1821680 364 1771045 0
5 3 87349 359 1778652 375 1740241 0
5 4 130796 351 1727448 375 1790025 0
SELECT * FROM ACCOUNT ORDER BY ACCOUNT_ID;
ACCOUNT_ID OWNER BALANCE ACCOUNT_TYPE
0 0 9531306 0
1 3001 123844 1
2 3002 30800 2
3 3003 3133 3
4 3004 6524 4
5 3005 80152 1
6 3006 107390 1
7 3007 69448 2
8 3008 663 3
9 3009 136740 4
SELECT COUNT(*) FROM TRANSACTION;
COUNT(*)
6649
SELECT * FROM SYSTEM_VALUES ORDER BY SYSTEM_VALUES_ID;
SYSTEM_VALUES_ID VALUE
0 4767
1 6
TRUNCATE GL;
TRUNCATE ACCOUNT;
TRUNCATE TRANSACTION;
TRUNCATE SYSTEM_VALUES;
TRUNCATE ACCOUNT_TYPE;
SELECT * FROM GL ORDER BY TIME,ACCOUNT_TYPE;
TIME ACCOUNT_TYPE BALANCE DEPOSIT_COUNT DEPOSIT_SUM WITHDRAWAL_COUNT WITHDRAWAL_SUM PURGED
0 0 10000000 0 0 0 0 1
0 1 30000 0 0 0 0 1
0 2 20000 0 0 0 0 1
0 3 20000 0 0 0 0 1
0 4 20000 0 0 0 0 1
1 0 10000000 0 0 0 0 1
1 1 30000 0 0 0 0 1
1 2 20000 0 0 0 0 1
1 3 20000 0 0 0 0 1
1 4 20000 0 0 0 0 1
2 0 10000000 0 0 0 0 1
2 1 30000 0 0 0 0 1
2 2 20000 0 0 0 0 1
2 3 20000 0 0 0 0 1
2 4 20000 0 0 0 0 1
3 0 9963591 14 59111 19 95520 0
3 1 44264 49 255559 53 241295 0
3 2 25515 39 177806 36 172291 0
3 3 16779 26 129200 29 132421 0
3 4 39851 43 182771 34 162920 0
4 0 9733661 141 632616 162 862546 0
4 1 63853 426 2005337 415 1985748 0
4 2 140473 314 1548632 297 1433674 0
4 3 13481 310 1528043 324 1531341 0
4 4 138532 316 1540206 309 1441525 0
SELECT * FROM ACCOUNT ORDER BY ACCOUNT_ID;
ACCOUNT_ID OWNER BALANCE ACCOUNT_TYPE
0 0 9679579 0
1 3001 18130 1
2 3002 12318 2
3 3003 3049 3
4 3004 39517 4
5 3005 37051 1
6 3006 144497 1
7 3007 130670 2
8 3008 13747 3
9 3009 11442 4
SELECT COUNT(*) FROM TRANSACTION;
COUNT(*)
4056
SELECT * FROM SYSTEM_VALUES ORDER BY SYSTEM_VALUES_ID;
SYSTEM_VALUES_ID VALUE
0 2297
1 5
DROP DATABASE BANK;
...@@ -373,3 +373,38 @@ drop table if exists t2_c; ...@@ -373,3 +373,38 @@ drop table if exists t2_c;
--exec $NDB_TOOLS_DIR/ndb_select_all --no-defaults -d sys -D , SYSTAB_0 | grep 520093696, | sed "s/,$the_backup_id/,<the_backup_id>/" --exec $NDB_TOOLS_DIR/ndb_select_all --no-defaults -d sys -D , SYSTAB_0 | grep 520093696, | sed "s/,$the_backup_id/,<the_backup_id>/"
# End of 4.1 tests # End of 4.1 tests
#
# Bug #18594 ndb_restore log boken in 5.1
#
--disable_warnings
DROP DATABASE IF EXISTS BANK;
--enable_warnings
CREATE DATABASE BANK default charset=latin1 default collate=latin1_bin;
USE BANK;
--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 1 -p 1 -m -r $MYSQL_TEST_DIR/std_data/ndb_backup51 >> $NDB_TOOLS_OUTPUT
--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 2 -p 1 -r $MYSQL_TEST_DIR/std_data/ndb_backup51 >> $NDB_TOOLS_OUTPUT
SHOW TABLES;
SELECT * FROM GL ORDER BY TIME,ACCOUNT_TYPE;
SELECT * FROM ACCOUNT ORDER BY ACCOUNT_ID;
SELECT COUNT(*) FROM TRANSACTION;
SELECT * FROM SYSTEM_VALUES ORDER BY SYSTEM_VALUES_ID;
#
# verify restore of 5.0 backup
# here we must use the already created tables as restoring the old
# table definitions will not work
#
TRUNCATE GL;
TRUNCATE ACCOUNT;
TRUNCATE TRANSACTION;
TRUNCATE SYSTEM_VALUES;
TRUNCATE ACCOUNT_TYPE;
--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 1 -p 1 -r $MYSQL_TEST_DIR/std_data/ndb_backup50 >> $NDB_TOOLS_OUTPUT
--exec $NDB_TOOLS_DIR/ndb_restore --no-defaults -b 1 -n 2 -p 1 -r $MYSQL_TEST_DIR/std_data/ndb_backup50 >> $NDB_TOOLS_OUTPUT
SELECT * FROM GL ORDER BY TIME,ACCOUNT_TYPE;
SELECT * FROM ACCOUNT ORDER BY ACCOUNT_ID;
SELECT COUNT(*) FROM TRANSACTION;
SELECT * FROM SYSTEM_VALUES ORDER BY SYSTEM_VALUES_ID;
DROP DATABASE BANK;
...@@ -146,6 +146,17 @@ struct BackupFormat { ...@@ -146,6 +146,17 @@ struct BackupFormat {
Uint32 FragId; Uint32 FragId;
Uint32 Data[1]; // Len = Length - 3 Uint32 Data[1]; // Len = Length - 3
}; };
/**
* Log Entry pre NDBD_FRAGID_VERSION
*/
struct LogEntry_no_fragid {
Uint32 Length;
Uint32 TableId;
// If TriggerEvent & 0x10000 == true then GCI is right after data
Uint32 TriggerEvent;
Uint32 Data[1]; // Len = Length - 2
};
}; };
/** /**
......
...@@ -43,6 +43,7 @@ ...@@ -43,6 +43,7 @@
#include <my_sys.h> #include <my_sys.h>
#include <NdbEnv.h> #include <NdbEnv.h>
#include <NdbMem.h> #include <NdbMem.h>
#include <ndb_version.h>
#define DEBUG_PRINT 0 #define DEBUG_PRINT 0
#define INCOMPATIBLE_VERSION -2 #define INCOMPATIBLE_VERSION -2
...@@ -1963,7 +1964,8 @@ indexTypeMapping[] = { ...@@ -1963,7 +1964,8 @@ indexTypeMapping[] = {
int int
NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
const Uint32 * data, Uint32 len, const Uint32 * data, Uint32 len,
bool fullyQualifiedNames) bool fullyQualifiedNames,
Uint32 version)
{ {
SimplePropertiesLinearReader it(data, len); SimplePropertiesLinearReader it(data, len);
DictTabInfo::Table *tableDesc; DictTabInfo::Table *tableDesc;
...@@ -2142,7 +2144,14 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret, ...@@ -2142,7 +2144,14 @@ NdbDictInterface::parseTableInfo(NdbTableImpl ** ret,
* ret = impl; * ret = impl;
NdbMem_Free((void*)tableDesc); NdbMem_Free((void*)tableDesc);
DBUG_ASSERT(impl->m_fragmentCount > 0); if (version < MAKE_VERSION(5,1,3))
{
;
}
else
{
DBUG_ASSERT(impl->m_fragmentCount > 0);
}
DBUG_RETURN(0); DBUG_RETURN(0);
} }
......
...@@ -454,7 +454,8 @@ public: ...@@ -454,7 +454,8 @@ public:
static int parseTableInfo(NdbTableImpl ** dst, static int parseTableInfo(NdbTableImpl ** dst,
const Uint32 * data, Uint32 len, const Uint32 * data, Uint32 len,
bool fullyQualifiedNames); bool fullyQualifiedNames,
Uint32 version= 0xFFFFFFFF);
static int parseFileInfo(NdbFileImpl &dst, static int parseFileInfo(NdbFileImpl &dst,
const Uint32 * data, Uint32 len); const Uint32 * data, Uint32 len);
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#include <SimpleProperties.hpp> #include <SimpleProperties.hpp>
#include <signaldata/DictTabInfo.hpp> #include <signaldata/DictTabInfo.hpp>
#include <ndb_limits.h> #include <ndb_limits.h>
#include <NdbAutoPtr.hpp>
#include "../../../../sql/ha_ndbcluster_tables.h" #include "../../../../sql/ha_ndbcluster_tables.h"
...@@ -291,6 +292,7 @@ RestoreMetaData::markSysTables() ...@@ -291,6 +292,7 @@ RestoreMetaData::markSysTables()
strcmp(tableName, "NDB$EVENTS_0") == 0 || strcmp(tableName, "NDB$EVENTS_0") == 0 ||
strcmp(tableName, "sys/def/SYSTAB_0") == 0 || strcmp(tableName, "sys/def/SYSTAB_0") == 0 ||
strcmp(tableName, "sys/def/NDB$EVENTS_0") == 0 || strcmp(tableName, "sys/def/NDB$EVENTS_0") == 0 ||
strcmp(tableName, "cluster_replication/def/" NDB_APPLY_TABLE) == 0 ||
strcmp(tableName, NDB_REP_DB "/def/" NDB_APPLY_TABLE) == 0 || strcmp(tableName, NDB_REP_DB "/def/" NDB_APPLY_TABLE) == 0 ||
strcmp(tableName, NDB_REP_DB "/def/" NDB_SCHEMA_TABLE)== 0 ) strcmp(tableName, NDB_REP_DB "/def/" NDB_SCHEMA_TABLE)== 0 )
table->isSysTable = true; table->isSysTable = true;
...@@ -377,7 +379,8 @@ bool ...@@ -377,7 +379,8 @@ bool
RestoreMetaData::parseTableDescriptor(const Uint32 * data, Uint32 len) RestoreMetaData::parseTableDescriptor(const Uint32 * data, Uint32 len)
{ {
NdbTableImpl* tableImpl = 0; NdbTableImpl* tableImpl = 0;
int ret = NdbDictInterface::parseTableInfo(&tableImpl, data, len, false); int ret = NdbDictInterface::parseTableInfo(&tableImpl, data, len, false,
m_fileHeader.NdbVersion);
if (ret != 0) { if (ret != 0) {
err << "parseTableInfo " << " failed" << endl; err << "parseTableInfo " << " failed" << endl;
...@@ -956,13 +959,16 @@ RestoreLogIterator::RestoreLogIterator(const RestoreMetaData & md) ...@@ -956,13 +959,16 @@ RestoreLogIterator::RestoreLogIterator(const RestoreMetaData & md)
} }
const LogEntry * const LogEntry *
RestoreLogIterator::getNextLogEntry(int & res, bool *alloc_flag) { RestoreLogIterator::getNextLogEntry(int & res) {
// Read record length // Read record length
typedef BackupFormat::LogFile::LogEntry LogE; typedef BackupFormat::LogFile::LogEntry LogE;
typedef BackupFormat::LogFile::LogEntry_no_fragid LogE_no_fragid;
const Uint32 offset= 3;
assert(offset == (offsetof(LogE, Data) >> 2) - 1);
LogE * logE= 0; LogE * logE= 0;
Uint32 len= ~0; Uint32 len= ~0;
const Uint32 stopGCP = m_metaData.getStopGCP(); const Uint32 stopGCP = m_metaData.getStopGCP();
NdbAutoPtr<char> ap1;
do { do {
if (buffer_read_ahead(&len, sizeof(Uint32), 1) != 1){ if (buffer_read_ahead(&len, sizeof(Uint32), 1) != 1){
res= -1; res= -1;
...@@ -988,21 +994,33 @@ RestoreLogIterator::getNextLogEntry(int & res, bool *alloc_flag) { ...@@ -988,21 +994,33 @@ RestoreLogIterator::getNextLogEntry(int & res, bool *alloc_flag) {
We set FragId to 0 in older versions (these versions We set FragId to 0 in older versions (these versions
do not support restore of user defined partitioned do not support restore of user defined partitioned
tables. tables.
These log entries miss one Uint32 FragId, hence missing_len=1
Reconstruct a new log entry with old.
*/ */
const Uint32 missing_len= 1;
assert((offsetof(LogE, Data) - offsetof(LogE_no_fragid, Data)) >> 2 ==
missing_len);
LogE_no_fragid * logE_no_fragid= (LogE_no_fragid *)logE;
int i; int i;
LogE *tmpLogE = (LogE*)NdbMem_Allocate(data_len + 4); LogE *tmpLogE= (LogE*)NdbMem_Allocate(data_len + missing_len*4);
if (!tmpLogE) if (!tmpLogE)
{ {
res = -2; res = -2;
return 0; return 0;
} }
tmpLogE->Length = logE->Length; ap1.reset((char*)tmpLogE);
tmpLogE->TableId = logE->TableId; bzero(tmpLogE, data_len + missing_len*4);
tmpLogE->TriggerEvent = logE->TriggerEvent; /* correct len to reflect new logEntry version length */
tmpLogE->FragId = 0; len+= missing_len;
for (i = 0; i < len - 3; i++) tmpLogE->Length = logE_no_fragid->Length;
tmpLogE->Data[i] = logE->Data[i-1]; tmpLogE->TableId = logE_no_fragid->TableId;
*alloc_flag= true; tmpLogE->TriggerEvent = logE_no_fragid->TriggerEvent;
for (i = 0; i < len - offset; i++)
tmpLogE->Data[i] = logE_no_fragid->Data[i];
logE= tmpLogE;
} }
logE->TableId= ntohl(logE->TableId); logE->TableId= ntohl(logE->TableId);
logE->TriggerEvent= ntohl(logE->TriggerEvent); logE->TriggerEvent= ntohl(logE->TriggerEvent);
...@@ -1012,7 +1030,7 @@ RestoreLogIterator::getNextLogEntry(int & res, bool *alloc_flag) { ...@@ -1012,7 +1030,7 @@ RestoreLogIterator::getNextLogEntry(int & res, bool *alloc_flag) {
if(hasGcp){ if(hasGcp){
len--; len--;
m_last_gci = ntohl(logE->Data[len-2]); m_last_gci = ntohl(logE->Data[len-offset]);
} }
} while(m_last_gci > stopGCP + 1); } while(m_last_gci > stopGCP + 1);
...@@ -1036,7 +1054,7 @@ RestoreLogIterator::getNextLogEntry(int & res, bool *alloc_flag) { ...@@ -1036,7 +1054,7 @@ RestoreLogIterator::getNextLogEntry(int & res, bool *alloc_flag) {
m_logEntry.clear(); m_logEntry.clear();
AttributeHeader * ah = (AttributeHeader *)&logE->Data[0]; AttributeHeader * ah = (AttributeHeader *)&logE->Data[0];
AttributeHeader *end = (AttributeHeader *)&logE->Data[len - 3]; AttributeHeader *end = (AttributeHeader *)&logE->Data[len - offset];
AttributeS * attr; AttributeS * attr;
m_logEntry.m_frag_id = ntohl(logE->FragId); m_logEntry.m_frag_id = ntohl(logE->FragId);
while(ah < end){ while(ah < end){
......
...@@ -386,7 +386,7 @@ public: ...@@ -386,7 +386,7 @@ public:
RestoreLogIterator(const RestoreMetaData &); RestoreLogIterator(const RestoreMetaData &);
virtual ~RestoreLogIterator() {}; virtual ~RestoreLogIterator() {};
const LogEntry * getNextLogEntry(int & res, bool *alloc_flag); const LogEntry * getNextLogEntry(int & res);
}; };
NdbOut& operator<<(NdbOut& ndbout, const TableS&); NdbOut& operator<<(NdbOut& ndbout, const TableS&);
......
...@@ -615,14 +615,11 @@ main(int argc, char** argv) ...@@ -615,14 +615,11 @@ main(int argc, char** argv)
} }
const LogEntry * logEntry = 0; const LogEntry * logEntry = 0;
bool alloc_flag = false; while ((logEntry = logIter.getNextLogEntry(res= 0)) != 0)
while ((logEntry = logIter.getNextLogEntry(res= 0, &alloc_flag)) != 0)
{ {
if (checkSysTable(logEntry->m_table)) if (checkSysTable(logEntry->m_table))
for(Uint32 i= 0; i < g_consumers.size(); i++) for(Uint32 i= 0; i < g_consumers.size(); i++)
g_consumers[i]->logEntry(* logEntry); g_consumers[i]->logEntry(* logEntry);
if (alloc_flag)
NdbMem_Free((void*)logEntry);
} }
if (res < 0) if (res < 0)
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment