bug#18594 ndb_restore log boken in 5.1

- corrected previous patch
- read log entry variables out into explicit variables for siimpler code (and backwards compatability code)
parent 9a2938b5
...@@ -961,15 +961,15 @@ RestoreLogIterator::RestoreLogIterator(const RestoreMetaData & md) ...@@ -961,15 +961,15 @@ RestoreLogIterator::RestoreLogIterator(const RestoreMetaData & md)
const LogEntry * const LogEntry *
RestoreLogIterator::getNextLogEntry(int & res) { RestoreLogIterator::getNextLogEntry(int & res) {
// Read record length // Read record length
typedef BackupFormat::LogFile::LogEntry LogE;
typedef BackupFormat::LogFile::LogEntry_no_fragid LogE_no_fragid;
const Uint32 offset= 3;
assert(offset == (offsetof(LogE, Data) >> 2) - 1);
LogE * logE= 0;
Uint32 len= ~0;
const Uint32 stopGCP = m_metaData.getStopGCP(); const Uint32 stopGCP = m_metaData.getStopGCP();
NdbAutoPtr<char> ap1; Uint32 tableId;
Uint32 triggerEvent;
Uint32 frag_id;
Uint32 *attr_data;
Uint32 attr_data_len;
do { do {
Uint32 len;
Uint32 *logEntryPtr;
if (buffer_read_ahead(&len, sizeof(Uint32), 1) != 1){ if (buffer_read_ahead(&len, sizeof(Uint32), 1) != 1){
res= -1; res= -1;
return 0; return 0;
...@@ -977,7 +977,7 @@ RestoreLogIterator::getNextLogEntry(int & res) { ...@@ -977,7 +977,7 @@ RestoreLogIterator::getNextLogEntry(int & res) {
len= ntohl(len); len= ntohl(len);
Uint32 data_len = sizeof(Uint32) + len*4; Uint32 data_len = sizeof(Uint32) + len*4;
if (buffer_get_ptr((void **)(&logE), 1, data_len) != data_len) { if (buffer_get_ptr((void **)(&logEntryPtr), 1, data_len) != data_len) {
res= -2; res= -2;
return 0; return 0;
} }
...@@ -986,7 +986,8 @@ RestoreLogIterator::getNextLogEntry(int & res) { ...@@ -986,7 +986,8 @@ RestoreLogIterator::getNextLogEntry(int & res) {
res= 0; res= 0;
return 0; return 0;
} }
if (m_metaData.getFileHeader().NdbVersion < NDBD_FRAGID_VERSION)
if (unlikely(m_metaData.getFileHeader().NdbVersion < NDBD_FRAGID_VERSION))
{ {
/* /*
FragId was introduced in LogEntry in version FragId was introduced in LogEntry in version
...@@ -994,48 +995,39 @@ RestoreLogIterator::getNextLogEntry(int & res) { ...@@ -994,48 +995,39 @@ RestoreLogIterator::getNextLogEntry(int & res) {
We set FragId to 0 in older versions (these versions We set FragId to 0 in older versions (these versions
do not support restore of user defined partitioned do not support restore of user defined partitioned
tables. tables.
These log entries miss one Uint32 FragId, hence missing_len=1
Reconstruct a new log entry with old.
*/ */
const Uint32 missing_len= 1; typedef BackupFormat::LogFile::LogEntry_no_fragid LogE_no_fragid;
assert((offsetof(LogE, Data) - offsetof(LogE_no_fragid, Data)) >> 2 == LogE_no_fragid * logE_no_fragid= (LogE_no_fragid *)logEntryPtr;
missing_len); tableId= ntohl(logE_no_fragid->TableId);
LogE_no_fragid * logE_no_fragid= (LogE_no_fragid *)logE; triggerEvent= ntohl(logE_no_fragid->TriggerEvent);
frag_id= 0;
int i; attr_data= &logE_no_fragid->Data[0];
LogE *tmpLogE= (LogE*)NdbMem_Allocate(data_len + missing_len*4); attr_data_len= len - ((offsetof(LogE_no_fragid, Data) >> 2) - 1);
if (!tmpLogE) }
else /* normal case */
{ {
res = -2; typedef BackupFormat::LogFile::LogEntry LogE;
return 0; LogE * logE= (LogE *)logEntryPtr;
} tableId= ntohl(logE->TableId);
ap1.reset((char*)tmpLogE); triggerEvent= ntohl(logE->TriggerEvent);
bzero(tmpLogE, data_len + missing_len*4); frag_id= ntohl(logE->FragId);
/* correct len to reflect new logEntry version length */ attr_data= &logE->Data[0];
len+= missing_len; attr_data_len= len - ((offsetof(LogE, Data) >> 2) - 1);
tmpLogE->Length = logE_no_fragid->Length;
tmpLogE->TableId = logE_no_fragid->TableId;
tmpLogE->TriggerEvent = logE_no_fragid->TriggerEvent;
for (i = 0; i < len - offset; i++)
tmpLogE->Data[i] = logE_no_fragid->Data[i];
logE= tmpLogE;
} }
logE->TableId= ntohl(logE->TableId);
logE->TriggerEvent= ntohl(logE->TriggerEvent);
const bool hasGcp= (logE->TriggerEvent & 0x10000) != 0; const bool hasGcp= (triggerEvent & 0x10000) != 0;
logE->TriggerEvent &= 0xFFFF; triggerEvent &= 0xFFFF;
if(hasGcp){ if(hasGcp){
// last attr_data is gci info
attr_data_len--;
len--; len--;
m_last_gci = ntohl(logE->Data[len-offset]); m_last_gci = ntohl(*(attr_data + attr_data_len));
} }
} while(m_last_gci > stopGCP + 1); } while(m_last_gci > stopGCP + 1);
m_logEntry.m_table = m_metaData.getTable(logE->TableId); m_logEntry.m_table = m_metaData.getTable(tableId);
switch(logE->TriggerEvent){ switch(triggerEvent){
case TriggerEvent::TE_INSERT: case TriggerEvent::TE_INSERT:
m_logEntry.m_type = LogEntry::LE_INSERT; m_logEntry.m_type = LogEntry::LE_INSERT;
break; break;
...@@ -1053,10 +1045,10 @@ RestoreLogIterator::getNextLogEntry(int & res) { ...@@ -1053,10 +1045,10 @@ RestoreLogIterator::getNextLogEntry(int & res) {
const TableS * tab = m_logEntry.m_table; const TableS * tab = m_logEntry.m_table;
m_logEntry.clear(); m_logEntry.clear();
AttributeHeader * ah = (AttributeHeader *)&logE->Data[0]; AttributeHeader * ah = (AttributeHeader *)attr_data;
AttributeHeader *end = (AttributeHeader *)&logE->Data[len - offset]; AttributeHeader *end = (AttributeHeader *)(attr_data + attr_data_len);
AttributeS * attr; AttributeS * attr;
m_logEntry.m_frag_id = ntohl(logE->FragId); m_logEntry.m_frag_id = frag_id;
while(ah < end){ while(ah < end){
attr= m_logEntry.add_attr(); attr= m_logEntry.add_attr();
if(attr == NULL) { if(attr == NULL) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment