Commit 692cebf7 authored by jonas@perch.ndb.mysql.com's avatar jonas@perch.ndb.mysql.com

Merge joreland@bk-internal.mysql.com:/home/bk/mysql-5.0

into  perch.ndb.mysql.com:/home/jonas/src/mysql-5.0
parents 68db3261 3a36d68f
...@@ -41,7 +41,9 @@ public: ...@@ -41,7 +41,9 @@ public:
* @param parallel No of fragments to scan in parallel (0=max) * @param parallel No of fragments to scan in parallel (0=max)
*/ */
virtual int readTuples(LockMode lock_mode = LM_Read, virtual int readTuples(LockMode lock_mode = LM_Read,
Uint32 scan_flags = 0, Uint32 parallel = 0); Uint32 scan_flags = 0,
Uint32 parallel = 0,
Uint32 batch = 0);
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL #ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
/** /**
...@@ -69,7 +71,7 @@ public: ...@@ -69,7 +71,7 @@ public:
(SF_ReadRangeNo & -(Int32)read_range_no) | (SF_ReadRangeNo & -(Int32)read_range_no) |
(SF_KeyInfo & -(Int32)keyinfo); (SF_KeyInfo & -(Int32)keyinfo);
return readTuples(lock_mode, scan_flags, parallel); return readTuples(lock_mode, scan_flags, parallel, batch);
} }
#endif #endif
......
...@@ -57,7 +57,9 @@ public: ...@@ -57,7 +57,9 @@ public:
*/ */
virtual virtual
int readTuples(LockMode lock_mode = LM_Read, int readTuples(LockMode lock_mode = LM_Read,
Uint32 scan_flags = 0, Uint32 parallel = 0); Uint32 scan_flags = 0,
Uint32 parallel = 0,
Uint32 batch = 0);
#ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED #ifndef DOXYGEN_SHOULD_SKIP_DEPRECATED
/** /**
......
...@@ -7373,15 +7373,15 @@ void Dblqh::scanLockReleasedLab(Signal* signal) ...@@ -7373,15 +7373,15 @@ void Dblqh::scanLockReleasedLab(Signal* signal)
scanptr.p->m_curr_batch_size_rows = 0; scanptr.p->m_curr_batch_size_rows = 0;
scanptr.p->m_curr_batch_size_bytes = 0; scanptr.p->m_curr_batch_size_bytes = 0;
closeScanLab(signal); closeScanLab(signal);
} else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
jam();
closeScanLab(signal);
return;
} else if (scanptr.p->check_scan_batch_completed() && } else if (scanptr.p->check_scan_batch_completed() &&
scanptr.p->scanLockHold != ZTRUE) { scanptr.p->scanLockHold != ZTRUE) {
jam(); jam();
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ; scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
sendScanFragConf(signal, ZFALSE); sendScanFragConf(signal, ZFALSE);
} else if (scanptr.p->m_last_row && !scanptr.p->scanLockHold) {
jam();
closeScanLab(signal);
return;
} else { } else {
jam(); jam();
/* /*
......
...@@ -7047,6 +7047,18 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal, ...@@ -7047,6 +7047,18 @@ void Dbtc::checkScanActiveInFailedLqh(Signal* signal,
found = true; found = true;
} }
} }
ScanFragList deliv(c_scan_frag_pool, scanptr.p->m_delivered_scan_frags);
for(deliv.first(ptr); !ptr.isNull(); deliv.next(ptr))
{
jam();
if (refToNode(ptr.p->lqhBlockref) == failedNodeId)
{
jam();
found = true;
break;
}
}
} }
if(found){ if(found){
jam(); jam();
......
...@@ -26,12 +26,12 @@ public: ...@@ -26,12 +26,12 @@ public:
void init() { m_confs.clear(); m_nRefs = 0; } void init() { m_confs.clear(); m_nRefs = 0; }
template<typename SignalClass> template<typename SignalClass>
void init(SafeCounterManager& mgr, bool init(SafeCounterManager& mgr,
NodeReceiverGroup rg, Uint16 GSN, Uint32 senderData) NodeReceiverGroup rg, Uint16 GSN, Uint32 senderData)
{ {
init(); init();
SafeCounter tmp(mgr, m_sc); SafeCounter tmp(mgr, m_sc);
tmp.init<SignalClass>(rg, GSN, senderData); return tmp.init<SignalClass>(rg, GSN, senderData);
} }
bool ignoreRef(SafeCounterManager& mgr, Uint32 nodeId) bool ignoreRef(SafeCounterManager& mgr, Uint32 nodeId)
......
...@@ -230,10 +230,13 @@ inline ...@@ -230,10 +230,13 @@ inline
bool bool
SafeCounter::init(NodeReceiverGroup rg, Uint16 GSN, Uint32 senderData){ SafeCounter::init(NodeReceiverGroup rg, Uint16 GSN, Uint32 senderData){
bool b = init<Ref>(rg.m_block, GSN, senderData); if (init<Ref>(rg.m_block, GSN, senderData))
{
m_nodes = rg.m_nodes; m_nodes = rg.m_nodes;
m_count = m_nodes.count(); m_count = m_nodes.count();
return b; return true;
}
return false;
} }
template<typename Ref> template<typename Ref>
...@@ -241,10 +244,13 @@ inline ...@@ -241,10 +244,13 @@ inline
bool bool
SafeCounter::init(NodeReceiverGroup rg, Uint32 senderData){ SafeCounter::init(NodeReceiverGroup rg, Uint32 senderData){
bool b = init<Ref>(rg.m_block, Ref::GSN, senderData); if (init<Ref>(rg.m_block, Ref::GSN, senderData))
{
m_nodes = rg.m_nodes; m_nodes = rg.m_nodes;
m_count = m_nodes.count(); m_count = m_nodes.count();
return b; return true;
}
return false;
} }
inline inline
......
...@@ -30,6 +30,7 @@ extern my_bool opt_core; ...@@ -30,6 +30,7 @@ extern my_bool opt_core;
#define MAX_LINE_LENGTH 255 #define MAX_LINE_LENGTH 255
#define KEY_INTERNAL 0 #define KEY_INTERNAL 0
#define MAX_INT_RNIL 0xfffffeff #define MAX_INT_RNIL 0xfffffeff
#define MAX_PORT_NO 65535
#define _STR_VALUE(x) #x #define _STR_VALUE(x) #x
#define STR_VALUE(x) _STR_VALUE(x) #define STR_VALUE(x) _STR_VALUE(x)
...@@ -422,7 +423,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -422,7 +423,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT, ConfigInfo::CI_INT,
UNDEFINED, UNDEFINED,
"1", "1",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_PORT_NO) },
{ {
CFG_DB_NO_REPLICAS, CFG_DB_NO_REPLICAS,
...@@ -1462,7 +1463,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1462,7 +1463,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT, ConfigInfo::CI_INT,
NDB_PORT, NDB_PORT,
"0", "0",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_PORT_NO) },
{ {
KEY_INTERNAL, KEY_INTERNAL,
...@@ -1474,7 +1475,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1474,7 +1475,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT, ConfigInfo::CI_INT,
UNDEFINED, UNDEFINED,
"0", "0",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_PORT_NO) },
{ {
CFG_NODE_ARBIT_RANK, CFG_NODE_ARBIT_RANK,
...@@ -1616,7 +1617,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1616,7 +1617,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT, ConfigInfo::CI_INT,
MANDATORY, MANDATORY,
"0", "0",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_PORT_NO) },
{ {
CFG_TCP_SEND_BUFFER_SIZE, CFG_TCP_SEND_BUFFER_SIZE,
...@@ -1722,7 +1723,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1722,7 +1723,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT, ConfigInfo::CI_INT,
MANDATORY, MANDATORY,
"0", "0",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_PORT_NO) },
{ {
CFG_SHM_SIGNUM, CFG_SHM_SIGNUM,
...@@ -1944,7 +1945,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1944,7 +1945,7 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
ConfigInfo::CI_INT, ConfigInfo::CI_INT,
MANDATORY, MANDATORY,
"0", "0",
STR_VALUE(MAX_INT_RNIL) }, STR_VALUE(MAX_PORT_NO) },
{ {
CFG_SCI_HOST1_ID_0, CFG_SCI_HOST1_ID_0,
......
...@@ -121,7 +121,15 @@ NdbReceiver::calculate_batch_size(Uint32 key_size, ...@@ -121,7 +121,15 @@ NdbReceiver::calculate_batch_size(Uint32 key_size,
* no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
* batch. * batch.
*/ */
if (batch_size == 0)
{
batch_byte_size= max_batch_byte_size; batch_byte_size= max_batch_byte_size;
}
else
{
batch_byte_size= batch_size * tot_size;
}
if (batch_byte_size * parallelism > max_scan_batch_size) { if (batch_byte_size * parallelism > max_scan_batch_size) {
batch_byte_size= max_scan_batch_size / parallelism; batch_byte_size= max_scan_batch_size / parallelism;
} }
......
...@@ -117,7 +117,8 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection) ...@@ -117,7 +117,8 @@ NdbScanOperation::init(const NdbTableImpl* tab, NdbTransaction* myConnection)
int int
NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
Uint32 scan_flags, Uint32 scan_flags,
Uint32 parallel) Uint32 parallel,
Uint32 batch)
{ {
m_ordered = m_descending = false; m_ordered = m_descending = false;
Uint32 fragCount = m_currentTable->m_fragmentCount; Uint32 fragCount = m_currentTable->m_fragmentCount;
...@@ -182,6 +183,9 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, ...@@ -182,6 +183,9 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
if (tupScan && rangeScan) if (tupScan && rangeScan)
tupScan = false; tupScan = false;
if (rangeScan && (scan_flags & SF_OrderBy))
parallel = fragCount;
theParallelism = parallel; theParallelism = parallel;
if(fix_receivers(parallel) == -1){ if(fix_receivers(parallel) == -1){
...@@ -202,6 +206,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm, ...@@ -202,6 +206,7 @@ NdbScanOperation::readTuples(NdbScanOperation::LockMode lm,
req->tableSchemaVersion = m_accessTable->m_version; req->tableSchemaVersion = m_accessTable->m_version;
req->storedProcId = 0xFFFF; req->storedProcId = 0xFFFF;
req->buddyConPtr = theNdbCon->theBuddyConPtr; req->buddyConPtr = theNdbCon->theBuddyConPtr;
req->first_batch_size = batch; // Save user specified batch size
Uint32 reqInfo = 0; Uint32 reqInfo = 0;
ScanTabReq::setParallelism(reqInfo, parallel); ScanTabReq::setParallelism(reqInfo, parallel);
...@@ -750,13 +755,14 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr, ...@@ -750,13 +755,14 @@ int NdbScanOperation::prepareSendScan(Uint32 aTC_ConnectPtr,
* The number of records sent by each LQH is calculated and the kernel * The number of records sent by each LQH is calculated and the kernel
* is informed of this number by updating the SCAN_TABREQ signal * is informed of this number by updating the SCAN_TABREQ signal
*/ */
Uint32 batch_size, batch_byte_size, first_batch_size; ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
Uint32 batch_size = req->first_batch_size; // User specified
Uint32 batch_byte_size, first_batch_size;
theReceiver.calculate_batch_size(key_size, theReceiver.calculate_batch_size(key_size,
theParallelism, theParallelism,
batch_size, batch_size,
batch_byte_size, batch_byte_size,
first_batch_size); first_batch_size);
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
ScanTabReq::setScanBatch(req->requestInfo, batch_size); ScanTabReq::setScanBatch(req->requestInfo, batch_size);
req->batch_byte_size= batch_byte_size; req->batch_byte_size= batch_byte_size;
req->first_batch_size= first_batch_size; req->first_batch_size= first_batch_size;
...@@ -1216,13 +1222,14 @@ NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){ ...@@ -1216,13 +1222,14 @@ NdbIndexScanOperation::insertBOUNDS(Uint32 * data, Uint32 sz){
int int
NdbIndexScanOperation::readTuples(LockMode lm, NdbIndexScanOperation::readTuples(LockMode lm,
Uint32 scan_flags, Uint32 scan_flags,
Uint32 parallel) Uint32 parallel,
Uint32 batch)
{ {
const bool order_by = scan_flags & SF_OrderBy; const bool order_by = scan_flags & SF_OrderBy;
const bool order_desc = scan_flags & SF_Descending; const bool order_desc = scan_flags & SF_Descending;
const bool read_range_no = scan_flags & SF_ReadRangeNo; const bool read_range_no = scan_flags & SF_ReadRangeNo;
int res = NdbScanOperation::readTuples(lm, scan_flags, 0); int res = NdbScanOperation::readTuples(lm, scan_flags, parallel, batch);
if(!res && read_range_no) if(!res && read_range_no)
{ {
m_read_range_no = 1; m_read_range_no = 1;
...@@ -1506,6 +1513,66 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ ...@@ -1506,6 +1513,66 @@ NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){
return -1; return -1;
} }
bool holdLock = false;
if (theSCAN_TABREQ)
{
ScanTabReq * req = CAST_PTR(ScanTabReq, theSCAN_TABREQ->getDataPtrSend());
holdLock = ScanTabReq::getHoldLockFlag(req->requestInfo);
}
/**
* When using locks, force close of scan directly
*/
if (holdLock && theError.code == 0 &&
(m_sent_receivers_count + m_conf_receivers_count + m_api_receivers_count))
{
TransporterFacade * tp = TransporterFacade::instance();
NdbApiSignal tSignal(theNdb->theMyRef);
tSignal.setSignal(GSN_SCAN_NEXTREQ);
Uint32* theData = tSignal.getDataPtrSend();
Uint64 transId = theNdbCon->theTransactionId;
theData[0] = theNdbCon->theTCConPtr;
theData[1] = 1;
theData[2] = transId;
theData[3] = (Uint32) (transId >> 32);
tSignal.setLength(4);
int ret = tp->sendSignal(&tSignal, nodeId);
if (ret)
{
setErrorCode(4008);
return -1;
}
checkForceSend(forceSend);
/**
* If no receiver is outstanding...
* set it to 1 as execCLOSE_SCAN_REP resets it
*/
m_sent_receivers_count = m_sent_receivers_count ? m_sent_receivers_count : 1;
while(theError.code == 0 && (m_sent_receivers_count + m_conf_receivers_count))
{
theNdb->theImpl->theWaiter.m_node = nodeId;
theNdb->theImpl->theWaiter.m_state = WAIT_SCAN;
int return_code = theNdb->receiveResponse(WAITFOR_SCAN_TIMEOUT);
switch(return_code){
case 0:
break;
case -1:
setErrorCode(4008);
case -2:
m_api_receivers_count = 0;
m_conf_receivers_count = 0;
m_sent_receivers_count = 0;
theNdbCon->theReleaseOnClose = true;
return -1;
}
}
return 0;
}
/** /**
* Wait for outstanding * Wait for outstanding
*/ */
......
...@@ -294,6 +294,7 @@ int runRestarts(NDBT_Context* ctx, NDBT_Step* step){ ...@@ -294,6 +294,7 @@ int runRestarts(NDBT_Context* ctx, NDBT_Step* step){
} }
i++; i++;
} }
ctx->stopTest();
return result; return result;
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment