Commit a2a2ebbf authored by unknown's avatar unknown

Bug #17095 Cluster RBR in circle does not terminate

- add any value to ndb
- use it to update correct server id in binlog thread


sql/ha_ndbcluster.cc:
  ndb: use "any value" to set correct server_id
sql/ha_ndbcluster_binlog.cc:
  ndb: use "any value" to set correct server_id
storage/ndb/include/kernel/AttributeHeader.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/include/kernel/signaldata/FireTrigOrd.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/include/kernel/signaldata/SumaImpl.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/include/ndbapi/NdbDictionary.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/include/ndbapi/NdbEventOperation.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/include/ndbapi/NdbOperation.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/dbtup/Dbtup.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/dbtup/DbtupAbort.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/dbtup/DbtupExecQuery.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/dbtup/DbtupGen.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/dbtup/DbtupRoutines.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/dbtup/DbtupStoredProcDef.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/dbtup/DbtupTrigger.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/kernel/blocks/suma/Suma.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/ndbapi/NdbEventOperation.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/ndbapi/NdbOperationDefine.cpp:
  add any_value as psudo column, updatable from ndbapi
storage/ndb/src/ndbapi/ndb_cluster_connection.cpp:
  add any_value as psudo column, updatable from ndbapi
parent 0e9677f6
...@@ -2712,6 +2712,9 @@ int ha_ndbcluster::write_row(byte *record) ...@@ -2712,6 +2712,9 @@ int ha_ndbcluster::write_row(byte *record)
op->setValue(no_fields, part_func_value); op->setValue(no_fields, part_func_value);
} }
if (thd->slave_thread)
op->setAnyValue(thd->server_id);
m_rows_changed++; m_rows_changed++;
/* /*
...@@ -2992,6 +2995,10 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data) ...@@ -2992,6 +2995,10 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
no_fields++; no_fields++;
op->setValue(no_fields, part_func_value); op->setValue(no_fields, part_func_value);
} }
if (thd->slave_thread)
op->setAnyValue(thd->server_id);
// Execute update operation // Execute update operation
if (!cursor && execute_no_commit(this,trans,FALSE) != 0) { if (!cursor && execute_no_commit(this,trans,FALSE) != 0) {
no_uncommitted_rows_execute_failure(); no_uncommitted_rows_execute_failure();
...@@ -3047,6 +3054,9 @@ int ha_ndbcluster::delete_row(const byte *record) ...@@ -3047,6 +3054,9 @@ int ha_ndbcluster::delete_row(const byte *record)
no_uncommitted_rows_update(-1); no_uncommitted_rows_update(-1);
if (thd->slave_thread)
((NdbOperation *)trans->getLastDefinedOperation())->setAnyValue(thd->server_id);
if (!m_primary_key_update) if (!m_primary_key_update)
// If deleting from cursor, NoCommit will be handled in next_result // If deleting from cursor, NoCommit will be handled in next_result
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -3076,6 +3086,9 @@ int ha_ndbcluster::delete_row(const byte *record) ...@@ -3076,6 +3086,9 @@ int ha_ndbcluster::delete_row(const byte *record)
if ((error= set_primary_key_from_record(op, record))) if ((error= set_primary_key_from_record(op, record)))
DBUG_RETURN(error); DBUG_RETURN(error);
} }
if (thd->slave_thread)
op->setAnyValue(thd->server_id);
} }
// Execute delete operation // Execute delete operation
......
...@@ -3189,8 +3189,12 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -3189,8 +3189,12 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData(); NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
if (share == ndb_apply_status_share) if (share == ndb_apply_status_share)
return 0; return 0;
TABLE *table= share->table;
uint originating_server_id= pOp->getAnyValue();
if (originating_server_id == 0)
originating_server_id= ::server_id;
TABLE *table= share->table;
DBUG_ASSERT(trans.good()); DBUG_ASSERT(trans.good());
DBUG_ASSERT(table != 0); DBUG_ASSERT(table != 0);
...@@ -3235,7 +3239,7 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -3235,7 +3239,7 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
DBUG_ASSERT(ret == 0); DBUG_ASSERT(ret == 0);
} }
ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]); ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
IF_DBUG(int ret=) trans.write_row(::server_id, IF_DBUG(int ret=) trans.write_row(originating_server_id,
injector::transaction::table(table, injector::transaction::table(table,
TRUE), TRUE),
&b, n_fields, table->record[0]); &b, n_fields, table->record[0]);
...@@ -3275,7 +3279,7 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -3275,7 +3279,7 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
} }
ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]); ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
DBUG_EXECUTE("info", print_records(table, table->record[n]);); DBUG_EXECUTE("info", print_records(table, table->record[n]););
IF_DBUG(int ret =) trans.delete_row(::server_id, IF_DBUG(int ret =) trans.delete_row(originating_server_id,
injector::transaction::table(table, injector::transaction::table(table,
TRUE), TRUE),
&b, n_fields, table->record[n]); &b, n_fields, table->record[n]);
...@@ -3305,7 +3309,8 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -3305,7 +3309,8 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
since table has a primary key, we can do a write since table has a primary key, we can do a write
using only after values using only after values
*/ */
trans.write_row(::server_id, injector::transaction::table(table, TRUE), trans.write_row(originating_server_id,
injector::transaction::table(table, TRUE),
&b, n_fields, table->record[0]);// after values &b, n_fields, table->record[0]);// after values
} }
else else
...@@ -3325,7 +3330,7 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp, ...@@ -3325,7 +3330,7 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
} }
ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]); ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
DBUG_EXECUTE("info", print_records(table, table->record[1]);); DBUG_EXECUTE("info", print_records(table, table->record[1]););
IF_DBUG(int ret =) trans.update_row(::server_id, IF_DBUG(int ret =) trans.update_row(originating_server_id,
injector::transaction::table(table, injector::transaction::table(table,
TRUE), TRUE),
&b, n_fields, &b, n_fields,
......
...@@ -45,6 +45,7 @@ public: ...@@ -45,6 +45,7 @@ public:
STATIC_CONST( ROWID = 0xFFF6 ); STATIC_CONST( ROWID = 0xFFF6 );
STATIC_CONST( ROW_GCI = 0xFFF5 ); STATIC_CONST( ROW_GCI = 0xFFF5 );
STATIC_CONST( FRAGMENT_VARSIZED_MEMORY = 0xFFF4 ); STATIC_CONST( FRAGMENT_VARSIZED_MEMORY = 0xFFF4 );
STATIC_CONST( ANY_VALUE = 0xFFF3 );
// NOTE: in 5.1 ctors and init take size in bytes // NOTE: in 5.1 ctors and init take size in bytes
......
...@@ -53,7 +53,7 @@ class FireTrigOrd { ...@@ -53,7 +53,7 @@ class FireTrigOrd {
public: public:
STATIC_CONST( SignalLength = 8 ); STATIC_CONST( SignalLength = 8 );
STATIC_CONST( SignalWithGCILength = 9 ); STATIC_CONST( SignalWithGCILength = 9 );
STATIC_CONST( SignalWithHashValueLength = 10 ); STATIC_CONST( SignalLengthSuma = 11 );
private: private:
Uint32 m_connectionPtr; Uint32 m_connectionPtr;
...@@ -66,6 +66,7 @@ private: ...@@ -66,6 +66,7 @@ private:
Uint32 fragId; Uint32 fragId;
Uint32 m_gci; Uint32 m_gci;
Uint32 m_hashValue; Uint32 m_hashValue;
Uint32 m_any_value;
// Public methods // Public methods
public: public:
Uint32 getConnectionPtr() const; Uint32 getConnectionPtr() const;
...@@ -86,6 +87,8 @@ public: ...@@ -86,6 +87,8 @@ public:
void setGCI(Uint32); void setGCI(Uint32);
Uint32 getHashValue() const; Uint32 getHashValue() const;
void setHashValue(Uint32); void setHashValue(Uint32);
Uint32 getAnyValue() const;
void setAnyValue(Uint32);
}; };
inline inline
...@@ -196,5 +199,17 @@ void FireTrigOrd::setHashValue(Uint32 flag) ...@@ -196,5 +199,17 @@ void FireTrigOrd::setHashValue(Uint32 flag)
m_hashValue = flag; m_hashValue = flag;
} }
inline
Uint32 FireTrigOrd::getAnyValue() const
{
return m_any_value;
}
inline
void FireTrigOrd::setAnyValue(Uint32 any_value)
{
m_any_value = any_value;
}
#endif #endif
...@@ -303,7 +303,10 @@ struct SubTableData { ...@@ -303,7 +303,10 @@ struct SubTableData {
Uint32 tableId; Uint32 tableId;
Uint32 requestInfo; Uint32 requestInfo;
Uint32 logType; Uint32 logType;
union {
Uint32 changeMask; Uint32 changeMask;
Uint32 anyValue;
};
Uint32 totalLen; Uint32 totalLen;
static void setOperation(Uint32& ri, Uint32 val) { static void setOperation(Uint32& ri, Uint32 val) {
......
...@@ -534,6 +534,7 @@ public: ...@@ -534,6 +534,7 @@ public:
static const Column * RECORDS_IN_RANGE; static const Column * RECORDS_IN_RANGE;
static const Column * ROWID; static const Column * ROWID;
static const Column * ROW_GCI; static const Column * ROW_GCI;
static const Column * ANY_VALUE;
int getSizeInBytes() const; int getSizeInBytes() const;
#endif #endif
......
...@@ -202,6 +202,13 @@ public: ...@@ -202,6 +202,13 @@ public:
*/ */
Uint64 getGCI() const; Uint64 getGCI() const;
/**
* Retrieve the AnyValue of the latest retrieved event
*
* @return AnyValue
*/
Uint32 getAnyValue() const;
/** /**
* Retrieve the complete GCI in the cluster (not necessarily * Retrieve the complete GCI in the cluster (not necessarily
* associated with an event) * associated with an event)
......
...@@ -413,6 +413,9 @@ public: ...@@ -413,6 +413,9 @@ public:
int setValue(const char* anAttrName, Uint64 aValue); int setValue(const char* anAttrName, Uint64 aValue);
int setValue(const char* anAttrName, float aValue); int setValue(const char* anAttrName, float aValue);
int setValue(const char* anAttrName, double aValue); int setValue(const char* anAttrName, double aValue);
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
int setAnyValue(Uint32 aValue);
#endif
#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED #ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
int setValue(Uint32 anAttrId, const char* aValue, Uint32 len); int setValue(Uint32 anAttrId, const char* aValue, Uint32 len);
......
...@@ -753,6 +753,7 @@ struct Operationrec { ...@@ -753,6 +753,7 @@ struct Operationrec {
union { union {
Uint32 firstAttrinbufrec; //Used until copyAttrinfo Uint32 firstAttrinbufrec; //Used until copyAttrinfo
}; };
Uint32 m_any_value;
union { union {
Uint32 lastAttrinbufrec; //Used until copyAttrinfo Uint32 lastAttrinbufrec; //Used until copyAttrinfo
Uint32 nextPool; Uint32 nextPool;
......
...@@ -35,6 +35,7 @@ void Dbtup::freeAllAttrBuffers(Operationrec* const regOperPtr) ...@@ -35,6 +35,7 @@ void Dbtup::freeAllAttrBuffers(Operationrec* const regOperPtr)
}//if }//if
regOperPtr->firstAttrinbufrec = RNIL; regOperPtr->firstAttrinbufrec = RNIL;
regOperPtr->lastAttrinbufrec = RNIL; regOperPtr->lastAttrinbufrec = RNIL;
regOperPtr->m_any_value = 0;
}//Dbtup::freeAllAttrBuffers() }//Dbtup::freeAllAttrBuffers()
void Dbtup::freeAttrinbufrec(Uint32 anAttrBuf) void Dbtup::freeAttrinbufrec(Uint32 anAttrBuf)
......
...@@ -102,6 +102,7 @@ void Dbtup::copyAttrinfo(Operationrec * regOperPtr, ...@@ -102,6 +102,7 @@ void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
regOperPtr->storedProcedureId= RNIL; regOperPtr->storedProcedureId= RNIL;
regOperPtr->firstAttrinbufrec= RNIL; regOperPtr->firstAttrinbufrec= RNIL;
regOperPtr->lastAttrinbufrec= RNIL; regOperPtr->lastAttrinbufrec= RNIL;
regOperPtr->m_any_value= 0;
} }
void Dbtup::handleATTRINFOforTUPKEYREQ(Signal* signal, void Dbtup::handleATTRINFOforTUPKEYREQ(Signal* signal,
...@@ -811,6 +812,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal) ...@@ -811,6 +812,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
else if(Roptype == ZDELETE) else if(Roptype == ZDELETE)
{ {
jam(); jam();
req_struct.log_size= 0;
if (handleDeleteReq(signal, regOperPtr, if (handleDeleteReq(signal, regOperPtr,
regFragPtr, regTabPtr, regFragPtr, regTabPtr,
&req_struct, &req_struct,
...@@ -829,7 +831,6 @@ void Dbtup::execTUPKEYREQ(Signal* signal) ...@@ -829,7 +831,6 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
regOperPtr, regOperPtr,
regTabPtr); regTabPtr);
set_change_mask_state(regOperPtr, DELETE_CHANGES); set_change_mask_state(regOperPtr, DELETE_CHANGES);
req_struct.log_size= 0;
sendTUPKEYCONF(signal, &req_struct, regOperPtr); sendTUPKEYCONF(signal, &req_struct, regOperPtr);
return; return;
} }
...@@ -1536,7 +1537,14 @@ int Dbtup::handleDeleteReq(Signal* signal, ...@@ -1536,7 +1537,14 @@ int Dbtup::handleDeleteReq(Signal* signal,
if (setup_read(req_struct, regOperPtr, regFragPtr, regTabPtr, disk)) if (setup_read(req_struct, regOperPtr, regFragPtr, regTabPtr, disk))
{ {
return handleReadReq(signal, regOperPtr, regTabPtr, req_struct); Uint32 RlogSize;
int ret= handleReadReq(signal, regOperPtr, regTabPtr, req_struct);
if (ret == 0 && (RlogSize= req_struct->log_size))
{
jam();
sendLogAttrinfo(signal, RlogSize, regOperPtr);
}
return ret;
} }
error: error:
......
...@@ -680,6 +680,7 @@ void Dbtup::execTUPSEIZEREQ(Signal* signal) ...@@ -680,6 +680,7 @@ void Dbtup::execTUPSEIZEREQ(Signal* signal)
new (regOperPtr.p) Operationrec(); new (regOperPtr.p) Operationrec();
regOperPtr.p->firstAttrinbufrec = RNIL; regOperPtr.p->firstAttrinbufrec = RNIL;
regOperPtr.p->lastAttrinbufrec = RNIL; regOperPtr.p->lastAttrinbufrec = RNIL;
regOperPtr.p->m_any_value = 0;
regOperPtr.p->op_struct.op_type = ZREAD; regOperPtr.p->op_struct.op_type = ZREAD;
regOperPtr.p->op_struct.in_active_list = false; regOperPtr.p->op_struct.in_active_list = false;
set_trans_state(regOperPtr.p, TRANS_DISCONNECTED); set_trans_state(regOperPtr.p, TRANS_DISCONNECTED);
......
...@@ -221,6 +221,18 @@ int Dbtup::readAttributes(KeyReqStruct *req_struct, ...@@ -221,6 +221,18 @@ int Dbtup::readAttributes(KeyReqStruct *req_struct,
return -1; return -1;
} }
} else if(attributeId & AttributeHeader::PSEUDO) { } else if(attributeId & AttributeHeader::PSEUDO) {
if (attributeId == AttributeHeader::ANY_VALUE)
{
jam();
Uint32 RlogSize = req_struct->log_size;
operPtr.p->m_any_value = inBuffer[inBufIndex];
* (clogMemBuffer + RlogSize) = inBuffer[inBufIndex - 1];
* (clogMemBuffer + RlogSize + 1) = inBuffer[inBufIndex];
inBufIndex++;
req_struct->out_buf_index = tmpAttrBufIndex;
req_struct->log_size = RlogSize + 2;
continue;
}
jam(); jam();
Uint32 sz= read_pseudo(attributeId, Uint32 sz= read_pseudo(attributeId,
req_struct, req_struct,
...@@ -781,6 +793,15 @@ int Dbtup::updateAttributes(KeyReqStruct *req_struct, ...@@ -781,6 +793,15 @@ int Dbtup::updateAttributes(KeyReqStruct *req_struct,
inBufIndex += 1 + sz; inBufIndex += 1 + sz;
req_struct->in_buf_index = inBufIndex; req_struct->in_buf_index = inBufIndex;
} }
else if(attributeId == AttributeHeader::ANY_VALUE)
{
jam();
Uint32 sz= ahIn.getDataSize();
ndbrequire(sz == 1);
regOperPtr->m_any_value = * (inBuffer + inBufIndex + 1);
inBufIndex += 1 + sz;
req_struct->in_buf_index = inBufIndex;
}
else else
{ {
jam(); jam();
......
...@@ -203,6 +203,7 @@ bool Dbtup::storedProcedureAttrInfo(Signal* signal, ...@@ -203,6 +203,7 @@ bool Dbtup::storedProcedureAttrInfo(Signal* signal,
storedPtr.p->storedLinkLast = regOperPtr->lastAttrinbufrec; storedPtr.p->storedLinkLast = regOperPtr->lastAttrinbufrec;
regOperPtr->firstAttrinbufrec = RNIL; regOperPtr->firstAttrinbufrec = RNIL;
regOperPtr->lastAttrinbufrec = RNIL; regOperPtr->lastAttrinbufrec = RNIL;
regOperPtr->m_any_value = 0;
set_trans_state(regOperPtr, TRANS_IDLE); set_trans_state(regOperPtr, TRANS_IDLE);
signal->theData[0] = regOperPtr->userpointer; signal->theData[0] = regOperPtr->userpointer;
signal->theData[1] = storedPtr.i; signal->theData[1] = storedPtr.i;
...@@ -220,6 +221,7 @@ void Dbtup::storedSeizeAttrinbufrecErrorLab(Signal* signal, ...@@ -220,6 +221,7 @@ void Dbtup::storedSeizeAttrinbufrecErrorLab(Signal* signal,
storedPtr.p->storedLinkFirst = regOperPtr->firstAttrinbufrec; storedPtr.p->storedLinkFirst = regOperPtr->firstAttrinbufrec;
regOperPtr->firstAttrinbufrec = RNIL; regOperPtr->firstAttrinbufrec = RNIL;
regOperPtr->lastAttrinbufrec = RNIL; regOperPtr->lastAttrinbufrec = RNIL;
regOperPtr->m_any_value = 0;
set_trans_state(regOperPtr, TRANS_ERROR_WAIT_STORED_PROCREQ); set_trans_state(regOperPtr, TRANS_ERROR_WAIT_STORED_PROCREQ);
signal->theData[0] = regOperPtr->userpointer; signal->theData[0] = regOperPtr->userpointer;
signal->theData[1] = ZSTORED_SEIZE_ATTRINBUFREC_ERROR; signal->theData[1] = ZSTORED_SEIZE_ATTRINBUFREC_ERROR;
......
...@@ -1048,10 +1048,11 @@ void Dbtup::sendFireTrigOrd(Signal* signal, ...@@ -1048,10 +1048,11 @@ void Dbtup::sendFireTrigOrd(Signal* signal,
// send to backup directly for now // send to backup directly for now
fireTrigOrd->setGCI(req_struct->gci); fireTrigOrd->setGCI(req_struct->gci);
fireTrigOrd->setHashValue(req_struct->hash_value); fireTrigOrd->setHashValue(req_struct->hash_value);
fireTrigOrd->m_any_value = regOperPtr->m_any_value;
EXECUTE_DIRECT(trigPtr->m_receiverBlock, EXECUTE_DIRECT(trigPtr->m_receiverBlock,
GSN_FIRE_TRIG_ORD, GSN_FIRE_TRIG_ORD,
signal, signal,
FireTrigOrd::SignalWithHashValueLength); FireTrigOrd::SignalLengthSuma);
break; break;
case (TriggerType::SUBSCRIPTION): case (TriggerType::SUBSCRIPTION):
jam(); jam();
......
...@@ -3469,6 +3469,7 @@ Suma::execFIRE_TRIG_ORD(Signal* signal) ...@@ -3469,6 +3469,7 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
const Uint32 hashValue = trg->getHashValue(); const Uint32 hashValue = trg->getHashValue();
const Uint32 gci = trg->getGCI(); const Uint32 gci = trg->getGCI();
const Uint32 event = trg->getTriggerEvent(); const Uint32 event = trg->getTriggerEvent();
const Uint32 any_value = trg->getAnyValue();
TablePtr tabPtr; TablePtr tabPtr;
tabPtr.i = trigId & 0xFFFF; tabPtr.i = trigId & 0xFFFF;
...@@ -3509,7 +3510,7 @@ Suma::execFIRE_TRIG_ORD(Signal* signal) ...@@ -3509,7 +3510,7 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
data->requestInfo = 0; data->requestInfo = 0;
SubTableData::setOperation(data->requestInfo, event); SubTableData::setOperation(data->requestInfo, event);
data->logType = 0; data->logType = 0;
data->changeMask = 0; data->anyValue = any_value;
data->totalLen = ptrLen; data->totalLen = ptrLen;
{ {
...@@ -3527,13 +3528,15 @@ Suma::execFIRE_TRIG_ORD(Signal* signal) ...@@ -3527,13 +3528,15 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
} }
else else
{ {
const uint buffer_header_sz = 4;
Uint32* dst; Uint32* dst;
Uint32 sz = f_trigBufferSize + b_trigBufferSize + 3; Uint32 sz = f_trigBufferSize + b_trigBufferSize + buffer_header_sz;
if((dst = get_buffer_ptr(signal, bucket, gci, sz))) if((dst = get_buffer_ptr(signal, bucket, gci, sz)))
{ {
* dst++ = tableId; * dst++ = tableId;
* dst++ = tabPtr.p->m_schemaVersion; * dst++ = tabPtr.p->m_schemaVersion;
* dst++ = (event << 16) | f_trigBufferSize; * dst++ = (event << 16) | f_trigBufferSize;
* dst++ = any_value;
memcpy(dst, f_buffer, f_trigBufferSize << 2); memcpy(dst, f_buffer, f_trigBufferSize << 2);
dst += f_trigBufferSize; dst += f_trigBufferSize;
memcpy(dst, b_buffer, b_trigBufferSize << 2); memcpy(dst, b_buffer, b_trigBufferSize << 2);
...@@ -5029,18 +5032,20 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci, ...@@ -5029,18 +5032,20 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci,
} }
else else
{ {
const uint buffer_header_sz = 4;
g_cnt++; g_cnt++;
Uint32 table = * src++ ; Uint32 table = * src++ ;
Uint32 schemaVersion = * src++; Uint32 schemaVersion = * src++;
Uint32 event = * src >> 16; Uint32 event = * src >> 16;
Uint32 sz_1 = (* src ++) & 0xFFFF; Uint32 sz_1 = (* src ++) & 0xFFFF;
Uint32 any_value = * src++;
ndbassert(sz - 3 >= sz_1); ndbassert(sz - buffer_header_sz >= sz_1);
LinearSectionPtr ptr[3]; LinearSectionPtr ptr[3];
const Uint32 nptr= reformat(signal, ptr, const Uint32 nptr= reformat(signal, ptr,
src, sz_1, src, sz_1,
src + sz_1, sz - 3 - sz_1); src + sz_1, sz - buffer_header_sz - sz_1);
Uint32 ptrLen= 0; Uint32 ptrLen= 0;
for(Uint32 i =0; i < nptr; i++) for(Uint32 i =0; i < nptr; i++)
ptrLen+= ptr[i].sz; ptrLen+= ptr[i].sz;
...@@ -5058,7 +5063,7 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci, ...@@ -5058,7 +5063,7 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci,
data->requestInfo = 0; data->requestInfo = 0;
SubTableData::setOperation(data->requestInfo, event); SubTableData::setOperation(data->requestInfo, event);
data->logType = 0; data->logType = 0;
data->changeMask = 0; data->anyValue = any_value;
data->totalLen = ptrLen; data->totalLen = ptrLen;
{ {
......
...@@ -378,6 +378,11 @@ NdbColumnImpl::create_pseudo(const char * name){ ...@@ -378,6 +378,11 @@ NdbColumnImpl::create_pseudo(const char * name){
col->m_impl.m_attrSize = 8; col->m_impl.m_attrSize = 8;
col->m_impl.m_arraySize = 1; col->m_impl.m_arraySize = 1;
col->m_impl.m_nullable = true; col->m_impl.m_nullable = true;
} else if(!strcmp(name, "NDB$ANY_VALUE")){
col->setType(NdbDictionary::Column::Unsigned);
col->m_impl.m_attrId = AttributeHeader::ANY_VALUE;
col->m_impl.m_attrSize = 4;
col->m_impl.m_arraySize = 1;
} else { } else {
abort(); abort();
} }
...@@ -5100,3 +5105,4 @@ const NdbDictionary::Column * NdbDictionary::Column::DISK_REF = 0; ...@@ -5100,3 +5105,4 @@ const NdbDictionary::Column * NdbDictionary::Column::DISK_REF = 0;
const NdbDictionary::Column * NdbDictionary::Column::RECORDS_IN_RANGE = 0; const NdbDictionary::Column * NdbDictionary::Column::RECORDS_IN_RANGE = 0;
const NdbDictionary::Column * NdbDictionary::Column::ROWID = 0; const NdbDictionary::Column * NdbDictionary::Column::ROWID = 0;
const NdbDictionary::Column * NdbDictionary::Column::ROW_GCI = 0; const NdbDictionary::Column * NdbDictionary::Column::ROW_GCI = 0;
const NdbDictionary::Column * NdbDictionary::Column::ANY_VALUE = 0;
...@@ -122,6 +122,12 @@ NdbEventOperation::getGCI() const ...@@ -122,6 +122,12 @@ NdbEventOperation::getGCI() const
return m_impl.getGCI(); return m_impl.getGCI();
} }
Uint32
NdbEventOperation::getAnyValue() const
{
return m_impl.getAnyValue();
}
Uint64 Uint64
NdbEventOperation::getLatestGCI() const NdbEventOperation::getLatestGCI() const
{ {
......
...@@ -683,6 +683,12 @@ NdbEventOperationImpl::getGCI() ...@@ -683,6 +683,12 @@ NdbEventOperationImpl::getGCI()
return m_data_item->sdata->gci; return m_data_item->sdata->gci;
} }
Uint32
NdbEventOperationImpl::getAnyValue() const
{
return m_data_item->sdata->anyValue;
}
Uint64 Uint64
NdbEventOperationImpl::getLatestGCI() NdbEventOperationImpl::getLatestGCI()
{ {
......
...@@ -366,6 +366,7 @@ public: ...@@ -366,6 +366,7 @@ public:
const bool tableFragmentationChanged() const; const bool tableFragmentationChanged() const;
const bool tableRangeListChanged() const; const bool tableRangeListChanged() const;
Uint64 getGCI(); Uint64 getGCI();
Uint32 getAnyValue() const;
Uint64 getLatestGCI(); Uint64 getLatestGCI();
bool execSUB_TABLE_DATA(NdbApiSignal * signal, bool execSUB_TABLE_DATA(NdbApiSignal * signal,
LinearSectionPtr ptr[3]); LinearSectionPtr ptr[3]);
......
...@@ -571,6 +571,33 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo, ...@@ -571,6 +571,33 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
DBUG_RETURN(0); DBUG_RETURN(0);
}//NdbOperation::setValue() }//NdbOperation::setValue()
int
NdbOperation::setAnyValue(Uint32 any_value)
{
const NdbColumnImpl* impl =
&NdbColumnImpl::getImpl(* NdbDictionary::Column::ANY_VALUE);
OperationType tOpType = theOperationType;
OperationStatus tStatus = theStatus;
switch(tOpType){
case DeleteRequest:{
Uint32 ah;
AttributeHeader::init(&ah, AttributeHeader::ANY_VALUE, 4);
if (insertATTRINFO(ah) != -1 && insertATTRINFO(any_value) != -1 )
{
return 0;
}
}
default:
return setValue(impl, (const char *)&any_value);
}
setErrorCodeAbort(4000);
return -1;
}
NdbBlob* NdbBlob*
NdbOperation::getBlobHandle(NdbTransaction* aCon, const NdbColumnImpl* tAttrInfo) NdbOperation::getBlobHandle(NdbTransaction* aCon, const NdbColumnImpl* tAttrInfo)
{ {
......
...@@ -328,6 +328,8 @@ Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char * ...@@ -328,6 +328,8 @@ Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
NdbColumnImpl::create_pseudo("NDB$ROWID"); NdbColumnImpl::create_pseudo("NDB$ROWID");
NdbDictionary::Column::ROW_GCI= NdbDictionary::Column::ROW_GCI=
NdbColumnImpl::create_pseudo("NDB$ROW_GCI"); NdbColumnImpl::create_pseudo("NDB$ROW_GCI");
NdbDictionary::Column::ANY_VALUE=
NdbColumnImpl::create_pseudo("NDB$ANY_VALUE");
} }
NdbMutex_Unlock(g_ndb_connection_mutex); NdbMutex_Unlock(g_ndb_connection_mutex);
...@@ -382,6 +384,7 @@ Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl() ...@@ -382,6 +384,7 @@ Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
delete NdbDictionary::Column::RECORDS_IN_RANGE; delete NdbDictionary::Column::RECORDS_IN_RANGE;
delete NdbDictionary::Column::ROWID; delete NdbDictionary::Column::ROWID;
delete NdbDictionary::Column::ROW_GCI; delete NdbDictionary::Column::ROW_GCI;
delete NdbDictionary::Column::ANY_VALUE;
NdbDictionary::Column::FRAGMENT= 0; NdbDictionary::Column::FRAGMENT= 0;
NdbDictionary::Column::FRAGMENT_FIXED_MEMORY= 0; NdbDictionary::Column::FRAGMENT_FIXED_MEMORY= 0;
NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY= 0; NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY= 0;
...@@ -393,6 +396,7 @@ Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl() ...@@ -393,6 +396,7 @@ Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
NdbDictionary::Column::RECORDS_IN_RANGE= 0; NdbDictionary::Column::RECORDS_IN_RANGE= 0;
NdbDictionary::Column::ROWID= 0; NdbDictionary::Column::ROWID= 0;
NdbDictionary::Column::ROW_GCI= 0; NdbDictionary::Column::ROW_GCI= 0;
NdbDictionary::Column::ANY_VALUE= 0;
} }
NdbMutex_Unlock(g_ndb_connection_mutex); NdbMutex_Unlock(g_ndb_connection_mutex);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment