ndb - add OM_AUTO_SYNC ta make sure os-kernel does not buffer too much

      add sync-flag to FsAppendReq
parent dca8afa9
...@@ -39,7 +39,7 @@ class FsAppendReq { ...@@ -39,7 +39,7 @@ class FsAppendReq {
friend bool printFSAPPENDREQ(FILE * output, const Uint32 * theData, friend bool printFSAPPENDREQ(FILE * output, const Uint32 * theData,
Uint32 len, Uint16 receiverBlockNo); Uint32 len, Uint16 receiverBlockNo);
public: public:
STATIC_CONST( SignalLength = 6 ); STATIC_CONST( SignalLength = 7 );
private: private:
...@@ -52,6 +52,7 @@ private: ...@@ -52,6 +52,7 @@ private:
UintR varIndex; // DATA 3 UintR varIndex; // DATA 3
UintR offset; // DATA 4 UintR offset; // DATA 4
UintR size; // DATA 5 UintR size; // DATA 5
UintR synch_flag; // DATA 6
}; };
#endif #endif
...@@ -53,7 +53,7 @@ public: ...@@ -53,7 +53,7 @@ public:
/** /**
* Length of signal * Length of signal
*/ */
STATIC_CONST( SignalLength = 10 ); STATIC_CONST( SignalLength = 11 );
SECTION( FILENAME = 0 ); SECTION( FILENAME = 0 );
private: private:
...@@ -69,6 +69,7 @@ private: ...@@ -69,6 +69,7 @@ private:
Uint32 page_size; Uint32 page_size;
Uint32 file_size_hi; Uint32 file_size_hi;
Uint32 file_size_lo; Uint32 file_size_lo;
Uint32 auto_sync_size; // In bytes
STATIC_CONST( OM_READONLY = 0 ); STATIC_CONST( OM_READONLY = 0 );
STATIC_CONST( OM_WRITEONLY = 1 ); STATIC_CONST( OM_WRITEONLY = 1 );
...@@ -80,10 +81,10 @@ private: ...@@ -80,10 +81,10 @@ private:
STATIC_CONST( OM_TRUNCATE = 0x200 ); STATIC_CONST( OM_TRUNCATE = 0x200 );
STATIC_CONST( OM_AUTOSYNC = 0x400 ); STATIC_CONST( OM_AUTOSYNC = 0x400 );
STATIC_CONST( OM_CREATE_IF_NONE = 0x0400 ); STATIC_CONST( OM_CREATE_IF_NONE = 0x0800 );
STATIC_CONST( OM_INIT = 0x0800 ); // STATIC_CONST( OM_INIT = 0x1000 ); //
STATIC_CONST( OM_CHECK_SIZE = 0x1000 ); STATIC_CONST( OM_CHECK_SIZE = 0x2000 );
STATIC_CONST( OM_DIRECT = 0x2000 ); STATIC_CONST( OM_DIRECT = 0x4000 );
enum Suffixes { enum Suffixes {
S_DATA = 0, S_DATA = 0,
......
...@@ -219,6 +219,10 @@ AsyncFile::run() ...@@ -219,6 +219,10 @@ AsyncFile::run()
case Request:: append: case Request:: append:
appendReq(request); appendReq(request);
break; break;
case Request:: append_synch:
appendReq(request);
syncReq(request);
break;
case Request::rmrf: case Request::rmrf:
rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory); rmrfReq(request, (char*)theFileName.c_str(), request->par.rmrf.own_directory);
break; break;
...@@ -246,9 +250,8 @@ extern Uint32 Global_syncFreq; ...@@ -246,9 +250,8 @@ extern Uint32 Global_syncFreq;
void AsyncFile::openReq(Request* request) void AsyncFile::openReq(Request* request)
{ {
m_openedWithSync = false; m_auto_sync_freq = 0;
m_syncFrequency = 0; m_write_wo_sync = 0;
m_syncCount= 0;
// for open.flags, see signal FSOPENREQ // for open.flags, see signal FSOPENREQ
#ifdef NDB_WIN32 #ifdef NDB_WIN32
...@@ -329,7 +332,7 @@ void AsyncFile::openReq(Request* request) ...@@ -329,7 +332,7 @@ void AsyncFile::openReq(Request* request)
if (flags & FsOpenReq::OM_AUTOSYNC) if (flags & FsOpenReq::OM_AUTOSYNC)
{ {
m_syncFrequency = 1024*1024; // Hard coded to 1M m_auto_sync_freq = request->par.open.auto_sync_size;
} }
if (flags & FsOpenReq::OM_APPEND){ if (flags & FsOpenReq::OM_APPEND){
...@@ -429,7 +432,7 @@ void AsyncFile::openReq(Request* request) ...@@ -429,7 +432,7 @@ void AsyncFile::openReq(Request* request)
{ {
request->error = errno; request->error = errno;
} }
else if(buf.st_size != request->par.open.file_size) else if((Uint64)buf.st_size != request->par.open.file_size)
{ {
request->error = FsRef::fsErrInvalidFileSize; request->error = FsRef::fsErrInvalidFileSize;
} }
...@@ -737,6 +740,10 @@ AsyncFile::writeReq( Request * request) ...@@ -737,6 +740,10 @@ AsyncFile::writeReq( Request * request)
return; return;
} }
} // while(write_not_complete) } // while(write_not_complete)
if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
syncReq(request);
}
} }
int int
...@@ -746,6 +753,8 @@ AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset, ...@@ -746,6 +753,8 @@ AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset,
size_t bytes_to_write = chunk_size; size_t bytes_to_write = chunk_size;
int return_value; int return_value;
m_write_wo_sync += size;
#ifdef NDB_WIN32 #ifdef NDB_WIN32
DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN); DWORD dwSFP = SetFilePointer(hFile, offset, 0, FILE_BEGIN);
if(dwSFP != offset) { if(dwSFP != offset) {
...@@ -805,7 +814,6 @@ AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset, ...@@ -805,7 +814,6 @@ AsyncFile::writeBuffer(const char * buf, size_t size, off_t offset,
} }
#endif #endif
m_syncCount+= bytes_written;
buf += bytes_written; buf += bytes_written;
size -= bytes_written; size -= bytes_written;
offset += bytes_written; offset += bytes_written;
...@@ -856,8 +864,7 @@ bool AsyncFile::isOpen(){ ...@@ -856,8 +864,7 @@ bool AsyncFile::isOpen(){
void void
AsyncFile::syncReq(Request * request) AsyncFile::syncReq(Request * request)
{ {
if(m_openedWithSync || if(m_auto_sync_freq && m_write_wo_sync == 0){
m_syncCount == 0){
return; return;
} }
#ifdef NDB_WIN32 #ifdef NDB_WIN32
...@@ -871,7 +878,7 @@ AsyncFile::syncReq(Request * request) ...@@ -871,7 +878,7 @@ AsyncFile::syncReq(Request * request)
return; return;
} }
#endif #endif
m_syncCount = 0; m_write_wo_sync = 0;
} }
void void
...@@ -880,7 +887,7 @@ AsyncFile::appendReq(Request * request){ ...@@ -880,7 +887,7 @@ AsyncFile::appendReq(Request * request){
const char * buf = request->par.append.buf; const char * buf = request->par.append.buf;
Uint32 size = request->par.append.size; Uint32 size = request->par.append.size;
m_syncCount += size; m_write_wo_sync += size;
#ifdef NDB_WIN32 #ifdef NDB_WIN32
DWORD dwWritten = 0; DWORD dwWritten = 0;
...@@ -912,7 +919,7 @@ AsyncFile::appendReq(Request * request){ ...@@ -912,7 +919,7 @@ AsyncFile::appendReq(Request * request){
} }
#endif #endif
if(m_syncFrequency != 0 && m_syncCount > m_syncFrequency){ if(m_auto_sync_freq && m_write_wo_sync > m_auto_sync_freq){
syncReq(request); syncReq(request);
} }
} }
......
...@@ -123,6 +123,7 @@ public: ...@@ -123,6 +123,7 @@ public:
sync, sync,
end, end,
append, append,
append_synch,
rmrf, rmrf,
readPartial readPartial
}; };
...@@ -132,6 +133,7 @@ public: ...@@ -132,6 +133,7 @@ public:
Uint32 flags; Uint32 flags;
Uint32 page_size; Uint32 page_size;
Uint64 file_size; Uint64 file_size;
Uint32 auto_sync_size;
} open; } open;
struct { struct {
int numberOfPages; int numberOfPages;
...@@ -232,9 +234,8 @@ private: ...@@ -232,9 +234,8 @@ private:
int theWriteBufferSize; int theWriteBufferSize;
char* theWriteBuffer; char* theWriteBuffer;
bool m_openedWithSync; size_t m_write_wo_sync; // Writes wo/ sync
Uint32 m_syncCount; size_t m_auto_sync_freq; // Auto sync freq in bytes
Uint32 m_syncFrequency;
public: public:
SimulatedBlock& m_fs; SimulatedBlock& m_fs;
Ptr<GlobalPage> m_page_ptr; Ptr<GlobalPage> m_page_ptr;
......
...@@ -230,6 +230,7 @@ Ndbfs::execFSOPENREQ(Signal* signal) ...@@ -230,6 +230,7 @@ Ndbfs::execFSOPENREQ(Signal* signal)
request->par.open.file_size = fsOpenReq->file_size_hi; request->par.open.file_size = fsOpenReq->file_size_hi;
request->par.open.file_size <<= 32; request->par.open.file_size <<= 32;
request->par.open.file_size |= fsOpenReq->file_size_lo; request->par.open.file_size |= fsOpenReq->file_size_lo;
request->par.open.auto_sync_size = fsOpenReq->auto_sync_size;
ndbrequire(forward(file, request)); ndbrequire(forward(file, request));
} }
...@@ -567,6 +568,7 @@ Ndbfs::execFSAPPENDREQ(Signal * signal) ...@@ -567,6 +568,7 @@ Ndbfs::execFSAPPENDREQ(Signal * signal)
const Uint32 tSz = myBaseAddrRef->nrr; const Uint32 tSz = myBaseAddrRef->nrr;
const Uint32 offset = fsReq->offset; const Uint32 offset = fsReq->offset;
const Uint32 size = fsReq->size; const Uint32 size = fsReq->size;
const Uint32 synch_flag = fsReq->synch_flag;
Request *request = theRequestPool->get(); Request *request = theRequestPool->get();
if (openFile == NULL) { if (openFile == NULL) {
...@@ -596,12 +598,15 @@ Ndbfs::execFSAPPENDREQ(Signal * signal) ...@@ -596,12 +598,15 @@ Ndbfs::execFSAPPENDREQ(Signal * signal)
request->error = 0; request->error = 0;
request->set(userRef, userPointer, filePointer); request->set(userRef, userPointer, filePointer);
request->file = openFile; request->file = openFile;
request->action = Request::append;
request->theTrace = signal->getTrace(); request->theTrace = signal->getTrace();
request->par.append.buf = (const char *)(tWA + offset); request->par.append.buf = (const char *)(tWA + offset);
request->par.append.size = size << 2; request->par.append.size = size << 2;
if (!synch_flag)
request->action = Request::append;
else
request->action = Request::append_synch;
ndbrequire(forward(openFile, request)); ndbrequire(forward(openFile, request));
return; return;
...@@ -744,7 +749,9 @@ Ndbfs::report(Request * request, Signal* signal) ...@@ -744,7 +749,9 @@ Ndbfs::report(Request * request, Signal* signal)
sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB); sendSignal(ref, GSN_FSSYNCREF, signal, FsRef::SignalLength, JBB);
break; break;
} }
case Request::append: { case Request::append:
case Request::append_synch:
{
jam(); jam();
sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB); sendSignal(ref, GSN_FSAPPENDREF, signal, FsRef::SignalLength, JBB);
break; break;
...@@ -814,7 +821,9 @@ Ndbfs::report(Request * request, Signal* signal) ...@@ -814,7 +821,9 @@ Ndbfs::report(Request * request, Signal* signal)
sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBB); sendSignal(ref, GSN_FSSYNCCONF, signal, 1, JBB);
break; break;
}//case }//case
case Request::append: { case Request::append:
case Request::append_synch:
{
jam(); jam();
signal->theData[1] = request->par.append.size; signal->theData[1] = request->par.append.size;
sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBB); sendSignal(ref, GSN_FSAPPENDCONF, signal, 2, JBB);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment