Commit 4c78665a authored by unknown's avatar unknown

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-5.0-ndb

into poseidon.ndb.mysql.com:/home/tomas/mysql-5.0-ndb
parents 78c91051 c99f5659
...@@ -24,73 +24,78 @@ class NdbEventOperationImpl; ...@@ -24,73 +24,78 @@ class NdbEventOperationImpl;
* @class NdbEventOperation * @class NdbEventOperation
* @brief Class of operations for getting change events from database. * @brief Class of operations for getting change events from database.
* *
* An NdbEventOperation object is instantiated by * Brief description on how to work with events:
* Ndb::createEventOperation *
* * - An event i created in the Database through
* Prior to that an event must have been created in the Database through * NdbDictionary::Dictionary::createEvent() (note that this can be done
* NdbDictionary::createEvent * by any application or thread and not necessarily by the "listener")
* * - To listen to events, an NdbEventOperation object is instantiated by
* The instance is removed by Ndb::dropEventOperation * Ndb::createEventOperation()
* - execute() starts the event flow. Use Ndb::pollEvents() to wait
* for an event to occur. Use next() to iterate
* through the events that have occured.
* - The instance is removed by Ndb::dropEventOperation()
* *
* For more info see: * For more info see:
* @ref ndbapi_event.cpp * @ref ndbapi_event.cpp
* *
* Known limitations: * Known limitations:
* *
* Maximum number of active NdbEventOperations are now set at compile time. * - Maximum number of active NdbEventOperations are now set at compile time.
* Today 100. This will become a configuration parameter later. * Today 100. This will become a configuration parameter later.
* * - Maximum number of NdbEventOperations tied to same event are maximum 16
* Maximum number of NdbEventOperations tied to same event are maximum 16
* per process. * per process.
* *
* Known issues: * Known issues:
* *
* When several NdbEventOperation's are tied to the same event in the same * - When several NdbEventOperation's are tied to the same event in the same
* process they will share the circular buffer. The BufferLength will then * process they will share the circular buffer. The BufferLength will then
* be the same for all and decided by the first NdbEventOperation * be the same for all and decided by the first NdbEventOperation
* instantiation. Just make sure to instantiate the "largest" one first. * instantiation. Just make sure to instantiate the "largest" one first.
* * - Today all events INSERT/DELETE/UPDATE and all changed attributes are
* Today all events INSERT/DELETE/UPDATE and all changed attributes are
* sent to the API, even if only specific attributes have been specified. * sent to the API, even if only specific attributes have been specified.
* These are however hidden from the user and only relevant data is shown * These are however hidden from the user and only relevant data is shown
* after next(). * after next().
* However false exits from Ndb::pollEvents() may occur and thus * - "False" exits from Ndb::pollEvents() may occur and thus
* the subsequent next() will return zero, * the subsequent next() will return zero,
* since there was no available data. Just do Ndb::pollEvents() again. * since there was no available data. Just do Ndb::pollEvents() again.
* * - Event code does not check table schema version. Make sure to drop events
* Event code does not check table schema version. Make sure to drop events
* after table is dropped. Will be fixed in later * after table is dropped. Will be fixed in later
* versions. * versions.
* * - If a node failure has occured not all events will be recieved
* If a node failure has occured not all events will be recieved
* anymore. Drop NdbEventOperation and Create again after nodes are up * anymore. Drop NdbEventOperation and Create again after nodes are up
* again. Will be fixed in later versions. * again. Will be fixed in later versions.
* *
* Test status: * Test status:
* Tests have been run on 1-node and 2-node systems
*
* Known bugs:
* *
* None, except if we can call some of the "issues" above bugs * - Tests have been run on 1-node and 2-node systems
* *
* Useful API programs: * Useful API programs:
* *
* ndb_select_all -d sys 'NDB$EVENTS_0' * - ndb_select_all -d sys 'NDB$EVENTS_0'
* Will show contents in the system table containing created events. * shows contents in the system table containing created events.
* *
* @note this is an inteface to viewing events that is subject to change
*/ */
class NdbEventOperation { class NdbEventOperation {
public: public:
/**
* State of the NdbEventOperation object
*/
enum State {
EO_CREATED, ///< Created but execute() not called
EO_EXECUTING, ///< execute() called
EO_ERROR ///< An error has occurred. Object unusable.
};
/** /**
* Retrieve current state of the NdbEventOperation object * Retrieve current state of the NdbEventOperation object
*/ */
enum State {CREATED,EXECUTING,ERROR};
State getState(); State getState();
/** /**
* Activates the NdbEventOperation to start receiving events. The * Activates the NdbEventOperation to start receiving events. The
* changed attribute values may be retrieved after next() has returned * changed attribute values may be retrieved after next() has returned
* a value greater than zero. The getValue() methods below must be called * a value greater than zero. The getValue() methods must be called
* prior to execute(). * prior to execute().
* *
* @return 0 if successful otherwise -1. * @return 0 if successful otherwise -1.
...@@ -112,21 +117,21 @@ public: ...@@ -112,21 +117,21 @@ public:
* aligned appropriately. The buffer is used directly * aligned appropriately. The buffer is used directly
* (avoiding a copy penalty) only if it is aligned on a * (avoiding a copy penalty) only if it is aligned on a
* 4-byte boundary and the attribute size in bytes * 4-byte boundary and the attribute size in bytes
* (i.e. NdbRecAttr::attrSize times NdbRecAttr::arraySize is * (i.e. NdbRecAttr::attrSize() times NdbRecAttr::arraySize() is
* a multiple of 4). * a multiple of 4).
* *
* @note There are two versions, NdbOperation::getValue and * @note There are two versions, getValue() and
* NdbOperation::getPreValue for retrieving the current and * getPreValue() for retrieving the current and
* previous value repectively. * previous value repectively.
* *
* @note This method does not fetch the attribute value from * @note This method does not fetch the attribute value from
* the database! The NdbRecAttr object returned by this method * the database! The NdbRecAttr object returned by this method
* is <em>not</em> readable/printable before the * is <em>not</em> readable/printable before the
* NdbEventConnection::execute has been made and * execute() has been made and
* NdbEventConnection::next has returned a value greater than * next() has returned a value greater than
* zero. If a specific attribute has not changed the corresponding * zero. If a specific attribute has not changed the corresponding
* NdbRecAttr will be in state UNDEFINED. This is checked by * NdbRecAttr will be in state UNDEFINED. This is checked by
* NdbRecAttr::isNull which then returns -1. * NdbRecAttr::isNull() which then returns -1.
* *
* @param anAttrName Attribute name * @param anAttrName Attribute name
* @param aValue If this is non-NULL, then the attribute value * @param aValue If this is non-NULL, then the attribute value
...@@ -143,11 +148,11 @@ public: ...@@ -143,11 +148,11 @@ public:
/** /**
* Retrieves event resultset if available, inserted into the NdbRecAttrs * Retrieves event resultset if available, inserted into the NdbRecAttrs
* specified in getValue() and getPreValue(). To avoid polling for * specified in getValue() and getPreValue(). To avoid polling for
* a resultset, one can use Ndb::pollEvents * a resultset, one can use Ndb::pollEvents()
* which will wait on a mutex until an event occurs or the specified * which will wait on a mutex until an event occurs or the specified
* timeout occurs. * timeout occurs.
* *
* @return >=0 if successful otherwise -1. Return value inicates number * @return >=0 if successful otherwise -1. Return value indicates number
* of available events. By sending pOverRun one may query for buffer * of available events. By sending pOverRun one may query for buffer
* overflow and *pOverRun will indicate the number of events that have * overflow and *pOverRun will indicate the number of events that have
* overwritten. * overwritten.
......
...@@ -1177,7 +1177,14 @@ NdbEventOperation* Ndb::createEventOperation(const char* eventName, ...@@ -1177,7 +1177,14 @@ NdbEventOperation* Ndb::createEventOperation(const char* eventName,
tOp = new NdbEventOperation(this, eventName, bufferLength); tOp = new NdbEventOperation(this, eventName, bufferLength);
if (tOp->getState() != NdbEventOperation::CREATED) { if (tOp == 0)
{
theError.code= 4000;
return NULL;
}
if (tOp->getState() != NdbEventOperation::EO_CREATED) {
theError.code= tOp->getNdbError().code;
delete tOp; delete tOp;
tOp = NULL; tOp = NULL;
} }
......
...@@ -55,9 +55,8 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, ...@@ -55,9 +55,8 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
const char* eventName, const char* eventName,
const int bufferLength) const int bufferLength)
: NdbEventOperation(*this), m_ndb(theNdb), : NdbEventOperation(*this), m_ndb(theNdb),
m_state(ERROR), m_bufferL(bufferLength) m_state(EO_ERROR), m_bufferL(bufferLength)
{ {
m_eventId = 0; m_eventId = 0;
theFirstRecAttrs[0] = NULL; theFirstRecAttrs[0] = NULL;
theCurrentRecAttrs[0] = NULL; theCurrentRecAttrs[0] = NULL;
...@@ -71,16 +70,15 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, ...@@ -71,16 +70,15 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
// we should lookup id in Dictionary, TODO // we should lookup id in Dictionary, TODO
// also make sure we only have one listener on each event // also make sure we only have one listener on each event
if (!m_ndb) { ndbout_c("m_ndb=NULL"); return; } if (!m_ndb) abort();
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
if (!myDict) { ndbout_c("getDictionary=NULL"); return; } if (!myDict) { m_error.code= m_ndb->getNdbError().code; return; }
const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName); const NdbDictionary::Event *myEvnt = myDict->getEvent(eventName);
if (!myEvnt) { ndbout_c("getEvent()=NULL"); return; } if (!myEvnt) { m_error.code= myDict->getNdbError().code; return; }
m_eventImpl = &myEvnt->m_impl; m_eventImpl = &myEvnt->m_impl;
if (!m_eventImpl) { ndbout_c("m_impl=NULL"); return; }
m_bufferHandle = m_ndb->getGlobalEventBufferHandle(); m_bufferHandle = m_ndb->getGlobalEventBufferHandle();
if (m_bufferHandle->m_bufferL > 0) if (m_bufferHandle->m_bufferL > 0)
...@@ -88,7 +86,7 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N, ...@@ -88,7 +86,7 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
else else
m_bufferHandle->m_bufferL = m_bufferL; m_bufferHandle->m_bufferL = m_bufferL;
m_state = CREATED; m_state = EO_CREATED;
} }
NdbEventOperationImpl::~NdbEventOperationImpl() NdbEventOperationImpl::~NdbEventOperationImpl()
...@@ -106,7 +104,7 @@ NdbEventOperationImpl::~NdbEventOperationImpl() ...@@ -106,7 +104,7 @@ NdbEventOperationImpl::~NdbEventOperationImpl()
p = p_next; p = p_next;
} }
} }
if (m_state == NdbEventOperation::EXECUTING) { if (m_state == EO_EXECUTING) {
stop(); stop();
// m_bufferHandle->dropSubscribeEvent(m_bufferId); // m_bufferHandle->dropSubscribeEvent(m_bufferId);
; // We should send stop signal here ; // We should send stop signal here
...@@ -122,7 +120,7 @@ NdbEventOperationImpl::getState() ...@@ -122,7 +120,7 @@ NdbEventOperationImpl::getState()
NdbRecAttr* NdbRecAttr*
NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n) NdbEventOperationImpl::getValue(const char *colName, char *aValue, int n)
{ {
if (m_state != NdbEventOperation::CREATED) { if (m_state != EO_CREATED) {
ndbout_c("NdbEventOperationImpl::getValue may only be called between instantiation and execute()"); ndbout_c("NdbEventOperationImpl::getValue may only be called between instantiation and execute()");
return NULL; return NULL;
} }
...@@ -211,8 +209,8 @@ NdbEventOperationImpl::execute() ...@@ -211,8 +209,8 @@ NdbEventOperationImpl::execute()
{ {
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
if (!myDict) { if (!myDict) {
ndbout_c("NdbEventOperation::execute(): getDictionary=NULL"); m_error.code= m_ndb->getNdbError().code;
return 0; return -1;
} }
if (theFirstRecAttrs[0] == NULL) { // defaults to get all if (theFirstRecAttrs[0] == NULL) { // defaults to get all
...@@ -245,14 +243,14 @@ NdbEventOperationImpl::execute() ...@@ -245,14 +243,14 @@ NdbEventOperationImpl::execute()
if (r) { if (r) {
//Error //Error
m_bufferHandle->unprepareAddSubscribeEvent(m_bufferId); m_bufferHandle->unprepareAddSubscribeEvent(m_bufferId);
m_state = NdbEventOperation::ERROR; m_state = EO_ERROR;
} else { } else {
m_bufferHandle->addSubscribeEvent(m_bufferId, this); m_bufferHandle->addSubscribeEvent(m_bufferId, this);
m_state = NdbEventOperation::EXECUTING; m_state = EO_EXECUTING;
} }
} else { } else {
//Error //Error
m_state = NdbEventOperation::ERROR; m_state = EO_ERROR;
} }
return r; return r;
} }
...@@ -261,14 +259,14 @@ int ...@@ -261,14 +259,14 @@ int
NdbEventOperationImpl::stop() NdbEventOperationImpl::stop()
{ {
DBUG_ENTER("NdbEventOperationImpl::stop"); DBUG_ENTER("NdbEventOperationImpl::stop");
if (m_state != NdbEventOperation::EXECUTING) if (m_state != EO_EXECUTING)
DBUG_RETURN(-1); DBUG_RETURN(-1);
// ndbout_c("NdbEventOperation::stopping()"); // ndbout_c("NdbEventOperation::stopping()");
NdbDictionary::Dictionary *myDict = m_ndb->getDictionary(); NdbDictionary::Dictionary *myDict = m_ndb->getDictionary();
if (!myDict) { if (!myDict) {
ndbout_c("NdbEventOperation::stop(): getDictionary=NULL"); m_error.code= m_ndb->getNdbError().code;
DBUG_RETURN(-1); DBUG_RETURN(-1);
} }
...@@ -299,13 +297,13 @@ NdbEventOperationImpl::stop() ...@@ -299,13 +297,13 @@ NdbEventOperationImpl::stop()
//Error //Error
m_bufferHandle->unprepareDropSubscribeEvent(m_bufferId); m_bufferHandle->unprepareDropSubscribeEvent(m_bufferId);
m_error.code= myDictImpl.m_error.code; m_error.code= myDictImpl.m_error.code;
m_state = NdbEventOperation::ERROR; m_state = EO_ERROR;
} else { } else {
#ifdef EVENT_DEBUG #ifdef EVENT_DEBUG
ndbout_c("NdbEventOperation::dropping()"); ndbout_c("NdbEventOperation::dropping()");
#endif #endif
m_bufferHandle->dropSubscribeEvent(m_bufferId); m_bufferHandle->dropSubscribeEvent(m_bufferId);
m_state = NdbEventOperation::CREATED; m_state = EO_CREATED;
} }
DBUG_RETURN(r); DBUG_RETURN(r);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment