Commit a820636a authored by unknown's avatar unknown

Merge perch.ndb.mysql.com:/home/jonas/src/51-work

into  perch.ndb.mysql.com:/home/jonas/src/mysql-5.1-new-ndb


storage/ndb/src/kernel/blocks/backup/Backup.cpp:
  Auto merged
parents a8bbedd7 9f73d9d7
......@@ -427,10 +427,9 @@ Backup::execDUMP_STATE_ORD(Signal* signal)
for(ptr.p->files.first(filePtr); filePtr.i != RNIL;
ptr.p->files.next(filePtr)){
jam();
infoEvent(" file %d: type: %d open: %d running: %d done: %d scan: %d",
filePtr.i, filePtr.p->fileType, filePtr.p->fileOpened,
filePtr.p->fileRunning,
filePtr.p->fileClosing, filePtr.p->scanRunning);
infoEvent(" file %d: type: %d flags: H'%x",
filePtr.i, filePtr.p->fileType,
filePtr.p->m_flags);
}
}
}
......@@ -2368,13 +2367,40 @@ Backup::abort_scan(Signal * signal, BackupRecordPtr ptr)
void
Backup::defineBackupRef(Signal* signal, BackupRecordPtr ptr, Uint32 errCode)
{
jam();
ptr.p->setErrorCode(errCode);
if(ptr.p->is_lcp())
{
jam();
BackupFilePtr filePtr;
ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
if (filePtr.p->m_flags & BackupFile::BF_LCP_META)
{
jam();
ndbrequire(! (filePtr.p->m_flags & BackupFile::BF_FILE_THREAD));
ndbrequire(! (filePtr.p->m_flags & BackupFile::BF_CLOSING));
filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_LCP_META;
if (filePtr.p->m_flags & BackupFile::BF_OPEN)
{
filePtr.p->m_flags |= BackupFile::BF_CLOSING;
FsCloseReq * req = (FsCloseReq *)signal->getDataPtrSend();
req->filePointer = filePtr.p->filePointer;
req->userPointer = filePtr.i;
req->userReference = reference();
req->fileFlag = 0;
FsCloseReq::setRemoveFileFlag(req->fileFlag, 1);
sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal,
FsCloseReq::SignalLength, JBA);
return;
}
}
ndbrequire(filePtr.p->m_flags == 0);
TablePtr tabPtr;
FragmentPtr fragPtr;
ptr.p->setErrorCode(errCode);
ndbrequire(ptr.p->tables.first(tabPtr));
tabPtr.p->fragments.getPtr(fragPtr, 0);
......@@ -2390,7 +2416,6 @@ Backup::defineBackupRef(Signal* signal, BackupRecordPtr ptr, Uint32 errCode)
}
ptr.p->m_gsn = GSN_DEFINE_BACKUP_REF;
ptr.p->setErrorCode(errCode);
ndbrequire(ptr.p->errorCode != 0);
DefineBackupRef* ref = (DefineBackupRef*)signal->getDataPtrSend();
......@@ -2521,10 +2546,7 @@ Backup::execDEFINE_BACKUP_REQ(Signal* signal)
files[i].p->tableId = RNIL;
files[i].p->backupPtr = ptr.i;
files[i].p->filePointer = RNIL;
files[i].p->fileClosing = 0;
files[i].p->fileOpened = 0;
files[i].p->fileRunning = 0;
files[i].p->scanRunning = 0;
files[i].p->m_flags = 0;
files[i].p->errorCode = 0;
if(files[i].p->pages.seize(noOfPages[i]) == false) {
......@@ -2686,8 +2708,7 @@ Backup::openFiles(Signal* signal, BackupRecordPtr ptr)
* Ctl file
*/
c_backupFilePool.getPtr(filePtr, ptr.p->ctlFilePtr);
ndbrequire(filePtr.p->fileRunning == 0);
filePtr.p->fileRunning = 1;
filePtr.p->m_flags |= BackupFile::BF_OPENING;
req->userPointer = filePtr.i;
FsOpenReq::setVersion(req->fileNumber, 2);
......@@ -2700,8 +2721,7 @@ Backup::openFiles(Signal* signal, BackupRecordPtr ptr)
* Log file
*/
c_backupFilePool.getPtr(filePtr, ptr.p->logFilePtr);
ndbrequire(filePtr.p->fileRunning == 0);
filePtr.p->fileRunning = 1;
filePtr.p->m_flags |= BackupFile::BF_OPENING;
req->userPointer = filePtr.i;
FsOpenReq::setVersion(req->fileNumber, 2);
......@@ -2714,8 +2734,7 @@ Backup::openFiles(Signal* signal, BackupRecordPtr ptr)
* Data file
*/
c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
ndbrequire(filePtr.p->fileRunning == 0);
filePtr.p->fileRunning = 1;
filePtr.p->m_flags |= BackupFile::BF_OPENING;
req->userPointer = filePtr.i;
FsOpenReq::setVersion(req->fileNumber, 2);
......@@ -2761,8 +2780,8 @@ Backup::execFSOPENCONF(Signal* signal)
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
ndbrequire(filePtr.p->fileOpened == 0);
filePtr.p->fileOpened = 1;
ndbrequire(! (filePtr.p->m_flags & BackupFile::BF_OPEN));
filePtr.p->m_flags |= BackupFile::BF_OPEN;
openFilesReply(signal, ptr, filePtr);
}
......@@ -2775,16 +2794,16 @@ Backup::openFilesReply(Signal* signal,
/**
* Mark files as "opened"
*/
ndbrequire(filePtr.p->fileRunning == 1);
filePtr.p->fileRunning = 0;
ndbrequire(filePtr.p->m_flags & BackupFile::BF_OPENING);
filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_OPENING;
filePtr.p->m_flags |= BackupFile::BF_OPEN;
/**
* Check if all files have recived open_reply
*/
for(ptr.p->files.first(filePtr); filePtr.i!=RNIL;ptr.p->files.next(filePtr))
{
jam();
if(filePtr.p->fileRunning == 1) {
if(filePtr.p->m_flags & BackupFile::BF_OPENING) {
jam();
return;
}//if
......@@ -2840,12 +2859,21 @@ Backup::openFilesReply(Signal* signal,
/**
* Start CTL file thread
*/
if (!ptr.p->is_lcp())
{
jam();
ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
filePtr.p->fileRunning = 1;
filePtr.p->m_flags |= BackupFile::BF_FILE_THREAD;
signal->theData[0] = BackupContinueB::START_FILE_THREAD;
signal->theData[1] = filePtr.i;
sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 2);
}
else
{
jam();
filePtr.p->m_flags |= BackupFile::BF_LCP_META;
}
/**
* Insert table list in ctl file
......@@ -2933,6 +2961,10 @@ Backup::execGET_TABINFOREF(Signal* signal)
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, senderData);
BackupFilePtr filePtr;
ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_FILE_THREAD;
defineBackupRef(signal, ptr, ref->errorCode);
}
......@@ -3361,13 +3393,13 @@ Backup::execSTART_BACKUP_REQ(Signal* signal)
* Start file threads...
*/
BackupFilePtr filePtr;
for(ptr.p->files.first(filePtr);
filePtr.i!=RNIL;
ptr.p->files.next(filePtr)){
for(ptr.p->files.first(filePtr); filePtr.i!=RNIL;ptr.p->files.next(filePtr))
{
jam();
if(filePtr.p->fileRunning == 0) {
if(! (filePtr.p->m_flags & BackupFile::BF_FILE_THREAD))
{
jam();
filePtr.p->fileRunning = 1;
filePtr.p->m_flags |= BackupFile::BF_FILE_THREAD;
signal->theData[0] = BackupContinueB::START_FILE_THREAD;
signal->theData[1] = filePtr.i;
sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 2);
......@@ -3417,10 +3449,8 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
ndbrequire(filePtr.p->backupPtr == ptrI);
ndbrequire(filePtr.p->fileOpened == 1);
ndbrequire(filePtr.p->fileRunning == 1);
ndbrequire(filePtr.p->scanRunning == 0);
ndbrequire(filePtr.p->fileClosing == 0);
ndbrequire(filePtr.p->m_flags ==
(BackupFile::BF_OPEN | BackupFile::BF_FILE_THREAD));
/**
* Get table
......@@ -3469,7 +3499,7 @@ Backup::execBACKUP_FRAGMENT_REQ(Signal* signal)
* Start scan
*/
{
filePtr.p->scanRunning = 1;
filePtr.p->m_flags |= BackupFile::BF_SCAN_THREAD;
Table & table = * tabPtr.p;
ScanFragReq * req = (ScanFragReq *)signal->getDataPtrSend();
......@@ -3754,7 +3784,7 @@ Backup::execSCAN_FRAGREF(Signal* signal)
c_backupFilePool.getPtr(filePtr, filePtrI);
filePtr.p->errorCode = ref->errorCode;
filePtr.p->scanRunning = 0;
filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_SCAN_THREAD;
backupFragmentRef(signal, filePtr);
}
......@@ -3794,7 +3824,7 @@ Backup::fragmentCompleted(Signal* signal, BackupFilePtr filePtr)
if(filePtr.p->errorCode != 0)
{
jam();
filePtr.p->scanRunning = 0;
filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_SCAN_THREAD;
backupFragmentRef(signal, filePtr); // Scan completed
return;
}//if
......@@ -3808,7 +3838,7 @@ Backup::fragmentCompleted(Signal* signal, BackupFilePtr filePtr)
return;
}//if
filePtr.p->scanRunning = 0;
filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_SCAN_THREAD;
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
......@@ -3931,7 +3961,7 @@ Backup::execFSAPPENDREF(Signal* signal)
BackupFilePtr filePtr;
c_backupFilePool.getPtr(filePtr, filePtrI);
filePtr.p->fileRunning = 0;
filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_FILE_THREAD;
filePtr.p->errorCode = errCode;
checkFile(signal, filePtr);
......@@ -4062,36 +4092,34 @@ Backup::checkFile(Signal* signal, BackupFilePtr filePtr)
return;
}
#ifdef DEBUG_ABORT
Uint32 running= filePtr.p->fileRunning;
Uint32 closing= filePtr.p->fileClosing;
#endif
Uint32 flags = filePtr.p->m_flags;
filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_FILE_THREAD;
if(!filePtr.p->fileClosing)
{
filePtr.p->fileRunning = 0;
filePtr.p->fileClosing = 1;
ndbrequire(flags & BackupFile::BF_OPEN);
ndbrequire(flags & BackupFile::BF_FILE_THREAD);
ndbrequire(! (flags & BackupFile::BF_CLOSING));
filePtr.p->m_flags |= BackupFile::BF_CLOSING;
FsCloseReq * req = (FsCloseReq *)signal->getDataPtrSend();
req->filePointer = filePtr.p->filePointer;
req->userPointer = filePtr.i;
req->userReference = reference();
req->fileFlag = 0;
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
if (ptr.p->errorCode)
{
FsCloseReq::setRemoveFileFlag(req->fileFlag, 1);
}
#ifdef DEBUG_ABORT
ndbout_c("***** a FSCLOSEREQ filePtr.i = %u run=%d cl=%d", filePtr.i,
running, closing);
#endif
sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, signal, FsCloseReq::SignalLength, JBA);
}
else
{
#ifdef DEBUG_ABORT
ndbout_c("***** a NOT SENDING FSCLOSEREQ filePtr.i = %u run=%d cl=%d",
filePtr.i,
running, closing);
#endif
}
}
......@@ -4330,7 +4358,8 @@ Backup::closeFiles(Signal* sig, BackupRecordPtr ptr)
int openCount = 0;
for(ptr.p->files.first(filePtr); filePtr.i!=RNIL; ptr.p->files.next(filePtr))
{
if(filePtr.p->fileOpened == 0) {
if(! (filePtr.p->m_flags & BackupFile::BF_OPEN))
{
jam();
continue;
}
......@@ -4338,34 +4367,26 @@ Backup::closeFiles(Signal* sig, BackupRecordPtr ptr)
jam();
openCount++;
if(filePtr.p->fileClosing == 1){
if(filePtr.p->m_flags & BackupFile::BF_CLOSING)
{
jam();
continue;
}//if
filePtr.p->operation.dataBuffer.eof();
if(filePtr.p->fileRunning == 1){
if(filePtr.p->m_flags & BackupFile::BF_FILE_THREAD)
{
jam();
#ifdef DEBUG_ABORT
ndbout_c("Close files fileRunning == 1, filePtr.i=%u", filePtr.i);
#endif
} else {
}
else
{
jam();
filePtr.p->fileClosing = 1;
checkFile(sig, filePtr); // make sure we write everything before closing
FsCloseReq * req = (FsCloseReq *)sig->getDataPtrSend();
req->filePointer = filePtr.p->filePointer;
req->userPointer = filePtr.i;
req->userReference = reference();
req->fileFlag = 0;
#ifdef DEBUG_ABORT
ndbout_c("***** b FSCLOSEREQ filePtr.i = %u", filePtr.i);
#endif
sendSignal(NDBFS_REF, GSN_FSCLOSEREQ, sig,
FsCloseReq::SignalLength, JBA);
}//if
}//for
}
}
if(openCount == 0){
jam();
......@@ -4387,7 +4408,6 @@ Backup::execFSCLOSEREF(Signal* signal)
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
filePtr.p->fileOpened = 1;
FsConf * conf = (FsConf*)signal->getDataPtr();
conf->userPointer = filePtrI;
......@@ -4409,28 +4429,16 @@ Backup::execFSCLOSECONF(Signal* signal)
ndbout_c("***** FSCLOSECONF filePtrI = %u", filePtrI);
#endif
ndbrequire(filePtr.p->fileClosing == 1);
ndbrequire(filePtr.p->fileOpened == 1);
ndbrequire(filePtr.p->fileRunning == 0);
ndbrequire(filePtr.p->scanRunning == 0);
ndbrequire(filePtr.p->m_flags == (BackupFile::BF_OPEN |
BackupFile::BF_CLOSING));
filePtr.p->fileOpened = 0;
filePtr.p->m_flags &= ~(Uint32)(BackupFile::BF_OPEN |BackupFile::BF_CLOSING);
filePtr.p->operation.dataBuffer.reset();
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, filePtr.p->backupPtr);
for(ptr.p->files.first(filePtr); filePtr.i!=RNIL;ptr.p->files.next(filePtr))
{
jam();
if(filePtr.p->fileOpened == 1) {
jam();
#ifdef DEBUG_ABORT
ndbout_c("waiting for more FSCLOSECONF's filePtr.i = %u", filePtr.i);
#endif
return; // we will be getting more FSCLOSECONF's
}//if
}//for
closeFilesDone(signal, ptr);
closeFiles(signal, ptr);
}
void
......@@ -4626,12 +4634,8 @@ Backup::dumpUsedResources()
filePtr.i != RNIL;
ptr.p->files.next(filePtr)) {
jam();
ndbout_c("filePtr.i = %u, filePtr.p->fileOpened=%u fileRunning=%u "
"scanRunning=%u",
filePtr.i,
filePtr.p->fileOpened,
filePtr.p->fileRunning,
filePtr.p->scanRunning);
ndbout_c("filePtr.i = %u, flags: H'%x ",
filePtr.i, filePtr.p->m_flags);
}//for
}
}
......@@ -4665,13 +4669,10 @@ Backup::cleanup(Signal* signal, BackupRecordPtr ptr)
}//for
BackupFilePtr filePtr;
for(ptr.p->files.first(filePtr);
filePtr.i != RNIL;
ptr.p->files.next(filePtr)) {
for(ptr.p->files.first(filePtr);filePtr.i != RNIL;ptr.p->files.next(filePtr))
{
jam();
ndbrequire(filePtr.p->fileOpened == 0);
ndbrequire(filePtr.p->fileRunning == 0);
ndbrequire(filePtr.p->scanRunning == 0);
ndbrequire(filePtr.p->m_flags == 0);
filePtr.p->pages.release();
}//for
......@@ -4744,11 +4745,35 @@ Backup::execLCP_PREPARE_REQ(Signal* signal)
BackupRecordPtr ptr;
c_backupPool.getPtr(ptr, req.backupPtr);
ptr.p->m_gsn = GSN_LCP_PREPARE_REQ;
TablePtr tabPtr;
if(ptr.p->errorCode == 0)
FragmentPtr fragPtr;
if (!ptr.p->tables.isEmpty())
{
jam();
FragmentPtr fragPtr;
ndbrequire(ptr.p->errorCode);
ptr.p->tables.first(tabPtr);
if (tabPtr.p->tableId == req.tableId)
{
jam();
ndbrequire(!tabPtr.p->fragments.empty());
tabPtr.p->fragments.getPtr(fragPtr, 0);
fragPtr.p->fragmentId = req.fragmentId;
defineBackupRef(signal, ptr, ptr.p->errorCode);
return;
}
else
{
jam();
tabPtr.p->attributes.release();
tabPtr.p->fragments.release();
ptr.p->tables.release();
ptr.p->errorCode = 0;
// fall-through
}
}
if(!ptr.p->tables.seize(tabPtr) || !tabPtr.p->fragments.seize(1))
{
if(!tabPtr.isNull())
......@@ -4766,15 +4791,6 @@ Backup::execLCP_PREPARE_REQ(Signal* signal)
ptr.p->backupId= req.backupId;
lcp_open_file(signal, ptr);
}
else
{
jam();
FragmentPtr fragPtr;
tabPtr.p->fragments.getPtr(fragPtr, 0);
fragPtr.p->fragmentId = req.fragmentId;
defineBackupRef(signal, ptr, ptr.p->errorCode);
}
}
void
......@@ -4786,6 +4802,17 @@ Backup::lcp_close_file_conf(Signal* signal, BackupRecordPtr ptr)
ndbrequire(ptr.p->tables.first(tabPtr));
Uint32 tableId = tabPtr.p->tableId;
BackupFilePtr filePtr;
c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
ndbrequire(filePtr.p->m_flags == 0);
if (ptr.p->m_gsn == GSN_LCP_PREPARE_REQ)
{
jam();
defineBackupRef(signal, ptr, ptr.p->errorCode);
return;
}
FragmentPtr fragPtr;
tabPtr.p->fragments.getPtr(fragPtr, 0);
Uint32 fragmentId = fragPtr.p->fragmentId;
......@@ -4833,9 +4860,8 @@ Backup::lcp_open_file(Signal* signal, BackupRecordPtr ptr)
*/
BackupFilePtr filePtr;
c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
ndbrequire(filePtr.p->fileRunning == 0);
filePtr.p->fileClosing = 0;
filePtr.p->fileRunning = 1;
ndbrequire(filePtr.p->m_flags == 0);
filePtr.p->m_flags |= BackupFile::BF_OPENING;
req->userPointer = filePtr.i;
FsOpenReq::setVersion(req->fileNumber, 5);
......@@ -4855,6 +4881,12 @@ Backup::lcp_open_file_done(Signal* signal, BackupRecordPtr ptr)
ndbrequire(ptr.p->tables.first(tabPtr));
tabPtr.p->fragments.getPtr(fragPtr, 0);
BackupFilePtr filePtr;
c_backupFilePool.getPtr(filePtr, ptr.p->dataFilePtr);
ndbrequire(filePtr.p->m_flags ==
(BackupFile::BF_OPEN | BackupFile::BF_LCP_META));
filePtr.p->m_flags &= ~(Uint32)BackupFile::BF_LCP_META;
ptr.p->slaveState.setState(STARTED);
LcpPrepareConf* conf= (LcpPrepareConf*)signal->getDataPtrSend();
......@@ -4864,6 +4896,16 @@ Backup::lcp_open_file_done(Signal* signal, BackupRecordPtr ptr)
conf->fragmentId = fragPtr.p->fragmentId;
sendSignal(ptr.p->masterRef, GSN_LCP_PREPARE_CONF,
signal, LcpPrepareConf::SignalLength, JBB);
/**
* Start file thread
*/
filePtr.p->m_flags |= BackupFile::BF_FILE_THREAD;
signal->theData[0] = BackupContinueB::START_FILE_THREAD;
signal->theData[1] = filePtr.i;
signal->theData[2] = __LINE__;
sendSignalWithDelay(BACKUP_REF, GSN_CONTINUEB, signal, 100, 3);
}
void
......@@ -4875,6 +4917,22 @@ Backup::execEND_LCPREQ(Signal* signal)
c_backupPool.getPtr(ptr, req->backupPtr);
ndbrequire(ptr.p->backupId == req->backupId);
BackupFilePtr filePtr;
ptr.p->files.getPtr(filePtr, ptr.p->ctlFilePtr);
ndbrequire(filePtr.p->m_flags == 0);
if (!ptr.p->tables.isEmpty())
{
jam();
ndbrequire(ptr.p->errorCode);
TablePtr tabPtr;
ptr.p->tables.first(tabPtr);
tabPtr.p->attributes.release();
tabPtr.p->fragments.release();
ptr.p->tables.release();
ptr.p->errorCode = 0;
}
ptr.p->errorCode = 0;
ptr.p->slaveState.setState(CLEANING);
ptr.p->slaveState.setState(INITIAL);
......
......@@ -345,10 +345,16 @@ public:
Uint32 nextList;
union { Uint32 prevList; Uint32 nextPool; };
Uint8 fileOpened;
Uint8 fileRunning;
Uint8 fileClosing;
Uint8 scanRunning;
enum {
BF_OPEN = 0x1
,BF_OPENING = 0x2
,BF_CLOSING = 0x4
,BF_FILE_THREAD = 0x8
,BF_SCAN_THREAD = 0x10
,BF_LCP_META = 0x20
};
Uint32 m_flags;
Uint32 m_pos;
};
typedef Ptr<BackupFile> BackupFilePtr;
......@@ -357,14 +363,14 @@ public:
* State for BackupRecord
*/
enum State {
INITIAL,
DEFINING, // Defining backup content and parameters
DEFINED, // DEFINE_BACKUP_CONF sent in slave, received all in master
STARTED, // Creating triggers
SCANNING, // Scanning fragments
STOPPING, // Closing files
CLEANING, // Cleaning resources
ABORTING // Aborting backup
INITIAL = 0,
DEFINING = 1, // Defining backup content and parameters
DEFINED = 2, // DEFINE_BACKUP_CONF sent in slave, received all in master
STARTED = 3, // Creating triggers
SCANNING = 4, // Scanning fragments
STOPPING = 5, // Closing files
CLEANING = 6, // Cleaning resources
ABORTING = 7 // Aborting backup
};
static const Uint32 validSlaveTransitionsCount;
......
......@@ -212,8 +212,6 @@ inline
void
FsBuffer::reset()
{
assert(m_free = m_size);
assert(m_readIndex == m_writeIndex);
m_readIndex = m_writeIndex = 0;
m_free = m_size;
m_eof = 0;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment