Commit 995ac877 authored by mronstrom@mysql.com's avatar mronstrom@mysql.com

Added configuration parameters for batch size, batch byte size

and max scan batch size
Removed some debug printouts
parent 607f7a6e
...@@ -64,24 +64,30 @@ ...@@ -64,24 +64,30 @@
#define MIN_ATTRBUF ((MAX_ATTRIBUTES_IN_TABLE/24) + 1) #define MIN_ATTRBUF ((MAX_ATTRIBUTES_IN_TABLE/24) + 1)
/* /*
* Number of Records to fetch per SCAN_NEXTREQ in a scan in LQH. The * Max Number of Records to fetch per SCAN_NEXTREQ in a scan in LQH. The
* API can order a multiple of this number of records at a time since * API can order a multiple of this number of records at a time since
* fragments can be scanned in parallel. * fragments can be scanned in parallel.
*/ */
#define MAX_PARALLEL_OP_PER_SCAN 992 #define MAX_PARALLEL_OP_PER_SCAN 992
/* /*
* The default batch size. Configurable parameter.
*/
#define DEF_BATCH_SIZE 64
/*
* When calculating the number of records sent from LQH in each batch * When calculating the number of records sent from LQH in each batch
* one uses SCAN_BATCH_SIZE divided by the expected size of signals * one uses SCAN_BATCH_SIZE divided by the expected size of signals
* per row. This gives the batch size used for the scan. The NDB API * per row. This gives the batch size used for the scan. The NDB API
* will receive one batch from each node at a time so there has to be * will receive one batch from each node at a time so there has to be
* some care taken also so that the NDB API is not overloaded with * some care taken also so that the NDB API is not overloaded with
* signals. * signals.
* This parameter is configurable, this is the default value.
*/ */
#define SCAN_BATCH_SIZE 32768 #define SCAN_BATCH_SIZE 32768
/* /*
* To protect the NDB API from overload we also define a maximum total * To protect the NDB API from overload we also define a maximum total
* batch size from all nodes. This parameter should most likely be * batch size from all nodes. This parameter should most likely be
* configurable, or dependent on sendBufferSize. * configurable, or dependent on sendBufferSize.
* This parameter is configurable, this is the default value.
*/ */
#define MAX_SCAN_BATCH_SIZE 262144 #define MAX_SCAN_BATCH_SIZE 262144
/* /*
......
...@@ -121,6 +121,14 @@ ...@@ -121,6 +121,14 @@
#define CFG_REP_HEARTBEAT_INTERVAL 700 #define CFG_REP_HEARTBEAT_INTERVAL 700
/**
* API Config variables
*
*/
#define CFG_MAX_SCAN_BATCH_SIZE 800
#define CFG_BATCH_BYTE_SIZE 801
#define CFG_BATCH_SIZE 802
/** /**
* Internal * Internal
*/ */
......
...@@ -1166,6 +1166,42 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = { ...@@ -1166,6 +1166,42 @@ const ConfigInfo::ParamInfo ConfigInfo::m_ParamInfo[] = {
0, 0,
MAX_INT_RNIL }, MAX_INT_RNIL },
{
CFG_MAX_SCAN_BATCH_SIZE,
"MaxScanBatchSize",
"API",
"The maximum collective batch size for one scan",
ConfigInfo::USED,
false,
ConfigInfo::INT,
MAX_SCAN_BATCH_SIZE,
32768,
(1024*1024*16) },
{
CFG_BATCH_BYTE_SIZE,
"BatchByteSize",
"API",
"The default batch size in bytes",
ConfigInfo::USED,
false,
ConfigInfo::INT,
SCAN_BATCH_SIZE,
1024,
(1024*1024) },
{
CFG_BATCH_SIZE,
"BatchSize",
"API",
"The default batch size in number of records",
ConfigInfo::USED,
false,
ConfigInfo::INT,
DEF_BATCH_SIZE,
1,
MAX_PARALLEL_OP_PER_SCAN },
/**************************************************************************** /****************************************************************************
* MGM * MGM
***************************************************************************/ ***************************************************************************/
......
...@@ -8386,9 +8386,6 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq) ...@@ -8386,9 +8386,6 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanNumber = ~0; scanptr.p->scanNumber = ~0;
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr; scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
ndbout << "batch_size = " << scanptr.p->batch_size;
ndbout << " first_batch_size = " << scanptr.p->scanConcurrentOperations;
ndbout << endl;
if ((scanptr.p->scanConcurrentOperations == 0) || if ((scanptr.p->scanConcurrentOperations == 0) ||
(scanptr.p->scanConcurrentOperations > scanptr.p->batch_size)) { (scanptr.p->scanConcurrentOperations > scanptr.p->batch_size)) {
jam(); jam();
......
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include <NdbRecAttr.hpp> #include <NdbRecAttr.hpp>
#include <AttributeHeader.hpp> #include <AttributeHeader.hpp>
#include <NdbConnection.hpp> #include <NdbConnection.hpp>
#include <TransporterFacade.hpp>
NdbReceiver::NdbReceiver(Ndb *aNdb) : NdbReceiver::NdbReceiver(Ndb *aNdb) :
theMagicNumber(0), theMagicNumber(0),
...@@ -96,6 +97,10 @@ NdbReceiver::calculate_batch_size(Uint32 key_size, ...@@ -96,6 +97,10 @@ NdbReceiver::calculate_batch_size(Uint32 key_size,
Uint32& batch_byte_size, Uint32& batch_byte_size,
Uint32& first_batch_size) Uint32& first_batch_size)
{ {
TransporterFacade *tp= TransporterFacade::instance();
Uint32 max_scan_batch_size= tp->get_scan_batch_size();
Uint32 max_batch_byte_size= tp->get_batch_byte_size();
Uint32 max_batch_size= tp->get_batch_size();
Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead Uint32 tot_size= (key_size ? (key_size + 32) : 0); //key + signal overhead
NdbRecAttr *rec_attr= theFirstRecAttr; NdbRecAttr *rec_attr= theFirstRecAttr;
while (rec_attr != NULL) { while (rec_attr != NULL) {
...@@ -112,19 +117,19 @@ NdbReceiver::calculate_batch_size(Uint32 key_size, ...@@ -112,19 +117,19 @@ NdbReceiver::calculate_batch_size(Uint32 key_size,
* no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per * no more than MAX_SCAN_BATCH_SIZE is sent from all nodes in total per
* batch. * batch.
*/ */
batch_byte_size= SCAN_BATCH_SIZE; batch_byte_size= max_batch_byte_size;
if (SCAN_BATCH_SIZE * parallelism > MAX_SCAN_BATCH_SIZE) { if (batch_byte_size * parallelism > max_scan_batch_size) {
batch_byte_size= MAX_SCAN_BATCH_SIZE / parallelism; batch_byte_size= max_scan_batch_size / parallelism;
} }
batch_size= batch_byte_size / tot_size; batch_size= batch_byte_size / tot_size;
#ifdef VM_TRACE
ndbout << "batch_byte_size = " << batch_byte_size << " batch_size = ";
ndbout << batch_size << "tot_size = " << tot_size << endl;
#endif
if (batch_size == 0) { if (batch_size == 0) {
batch_size= 1; batch_size= 1;
} else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) { } else {
batch_size= MAX_PARALLEL_OP_PER_SCAN; if (batch_size > max_batch_size) {
batch_size= max_batch_size;
} else if (batch_size > MAX_PARALLEL_OP_PER_SCAN) {
batch_size= MAX_PARALLEL_OP_PER_SCAN;
}
} }
first_batch_size= batch_size; first_batch_size= batch_size;
return; return;
......
...@@ -550,6 +550,9 @@ TransporterFacade::TransporterFacade() : ...@@ -550,6 +550,9 @@ TransporterFacade::TransporterFacade() :
theArbitMgr = NULL; theArbitMgr = NULL;
theStartNodeId = 1; theStartNodeId = 1;
m_open_count = 0; m_open_count = 0;
m_scan_batch_size= MAX_SCAN_BATCH_SIZE;
m_batch_byte_size= SCAN_BATCH_SIZE;
m_batch_size= DEF_BATCH_SIZE;
} }
bool bool
...@@ -593,7 +596,18 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props) ...@@ -593,7 +596,18 @@ TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
iter.get(CFG_NODE_ARBIT_DELAY, &delay); iter.get(CFG_NODE_ARBIT_DELAY, &delay);
theArbitMgr->setDelay(delay); theArbitMgr->setDelay(delay);
} }
Uint32 scan_batch_size= 0;
if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
m_scan_batch_size= scan_batch_size;
}
Uint32 batch_byte_size= 0;
if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) {
m_batch_byte_size= batch_byte_size;
}
Uint32 batch_size= 0;
if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
m_batch_size= batch_size;
}
#if 0 #if 0
} }
#endif #endif
......
...@@ -113,6 +113,11 @@ public: ...@@ -113,6 +113,11 @@ public:
// Close this block number // Close this block number
int close_local(BlockNumber blockNumber); int close_local(BlockNumber blockNumber);
// Scan batch configuration parameters
Uint32 get_scan_batch_size();
Uint32 get_batch_byte_size();
Uint32 get_batch_size();
private: private:
/** /**
* Send a signal unconditional of node status (used by ClusterMgr) * Send a signal unconditional of node status (used by ClusterMgr)
...@@ -146,6 +151,11 @@ private: ...@@ -146,6 +151,11 @@ private:
void calculateSendLimit(); void calculateSendLimit();
// Scan batch configuration parameters
Uint32 m_scan_batch_size;
Uint32 m_batch_byte_size;
Uint32 m_batch_size;
// Declarations for the receive and send thread // Declarations for the receive and send thread
int theStopReceive; int theStopReceive;
...@@ -325,4 +335,24 @@ TransporterFacade::getNodeSequence(NodeId n) const { ...@@ -325,4 +335,24 @@ TransporterFacade::getNodeSequence(NodeId n) const {
return theClusterMgr->getNodeInfo(n).m_info.m_connectCount; return theClusterMgr->getNodeInfo(n).m_info.m_connectCount;
} }
inline
Uint32
TransporterFacade::get_scan_batch_size() {
return m_scan_batch_size;
}
inline
Uint32
TransporterFacade::get_batch_byte_size() {
return m_batch_byte_size;
}
inline
Uint32
TransporterFacade::get_batch_size() {
return m_batch_size;
}
#endif // TransporterFacade_H #endif // TransporterFacade_H
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment