/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #ifndef MemoryChannel_H #define MemoryChannel_H //=========================================================================== // // .DESCRIPTION // Pointer based communication channel for communication between two // thread. It does not copy any data in or out the channel so the // item that is put in can not be used untill the other thread has // given it back. There is no support for detecting the return of a // item. The channel is half-duplex. // For comminication between 1 writer and 1 reader use the MemoryChannel // class, for comminication between multiple writer and 1 reader use the // MemoryChannelMultipleWriter. There is no support for multiple readers. // // .TYPICAL USE: // to communicate between threads. // // .EXAMPLE: // See AsyncFile.C //=========================================================================== // // // MemoryChannel( int size= 256); // Constuctor // Parameters: // size : amount of pointer it can hold // // void operator ++ (); // increments the index with one, if size is reached it is set to zero // // virtual void write( T *t); // Puts the item in the channel if the channel is full an error is reported. // Parameters: // t: pointer to item to put in the channel, after this the item // is shared with the other thread. // errors // AFS_ERROR_CHANNALFULL, channel is full // // T* read(); // Reads a itemn from the channel, if channel is empty it blocks untill // an item can be read. // return // T : item from the channel // // T* tryRead(); // Reads a item from the channel, if channel is empty it returns zero. // return // T : item from the channel or zero if channel is empty. // #if defined NDB_OSE || defined NDB_SOFTOSE #include "MemoryChannelOSE.hpp" #else #include "ErrorHandlingMacros.hpp" #include "Error.hpp" #include "CircularIndex.hpp" #include "NdbMutex.h" #include "NdbCondition.h" #include <NdbOut.hpp> template <class T> class MemoryChannel { public: MemoryChannel( int size= 256); virtual ~MemoryChannel( ); void writeChannel( T *t); void writeChannelNoSignal( T *t); T* readChannel(); T* tryReadChannel(); private: int theSize; T **theChannel; CircularIndex theWriteIndex; CircularIndex theReadIndex; NdbMutex* theMutexPtr; NdbCondition* theConditionPtr; template<class U> friend NdbOut& operator<<(NdbOut& out, const MemoryChannel<U> & chn); }; template <class T> NdbOut& operator<<(NdbOut& out, const MemoryChannel<T> & chn) { NdbMutex_Lock(chn.theMutexPtr); out << "[ theSize: " << chn.theSize << " theReadIndex: " << (int)chn.theReadIndex << " theWriteIndex: " << (int)chn.theWriteIndex << " ]"; NdbMutex_Unlock(chn.theMutexPtr); return out; } template <class T> MemoryChannel<T>::MemoryChannel( int size): theSize(size), theChannel(new T*[size] ), theWriteIndex(0, size), theReadIndex(0, size) { theMutexPtr = NdbMutex_Create(); theConditionPtr = NdbCondition_Create(); } template <class T> MemoryChannel<T>::~MemoryChannel( ) { NdbMutex_Destroy(theMutexPtr); NdbCondition_Destroy(theConditionPtr); delete [] theChannel; } template <class T> void MemoryChannel<T>::writeChannel( T *t) { NdbMutex_Lock(theMutexPtr); if(full(theWriteIndex, theReadIndex) || theChannel == NULL) abort(); theChannel[theWriteIndex]= t; ++theWriteIndex; NdbMutex_Unlock(theMutexPtr); NdbCondition_Signal(theConditionPtr); } template <class T> void MemoryChannel<T>::writeChannelNoSignal( T *t) { NdbMutex_Lock(theMutexPtr); if(full(theWriteIndex, theReadIndex) || theChannel == NULL) abort(); theChannel[theWriteIndex]= t; ++theWriteIndex; NdbMutex_Unlock(theMutexPtr); } template <class T> T* MemoryChannel<T>::readChannel() { T* tmp; NdbMutex_Lock(theMutexPtr); while ( empty(theWriteIndex, theReadIndex) ) { NdbCondition_Wait(theConditionPtr, theMutexPtr); } tmp= theChannel[theReadIndex]; ++theReadIndex; NdbMutex_Unlock(theMutexPtr); return tmp; } template <class T> T* MemoryChannel<T>::tryReadChannel() { T* tmp= 0; NdbMutex_Lock(theMutexPtr); if ( !empty(theWriteIndex, theReadIndex) ) { tmp= theChannel[theReadIndex]; ++theReadIndex; } NdbMutex_Unlock(theMutexPtr); return tmp; } #endif #endif // MemoryChannel_H