Commit c105f1ed authored by serg@serg.mylan's avatar serg@serg.mylan

Merge bk-internal.mysql.com:/home/bk/mysql-4.1/

into serg.mylan:/usr/home/serg/Abk/mysql-4.1
parents 4665cec0 29b5faf0
...@@ -1513,6 +1513,8 @@ fil_decr_pending_ibuf_merges( ...@@ -1513,6 +1513,8 @@ fil_decr_pending_ibuf_merges(
mutex_exit(&(system->mutex)); mutex_exit(&(system->mutex));
} }
/************************************************************
Creates the database directory for a table if it does not exist yet. */
static static
void void
fil_create_directory_for_tablename( fil_create_directory_for_tablename(
......
...@@ -1450,9 +1450,11 @@ then ...@@ -1450,9 +1450,11 @@ then
then then
echo "Starting ndbcluster" echo "Starting ndbcluster"
./ndb/ndbcluster --port-base=$NDBCLUSTER_PORT --small --diskless --initial --data-dir=$MYSQL_TEST_DIR/var || exit 1 ./ndb/ndbcluster --port-base=$NDBCLUSTER_PORT --small --diskless --initial --data-dir=$MYSQL_TEST_DIR/var || exit 1
export NDB_CONNECTSTRING="host=localhost:$NDBCLUSTER_PORT" NDB_CONNECTSTRING="host=localhost:$NDBCLUSTER_PORT"
export NDB_CONNECTSTRING
else else
export NDB_CONNECTSTRING="$USE_RUNNING_NDBCLUSTER" NDB_CONNECTSTRING="$USE_RUNNING_NDBCLUSTER"
export NDB_CONNECTSTRING
echo "Using ndbcluster at $NDB_CONNECTSTRING" echo "Using ndbcluster at $NDB_CONNECTSTRING"
fi fi
fi fi
......
...@@ -86,7 +86,6 @@ fs_name_1=$fs_ndb/node-1-fs ...@@ -86,7 +86,6 @@ fs_name_1=$fs_ndb/node-1-fs
fs_name_2=$fs_ndb/node-2-fs fs_name_2=$fs_ndb/node-2-fs
NDB_HOME= NDB_HOME=
export NDB_CONNECTSTRING
if [ ! -x $fsdir ]; then if [ ! -x $fsdir ]; then
echo "$fsdir missing" echo "$fsdir missing"
exit 1 exit 1
...@@ -102,7 +101,8 @@ fi ...@@ -102,7 +101,8 @@ fi
ndb_host="localhost" ndb_host="localhost"
ndb_mgmd_port=$port_base ndb_mgmd_port=$port_base
export NDB_CONNECTSTRING="host=$ndb_host:$ndb_mgmd_port" NDB_CONNECTSTRING="host=$ndb_host:$ndb_mgmd_port"
export NDB_CONNECTSTRING
start_default_ndbcluster() { start_default_ndbcluster() {
......
...@@ -15,7 +15,7 @@ col2 varchar(30) not null, ...@@ -15,7 +15,7 @@ col2 varchar(30) not null,
col3 varchar (20) not null, col3 varchar (20) not null,
col4 varchar(4) not null, col4 varchar(4) not null,
col5 enum('PENDING', 'ACTIVE', 'DISABLED') not null, col5 enum('PENDING', 'ACTIVE', 'DISABLED') not null,
col6 int not null, to_be_deleted int); col6 int not null, to_be_deleted int) ENGINE=ndbcluster;
insert into t1 values (2,4,3,5,"PENDING",1,7); insert into t1 values (2,4,3,5,"PENDING",1,7);
alter table t1 alter table t1
add column col4_5 varchar(20) not null after col4, add column col4_5 varchar(20) not null after col4,
......
...@@ -6,20 +6,20 @@ attr2 INT, ...@@ -6,20 +6,20 @@ attr2 INT,
attr3 VARCHAR(10) attr3 VARCHAR(10)
) ENGINE=ndbcluster; ) ENGINE=ndbcluster;
INSERT INTO t1 VALUES (9410,9412, NULL, '9412'), (9411,9413, 17, '9413'); INSERT INTO t1 VALUES (9410,9412, NULL, '9412'), (9411,9413, 17, '9413');
SELECT pk1 FROM t1; SELECT pk1 FROM t1 ORDER BY pk1;
pk1 pk1
9410 9410
9411 9411
SELECT * FROM t1; SELECT * FROM t1 ORDER BY pk1;
pk1 attr1 attr2 attr3 pk1 attr1 attr2 attr3
9410 9412 NULL 9412 9410 9412 NULL 9412
9411 9413 17 9413 9411 9413 17 9413
SELECT t1.* FROM t1; SELECT t1.* FROM t1 ORDER BY pk1;
pk1 attr1 attr2 attr3 pk1 attr1 attr2 attr3
9410 9412 NULL 9412 9410 9412 NULL 9412
9411 9413 17 9413 9411 9413 17 9413
UPDATE t1 SET attr1=1 WHERE pk1=9410; UPDATE t1 SET attr1=1 WHERE pk1=9410;
SELECT * FROM t1; SELECT * FROM t1 ORDER BY pk1;
pk1 attr1 attr2 attr3 pk1 attr1 attr2 attr3
9410 1 NULL 9412 9410 1 NULL 9412
9411 9413 17 9413 9411 9413 17 9413
...@@ -115,13 +115,17 @@ SELECT * FROM t1; ...@@ -115,13 +115,17 @@ SELECT * FROM t1;
id id2 id id2
1234 7890 1234 7890
DELETE FROM t1; DELETE FROM t1;
INSERT INTO t1 values(3456, 7890), (3456, 7890), (3456, 7890); INSERT INTO t1 values(3456, 7890), (3456, 7890), (3456, 7890), (3454, 7890);
SELECT * FROM t1; SELECT * FROM t1 ORDER BY id;
id id2 id id2
3454 7890
3456 7890 3456 7890
3456 7890 3456 7890
3456 7890 3456 7890
DELETE FROM t1 WHERE id = 3456; DELETE FROM t1 WHERE id = 3456;
SELECT * FROM t1 ORDER BY id;
id id2
3454 7890
DROP TABLE t1; DROP TABLE t1;
CREATE TABLE t1 ( CREATE TABLE t1 (
pk1 INT NOT NULL PRIMARY KEY, pk1 INT NOT NULL PRIMARY KEY,
......
...@@ -11,6 +11,11 @@ x y ...@@ -11,6 +11,11 @@ x y
2 two 2 two
start transaction; start transaction;
insert into t1 values (3,'three'); insert into t1 values (3,'three');
select * from t1 order by x;
x y
1 one
2 two
3 three
start transaction; start transaction;
select * from t1 order by x; select * from t1 order by x;
x y x y
......
...@@ -29,7 +29,7 @@ col2 varchar(30) not null, ...@@ -29,7 +29,7 @@ col2 varchar(30) not null,
col3 varchar (20) not null, col3 varchar (20) not null,
col4 varchar(4) not null, col4 varchar(4) not null,
col5 enum('PENDING', 'ACTIVE', 'DISABLED') not null, col5 enum('PENDING', 'ACTIVE', 'DISABLED') not null,
col6 int not null, to_be_deleted int); col6 int not null, to_be_deleted int) ENGINE=ndbcluster;
insert into t1 values (2,4,3,5,"PENDING",1,7); insert into t1 values (2,4,3,5,"PENDING",1,7);
alter table t1 alter table t1
add column col4_5 varchar(20) not null after col4, add column col4_5 varchar(20) not null after col4,
......
...@@ -21,13 +21,13 @@ CREATE TABLE t1 ( ...@@ -21,13 +21,13 @@ CREATE TABLE t1 (
INSERT INTO t1 VALUES (9410,9412, NULL, '9412'), (9411,9413, 17, '9413'); INSERT INTO t1 VALUES (9410,9412, NULL, '9412'), (9411,9413, 17, '9413');
SELECT pk1 FROM t1; SELECT pk1 FROM t1 ORDER BY pk1;
SELECT * FROM t1; SELECT * FROM t1 ORDER BY pk1;
SELECT t1.* FROM t1; SELECT t1.* FROM t1 ORDER BY pk1;
# Update on record by primary key # Update on record by primary key
UPDATE t1 SET attr1=1 WHERE pk1=9410; UPDATE t1 SET attr1=1 WHERE pk1=9410;
SELECT * FROM t1; SELECT * FROM t1 ORDER BY pk1;
# Update primary key # Update primary key
UPDATE t1 SET pk1=2 WHERE attr1=1; UPDATE t1 SET pk1=2 WHERE attr1=1;
...@@ -85,9 +85,10 @@ UPDATE t1 SET id=1234 WHERE id2=7890; ...@@ -85,9 +85,10 @@ UPDATE t1 SET id=1234 WHERE id2=7890;
SELECT * FROM t1; SELECT * FROM t1;
DELETE FROM t1; DELETE FROM t1;
INSERT INTO t1 values(3456, 7890), (3456, 7890), (3456, 7890); INSERT INTO t1 values(3456, 7890), (3456, 7890), (3456, 7890), (3454, 7890);
SELECT * FROM t1; SELECT * FROM t1 ORDER BY id;
DELETE FROM t1 WHERE id = 3456; DELETE FROM t1 WHERE id = 3456;
SELECT * FROM t1 ORDER BY id;
DROP TABLE t1; DROP TABLE t1;
......
...@@ -25,10 +25,13 @@ connection con2; ...@@ -25,10 +25,13 @@ connection con2;
select * from t1 order by x; select * from t1 order by x;
connection con1; connection con1;
start transaction; insert into t1 values (3,'three'); start transaction;
insert into t1 values (3,'three');
select * from t1 order by x;
connection con2; connection con2;
start transaction; select * from t1 order by x; start transaction;
select * from t1 order by x;
connection con1; connection con1;
commit; commit;
......
...@@ -23,6 +23,7 @@ ndbapi/NdbReceiver.hpp \ ...@@ -23,6 +23,7 @@ ndbapi/NdbReceiver.hpp \
ndbapi/NdbResultSet.hpp \ ndbapi/NdbResultSet.hpp \
ndbapi/NdbScanFilter.hpp \ ndbapi/NdbScanFilter.hpp \
ndbapi/NdbScanOperation.hpp \ ndbapi/NdbScanOperation.hpp \
ndbapi/NdbIndexScanOperation.hpp \
ndbapi/ndberror.h ndbapi/ndberror.h
mgmapiinclude_HEADERS = \ mgmapiinclude_HEADERS = \
......
...@@ -762,7 +762,7 @@ BitmaskPOD<size>::overlaps(BitmaskPOD<size> that) ...@@ -762,7 +762,7 @@ BitmaskPOD<size>::overlaps(BitmaskPOD<size> that)
template <unsigned size> template <unsigned size>
class Bitmask : public BitmaskPOD<size> { class Bitmask : public BitmaskPOD<size> {
public: public:
Bitmask() { clear();} Bitmask() { this->clear();}
}; };
#endif #endif
...@@ -218,6 +218,7 @@ ndbout << "Ptr: " << ptr.p->word32 << " \tIndex: " << tmp_string << " \tValue: " ...@@ -218,6 +218,7 @@ ndbout << "Ptr: " << ptr.p->word32 << " \tIndex: " << tmp_string << " \tValue: "
#define ZREL_FRAG 6 #define ZREL_FRAG 6
#define ZREL_DIR 7 #define ZREL_DIR 7
#define ZREPORT_MEMORY_USAGE 8 #define ZREPORT_MEMORY_USAGE 8
#define ZLCP_OP_WRITE_RT_BREAK 9
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
/* ERROR CODES */ /* ERROR CODES */
...@@ -1190,6 +1191,7 @@ private: ...@@ -1190,6 +1191,7 @@ private:
void zpagesize_error(const char* where); void zpagesize_error(const char* where);
void reportMemoryUsage(Signal* signal, int gth); void reportMemoryUsage(Signal* signal, int gth);
void lcp_write_op_to_undolog(Signal* signal);
// Initialisation // Initialisation
......
...@@ -46,13 +46,17 @@ Dbacc::remainingUndoPages(){ ...@@ -46,13 +46,17 @@ Dbacc::remainingUndoPages(){
ndbrequire(HeadPage>=TailPage); ndbrequire(HeadPage>=TailPage);
Uint32 UsedPages = HeadPage - TailPage; Uint32 UsedPages = HeadPage - TailPage;
Uint32 Remaining = cundopagesize - UsedPages; Int32 Remaining = cundopagesize - UsedPages;
// There can not be more than cundopagesize remaining // There can not be more than cundopagesize remaining
ndbrequire(Remaining<=cundopagesize); if (Remaining <= 0){
// No more undolog, crash node
progError(__LINE__,
ERR_NO_MORE_UNDOLOG,
"There are more than 1Mbyte undolog writes outstanding");
}
return Remaining; return Remaining;
}//Dbacc::remainingUndoPages() }
void void
Dbacc::updateLastUndoPageIdWritten(Signal* signal, Uint32 aNewValue){ Dbacc::updateLastUndoPageIdWritten(Signal* signal, Uint32 aNewValue){
...@@ -193,6 +197,17 @@ void Dbacc::execCONTINUEB(Signal* signal) ...@@ -193,6 +197,17 @@ void Dbacc::execCONTINUEB(Signal* signal)
return; return;
} }
case ZLCP_OP_WRITE_RT_BREAK:
{
operationRecPtr.i= signal->theData[1];
fragrecptr.i= signal->theData[2];
lcpConnectptr.i= signal->theData[3];
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
ptrCheckGuard(fragrecptr, cfragmentsize, fragmentrec);
ptrCheckGuard(lcpConnectptr, clcpConnectsize, lcpConnectrec);
lcp_write_op_to_undolog(signal);
return;
}
default: default:
ndbrequire(false); ndbrequire(false);
break; break;
...@@ -7697,32 +7712,70 @@ void Dbacc::execACC_LCPREQ(Signal* signal) ...@@ -7697,32 +7712,70 @@ void Dbacc::execACC_LCPREQ(Signal* signal)
fragrecptr.p->lcpMaxOverDirIndex = fragrecptr.p->lastOverIndex; fragrecptr.p->lcpMaxOverDirIndex = fragrecptr.p->lastOverIndex;
fragrecptr.p->createLcp = ZTRUE; fragrecptr.p->createLcp = ZTRUE;
operationRecPtr.i = fragrecptr.p->lockOwnersList; operationRecPtr.i = fragrecptr.p->lockOwnersList;
while (operationRecPtr.i != RNIL) { lcp_write_op_to_undolog(signal);
}
void
Dbacc::lcp_write_op_to_undolog(Signal* signal)
{
bool delay_continueb= false;
Uint32 i, j;
for (i= 0; i < 16; i++) {
jam(); jam();
ptrCheckGuard(operationRecPtr, coprecsize, operationrec); if (remainingUndoPages() <= ZMIN_UNDO_PAGES_AT_COMMIT) {
jam();
delay_continueb= true;
break;
}
for (j= 0; j < 32; j++) {
if (operationRecPtr.i == RNIL) {
jam();
break;
}
jam();
ptrCheckGuard(operationRecPtr, coprecsize, operationrec);
if ((operationRecPtr.p->operation == ZINSERT) || if ((operationRecPtr.p->operation == ZINSERT) ||
(operationRecPtr.p->elementIsDisappeared == ZTRUE)){ (operationRecPtr.p->elementIsDisappeared == ZTRUE)){
/******************************************************************* /*******************************************************************
* Only log inserts and elements that are marked as dissapeared. * Only log inserts and elements that are marked as dissapeared.
* All other operations update the element header and that is handled * All other operations update the element header and that is handled
* when pages are written to disk * when pages are written to disk
********************************************************************/ ********************************************************************/
undopageptr.i = (cundoposition>>ZUNDOPAGEINDEXBITS) & (cundopagesize-1); undopageptr.i = (cundoposition>>ZUNDOPAGEINDEXBITS) & (cundopagesize-1);
ptrAss(undopageptr, undopage); ptrAss(undopageptr, undopage);
theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK; theadundoindex = cundoposition & ZUNDOPAGEINDEX_MASK;
tundoindex = theadundoindex + ZUNDOHEADSIZE; tundoindex = theadundoindex + ZUNDOHEADSIZE;
writeUndoOpInfo(signal);/* THE INFORMATION ABOUT ELEMENT HEADER, STORED*/
/* IN OP REC, IS WRITTEN AT UNDO PAGES */
cundoElemIndex = 0;/* DEFAULT VALUE USED BY WRITE_UNDO_HEADER SUBROTINE */
writeUndoHeader(signal, RNIL, UndoHeader::ZOP_INFO); /* WRITE THE HEAD OF THE UNDO ELEMENT */
checkUndoPages(signal); /* SEND UNDO PAGE TO DISK WHEN A GROUP OF */
/* UNDO PAGES,CURRENTLY 8, IS FILLED */
}//if
operationRecPtr.i = operationRecPtr.p->nextLockOwnerOp; writeUndoOpInfo(signal);/* THE INFORMATION ABOUT ELEMENT HEADER, STORED*/
}//while /* IN OP REC, IS WRITTEN AT UNDO PAGES */
cundoElemIndex = 0;/* DEFAULT VALUE USED BY WRITE_UNDO_HEADER SUBROTINE */
writeUndoHeader(signal, RNIL, UndoHeader::ZOP_INFO); /* WRITE THE HEAD OF THE UNDO ELEMENT */
checkUndoPages(signal); /* SEND UNDO PAGE TO DISK WHEN A GROUP OF */
/* UNDO PAGES,CURRENTLY 8, IS FILLED */
}
operationRecPtr.i = operationRecPtr.p->nextLockOwnerOp;
}
if (operationRecPtr.i == RNIL) {
jam();
break;
}
}
if (operationRecPtr.i != RNIL) {
jam();
signal->theData[0]= ZLCP_OP_WRITE_RT_BREAK;
signal->theData[1]= operationRecPtr.i;
signal->theData[2]= fragrecptr.i;
signal->theData[3]= lcpConnectptr.i;
if (delay_continueb) {
jam();
sendSignalWithDelay(cownBlockref, GSN_CONTINUEB, signal, 10, 4);
} else {
jam();
sendSignal(cownBlockref, GSN_CONTINUEB, signal, 4, JBB);
}
return;
}
signal->theData[0] = fragrecptr.p->lcpLqhPtr; signal->theData[0] = fragrecptr.p->lcpLqhPtr;
sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPSTARTED, sendSignal(lcpConnectptr.p->lcpUserblockref, GSN_ACC_LCPSTARTED,
...@@ -7735,8 +7788,7 @@ void Dbacc::execACC_LCPREQ(Signal* signal) ...@@ -7735,8 +7788,7 @@ void Dbacc::execACC_LCPREQ(Signal* signal)
signal->theData[0] = lcpConnectptr.i; signal->theData[0] = lcpConnectptr.i;
signal->theData[1] = fragrecptr.i; signal->theData[1] = fragrecptr.i;
sendSignal(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 2, JBB); sendSignal(cownBlockref, GSN_ACC_SAVE_PAGES, signal, 2, JBB);
return; }
}//Dbacc::execACC_LCPREQ()
/* ******************--------------------------------------------------------------- */ /* ******************--------------------------------------------------------------- */
/* ACC_SAVE_PAGES A GROUP OF PAGES IS ALLOCATED. THE PAGES AND OVERFLOW */ /* ACC_SAVE_PAGES A GROUP OF PAGES IS ALLOCATED. THE PAGES AND OVERFLOW */
...@@ -8595,12 +8647,6 @@ void Dbacc::checkUndoPages(Signal* signal) ...@@ -8595,12 +8647,6 @@ void Dbacc::checkUndoPages(Signal* signal)
* RECORDS IN * RECORDS IN
*/ */
Uint16 nextUndoPageId = tundoPageId + 1; Uint16 nextUndoPageId = tundoPageId + 1;
if (nextUndoPageId > (clastUndoPageIdWritten + cundopagesize)){
// No more undolog, crash node
progError(__LINE__,
ERR_NO_MORE_UNDOLOG,
"There are more than 1Mbyte undolog writes outstanding");
}
updateUndoPositionPage(signal, nextUndoPageId << ZUNDOPAGEINDEXBITS); updateUndoPositionPage(signal, nextUndoPageId << ZUNDOPAGEINDEXBITS);
if ((tundoPageId & (ZWRITE_UNDOPAGESIZE - 1)) == (ZWRITE_UNDOPAGESIZE - 1)) { if ((tundoPageId & (ZWRITE_UNDOPAGESIZE - 1)) == (ZWRITE_UNDOPAGESIZE - 1)) {
......
...@@ -998,7 +998,12 @@ public: ...@@ -998,7 +998,12 @@ public:
* It will receive max 16 tuples in each request * It will receive max 16 tuples in each request
*/ */
struct ScanFragRec { struct ScanFragRec {
ScanFragRec(){} ScanFragRec(){
stopFragTimer();
lqhBlockref = 0;
scanFragState = IDLE;
scanRec = RNIL;
}
/** /**
* ScanFragState * ScanFragState
* WAIT_GET_PRIMCONF : Waiting for DIGETPRIMCONF when starting a new * WAIT_GET_PRIMCONF : Waiting for DIGETPRIMCONF when starting a new
......
...@@ -187,7 +187,7 @@ NDB_MAIN(mgmsrv){ ...@@ -187,7 +187,7 @@ NDB_MAIN(mgmsrv){
"Please check if the port is already used,\n" "Please check if the port is already used,\n"
"(perhaps a mgmtsrvr is already running),\n" "(perhaps a mgmtsrvr is already running),\n"
"and if you are executing on the correct computer", "and if you are executing on the correct computer",
glob.interface_name, glob.port); (glob.interface_name ? glob.interface_name : "*"), glob.port);
goto error_end; goto error_end;
} }
free(glob.interface_name); free(glob.interface_name);
......
...@@ -1452,7 +1452,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb, ...@@ -1452,7 +1452,7 @@ NdbDictInterface::createOrAlterTable(Ndb & ndb,
alterTable(&tSignal, ptr) alterTable(&tSignal, ptr)
: createTable(&tSignal, ptr); : createTable(&tSignal, ptr);
if (haveAutoIncrement) { if (!alter && haveAutoIncrement) {
// if (!ndb.setAutoIncrementValue(impl.m_internalName.c_str(), autoIncrementValue)) { // if (!ndb.setAutoIncrementValue(impl.m_internalName.c_str(), autoIncrementValue)) {
if (!ndb.setAutoIncrementValue(impl.m_externalName.c_str(), autoIncrementValue)) { if (!ndb.setAutoIncrementValue(impl.m_externalName.c_str(), autoIncrementValue)) {
m_error.code = 4336; m_error.code = 4336;
......
...@@ -589,13 +589,14 @@ Ndb::releaseSignal(NdbApiSignal* aSignal) ...@@ -589,13 +589,14 @@ Ndb::releaseSignal(NdbApiSignal* aSignal)
#if defined VM_TRACE #if defined VM_TRACE
// Check that signal is not null // Check that signal is not null
assert(aSignal != NULL); assert(aSignal != NULL);
#if 0
// Check that signal is not already in list // Check that signal is not already in list
NdbApiSignal* tmp = theSignalIdleList; NdbApiSignal* tmp = theSignalIdleList;
while (tmp != NULL){ while (tmp != NULL){
assert(tmp != aSignal); assert(tmp != aSignal);
tmp = tmp->next(); tmp = tmp->next();
} }
#endif
#endif #endif
creleaseSignals++; creleaseSignals++;
aSignal->next(theSignalIdleList); aSignal->next(theSignalIdleList);
......
...@@ -34,7 +34,8 @@ public: ...@@ -34,7 +34,8 @@ public:
int records, int records,
int batch = 512, int batch = 512,
bool allowConstraintViolation = true, bool allowConstraintViolation = true,
int doSleep = 0); int doSleep = 0,
bool oneTrans = false);
int scanReadRecords(Ndb*, int scanReadRecords(Ndb*,
int records, int records,
int abort = 0, int abort = 0,
......
...@@ -29,9 +29,18 @@ ...@@ -29,9 +29,18 @@
* delete should be visible to same transaction * delete should be visible to same transaction
* *
*/ */
int runLoadTable2(NDBT_Context* ctx, NDBT_Step* step)
{
int records = ctx->getNumRecords();
HugoTransactions hugoTrans(*ctx->getTab());
if (hugoTrans.loadTable(GETNDB(step), records, 512, false, 0, true) != 0){
return NDBT_FAILED;
}
return NDBT_OK;
}
int runLoadTable(NDBT_Context* ctx, NDBT_Step* step){ int runLoadTable(NDBT_Context* ctx, NDBT_Step* step)
{
int records = ctx->getNumRecords(); int records = ctx->getNumRecords();
HugoTransactions hugoTrans(*ctx->getTab()); HugoTransactions hugoTrans(*ctx->getTab());
if (hugoTrans.loadTable(GETNDB(step), records) != 0){ if (hugoTrans.loadTable(GETNDB(step), records) != 0){
...@@ -1255,6 +1264,11 @@ TESTCASE("MassiveRollback2", ...@@ -1255,6 +1264,11 @@ TESTCASE("MassiveRollback2",
INITIALIZER(runMassiveRollback2); INITIALIZER(runMassiveRollback2);
FINALIZER(runClearTable2); FINALIZER(runClearTable2);
} }
TESTCASE("MassiveTransaction",
"Test very large insert transaction"){
INITIALIZER(runLoadTable2);
FINALIZER(runClearTable2);
}
NDBT_TESTSUITE_END(testBasic); NDBT_TESTSUITE_END(testBasic);
int main(int argc, const char** argv){ int main(int argc, const char** argv){
......
...@@ -693,12 +693,14 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -693,12 +693,14 @@ HugoTransactions::loadTable(Ndb* pNdb,
int records, int records,
int batch, int batch,
bool allowConstraintViolation, bool allowConstraintViolation,
int doSleep){ int doSleep,
bool oneTrans){
int check; int check;
int retryAttempt = 0; int retryAttempt = 0;
int retryMax = 5; int retryMax = 5;
NdbConnection *pTrans; NdbConnection *pTrans;
NdbOperation *pOp; NdbOperation *pOp;
bool first_batch = true;
const int org = batch; const int org = batch;
const int cols = tab.getNoOfColumns(); const int cols = tab.getNoOfColumns();
...@@ -707,7 +709,7 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -707,7 +709,7 @@ HugoTransactions::loadTable(Ndb* pNdb,
batch = (batch * 256); // -> 512 -> 65536k per commit batch = (batch * 256); // -> 512 -> 65536k per commit
batch = batch/bytes; // batch = batch/bytes; //
batch = batch == 0 ? 1 : batch; batch = batch == 0 ? 1 : batch;
if(batch != org){ if(batch != org){
g_info << "batch = " << org << " rowsize = " << bytes g_info << "batch = " << org << " rowsize = " << bytes
<< " -> rows/commit = " << batch << endl; << " -> rows/commit = " << batch << endl;
...@@ -715,7 +717,7 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -715,7 +717,7 @@ HugoTransactions::loadTable(Ndb* pNdb,
g_info << "|- Inserting records..." << endl; g_info << "|- Inserting records..." << endl;
for (int c=0 ; c<records ; ){ for (int c=0 ; c<records ; ){
bool closeTrans;
if (retryAttempt >= retryMax){ if (retryAttempt >= retryMax){
g_info << "Record " << c << " could not be inserted, has retried " g_info << "Record " << c << " could not be inserted, has retried "
<< retryAttempt << " times " << endl; << retryAttempt << " times " << endl;
...@@ -726,19 +728,22 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -726,19 +728,22 @@ HugoTransactions::loadTable(Ndb* pNdb,
if (doSleep > 0) if (doSleep > 0)
NdbSleep_MilliSleep(doSleep); NdbSleep_MilliSleep(doSleep);
pTrans = pNdb->startTransaction(); if (first_batch || !oneTrans) {
first_batch = false;
if (pTrans == NULL) { pTrans = pNdb->startTransaction();
const NdbError err = pNdb->getNdbError();
if (pTrans == NULL) {
const NdbError err = pNdb->getNdbError();
if (err.status == NdbError::TemporaryError){ if (err.status == NdbError::TemporaryError){
ERR(err); ERR(err);
NdbSleep_MilliSleep(50); NdbSleep_MilliSleep(50);
retryAttempt++; retryAttempt++;
continue; continue;
}
ERR(err);
return NDBT_FAILED;
} }
ERR(err);
return NDBT_FAILED;
} }
for(int b = 0; b < batch && c+b<records; b++){ for(int b = 0; b < batch && c+b<records; b++){
...@@ -768,7 +773,13 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -768,7 +773,13 @@ HugoTransactions::loadTable(Ndb* pNdb,
} }
// Execute the transaction and insert the record // Execute the transaction and insert the record
check = pTrans->execute( Commit ); if (!oneTrans || (c + batch) >= records) {
closeTrans = true;
check = pTrans->execute( Commit );
} else {
closeTrans = false;
check = pTrans->execute( NoCommit );
}
if(check == -1 ) { if(check == -1 ) {
const NdbError err = pTrans->getNdbError(); const NdbError err = pTrans->getNdbError();
pNdb->closeTransaction(pTrans); pNdb->closeTransaction(pTrans);
...@@ -811,8 +822,10 @@ HugoTransactions::loadTable(Ndb* pNdb, ...@@ -811,8 +822,10 @@ HugoTransactions::loadTable(Ndb* pNdb,
break; break;
} }
} }
else{ else{
pNdb->closeTransaction(pTrans); if (closeTrans) {
pNdb->closeTransaction(pTrans);
}
} }
// Step to next record // Step to next record
......
...@@ -3642,11 +3642,19 @@ ha_innobase::create( ...@@ -3642,11 +3642,19 @@ ha_innobase::create(
} }
if (current_thd->query != NULL) { if (current_thd->query != NULL) {
error = row_table_add_foreign_constraints(trx,
current_thd->query, norm_name);
error = convert_error_code_to_mysql(error, NULL); LEX_STRING q;
if (thd->convert_string(&q, system_charset_info,
current_thd->query,
current_thd->query_length,
current_thd->charset())) {
error = HA_ERR_OUT_OF_MEM;
} else {
error = row_table_add_foreign_constraints(trx,
q.str, norm_name);
error = convert_error_code_to_mysql(error, NULL);
}
if (error) { if (error) {
innobase_commit_low(trx); innobase_commit_low(trx);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment