Commit 26d18f6a authored by unknown's avatar unknown

ndb: store event operations per gci

parent cabebacd
...@@ -1117,7 +1117,7 @@ NdbEventBuffer::nextEvent() ...@@ -1117,7 +1117,7 @@ NdbEventBuffer::nextEvent()
m_available_data.remove_first(); m_available_data.remove_first();
// add it to used list // add it to used list
m_used_data.append(data); m_used_data.append_used_data(data);
#ifdef VM_TRACE #ifdef VM_TRACE
op->m_data_done_count++; op->m_data_done_count++;
...@@ -1144,6 +1144,10 @@ NdbEventBuffer::nextEvent() ...@@ -1144,6 +1144,10 @@ NdbEventBuffer::nextEvent()
(void)tBlob->atNextEvent(); (void)tBlob->atNextEvent();
tBlob = tBlob->theNext; tBlob = tBlob->theNext;
} }
EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
while (gci_ops && op->getGCI() > gci_ops->m_gci)
gci_ops = m_available_data.next_gci_ops();
assert(gci_ops && (op->getGCI() == gci_ops->m_gci));
DBUG_RETURN_EVENT(op->m_facade); DBUG_RETURN_EVENT(op->m_facade);
} }
// the next event belonged to an event op that is no // the next event belonged to an event op that is no
...@@ -1158,15 +1162,21 @@ NdbEventBuffer::nextEvent() ...@@ -1158,15 +1162,21 @@ NdbEventBuffer::nextEvent()
#ifdef VM_TRACE #ifdef VM_TRACE
m_latest_command= m_latest_command_save; m_latest_command= m_latest_command_save;
#endif #endif
// free all "per gci unique" collected operations
EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
while (gci_ops)
gci_ops = m_available_data.next_gci_ops();
DBUG_RETURN_EVENT(0); DBUG_RETURN_EVENT(0);
} }
NdbEventOperationImpl* NdbEventOperationImpl*
NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types) NdbEventBuffer::getGCIEventOperations(Uint32* iter, Uint32* event_types)
{ {
if (*iter < m_available_data.m_gci_op_count) EventBufData_list::Gci_ops *gci_ops = m_available_data.first_gci_ops();
if (*iter < gci_ops->m_gci_op_count)
{ {
EventBufData_list::Gci_op g = m_available_data.m_gci_op_list[(*iter)++]; EventBufData_list::Gci_op g = gci_ops->m_gci_op_list[(*iter)++];
if (event_types != NULL) if (event_types != NULL)
*event_types = g.event_types; *event_types = g.event_types;
return g.op; return g.op;
...@@ -1318,7 +1328,7 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep) ...@@ -1318,7 +1328,7 @@ NdbEventBuffer::execSUB_GCP_COMPLETE_REP(const SubGcpCompleteRep * const rep)
#ifdef VM_TRACE #ifdef VM_TRACE
assert(bucket->m_data.m_count); assert(bucket->m_data.m_count);
#endif #endif
m_complete_data.m_data.append(bucket->m_data); m_complete_data.m_data.append_list(&bucket->m_data, gci);
} }
reportStatus(); reportStatus();
bzero(bucket, sizeof(Gci_container)); bzero(bucket, sizeof(Gci_container));
...@@ -1389,7 +1399,7 @@ NdbEventBuffer::complete_outof_order_gcis() ...@@ -1389,7 +1399,7 @@ NdbEventBuffer::complete_outof_order_gcis()
#ifdef VM_TRACE #ifdef VM_TRACE
assert(bucket->m_data.m_count); assert(bucket->m_data.m_count);
#endif #endif
m_complete_data.m_data.append(bucket->m_data); m_complete_data.m_data.append_list(&bucket->m_data, start_gci);
#ifdef VM_TRACE #ifdef VM_TRACE
ndbout_c(" moved %lld rows -> %lld", bucket->m_data.m_count, ndbout_c(" moved %lld rows -> %lld", bucket->m_data.m_count,
m_complete_data.m_data.m_count); m_complete_data.m_data.m_count);
...@@ -1599,7 +1609,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, ...@@ -1599,7 +1609,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
data->m_event_op = op; data->m_event_op = op;
if (! is_blob_event || ! is_data_event) if (! is_blob_event || ! is_data_event)
{ {
bucket->m_data.append(data); bucket->m_data.append_data(data);
} }
else else
{ {
...@@ -1615,7 +1625,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op, ...@@ -1615,7 +1625,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
if (ret != 0) // main event was created if (ret != 0) // main event was created
{ {
main_data->m_event_op = op->theMainOp; main_data->m_event_op = op->theMainOp;
bucket->m_data.append(main_data); bucket->m_data.append_data(main_data);
if (use_hash) if (use_hash)
{ {
main_data->m_pkhash = main_hpos.pkhash; main_data->m_pkhash = main_hpos.pkhash;
...@@ -2097,7 +2107,7 @@ NdbEventBuffer::move_data() ...@@ -2097,7 +2107,7 @@ NdbEventBuffer::move_data()
if (!m_complete_data.m_data.is_empty()) if (!m_complete_data.m_data.is_empty())
{ {
// move this list to last in m_available_data // move this list to last in m_available_data
m_available_data.append(m_complete_data.m_data); m_available_data.append_list(&m_complete_data.m_data, 0);
bzero(&m_complete_data, sizeof(m_complete_data)); bzero(&m_complete_data, sizeof(m_complete_data));
} }
...@@ -2160,6 +2170,19 @@ NdbEventBuffer::free_list(EventBufData_list &list) ...@@ -2160,6 +2170,19 @@ NdbEventBuffer::free_list(EventBufData_list &list)
list.m_count = list.m_sz = 0; list.m_count = list.m_sz = 0;
} }
void EventBufData_list::append_list(EventBufData_list *list, Uint64 gci)
{
move_gci_ops(list, gci);
if (m_tail)
m_tail->m_next= list->m_head;
else
m_head= list->m_head;
m_tail= list->m_tail;
m_count+= list->m_count;
m_sz+= list->m_sz;
}
void void
EventBufData_list::add_gci_op(Gci_op g) EventBufData_list::add_gci_op(Gci_op g)
{ {
...@@ -2188,6 +2211,44 @@ EventBufData_list::add_gci_op(Gci_op g) ...@@ -2188,6 +2211,44 @@ EventBufData_list::add_gci_op(Gci_op g)
} }
} }
void
EventBufData_list::move_gci_ops(EventBufData_list *list, Uint64 gci)
{
assert(!m_is_not_multi_list);
if (!list->m_is_not_multi_list)
{
assert(gci == 0);
if (m_gci_ops_list_tail)
m_gci_ops_list_tail->m_next = list->m_gci_ops_list;
else
{
m_gci_ops_list = list->m_gci_ops_list;
}
m_gci_ops_list_tail = list->m_gci_ops_list_tail;
goto end;
}
{
Gci_ops *new_gci_ops = new Gci_ops;
if (m_gci_ops_list_tail)
m_gci_ops_list_tail->m_next = new_gci_ops;
else
{
assert(m_gci_ops_list == 0);
m_gci_ops_list = new_gci_ops;
}
m_gci_ops_list_tail = new_gci_ops;
new_gci_ops->m_gci_op_list = list->m_gci_op_list;
new_gci_ops->m_gci_op_count = list->m_gci_op_count;
new_gci_ops->m_gci = gci;
new_gci_ops->m_next = 0;
}
end:
list->m_gci_op_list = 0;
list->m_gci_ops_list_tail = 0;
list->m_gci_op_alloc = 0;
}
NdbEventOperation* NdbEventOperation*
NdbEventBuffer::createEventOperation(const char* eventName, NdbEventBuffer::createEventOperation(const char* eventName,
NdbError &theError) NdbError &theError)
......
...@@ -68,8 +68,12 @@ public: ...@@ -68,8 +68,12 @@ public:
~EventBufData_list(); ~EventBufData_list();
void remove_first(); void remove_first();
void append(EventBufData *data); // append data and insert data into Gci_op list with add_gci_op
void append(const EventBufData_list &list); void append_data(EventBufData *data);
// append data and insert data but ignore Gci_op list
void append_used_data(EventBufData *data);
// append list to another, will call move_gci_ops
void append_list(EventBufData_list *list, Uint64 gci);
int is_empty(); int is_empty();
...@@ -77,13 +81,60 @@ public: ...@@ -77,13 +81,60 @@ public:
unsigned m_count; unsigned m_count;
unsigned m_sz; unsigned m_sz;
// distinct ops per gci (assume no hash needed) /*
struct Gci_op { NdbEventOperationImpl* op; Uint32 event_types; }; distinct ops per gci (assume no hash needed)
Gci_op* m_gci_op_list;
Uint32 m_gci_op_count; list may be in 2 versions
Uint32 m_gci_op_alloc;
1. single list with on gci only
- one linear array
Gci_op *m_gci_op_list;
Uint32 m_gci_op_count;
Uint32 m_gci_op_alloc != 0;
2. multi list with several gcis
- linked list of gci's
- one linear array per gci
Gci_ops *m_gci_ops_list;
Gci_ops *m_gci_ops_list_tail;
Uint32 m_is_not_multi_list == 0;
*/
struct Gci_op // 1 + 2
{
NdbEventOperationImpl* op;
Uint32 event_types;
};
struct Gci_ops // 2
{
Uint64 m_gci;
Gci_op *m_gci_op_list;
Gci_ops *m_next;
Uint32 m_gci_op_count;
};
union
{
Gci_op *m_gci_op_list; // 1
Gci_ops *m_gci_ops_list; // 2
};
union
{
Uint32 m_gci_op_count; // 1
Gci_ops *m_gci_ops_list_tail;// 2
};
union
{
Uint32 m_gci_op_alloc; // 1
Uint32 m_is_not_multi_list; // 2
};
Gci_ops *first_gci_ops();
Gci_ops *next_gci_ops();
private: private:
// case 1 above; add Gci_op to single list
void add_gci_op(Gci_op g); void add_gci_op(Gci_op g);
// case 2 above; move single list or multi list from
// one list to another
void move_gci_ops(EventBufData_list *list, Uint64 gci);
}; };
inline inline
...@@ -92,7 +143,7 @@ EventBufData_list::EventBufData_list() ...@@ -92,7 +143,7 @@ EventBufData_list::EventBufData_list()
m_count(0), m_count(0),
m_sz(0), m_sz(0),
m_gci_op_list(NULL), m_gci_op_list(NULL),
m_gci_op_count(0), m_gci_ops_list_tail(0),
m_gci_op_alloc(0) m_gci_op_alloc(0)
{ {
} }
...@@ -100,7 +151,14 @@ EventBufData_list::EventBufData_list() ...@@ -100,7 +151,14 @@ EventBufData_list::EventBufData_list()
inline inline
EventBufData_list::~EventBufData_list() EventBufData_list::~EventBufData_list()
{ {
delete [] m_gci_op_list; if (m_is_not_multi_list)
delete [] m_gci_op_list;
else
{
Gci_ops *op = first_gci_ops();
while (op)
op = next_gci_ops();
}
} }
inline inline
...@@ -120,11 +178,8 @@ void EventBufData_list::remove_first() ...@@ -120,11 +178,8 @@ void EventBufData_list::remove_first()
} }
inline inline
void EventBufData_list::append(EventBufData *data) void EventBufData_list::append_used_data(EventBufData *data)
{ {
Gci_op g = { data->m_event_op, 1 << (Uint32)data->sdata->operation };
add_gci_op(g);
data->m_next= 0; data->m_next= 0;
if (m_tail) if (m_tail)
m_tail->m_next= data; m_tail->m_next= data;
...@@ -143,19 +198,33 @@ void EventBufData_list::append(EventBufData *data) ...@@ -143,19 +198,33 @@ void EventBufData_list::append(EventBufData *data)
} }
inline inline
void EventBufData_list::append(const EventBufData_list &list) void EventBufData_list::append_data(EventBufData *data)
{ {
Uint32 i; Gci_op g = { data->m_event_op, 1 << (Uint32)data->sdata->operation };
for (i = 0; i < list.m_gci_op_count; i++) add_gci_op(g);
add_gci_op(list.m_gci_op_list[i]);
if (m_tail) append_used_data(data);
m_tail->m_next= list.m_head; }
else
m_head= list.m_head; inline EventBufData_list::Gci_ops *
m_tail= list.m_tail; EventBufData_list::first_gci_ops()
m_count+= list.m_count; {
m_sz+= list.m_sz; assert(!m_is_not_multi_list);
return m_gci_ops_list;
}
inline EventBufData_list::Gci_ops *
EventBufData_list::next_gci_ops()
{
assert(!m_is_not_multi_list);
Gci_ops *first = m_gci_ops_list;
m_gci_ops_list = first->m_next;
if (first->m_gci_op_list)
delete [] first->m_gci_op_list;
delete first;
if (m_gci_ops_list == 0)
m_gci_ops_list_tail = 0;
return m_gci_ops_list;
} }
// GCI bucket has also a hash over data, with key event op, table PK. // GCI bucket has also a hash over data, with key event op, table PK.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment