Commit 58eed59a authored by unknown's avatar unknown

Merge perch.ndb.mysql.com:/home/jonas/src/51-work

into  perch.ndb.mysql.com:/home/jonas/src/mysql-5.1-new-ndb


sql/ha_ndbcluster.cc:
  Auto merged
parents 9d2513dd ad204a69
......@@ -302,13 +302,32 @@ struct SubTableData {
Uint32 senderData;
Uint32 gci;
Uint32 tableId;
Uint8 operation;
Uint8 req_nodeid;
Uint8 ndbd_nodeid;
Uint8 not_used3;
Uint32 requestInfo;
Uint32 logType;
Uint32 changeMask;
Uint32 totalLen;
static void setOperation(Uint32& ri, Uint32 val) {
ri = (ri & 0xFFFFFF00) | val;
}
static void setReqNodeId(Uint32& ri, Uint32 val) {
ri = (ri & 0xFFFF00FF) | (val << 8);
}
static void setNdbdNodeId(Uint32& ri, Uint32 val) {
ri = (ri & 0xFF00FFFF) | (val << 16);
}
static Uint32 getOperation(const Uint32 & ri){
return (ri & 0xFF);
}
static Uint32 getReqNodeId(const Uint32 & ri){
return (ri >> 8) & 0xFF;
}
static Uint32 getNdbdNodeId(const Uint32 & ri){
return (ri >> 16) & 0xFF;
}
};
struct SubSyncContinueReq {
......
......@@ -174,10 +174,10 @@ printSUB_TABLE_DATA(FILE * output, const Uint32 * theData,
Uint32 len, Uint16 receiverBlockNo) {
const SubTableData * const sig = (SubTableData *)theData;
fprintf(output, " senderData: %x\n", sig->senderData);
fprintf(output, " senderData: %x\n", sig->senderData);
fprintf(output, " gci: %x\n", sig->gci);
fprintf(output, " tableId: %x\n", sig->tableId);
fprintf(output, " operation: %x\n", sig->operation);
fprintf(output, " operation: %x\n",
SubTableData::getOperation(sig->requestInfo));
return false;
}
......
......@@ -2642,12 +2642,16 @@ Suma::sendSubStopComplete(Signal* signal, SubscriberPtr subbPtr)
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
data->gci = m_last_complete_gci + 1; // XXX ???
data->tableId = 0;
data->operation = NdbDictionary::Event::_TE_STOP;
data->requestInfo = 0;
SubTableData::setOperation(data->requestInfo,
NdbDictionary::Event::_TE_STOP);
SubTableData::setNdbdNodeId(data->requestInfo,
getOwnNodeId());
data->senderData = subbPtr.p->m_senderData;
sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
SubTableData::SignalLength, JBB);
}
SubStopConf * const conf = (SubStopConf*)signal->getDataPtrSend();
conf->senderRef= reference();
......@@ -2681,11 +2685,14 @@ Suma::reportAllSubscribers(Signal *signal,
{
data->gci = m_last_complete_gci + 1;
data->tableId = subPtr.p->m_tableId;
data->operation = NdbDictionary::Event::_TE_ACTIVE;
data->ndbd_nodeid = refToNode(reference());
data->requestInfo = 0;
SubTableData::setOperation(data->requestInfo,
NdbDictionary::Event::_TE_ACTIVE);
SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
SubTableData::setReqNodeId(data->requestInfo,
refToNode(subbPtr.p->m_senderRef));
data->changeMask = 0;
data->totalLen = 0;
data->req_nodeid = refToNode(subbPtr.p->m_senderRef);
data->senderData = subbPtr.p->m_senderData;
sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
SubTableData::SignalLength, JBB);
......@@ -2707,8 +2714,9 @@ Suma::reportAllSubscribers(Signal *signal,
//#endif
data->gci = m_last_complete_gci + 1;
data->tableId = subPtr.p->m_tableId;
data->operation = table_event;
data->ndbd_nodeid = refToNode(reference());
data->requestInfo = 0;
SubTableData::setOperation(data->requestInfo, table_event);
SubTableData::setNdbdNodeId(data->requestInfo, getOwnNodeId());
data->changeMask = 0;
data->totalLen = 0;
......@@ -2720,7 +2728,8 @@ Suma::reportAllSubscribers(Signal *signal,
{
if (i_subbPtr.p->m_subPtrI == subPtr.i)
{
data->req_nodeid = refToNode(subbPtr.p->m_senderRef);
SubTableData::setReqNodeId(data->requestInfo,
refToNode(subbPtr.p->m_senderRef));
data->senderData = i_subbPtr.p->m_senderData;
sendSignal(i_subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
SubTableData::SignalLength, JBB);
......@@ -2729,12 +2738,14 @@ Suma::reportAllSubscribers(Signal *signal,
table_event == NdbDictionary::Event::_TE_SUBSCRIBE ?
"SUBSCRIBE" : "UNSUBSCRIBE", (int) table_event,
refToNode(i_subbPtr.p->m_senderRef),
data->req_nodeid, data->senderData
refToNode(subbPtr.p->m_senderRef), data->senderData
);
//#endif
if (i_subbPtr.i != subbPtr.i)
{
data->req_nodeid = refToNode(i_subbPtr.p->m_senderRef);
SubTableData::setReqNodeId(data->requestInfo,
refToNode(i_subbPtr.p->m_senderRef));
data->senderData = subbPtr.p->m_senderData;
sendSignal(subbPtr.p->m_senderRef, GSN_SUB_TABLE_DATA, signal,
SubTableData::SignalLength, JBB);
......@@ -2743,7 +2754,7 @@ Suma::reportAllSubscribers(Signal *signal,
table_event == NdbDictionary::Event::_TE_SUBSCRIBE ?
"SUBSCRIBE" : "UNSUBSCRIBE", (int) table_event,
refToNode(subbPtr.p->m_senderRef),
data->req_nodeid, data->senderData
refToNode(i_subbPtr.p->m_senderRef), data->senderData
);
//#endif
}
......@@ -3146,7 +3157,9 @@ Suma::execTRANSID_AI(Signal* signal)
Uint32 ref = subPtr.p->m_senderRef;
sdata->tableId = syncPtr.p->m_currentTableId;
sdata->senderData = subPtr.p->m_senderData;
sdata->operation = NdbDictionary::Event::_TE_SCAN; // Scan
sdata->requestInfo = 0;
SubTableData::setOperation(sdata->requestInfo,
NdbDictionary::Event::_TE_SCAN); // Scan
sdata->gci = 0; // Undefined
#if PRINT_ONLY
ndbout_c("GSN_SUB_TABLE_DATA (scan) #attr: %d len: %d", attribs, sum);
......@@ -3362,7 +3375,8 @@ Suma::execFIRE_TRIG_ORD(Signal* signal)
SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
data->gci = gci;
data->tableId = tabPtr.p->m_tableId;
data->operation = event;
data->requestInfo = 0;
SubTableData::setOperation(data->requestInfo, event);
data->logType = 0;
data->changeMask = 0;
data->totalLen = ptrLen;
......@@ -3588,8 +3602,9 @@ Suma::execDROP_TAB_CONF(Signal *signal)
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
data->gci = m_last_complete_gci+1;
data->tableId = tableId;
data->operation = NdbDictionary::Event::_TE_DROP;
data->req_nodeid = refToNode(senderRef);
data->requestInfo = 0;
SubTableData::setOperation(data->requestInfo,NdbDictionary::Event::_TE_DROP);
SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
{
LocalDLList<Subscriber> subbs(c_subscriberPool,tabPtr.p->c_subscribers);
......@@ -3667,8 +3682,10 @@ Suma::execALTER_TAB_REQ(Signal *signal)
SubTableData * data = (SubTableData*)signal->getDataPtrSend();
data->gci = m_last_complete_gci+1;
data->tableId = tableId;
data->operation = NdbDictionary::Event::_TE_ALTER;
data->req_nodeid = refToNode(senderRef);
data->requestInfo = 0;
SubTableData::setOperation(data->requestInfo,
NdbDictionary::Event::_TE_ALTER);
SubTableData::setReqNodeId(data->requestInfo, refToNode(senderRef));
data->logType = 0;
data->changeMask = changeMask;
data->totalLen = tabInfoPtr.sz;
......@@ -4898,7 +4915,8 @@ Suma::resend_bucket(Signal* signal, Uint32 buck, Uint32 min_gci,
SubTableData * data = (SubTableData*)signal->getDataPtrSend();//trg;
data->gci = last_gci;
data->tableId = tabPtr.p->m_tableId;
data->operation = event;
data->requestInfo = 0;
SubTableData::setOperation(data->requestInfo, event);
data->logType = 0;
data->changeMask = 0;
data->totalLen = ptrLen;
......
......@@ -1894,7 +1894,8 @@ int
NdbBlob::atNextEvent()
{
DBUG_ENTER("NdbBlob::atNextEvent");
Uint32 optype = theEventOp->m_data_item->sdata->operation;
Uint32 optype =
SubTableData::getOperation(theEventOp->m_data_item->sdata->requestInfo);
DBUG_PRINT("info", ("this=%p op=%p blob op=%p version=%d optype=%u", this, theEventOp, theBlobEventOp, theEventBlobVersion, optype));
if (theState == Invalid)
DBUG_RETURN(-1);
......
......@@ -192,12 +192,12 @@ void * NdbEventOperation::getCustomData() const
int NdbEventOperation::getReqNodeId() const
{
return m_impl.m_data_item->sdata->req_nodeid;
return SubTableData::getReqNodeId(m_impl.m_data_item->sdata->requestInfo);
}
int NdbEventOperation::getNdbdNodeId() const
{
return m_impl.m_data_item->sdata->ndbd_nodeid;
return SubTableData::getNdbdNodeId(m_impl.m_data_item->sdata->requestInfo);
}
/*
......
......@@ -54,7 +54,8 @@ static const Uint32 ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1;
static void
print_std(const SubTableData * sdata, LinearSectionPtr ptr[3])
{
printf("addr=%p gci=%d op=%d\n", (void*)sdata, sdata->gci, sdata->operation);
printf("addr=%p gci=%d op=%d\n", (void*)sdata, sdata->gci,
SubTableData::getOperation(sdata->requestInfo));
for (int i = 0; i <= 2; i++) {
printf("sec=%d addr=%p sz=%d\n", i, (void*)ptr[i].p, ptr[i].sz);
for (int j = 0; j < ptr[i].sz; j++)
......@@ -672,7 +673,8 @@ NdbEventOperationImpl::execSUB_TABLE_DATA(NdbApiSignal * signal,
int
NdbEventOperationImpl::receive_event()
{
Uint32 operation= (Uint32)m_data_item->sdata->operation;
Uint32 operation=
SubTableData::getOperation(m_data_item->sdata->requestInfo);
if (unlikely(operation >= NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT))
{
DBUG_ENTER("NdbEventOperationImpl::receive_event");
......@@ -869,7 +871,7 @@ NdbDictionary::Event::TableEvent
NdbEventOperationImpl::getEventType()
{
return (NdbDictionary::Event::TableEvent)
(1 << (unsigned)m_data_item->sdata->operation);
(1 << SubTableData::getOperation(m_data_item->sdata->requestInfo));
}
......@@ -1207,7 +1209,8 @@ NdbEventBuffer::nextEvent()
}
assert(gci_ops && (op->getGCI() == gci_ops->m_gci));
// to return TE_NUL it should be made into data event
if (data->sdata->operation == NdbDictionary::Event::_TE_NUL)
if (SubTableData::getOperation(data->sdata->requestInfo) ==
NdbDictionary::Event::_TE_NUL)
{
DBUG_PRINT_EVENT("info", ("skip _TE_NUL"));
continue;
......@@ -1574,9 +1577,11 @@ NdbEventBuffer::report_node_connected(Uint32 node_id)
bzero(ptr, sizeof(ptr));
data.tableId = ~0;
data.operation = NdbDictionary::Event::_TE_ACTIVE;
data.req_nodeid = (Uint8)node_id;
data.ndbd_nodeid = (Uint8)node_id;
data.requestInfo = 0;
SubTableData::setOperation(data.requestInfo,
NdbDictionary::Event::_TE_ACTIVE);
SubTableData::setReqNodeId(data.requestInfo, node_id);
SubTableData::setNdbdNodeId(data.requestInfo, node_id);
data.logType = SubTableData::LOG;
data.gci = m_latestGCI + 1;
/**
......@@ -1614,9 +1619,11 @@ NdbEventBuffer::report_node_failure(Uint32 node_id)
bzero(ptr, sizeof(ptr));
data.tableId = ~0;
data.operation = NdbDictionary::Event::_TE_NODE_FAILURE;
data.req_nodeid = (Uint8)node_id;
data.ndbd_nodeid = (Uint8)node_id;
data.requestInfo = 0;
SubTableData::setOperation(data.requestInfo,
NdbDictionary::Event::_TE_NODE_FAILURE);
SubTableData::setReqNodeId(data.requestInfo, node_id);
SubTableData::setNdbdNodeId(data.requestInfo, node_id);
data.logType = SubTableData::LOG;
data.gci = m_latestGCI + 1;
/**
......@@ -1654,7 +1661,9 @@ NdbEventBuffer::completeClusterFailed()
bzero(ptr, sizeof(ptr));
data.tableId = ~0;
data.operation = NdbDictionary::Event::_TE_CLUSTER_FAILURE;
data.requestInfo = 0;
SubTableData::setOperation(data.requestInfo,
NdbDictionary::Event::_TE_CLUSTER_FAILURE);
data.logType = SubTableData::LOG;
data.gci = m_latestGCI + 1;
......@@ -1757,19 +1766,21 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
LinearSectionPtr ptr[3])
{
DBUG_ENTER_EVENT("NdbEventBuffer::insertDataL");
const Uint32 ri = sdata->requestInfo;
const Uint32 operation = SubTableData::getOperation(ri);
Uint64 gci= sdata->gci;
const bool is_data_event =
sdata->operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
const bool is_data_event =
operation < NdbDictionary::Event::_TE_FIRST_NON_DATA_EVENT;
if (!is_data_event)
{
switch (sdata->operation)
switch (operation)
{
case NdbDictionary::Event::_TE_NODE_FAILURE:
op->m_node_bit_mask.clear(sdata->ndbd_nodeid);
op->m_node_bit_mask.clear(SubTableData::getNdbdNodeId(ri));
break;
case NdbDictionary::Event::_TE_ACTIVE:
op->m_node_bit_mask.set(sdata->ndbd_nodeid);
op->m_node_bit_mask.set(SubTableData::getNdbdNodeId(ri));
// internal event, do not relay to user
DBUG_RETURN_EVENT(0);
break;
......@@ -1780,7 +1791,8 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_PRINT("info", ("m_ref_count: %u for op: %p", op->m_ref_count, op));
break;
case NdbDictionary::Event::_TE_STOP:
op->m_node_bit_mask.clear(sdata->ndbd_nodeid);
ndbout_c("sdata->ndbd_nodeid: %d", SubTableData::getNdbdNodeId(ri));
op->m_node_bit_mask.clear(SubTableData::getNdbdNodeId(ri));
if (op->m_node_bit_mask.isclear())
{
DBUG_ASSERT(op->m_ref_count > 0);
......@@ -1792,19 +1804,20 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
break;
}
}
if ( likely((Uint32)op->mi_type & (1 << (Uint32)sdata->operation)) )
if ( likely((Uint32)op->mi_type & (1 << operation)))
{
Gci_container* bucket= find_bucket(&m_active_gci, gci
#ifdef VM_TRACE
, m_flush_gci
#endif
);
DBUG_PRINT_EVENT("info", ("data insertion in eventId %d", op->m_eventId));
DBUG_PRINT_EVENT("info", ("gci=%d tab=%d op=%d node=%d",
sdata->gci, sdata->tableId, sdata->operation,
sdata->req_nodeid));
sdata->gci, sdata->tableId,
SubTableData::getOperation(sdata->requestInfo),
SubTableData::getReqNodeId(sdata->requestInfo)));
if (unlikely(bucket == 0))
{
......@@ -1824,7 +1837,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_PRINT_EVENT("info", ("ignore non-data event on blob table"));
DBUG_RETURN_EVENT(0);
}
// find position in bucket hash table
EventBufData* data = 0;
EventBufData_hash::Pos hpos;
......@@ -1833,7 +1846,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
bucket->m_data_hash.search(hpos, op, ptr);
data = hpos.data;
}
if (data == 0)
{
// allocate new result buffer
......@@ -1901,20 +1914,22 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
// since the flags represent multiple ops on multiple PKs
// XXX fix by doing merge at end of epoch (extra mem cost)
{
EventBufData_list::Gci_op g = { op, (1 << sdata->operation) };
EventBufData_list::Gci_op g = { op, (1 << operation) };
bucket->m_data.add_gci_op(g);
}
{
EventBufData_list::Gci_op g = { op, (1 << data->sdata->operation) };
EventBufData_list::Gci_op
g = { op,
(1 << SubTableData::getOperation(data->sdata->requestInfo))};
bucket->m_data.add_gci_op(g);
}
}
}
DBUG_RETURN_EVENT(0);
}
#ifdef VM_TRACE
if ((Uint32)op->m_eventImpl->mi_type & (1 << (Uint32)sdata->operation))
if ((Uint32)op->m_eventImpl->mi_type & (1 << operation))
{
DBUG_PRINT_EVENT("info",("Data arrived before ready eventId", op->m_eventId));
DBUG_RETURN_EVENT(0);
......@@ -2100,8 +2115,8 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
Uint32 nkey = data->m_event_op->m_eventImpl->m_tableImpl->m_noOfKeys;
int t1 = data->sdata->operation;
int t2 = sdata->operation;
int t1 = SubTableData::getOperation(data->sdata->requestInfo);
int t2 = SubTableData::getOperation(sdata->requestInfo);
if (t1 == Ev_t::enum_NUL)
DBUG_RETURN_EVENT(copy_data(sdata, ptr2, data));
......@@ -2165,7 +2180,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
goto end;
}
*data->sdata = *sdata;
data->sdata->operation = tp->t3;
SubTableData::setOperation(data->sdata->requestInfo, tp->t3);
}
ptr[0].sz = ptr[1].sz = ptr[2].sz = 0;
......@@ -2357,7 +2372,7 @@ NdbEventBuffer::get_main_data(Gci_container* bucket,
DBUG_RETURN_EVENT(-1);
SubTableData sdata = *blob_data->sdata;
sdata.tableId = main_op->m_eventImpl->m_tableImpl->m_id;
sdata.operation = NdbDictionary::Event::_TE_NUL;
SubTableData::setOperation(sdata.requestInfo, NdbDictionary::Event::_TE_NUL);
if (copy_data(&sdata, ptr, main_data) != 0)
DBUG_RETURN_EVENT(-1);
hpos.data = main_data;
......
......@@ -225,7 +225,8 @@ void EventBufData_list::append_used_data(EventBufData *data)
inline
void EventBufData_list::append_data(EventBufData *data)
{
Gci_op g = { data->m_event_op, 1 << (Uint32)data->sdata->operation };
Gci_op g = { data->m_event_op,
1 << SubTableData::getOperation(data->sdata->requestInfo) };
add_gci_op(g);
append_used_data(data);
......
......@@ -496,6 +496,7 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
idx = m_current_api_receiver;
last = m_api_receivers_count;
Uint32 timeout = tp->m_waitfor_timeout;
do {
if(theError.code){
......@@ -521,8 +522,7 @@ int NdbScanOperation::nextResultImpl(bool fetchAllowed, bool forceSend)
/**
* No completed...
*/
int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
forceSend);
int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
} else {
......@@ -1425,13 +1425,13 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed,
return -1;
Uint32 seq = theNdbCon->theNodeSequence;
Uint32 nodeId = theNdbCon->theDBnode;
Uint32 timeout = tp->m_waitfor_timeout;
if(seq == tp->getNodeSequence(nodeId) &&
!send_next_scan_ordered(s_idx)){
Uint32 tmp = m_sent_receivers_count;
s_idx = m_current_api_receiver;
while(m_sent_receivers_count > 0 && !theError.code){
int ret_code= poll_guard.wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
forceSend);
int ret_code= poll_guard.wait_scan(3*timeout, nodeId, forceSend);
if (ret_code == 0 && seq == tp->getNodeSequence(nodeId)) {
continue;
}
......@@ -1574,12 +1574,13 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend,
return -1;
}
Uint32 timeout = tp->m_waitfor_timeout;
/**
* Wait for outstanding
*/
while(theError.code == 0 && m_sent_receivers_count)
{
int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId, forceSend);
int return_code= poll_guard->wait_scan(3*timeout, nodeId, forceSend);
switch(return_code){
case 0:
break;
......@@ -1647,8 +1648,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend,
*/
while(m_sent_receivers_count+m_api_receivers_count+m_conf_receivers_count)
{
int return_code= poll_guard->wait_scan(WAITFOR_SCAN_TIMEOUT, nodeId,
forceSend);
int return_code= poll_guard->wait_scan(3*timeout, nodeId, forceSend);
switch(return_code){
case 0:
break;
......
......@@ -750,17 +750,19 @@ Ndb::handleReceivedSignal(NdbApiSignal* aSignal, LinearSectionPtr ptr[3])
}
// Accumulate DIC_TAB_INFO for TE_ALTER events
if (sdata->operation == NdbDictionary::Event::_TE_ALTER &&
if (SubTableData::getOperation(sdata->requestInfo) ==
NdbDictionary::Event::_TE_ALTER &&
!op->execSUB_TABLE_DATA(aSignal, ptr))
return;
for (int i= aSignal->m_noOfSections;i < 3; i++) {
ptr[i].p = NULL;
ptr[i].sz = 0;
}
DBUG_PRINT("info",("oid=senderData: %d, gci: %d, operation: %d, "
"tableId: %d",
sdata->senderData, sdata->gci, sdata->operation,
sdata->senderData, sdata->gci,
SubTableData::getOperation(sdata->requestInfo),
sdata->tableId));
theEventBuffer->insertDataL(op,sdata, ptr);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment