Commit bff6df92 authored by unknown's avatar unknown

Merge perch.ndb.mysql.com:/home/jonas/src/51-work

into  perch.ndb.mysql.com:/home/jonas/src/mysql-5.1-new-ndb


storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp:
  Auto merged
parents 5389e3f9 3cd43eb4
...@@ -150,6 +150,7 @@ struct DropFileRef { ...@@ -150,6 +150,7 @@ struct DropFileRef {
enum ErrorCode { enum ErrorCode {
NoError = 0, NoError = 0,
Busy = 701, Busy = 701,
NotMaster = 702,
NoSuchFile = 766, NoSuchFile = 766,
DropUndoFileNotSupported = 769, DropUndoFileNotSupported = 769,
InvalidSchemaObjectVersion = 774 InvalidSchemaObjectVersion = 774
......
...@@ -13396,6 +13396,24 @@ Dbdict::execCREATE_FILE_REQ(Signal* signal){ ...@@ -13396,6 +13396,24 @@ Dbdict::execCREATE_FILE_REQ(Signal* signal){
Uint32 requestInfo = req->requestInfo; Uint32 requestInfo = req->requestInfo;
do { do {
if(getOwnNodeId() != c_masterNodeId){
jam();
ref->errorCode = CreateFileRef::NotMaster;
ref->status = 0;
ref->errorKey = 0;
ref->errorLine = __LINE__;
break;
}
if (c_blockState != BS_IDLE){
jam();
ref->errorCode = CreateFileRef::Busy;
ref->status = 0;
ref->errorKey = 0;
ref->errorLine = __LINE__;
break;
}
Ptr<SchemaTransaction> trans_ptr; Ptr<SchemaTransaction> trans_ptr;
if (! c_Trans.seize(trans_ptr)){ if (! c_Trans.seize(trans_ptr)){
ref->errorCode = CreateFileRef::Busy; ref->errorCode = CreateFileRef::Busy;
...@@ -13455,6 +13473,9 @@ Dbdict::execCREATE_FILE_REQ(Signal* signal){ ...@@ -13455,6 +13473,9 @@ Dbdict::execCREATE_FILE_REQ(Signal* signal){
tmp.init<CreateObjRef>(rg, GSN_CREATE_OBJ_REF, trans_key); tmp.init<CreateObjRef>(rg, GSN_CREATE_OBJ_REF, trans_key);
sendSignal(rg, GSN_CREATE_OBJ_REQ, signal, sendSignal(rg, GSN_CREATE_OBJ_REQ, signal,
CreateObjReq::SignalLength, JBB); CreateObjReq::SignalLength, JBB);
c_blockState = BS_CREATE_TAB;
return; return;
} while(0); } while(0);
...@@ -13480,15 +13501,6 @@ Dbdict::execCREATE_FILEGROUP_REQ(Signal* signal){ ...@@ -13480,15 +13501,6 @@ Dbdict::execCREATE_FILEGROUP_REQ(Signal* signal){
Uint32 type = req->objType; Uint32 type = req->objType;
do { do {
Ptr<SchemaTransaction> trans_ptr;
if (! c_Trans.seize(trans_ptr)){
ref->errorCode = CreateFilegroupRef::Busy;
ref->status = 0;
ref->errorKey = 0;
ref->errorLine = __LINE__;
break;
}
if(getOwnNodeId() != c_masterNodeId){ if(getOwnNodeId() != c_masterNodeId){
jam(); jam();
ref->errorCode = CreateFilegroupRef::NotMaster; ref->errorCode = CreateFilegroupRef::NotMaster;
...@@ -13506,6 +13518,15 @@ Dbdict::execCREATE_FILEGROUP_REQ(Signal* signal){ ...@@ -13506,6 +13518,15 @@ Dbdict::execCREATE_FILEGROUP_REQ(Signal* signal){
ref->errorLine = __LINE__; ref->errorLine = __LINE__;
break; break;
} }
Ptr<SchemaTransaction> trans_ptr;
if (! c_Trans.seize(trans_ptr)){
ref->errorCode = CreateFilegroupRef::Busy;
ref->status = 0;
ref->errorKey = 0;
ref->errorLine = __LINE__;
break;
}
const Uint32 trans_key = ++c_opRecordSequence; const Uint32 trans_key = ++c_opRecordSequence;
trans_ptr.p->key = trans_key; trans_ptr.p->key = trans_key;
...@@ -13554,6 +13575,9 @@ Dbdict::execCREATE_FILEGROUP_REQ(Signal* signal){ ...@@ -13554,6 +13575,9 @@ Dbdict::execCREATE_FILEGROUP_REQ(Signal* signal){
tmp.init<CreateObjRef>(rg, GSN_CREATE_OBJ_REF, trans_key); tmp.init<CreateObjRef>(rg, GSN_CREATE_OBJ_REF, trans_key);
sendSignal(rg, GSN_CREATE_OBJ_REQ, signal, sendSignal(rg, GSN_CREATE_OBJ_REQ, signal,
CreateObjReq::SignalLength, JBB); CreateObjReq::SignalLength, JBB);
c_blockState = BS_CREATE_TAB;
return; return;
} while(0); } while(0);
...@@ -13581,6 +13605,22 @@ Dbdict::execDROP_FILE_REQ(Signal* signal) ...@@ -13581,6 +13605,22 @@ Dbdict::execDROP_FILE_REQ(Signal* signal)
Uint32 version = req->file_version; Uint32 version = req->file_version;
do { do {
if(getOwnNodeId() != c_masterNodeId){
jam();
ref->errorCode = DropFileRef::NotMaster;
ref->errorKey = 0;
ref->errorLine = __LINE__;
break;
}
if (c_blockState != BS_IDLE){
jam();
ref->errorCode = DropFileRef::Busy;
ref->errorKey = 0;
ref->errorLine = __LINE__;
break;
}
Ptr<File> file_ptr; Ptr<File> file_ptr;
if (!c_file_hash.find(file_ptr, objId)) if (!c_file_hash.find(file_ptr, objId))
{ {
...@@ -13636,6 +13676,9 @@ Dbdict::execDROP_FILE_REQ(Signal* signal) ...@@ -13636,6 +13676,9 @@ Dbdict::execDROP_FILE_REQ(Signal* signal)
tmp.init<CreateObjRef>(rg, GSN_DROP_OBJ_REF, trans_key); tmp.init<CreateObjRef>(rg, GSN_DROP_OBJ_REF, trans_key);
sendSignal(rg, GSN_DROP_OBJ_REQ, signal, sendSignal(rg, GSN_DROP_OBJ_REQ, signal,
DropObjReq::SignalLength, JBB); DropObjReq::SignalLength, JBB);
c_blockState = BS_CREATE_TAB;
return; return;
} while(0); } while(0);
...@@ -13663,6 +13706,22 @@ Dbdict::execDROP_FILEGROUP_REQ(Signal* signal) ...@@ -13663,6 +13706,22 @@ Dbdict::execDROP_FILEGROUP_REQ(Signal* signal)
Uint32 version = req->filegroup_version; Uint32 version = req->filegroup_version;
do { do {
if(getOwnNodeId() != c_masterNodeId){
jam();
ref->errorCode = DropFilegroupRef::NotMaster;
ref->errorKey = 0;
ref->errorLine = __LINE__;
break;
}
if (c_blockState != BS_IDLE){
jam();
ref->errorCode = DropFilegroupRef::Busy;
ref->errorKey = 0;
ref->errorLine = __LINE__;
break;
}
Ptr<Filegroup> filegroup_ptr; Ptr<Filegroup> filegroup_ptr;
if (!c_filegroup_hash.find(filegroup_ptr, objId)) if (!c_filegroup_hash.find(filegroup_ptr, objId))
{ {
...@@ -13718,6 +13777,9 @@ Dbdict::execDROP_FILEGROUP_REQ(Signal* signal) ...@@ -13718,6 +13777,9 @@ Dbdict::execDROP_FILEGROUP_REQ(Signal* signal)
tmp.init<CreateObjRef>(rg, GSN_DROP_OBJ_REF, trans_key); tmp.init<CreateObjRef>(rg, GSN_DROP_OBJ_REF, trans_key);
sendSignal(rg, GSN_DROP_OBJ_REQ, signal, sendSignal(rg, GSN_DROP_OBJ_REQ, signal,
DropObjReq::SignalLength, JBB); DropObjReq::SignalLength, JBB);
c_blockState = BS_CREATE_TAB;
return; return;
} while(0); } while(0);
...@@ -13892,6 +13954,7 @@ Dbdict::trans_commit_complete_done(Signal* signal, ...@@ -13892,6 +13954,7 @@ Dbdict::trans_commit_complete_done(Signal* signal,
//@todo check api failed //@todo check api failed
sendSignal(trans_ptr.p->m_senderRef, GSN_CREATE_FILEGROUP_CONF, signal, sendSignal(trans_ptr.p->m_senderRef, GSN_CREATE_FILEGROUP_CONF, signal,
CreateFilegroupConf::SignalLength, JBB); CreateFilegroupConf::SignalLength, JBB);
break; break;
} }
case GSN_CREATE_FILE_REQ:{ case GSN_CREATE_FILE_REQ:{
...@@ -13935,6 +13998,7 @@ Dbdict::trans_commit_complete_done(Signal* signal, ...@@ -13935,6 +13998,7 @@ Dbdict::trans_commit_complete_done(Signal* signal,
} }
c_Trans.release(trans_ptr); c_Trans.release(trans_ptr);
ndbrequire(c_blockState == BS_CREATE_TAB);
c_blockState = BS_IDLE; c_blockState = BS_IDLE;
return; return;
} }
...@@ -14047,6 +14111,7 @@ Dbdict::trans_abort_complete_done(Signal* signal, ...@@ -14047,6 +14111,7 @@ Dbdict::trans_abort_complete_done(Signal* signal,
} }
c_Trans.release(trans_ptr); c_Trans.release(trans_ptr);
ndbrequire(c_blockState == BS_CREATE_TAB);
c_blockState = BS_IDLE; c_blockState = BS_IDLE;
return; return;
} }
......
...@@ -170,7 +170,6 @@ void Dbtup::execTUP_ABORTREQ(Signal* signal) ...@@ -170,7 +170,6 @@ void Dbtup::execTUP_ABORTREQ(Signal* signal)
/** /**
* Aborting last operation that performed ALLOC * Aborting last operation that performed ALLOC
*/ */
ndbout_c("clearing ALLOC");
tuple_ptr->m_header_bits &= ~(Uint32)Tuple_header::ALLOC; tuple_ptr->m_header_bits &= ~(Uint32)Tuple_header::ALLOC;
tuple_ptr->m_header_bits |= Tuple_header::FREED; tuple_ptr->m_header_bits |= Tuple_header::FREED;
} }
......
...@@ -473,13 +473,16 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -473,13 +473,16 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
ptrCheckGuard(regTabPtr, no_of_tablerec, tablerec); ptrCheckGuard(regTabPtr, no_of_tablerec, tablerec);
PagePtr page; PagePtr page;
Tuple_header* tuple_ptr= 0; Tuple_header* tuple_ptr= (Tuple_header*)
get_ptr(&page, &regOperPtr.p->m_tuple_location, regTabPtr.p);
bool get_page = false;
if(regOperPtr.p->op_struct.m_load_diskpage_on_commit) if(regOperPtr.p->op_struct.m_load_diskpage_on_commit)
{ {
Page_cache_client::Request req;
ndbassert(regOperPtr.p->is_first_operation() && ndbassert(regOperPtr.p->is_first_operation() &&
regOperPtr.p->is_last_operation()); regOperPtr.p->is_last_operation());
Page_cache_client::Request req;
/** /**
* Check for page * Check for page
*/ */
...@@ -490,15 +493,33 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -490,15 +493,33 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
memcpy(&req.m_page, memcpy(&req.m_page,
tmp->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key)); tmp->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
if (unlikely(regOperPtr.p->op_struct.op_type == ZDELETE &&
tmp->m_header_bits & Tuple_header::DISK_ALLOC))
{
jam();
/**
* Insert+Delete
*/
regOperPtr.p->op_struct.m_load_diskpage_on_commit = 0;
regOperPtr.p->op_struct.m_wait_log_buffer = 0;
disk_page_abort_prealloc(signal, regFragPtr.p,
&req.m_page, req.m_page.m_page_idx);
c_lgman->free_log_space(regFragPtr.p->m_logfile_group_id,
regOperPtr.p->m_undo_buffer_space);
ndbout_c("insert+delete");
goto skip_disk;
}
} }
else else
{ {
// initial delete // initial delete
ndbassert(regOperPtr.p->op_struct.op_type == ZDELETE); ndbassert(regOperPtr.p->op_struct.op_type == ZDELETE);
tuple_ptr= (Tuple_header*)
get_ptr(&page, &regOperPtr.p->m_tuple_location, regTabPtr.p);
memcpy(&req.m_page, memcpy(&req.m_page,
tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key)); tuple_ptr->get_disk_ref_ptr(regTabPtr.p), sizeof(Local_key));
ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
} }
req.m_callback.m_callbackData= regOperPtr.i; req.m_callback.m_callbackData= regOperPtr.i;
req.m_callback.m_callbackFunction = req.m_callback.m_callbackFunction =
...@@ -522,6 +543,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -522,6 +543,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
ndbrequire("NOT YET IMPLEMENTED" == 0); ndbrequire("NOT YET IMPLEMENTED" == 0);
break; break;
} }
get_page = true;
disk_page_set_dirty(*(Ptr<Page>*)&m_pgman.m_ptr); disk_page_set_dirty(*(Ptr<Page>*)&m_pgman.m_ptr);
regOperPtr.p->m_commit_disk_callback_page= res; regOperPtr.p->m_commit_disk_callback_page= res;
regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0; regOperPtr.p->op_struct.m_load_diskpage_on_commit= 0;
...@@ -555,6 +577,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -555,6 +577,7 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
tuple_ptr = (Tuple_header*) tuple_ptr = (Tuple_header*)
get_ptr(&page, &regOperPtr.p->m_tuple_location,regTabPtr.p); get_ptr(&page, &regOperPtr.p->m_tuple_location,regTabPtr.p);
} }
skip_disk:
req_struct.m_tuple_ptr = tuple_ptr; req_struct.m_tuple_ptr = tuple_ptr;
if(get_tuple_state(regOperPtr.p) == TUPLE_PREPARED) if(get_tuple_state(regOperPtr.p) == TUPLE_PREPARED)
...@@ -599,6 +622,8 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal) ...@@ -599,6 +622,8 @@ void Dbtup::execTUP_COMMITREQ(Signal* signal)
else else
{ {
removeActiveOpList(regOperPtr.p, tuple_ptr); removeActiveOpList(regOperPtr.p, tuple_ptr);
if (get_page)
ndbassert(tuple_ptr->m_header_bits & Tuple_header::DISK_PART);
dealloc_tuple(signal, gci, page.p, tuple_ptr, dealloc_tuple(signal, gci, page.p, tuple_ptr,
regOperPtr.p, regFragPtr.p, regTabPtr.p); regOperPtr.p, regFragPtr.p, regTabPtr.p);
} }
......
...@@ -1053,6 +1053,7 @@ Dbtup::disk_page_abort_prealloc_callback(Signal* signal, ...@@ -1053,6 +1053,7 @@ Dbtup::disk_page_abort_prealloc_callback(Signal* signal,
Ptr<Fragrecord> fragPtr; Ptr<Fragrecord> fragPtr;
getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p); getFragmentrec(fragPtr, pagePtr.p->m_fragment_id, tabPtr.p);
disk_page_set_dirty(pagePtr);
disk_page_abort_prealloc_callback_1(signal, fragPtr.p, pagePtr, sz); disk_page_abort_prealloc_callback_1(signal, fragPtr.p, pagePtr, sz);
} }
...@@ -1074,6 +1075,13 @@ Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal, ...@@ -1074,6 +1075,13 @@ Dbtup::disk_page_abort_prealloc_callback_1(Signal* signal,
ddassert(alloc.calc_page_free_bits(free - used) == old_idx); ddassert(alloc.calc_page_free_bits(free - used) == old_idx);
Uint32 new_idx = alloc.calc_page_free_bits(free - used + sz); Uint32 new_idx = alloc.calc_page_free_bits(free - used + sz);
#ifdef VM_TRACE
Local_key key;
key.m_page_no = pagePtr.p->m_page_no;
key.m_file_no = pagePtr.p->m_file_no;
ndbout << "disk_page_abort_prealloc_callback_1" << key << endl;
#endif
Ptr<Extent_info> extentPtr; Ptr<Extent_info> extentPtr;
c_extent_pool.getPtr(extentPtr, ext); c_extent_pool.getPtr(extentPtr, ext);
if (old_idx != new_idx) if (old_idx != new_idx)
......
...@@ -1382,8 +1382,9 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1382,8 +1382,9 @@ int Dbtup::handleInsertReq(Signal* signal,
regOperPtr.p->userpointer, regOperPtr.p->userpointer,
&regOperPtr.p->m_tuple_location); &regOperPtr.p->m_tuple_location);
((Tuple_header*)ptr)->m_operation_ptr_i= regOperPtr.i; base = (Tuple_header*)ptr;
((Tuple_header*)ptr)->m_header_bits= Tuple_header::ALLOC | base->m_operation_ptr_i= regOperPtr.i;
base->m_header_bits= Tuple_header::ALLOC |
(varsize ? Tuple_header::CHAINED_ROW : 0); (varsize ? Tuple_header::CHAINED_ROW : 0);
regOperPtr.p->m_tuple_location.m_page_no = real_page_id; regOperPtr.p->m_tuple_location.m_page_no = real_page_id;
} }
...@@ -1407,6 +1408,8 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1407,6 +1408,8 @@ int Dbtup::handleInsertReq(Signal* signal,
} }
req_struct->m_use_rowid = false; req_struct->m_use_rowid = false;
base->m_header_bits &= ~(Uint32)Tuple_header::FREE; base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
base->m_header_bits |= Tuple_header::ALLOC &
(regOperPtr.p->is_first_operation() ? ~0 : 1);
} }
else else
{ {
...@@ -1415,6 +1418,8 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1415,6 +1418,8 @@ int Dbtup::handleInsertReq(Signal* signal,
{ {
ndbout_c("no mem insert but rowid (same)"); ndbout_c("no mem insert but rowid (same)");
base->m_header_bits &= ~(Uint32)Tuple_header::FREE; base->m_header_bits &= ~(Uint32)Tuple_header::FREE;
base->m_header_bits |= Tuple_header::ALLOC &
(regOperPtr.p->is_first_operation() ? ~0 : 1);
} }
else else
{ {
...@@ -1467,7 +1472,7 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1467,7 +1472,7 @@ int Dbtup::handleInsertReq(Signal* signal,
size_change_error: size_change_error:
jam(); jam();
terrorCode = ZMEM_NOMEM_ERROR; terrorCode = ZMEM_NOMEM_ERROR;
goto disk_prealloc_error; goto exit_error;
undo_buffer_error: undo_buffer_error:
jam(); jam();
...@@ -1501,9 +1506,13 @@ int Dbtup::handleInsertReq(Signal* signal, ...@@ -1501,9 +1506,13 @@ int Dbtup::handleInsertReq(Signal* signal,
regOperPtr.p->op_struct.in_active_list = false; regOperPtr.p->op_struct.in_active_list = false;
regOperPtr.p->m_tuple_location.setNull(); regOperPtr.p->m_tuple_location.setNull();
} }
disk_prealloc_error: exit_error:
tupkeyErrorLab(signal); tupkeyErrorLab(signal);
return -1; return -1;
disk_prealloc_error:
base->m_header_bits |= Tuple_header::FREED;
goto exit_error;
} }
/* ---------------------------------------------------------------- */ /* ---------------------------------------------------------------- */
......
...@@ -4410,7 +4410,7 @@ NdbDictInterface::create_file(const NdbFileImpl & file, ...@@ -4410,7 +4410,7 @@ NdbDictInterface::create_file(const NdbFileImpl & file,
ptr[0].p = (Uint32*)m_buffer.get_data(); ptr[0].p = (Uint32*)m_buffer.get_data();
ptr[0].sz = m_buffer.length() / 4; ptr[0].sz = m_buffer.length() / 4;
int err[] = { CreateFileRef::Busy, 0}; int err[] = { CreateFileRef::Busy, CreateFileRef::NotMaster, 0};
/* /*
Send signal without time-out since creating files can take a very long Send signal without time-out since creating files can take a very long
time if the file is very big. time if the file is very big.
...@@ -4454,7 +4454,7 @@ NdbDictInterface::drop_file(const NdbFileImpl & file){ ...@@ -4454,7 +4454,7 @@ NdbDictInterface::drop_file(const NdbFileImpl & file){
req->file_id = file.m_id; req->file_id = file.m_id;
req->file_version = file.m_version; req->file_version = file.m_version;
int err[] = { DropFileRef::Busy, 0}; int err[] = { DropFileRef::Busy, DropFileRef::NotMaster, 0};
DBUG_RETURN(dictSignal(&tSignal, 0, 0, DBUG_RETURN(dictSignal(&tSignal, 0, 0,
0, // master 0, // master
WAIT_CREATE_INDX_REQ, WAIT_CREATE_INDX_REQ,
......
...@@ -46,7 +46,7 @@ ...@@ -46,7 +46,7 @@
#include <EventLogger.hpp> #include <EventLogger.hpp>
extern EventLogger g_eventLogger; extern EventLogger g_eventLogger;
static Gci_container g_empty_gci_container; static Gci_container_pod g_empty_gci_container;
static const Uint32 ACTIVE_GCI_DIRECTORY_SIZE = 4; static const Uint32 ACTIVE_GCI_DIRECTORY_SIZE = 4;
static const Uint32 ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1; static const Uint32 ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1;
...@@ -1223,12 +1223,22 @@ operator<<(NdbOut& out, const Gci_container& gci) ...@@ -1223,12 +1223,22 @@ operator<<(NdbOut& out, const Gci_container& gci)
return out; return out;
} }
static
NdbOut&
operator<<(NdbOut& out, const Gci_container_pod& gci)
{
Gci_container* ptr = (Gci_container*)&gci;
out << *ptr;
return out;
}
static static
Gci_container* Gci_container*
find_bucket_chained(Vector<Gci_container> * active, Uint64 gci) find_bucket_chained(Vector<Gci_container_pod> * active, Uint64 gci)
{ {
Uint32 pos = (gci & ACTIVE_GCI_MASK); Uint32 pos = (gci & ACTIVE_GCI_MASK);
Gci_container *bucket= active->getBase() + pos; Gci_container *bucket= ((Gci_container*)active->getBase()) + pos;
if(gci > bucket->m_gci) if(gci > bucket->m_gci)
{ {
...@@ -1237,8 +1247,9 @@ find_bucket_chained(Vector<Gci_container> * active, Uint64 gci) ...@@ -1237,8 +1247,9 @@ find_bucket_chained(Vector<Gci_container> * active, Uint64 gci)
do do
{ {
active->fill(move_pos, g_empty_gci_container); active->fill(move_pos, g_empty_gci_container);
bucket = active->getBase() + pos; // Needs to recomputed after fill // Needs to recomputed after fill
move = active->getBase() + move_pos; bucket = ((Gci_container*)active->getBase()) + pos;
move = ((Gci_container*)active->getBase()) + move_pos;
if(move->m_gcp_complete_rep_count == 0) if(move->m_gcp_complete_rep_count == 0)
{ {
memcpy(move, bucket, sizeof(Gci_container)); memcpy(move, bucket, sizeof(Gci_container));
...@@ -1269,10 +1280,10 @@ find_bucket_chained(Vector<Gci_container> * active, Uint64 gci) ...@@ -1269,10 +1280,10 @@ find_bucket_chained(Vector<Gci_container> * active, Uint64 gci)
inline inline
Gci_container* Gci_container*
find_bucket(Vector<Gci_container> * active, Uint64 gci) find_bucket(Vector<Gci_container_pod> * active, Uint64 gci)
{ {
Uint32 pos = (gci & ACTIVE_GCI_MASK); Uint32 pos = (gci & ACTIVE_GCI_MASK);
Gci_container *bucket= active->getBase() + pos; Gci_container *bucket= ((Gci_container*)active->getBase()) + pos;
if(likely(gci == bucket->m_gci)) if(likely(gci == bucket->m_gci))
return bucket; return bucket;
...@@ -1370,7 +1381,8 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep) ...@@ -1370,7 +1381,8 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep)
{ {
/** out of order something */ /** out of order something */
ndbout_c("out of order bucket: %d gci: %lld m_latestGCI: %lld", ndbout_c("out of order bucket: %d gci: %lld m_latestGCI: %lld",
bucket-m_active_gci.getBase(), gci, m_latestGCI); bucket-(Gci_container*)m_active_gci.getBase(),
gci, m_latestGCI);
bucket->m_state = Gci_container::GC_COMPLETE; bucket->m_state = Gci_container::GC_COMPLETE;
bucket->m_gcp_complete_rep_count = 1; // Prevent from being reused bucket->m_gcp_complete_rep_count = 1; // Prevent from being reused
m_latest_complete_GCI = gci; m_latest_complete_GCI = gci;
...@@ -1387,7 +1399,7 @@ NdbEventBuffer::complete_outof_order_gcis() ...@@ -1387,7 +1399,7 @@ NdbEventBuffer::complete_outof_order_gcis()
Uint64 stop_gci = m_latest_complete_GCI; Uint64 stop_gci = m_latest_complete_GCI;
const Uint32 size = m_active_gci.size(); const Uint32 size = m_active_gci.size();
Gci_container* array= m_active_gci.getBase(); Gci_container* array= (Gci_container*)m_active_gci.getBase();
ndbout_c("complete_outof_order_gcis"); ndbout_c("complete_outof_order_gcis");
for(Uint32 i = 0; i<size; i++) for(Uint32 i = 0; i<size; i++)
...@@ -1490,7 +1502,7 @@ NdbEventBuffer::completeClusterFailed() ...@@ -1490,7 +1502,7 @@ NdbEventBuffer::completeClusterFailed()
Uint32 sz= m_active_gci.size(); Uint32 sz= m_active_gci.size();
Uint64 gci= ~0; Uint64 gci= ~0;
Gci_container* bucket = 0; Gci_container* bucket = 0;
Gci_container* array = m_active_gci.getBase(); Gci_container* array = (Gci_container*)m_active_gci.getBase();
for(Uint32 i = 0; i<sz; i++) for(Uint32 i = 0; i<sz; i++)
{ {
if(array[i].m_gcp_complete_rep_count && array[i].m_gci < gci) if(array[i].m_gcp_complete_rep_count && array[i].m_gci < gci)
...@@ -2538,5 +2550,5 @@ EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr ...@@ -2538,5 +2550,5 @@ EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr
DBUG_VOID_RETURN_EVENT; DBUG_VOID_RETURN_EVENT;
} }
template class Vector<Gci_container>; template class Vector<Gci_container_pod>;
template class Vector<NdbEventBuffer::EventBufData_chunk*>; template class Vector<NdbEventBuffer::EventBufData_chunk*>;
...@@ -272,6 +272,11 @@ struct Gci_container ...@@ -272,6 +272,11 @@ struct Gci_container
EventBufData_hash m_data_hash; EventBufData_hash m_data_hash;
}; };
struct Gci_container_pod
{
char data[sizeof(Gci_container)];
};
class NdbEventOperationImpl : public NdbEventOperation { class NdbEventOperationImpl : public NdbEventOperation {
public: public:
NdbEventOperationImpl(NdbEventOperation &f, NdbEventOperationImpl(NdbEventOperation &f,
...@@ -365,7 +370,7 @@ public: ...@@ -365,7 +370,7 @@ public:
~NdbEventBuffer(); ~NdbEventBuffer();
const Uint32 &m_system_nodes; const Uint32 &m_system_nodes;
Vector<Gci_container> m_active_gci; Vector<Gci_container_pod> m_active_gci;
NdbEventOperation *createEventOperation(const char* eventName, NdbEventOperation *createEventOperation(const char* eventName,
NdbError &); NdbError &);
NdbEventOperationImpl *createEventOperation(NdbEventImpl& evnt, NdbEventOperationImpl *createEventOperation(NdbEventImpl& evnt,
......
...@@ -1034,6 +1034,38 @@ runMassiveRollback2(NDBT_Context* ctx, NDBT_Step* step){ ...@@ -1034,6 +1034,38 @@ runMassiveRollback2(NDBT_Context* ctx, NDBT_Step* step){
return result; return result;
} }
int
runMassiveRollback3(NDBT_Context* ctx, NDBT_Step* step){
int result = NDBT_OK;
HugoOperations hugoOps(*ctx->getTab());
Ndb* pNdb = GETNDB(step);
const Uint32 BATCH = 10;
const Uint32 OPS_TOTAL = 20;
const Uint32 LOOPS = 100;
for(Uint32 loop = 0; loop<LOOPS; loop++)
{
CHECK(hugoOps.startTransaction(pNdb) == 0);
bool ok = true;
for (Uint32 i = 0; i<OPS_TOTAL; i+= BATCH)
{
CHECK(hugoOps.pkInsertRecord(pNdb, i, BATCH, 0) == 0);
if (hugoOps.execute_NoCommit(pNdb) != 0)
{
ok = false;
break;
}
}
hugoOps.execute_Rollback(pNdb);
CHECK(hugoOps.closeTransaction(pNdb) == 0);
}
hugoOps.closeTransaction(pNdb);
return result;
}
/** /**
* TUP errors * TUP errors
*/ */
...@@ -1360,6 +1392,13 @@ TESTCASE("MassiveRollback2", ...@@ -1360,6 +1392,13 @@ TESTCASE("MassiveRollback2",
INITIALIZER(runMassiveRollback2); INITIALIZER(runMassiveRollback2);
FINALIZER(runClearTable2); FINALIZER(runClearTable2);
} }
TESTCASE("MassiveRollback3",
"Test rollback of 4096 operations"){
INITIALIZER(runClearTable2);
STEP(runMassiveRollback3);
STEP(runMassiveRollback3);
FINALIZER(runClearTable2);
}
TESTCASE("MassiveTransaction", TESTCASE("MassiveTransaction",
"Test very large insert transaction"){ "Test very large insert transaction"){
INITIALIZER(runLoadTable2); INITIALIZER(runLoadTable2);
......
...@@ -207,6 +207,10 @@ max-time: 500 ...@@ -207,6 +207,10 @@ max-time: 500
cmd: testBasic cmd: testBasic
args: -n MassiveRollback2 T1 T6 T13 D1 D2 args: -n MassiveRollback2 T1 T6 T13 D1 D2
max-time: 500
cmd: testBasic
args: -n MassiveRollback3 T1 D1
max-time: 500 max-time: 500
cmd: testBasic cmd: testBasic
args: -n TupError args: -n TupError
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment