Bug #17095 Cluster RBR in circle does not terminate

- add any value to ndb
- use it to update correct server id in binlog thread
parent d2bcd58e
......@@ -2712,6 +2712,9 @@ int ha_ndbcluster::write_row(byte *record)
op->setValue(no_fields, part_func_value);
}
if (thd->slave_thread)
op->setAnyValue(thd->server_id);
m_rows_changed++;
/*
......@@ -2992,6 +2995,10 @@ int ha_ndbcluster::update_row(const byte *old_data, byte *new_data)
no_fields++;
op->setValue(no_fields, part_func_value);
}
if (thd->slave_thread)
op->setAnyValue(thd->server_id);
// Execute update operation
if (!cursor && execute_no_commit(this,trans,FALSE) != 0) {
no_uncommitted_rows_execute_failure();
......@@ -3047,6 +3054,9 @@ int ha_ndbcluster::delete_row(const byte *record)
no_uncommitted_rows_update(-1);
if (thd->slave_thread)
((NdbOperation *)trans->getLastDefinedOperation())->setAnyValue(thd->server_id);
if (!m_primary_key_update)
// If deleting from cursor, NoCommit will be handled in next_result
DBUG_RETURN(0);
......@@ -3076,6 +3086,9 @@ int ha_ndbcluster::delete_row(const byte *record)
if ((error= set_primary_key_from_record(op, record)))
DBUG_RETURN(error);
}
if (thd->slave_thread)
op->setAnyValue(thd->server_id);
}
// Execute delete operation
......
......@@ -3189,8 +3189,12 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
NDB_SHARE *share= (NDB_SHARE*) pOp->getCustomData();
if (share == ndb_apply_status_share)
return 0;
TABLE *table= share->table;
uint originating_server_id= pOp->getAnyValue();
if (originating_server_id == 0)
originating_server_id= ::server_id;
TABLE *table= share->table;
DBUG_ASSERT(trans.good());
DBUG_ASSERT(table != 0);
......@@ -3235,7 +3239,7 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
DBUG_ASSERT(ret == 0);
}
ndb_unpack_record(table, share->ndb_value[0], &b, table->record[0]);
IF_DBUG(int ret=) trans.write_row(::server_id,
IF_DBUG(int ret=) trans.write_row(originating_server_id,
injector::transaction::table(table,
TRUE),
&b, n_fields, table->record[0]);
......@@ -3275,7 +3279,7 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
}
ndb_unpack_record(table, share->ndb_value[n], &b, table->record[n]);
DBUG_EXECUTE("info", print_records(table, table->record[n]););
IF_DBUG(int ret =) trans.delete_row(::server_id,
IF_DBUG(int ret =) trans.delete_row(originating_server_id,
injector::transaction::table(table,
TRUE),
&b, n_fields, table->record[n]);
......@@ -3305,7 +3309,8 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
since table has a primary key, we can do a write
using only after values
*/
trans.write_row(::server_id, injector::transaction::table(table, TRUE),
trans.write_row(originating_server_id,
injector::transaction::table(table, TRUE),
&b, n_fields, table->record[0]);// after values
}
else
......@@ -3325,7 +3330,7 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
}
ndb_unpack_record(table, share->ndb_value[1], &b, table->record[1]);
DBUG_EXECUTE("info", print_records(table, table->record[1]););
IF_DBUG(int ret =) trans.update_row(::server_id,
IF_DBUG(int ret =) trans.update_row(originating_server_id,
injector::transaction::table(table,
TRUE),
&b, n_fields,
......
......@@ -45,7 +45,8 @@ public:
STATIC_CONST( ROWID = 0xFFF6 );
STATIC_CONST( ROW_GCI = 0xFFF5 );
STATIC_CONST( FRAGMENT_VARSIZED_MEMORY = 0xFFF4 );
STATIC_CONST( ANY_VALUE = 0xFFF3 );
// NOTE: in 5.1 ctors and init take size in bytes
/** Initialize AttributeHeader at location aHeaderPtr */
......
......@@ -53,7 +53,7 @@ class FireTrigOrd {
public:
STATIC_CONST( SignalLength = 8 );
STATIC_CONST( SignalWithGCILength = 9 );
STATIC_CONST( SignalWithHashValueLength = 10 );
STATIC_CONST( SignalLengthSuma = 11 );
private:
Uint32 m_connectionPtr;
......@@ -66,6 +66,7 @@ private:
Uint32 fragId;
Uint32 m_gci;
Uint32 m_hashValue;
Uint32 m_any_value;
// Public methods
public:
Uint32 getConnectionPtr() const;
......@@ -86,6 +87,8 @@ public:
void setGCI(Uint32);
Uint32 getHashValue() const;
void setHashValue(Uint32);
Uint32 getAnyValue() const;
void setAnyValue(Uint32);
};
inline
......@@ -196,5 +199,17 @@ void FireTrigOrd::setHashValue(Uint32 flag)
m_hashValue = flag;
}
inline
Uint32 FireTrigOrd::getAnyValue() const
{
return m_any_value;
}
inline
void FireTrigOrd::setAnyValue(Uint32 any_value)
{
m_any_value = any_value;
}
#endif
......@@ -303,7 +303,10 @@ struct SubTableData {
Uint32 tableId;
Uint32 requestInfo;
Uint32 logType;
Uint32 changeMask;
union {
Uint32 changeMask;
Uint32 anyValue;
};
Uint32 totalLen;
static void setOperation(Uint32& ri, Uint32 val) {
......
......@@ -534,6 +534,7 @@ public:
static const Column * RECORDS_IN_RANGE;
static const Column * ROWID;
static const Column * ROW_GCI;
static const Column * ANY_VALUE;
int getSizeInBytes() const;
#endif
......
......@@ -202,6 +202,13 @@ public:
*/
Uint64 getGCI() const;
/**
* Retrieve the AnyValue of the latest retrieved event
*
* @return AnyValue
*/
Uint32 getAnyValue() const;
/**
* Retrieve the complete GCI in the cluster (not necessarily
* associated with an event)
......
......@@ -413,6 +413,9 @@ public:
int setValue(const char* anAttrName, Uint64 aValue);
int setValue(const char* anAttrName, float aValue);
int setValue(const char* anAttrName, double aValue);
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
int setAnyValue(Uint32 aValue);
#endif
#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
int setValue(Uint32 anAttrId, const char* aValue, Uint32 len);
......
......@@ -753,6 +753,7 @@ struct Operationrec {
union {
Uint32 firstAttrinbufrec; //Used until copyAttrinfo
};
Uint32 m_any_value;
union {
Uint32 lastAttrinbufrec; //Used until copyAttrinfo
Uint32 nextPool;
......
......@@ -35,6 +35,7 @@ void Dbtup::freeAllAttrBuffers(Operationrec* const regOperPtr)
}//if
regOperPtr->firstAttrinbufrec = RNIL;
regOperPtr->lastAttrinbufrec = RNIL;
regOperPtr->m_any_value = 0;
}//Dbtup::freeAllAttrBuffers()
void Dbtup::freeAttrinbufrec(Uint32 anAttrBuf)
......
......@@ -102,6 +102,7 @@ void Dbtup::copyAttrinfo(Operationrec * regOperPtr,
regOperPtr->storedProcedureId= RNIL;
regOperPtr->firstAttrinbufrec= RNIL;
regOperPtr->lastAttrinbufrec= RNIL;
regOperPtr->m_any_value= 0;
}
void Dbtup::handleATTRINFOforTUPKEYREQ(Signal* signal,
......@@ -811,6 +812,7 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
else if(Roptype == ZDELETE)
{
jam();
req_struct.log_size= 0;
if (handleDeleteReq(signal, regOperPtr,
regFragPtr, regTabPtr,
&req_struct,
......@@ -829,7 +831,6 @@ void Dbtup::execTUPKEYREQ(Signal* signal)
regOperPtr,
regTabPtr);
set_change_mask_state(regOperPtr, DELETE_CHANGES);
req_struct.log_size= 0;
sendTUPKEYCONF(signal, &req_struct, regOperPtr);
return;
}
......@@ -1536,7 +1537,14 @@ int Dbtup::handleDeleteReq(Signal* signal,
if (setup_read(req_struct, regOperPtr, regFragPtr, regTabPtr, disk))
{
return handleReadReq(signal, regOperPtr, regTabPtr, req_struct);
Uint32 RlogSize;
int ret= handleReadReq(signal, regOperPtr, regTabPtr, req_struct);
if (ret == 0 && (RlogSize= req_struct->log_size))
{
jam();
sendLogAttrinfo(signal, RlogSize, regOperPtr);
}
return ret;
}
error:
......
......@@ -680,6 +680,7 @@ void Dbtup::execTUPSEIZEREQ(Signal* signal)
new (regOperPtr.p) Operationrec();
regOperPtr.p->firstAttrinbufrec = RNIL;
regOperPtr.p->lastAttrinbufrec = RNIL;
regOperPtr.p->m_any_value = 0;
regOperPtr.p->op_struct.op_type = ZREAD;
regOperPtr.p->op_struct.in_active_list = false;
set_trans_state(regOperPtr.p, TRANS_DISCONNECTED);
......
......@@ -221,6 +221,18 @@ int Dbtup::readAttributes(KeyReqStruct *req_struct,
return -1;
}
} else if(attributeId & AttributeHeader::PSEUDO) {
if (attributeId == AttributeHeader::ANY_VALUE)
{
jam();
Uint32 RlogSize = req_struct->log_size;
operPtr.p->m_any_value = inBuffer[inBufIndex];
* (clogMemBuffer + RlogSize) = inBuffer[inBufIndex - 1];
* (clogMemBuffer + RlogSize + 1) = inBuffer[inBufIndex];
inBufIndex++;
req_struct->out_buf_index = tmpAttrBufIndex;
req_struct->log_size = RlogSize + 2;
continue;
}
jam();
Uint32 sz= read_pseudo(attributeId,
req_struct,
......@@ -781,6 +793,15 @@ int Dbtup::updateAttributes(KeyReqStruct *req_struct,
inBufIndex += 1 + sz;
req_struct->in_buf_index = inBufIndex;
}
else if(attributeId == AttributeHeader::ANY_VALUE)
{
jam();
Uint32 sz= ahIn.getDataSize();
ndbrequire(sz == 1);
regOperPtr->m_any_value = * (inBuffer + inBufIndex + 1);
inBufIndex += 1 + sz;
req_struct->in_buf_index = inBufIndex;
}
else
{
jam();
......
......@@ -203,6 +203,7 @@ bool Dbtup::storedProcedureAttrInfo(Signal* signal,
storedPtr.p->storedLinkLast = regOperPtr->lastAttrinbufrec;
regOperPtr->firstAttrinbufrec = RNIL;
regOperPtr->lastAttrinbufrec = RNIL;
regOperPtr->m_any_value = 0;
set_trans_state(regOperPtr, TRANS_IDLE);
signal->theData[0] = regOperPtr->userpointer;
signal->theData[1] = storedPtr.i;
......@@ -220,6 +221,7 @@ void Dbtup::storedSeizeAttrinbufrecErrorLab(Signal* signal,
storedPtr.p->storedLinkFirst = regOperPtr->firstAttrinbufrec;
regOperPtr->firstAttrinbufrec = RNIL;
regOperPtr->lastAttrinbufrec = RNIL;
regOperPtr->m_any_value = 0;
set_trans_state(regOperPtr, TRANS_ERROR_WAIT_STORED_PROCREQ);
signal->theData[0] = regOperPtr->userpointer;
signal->theData[1] = ZSTORED_SEIZE_ATTRINBUFREC_ERROR;
......
......@@ -1048,10 +1048,11 @@ void Dbtup::sendFireTrigOrd(Signal* signal,
// send to backup directly for now
fireTrigOrd->setGCI(req_struct->gci);
fireTrigOrd->setHashValue(req_struct->hash_value);
fireTrigOrd->m_any_value = regOperPtr->m_any_value;
EXECUTE_DIRECT(trigPtr->m_receiverBlock,
GSN_FIRE_TRIG_ORD,
signal,
FireTrigOrd::SignalWithHashValueLength);
FireTrigOrd::SignalLengthSuma);
break;
case (TriggerType::SUBSCRIPTION):
jam();
......
......@@ -3469,6 +3469,7 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
const Uint32 hashValue = trg->getHashValue();
const Uint32 gci = trg->getGCI();
const Uint32 event = trg->getTriggerEvent();
const Uint32 any_value = trg->getAnyValue();
TablePtr tabPtr;
tabPtr.i = trigId & 0xFFFF;
......@@ -3509,7 +3510,7 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
data->requestInfo = 0;
SubTableData::setOperation(data->requestInfo, event);
data->logType = 0;
data->changeMask = 0;
data->anyValue = any_value;
data->totalLen = ptrLen;
{
......@@ -3527,13 +3528,15 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
}
else
{
const uint buffer_header_sz = 4;
Uint32* dst;
Uint32 sz = f_trigBufferSize + b_trigBufferSize + 3;
Uint32 sz = f_trigBufferSize + b_trigBufferSize + buffer_header_sz;
if((dst = get_buffer_ptr(signal, bucket, gci, sz)))
{
* dst++ = tableId;
* dst++ = tabPtr.p->m_schemaVersion;
* dst++ = (event << 16) | f_trigBufferSize;
* dst++ = any_value;
memcpy(dst, f_buffer, f_trigBufferSize << 2);
dst += f_trigBufferSize;
memcpy(dst, b_buffer, b_trigBufferSize << 2);
......@@ -5029,18 +5032,20 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci,
}
else
{
const uint buffer_header_sz = 4;
g_cnt++;
Uint32 table = * src++ ;
Uint32 schemaVersion = * src++;
Uint32 event = * src >> 16;
Uint32 sz_1 = (* src ++) & 0xFFFF;
ndbassert(sz - 3 >= sz_1);
Uint32 any_value = * src++;
ndbassert(sz - buffer_header_sz >= sz_1);
LinearSectionPtr ptr[3];
const Uint32 nptr= reformat(signal, ptr,
src, sz_1,
src + sz_1, sz - 3 - sz_1);
src + sz_1, sz - buffer_header_sz - sz_1);
Uint32 ptrLen= 0;
for(Uint32 i =0; i < nptr; i++)
ptrLen+= ptr[i].sz;
......@@ -5058,7 +5063,7 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci,
data->requestInfo = 0;
SubTableData::setOperation(data->requestInfo, event);
data->logType = 0;
data->changeMask = 0;
data->anyValue = any_value;
data->totalLen = ptrLen;
{
......
......@@ -378,6 +378,11 @@ NdbColumnImpl::create_pseudo(const char * name){
col->m_impl.m_attrSize = 8;
col->m_impl.m_arraySize = 1;
col->m_impl.m_nullable = true;
} else if(!strcmp(name, "NDB$ANY_VALUE")){
col->setType(NdbDictionary::Column::Unsigned);
col->m_impl.m_attrId = AttributeHeader::ANY_VALUE;
col->m_impl.m_attrSize = 4;
col->m_impl.m_arraySize = 1;
} else {
abort();
}
......@@ -5100,3 +5105,4 @@ const NdbDictionary::Column * NdbDictionary::Column::DISK_REF = 0;
const NdbDictionary::Column * NdbDictionary::Column::RECORDS_IN_RANGE = 0;
const NdbDictionary::Column * NdbDictionary::Column::ROWID = 0;
const NdbDictionary::Column * NdbDictionary::Column::ROW_GCI = 0;
const NdbDictionary::Column * NdbDictionary::Column::ANY_VALUE = 0;
......@@ -122,6 +122,12 @@ NdbEventOperation::getGCI() const
return m_impl.getGCI();
}
Uint32
NdbEventOperation::getAnyValue() const
{
return m_impl.getAnyValue();
}
Uint64
NdbEventOperation::getLatestGCI() const
{
......
......@@ -683,6 +683,12 @@ NdbEventOperationImpl::getGCI()
return m_data_item->sdata->gci;
}
Uint32
NdbEventOperationImpl::getAnyValue() const
{
return m_data_item->sdata->anyValue;
}
Uint64
NdbEventOperationImpl::getLatestGCI()
{
......
......@@ -366,6 +366,7 @@ public:
const bool tableFragmentationChanged() const;
const bool tableRangeListChanged() const;
Uint64 getGCI();
Uint32 getAnyValue() const;
Uint64 getLatestGCI();
bool execSUB_TABLE_DATA(NdbApiSignal * signal,
LinearSectionPtr ptr[3]);
......
......@@ -571,6 +571,33 @@ NdbOperation::setValue( const NdbColumnImpl* tAttrInfo,
DBUG_RETURN(0);
}//NdbOperation::setValue()
int
NdbOperation::setAnyValue(Uint32 any_value)
{
const NdbColumnImpl* impl =
&NdbColumnImpl::getImpl(* NdbDictionary::Column::ANY_VALUE);
OperationType tOpType = theOperationType;
OperationStatus tStatus = theStatus;
switch(tOpType){
case DeleteRequest:{
Uint32 ah;
AttributeHeader::init(&ah, AttributeHeader::ANY_VALUE, 4);
if (insertATTRINFO(ah) != -1 && insertATTRINFO(any_value) != -1 )
{
return 0;
}
}
default:
return setValue(impl, (const char *)&any_value);
}
setErrorCodeAbort(4000);
return -1;
}
NdbBlob*
NdbOperation::getBlobHandle(NdbTransaction* aCon, const NdbColumnImpl* tAttrInfo)
{
......
......@@ -328,6 +328,8 @@ Ndb_cluster_connection_impl::Ndb_cluster_connection_impl(const char *
NdbColumnImpl::create_pseudo("NDB$ROWID");
NdbDictionary::Column::ROW_GCI=
NdbColumnImpl::create_pseudo("NDB$ROW_GCI");
NdbDictionary::Column::ANY_VALUE=
NdbColumnImpl::create_pseudo("NDB$ANY_VALUE");
}
NdbMutex_Unlock(g_ndb_connection_mutex);
......@@ -382,6 +384,7 @@ Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
delete NdbDictionary::Column::RECORDS_IN_RANGE;
delete NdbDictionary::Column::ROWID;
delete NdbDictionary::Column::ROW_GCI;
delete NdbDictionary::Column::ANY_VALUE;
NdbDictionary::Column::FRAGMENT= 0;
NdbDictionary::Column::FRAGMENT_FIXED_MEMORY= 0;
NdbDictionary::Column::FRAGMENT_VARSIZED_MEMORY= 0;
......@@ -393,6 +396,7 @@ Ndb_cluster_connection_impl::~Ndb_cluster_connection_impl()
NdbDictionary::Column::RECORDS_IN_RANGE= 0;
NdbDictionary::Column::ROWID= 0;
NdbDictionary::Column::ROW_GCI= 0;
NdbDictionary::Column::ANY_VALUE= 0;
}
NdbMutex_Unlock(g_ndb_connection_mutex);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment