Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
7e1ea97a
Commit
7e1ea97a
authored
Jan 12, 2006
by
unknown
Browse files
Options
Browse Files
Download
Plain Diff
Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-5.1-new
into mysql.com:/space/pekka/ndb/version/my51
parents
56842e5e
c126958f
Changes
9
Expand all
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
616 additions
and
150 deletions
+616
-150
storage/ndb/include/ndbapi/NdbDictionary.hpp
storage/ndb/include/ndbapi/NdbDictionary.hpp
+2
-1
storage/ndb/include/ndbapi/NdbEventOperation.hpp
storage/ndb/include/ndbapi/NdbEventOperation.hpp
+6
-0
storage/ndb/ndbapi-examples/ndbapi_event/Makefile
storage/ndb/ndbapi-examples/ndbapi_event/Makefile
+2
-2
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
+53
-33
storage/ndb/src/ndbapi/NdbEventOperation.cpp
storage/ndb/src/ndbapi/NdbEventOperation.cpp
+5
-0
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
+468
-94
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
+46
-6
storage/ndb/test/ndbapi/test_event.cpp
storage/ndb/test/ndbapi/test_event.cpp
+4
-0
storage/ndb/test/ndbapi/test_event_merge.cpp
storage/ndb/test/ndbapi/test_event_merge.cpp
+30
-14
No files found.
storage/ndb/include/ndbapi/NdbDictionary.hpp
View file @
7e1ea97a
...
...
@@ -1051,7 +1051,8 @@ public:
_TE_CREATE
=
6
,
_TE_GCP_COMPLETE
=
7
,
_TE_CLUSTER_FAILURE
=
8
,
_TE_STOP
=
9
_TE_STOP
=
9
,
_TE_NUL
=
10
// internal (INS o DEL within same GCI)
};
#endif
/**
...
...
storage/ndb/include/ndbapi/NdbEventOperation.hpp
View file @
7e1ea97a
...
...
@@ -93,6 +93,12 @@ public:
* Retrieve current state of the NdbEventOperation object
*/
State
getState
();
/**
* By default events on same NdbEventOperation within same GCI
* are merged into a single event. This can be changed with
* separateEvents(true).
*/
void
separateEvents
(
bool
flag
);
/**
* Activates the NdbEventOperation to start receiving events. The
...
...
storage/ndb/ndbapi-examples/ndbapi_event/Makefile
View file @
7e1ea97a
TARGET
=
ndbapi_event
SRCS
=
ndbapi_event.cpp
OBJS
=
ndbapi_event.o
CXX
=
g++
CXX
=
g++
-g
CFLAGS
=
-c
-Wall
-fno-rtti
-fno-exceptions
CXXFLAGS
=
DEBUG
=
...
...
@@ -17,7 +17,7 @@ $(TARGET): $(OBJS)
$(CXX)
$(CXXFLAGS)
$(LFLAGS)
$(LIB_DIR)
$(OBJS)
-lndbclient
-lmysqlclient_r
-lmysys
-lmystrings
-lz
$(SYS_LIB)
-o
$(TARGET)
$(TARGET).o
:
$(SRCS)
$(CXX)
$(CFLAGS)
-I
$(INCLUDE_DIR)
-I
$(INCLUDE_DIR)
/ndbapi
$(SRCS)
$(CXX)
$(CFLAGS)
-I
$(INCLUDE_DIR)
-I
$(INCLUDE_DIR)
/ndbapi
-I
$(TOP_SRCDIR)
/include
$(SRCS)
clean
:
rm
-f
*
.o
$(TARGET)
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
View file @
7e1ea97a
...
...
@@ -58,24 +58,29 @@
/**
*
* Assume that there is a table
TAB
0 which is being updated by
* Assume that there is a table
t
0 which is being updated by
* another process (e.g. flexBench -l 0 -stdtables).
* We want to monitor what happens with columns
COL0, COL2, COL11
* We want to monitor what happens with columns
c0,c1,c2,c3.
*
* or together with the mysql client;
*
* shell> mysql -u root
* mysql> create database TEST_DB;
* mysql> use TEST_DB;
* mysql> create table TAB0 (COL0 int primary key, COL1 int, COL11 int) engine=ndb;
* mysql> create table t0 (c0 int, c1 int, c2 char(4), c3 char(4),
* primary key(c0, c2)) engine ndb charset latin1;
*
* In another window start ndbapi_event, wait until properly started
*
insert into TAB0 values (1,2,3);
insert into TAB0 values (2,2,3);
insert into TAB0 values (3,2,9);
update TAB0 set COL1=10 where COL0=1;
delete from TAB0 where COL0=1;
insert into t0 values (1, 2, 'a', 'b');
insert into t0 values (3, 4, 'c', 'd');
update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk
update t0 set c3 = 'f'; -- use scan
update t0 set c3 = 'F'; -- use scan update to 'same'
update t0 set c2 = 'g' where c0 = 1; -- update pk part
update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same'
update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK
delete from t0;
*
* you should see the data popping up in the example window
*
...
...
@@ -92,9 +97,10 @@ int myCreateEvent(Ndb* myNdb,
const
char
**
eventColumnName
,
const
int
noEventColumnName
);
int
main
()
int
main
(
int
argc
,
char
**
argv
)
{
ndb_init
();
bool
sep
=
argc
>
1
&&
strcmp
(
argv
[
1
],
"-s"
)
==
0
;
Ndb_cluster_connection
*
cluster_connection
=
new
Ndb_cluster_connection
();
// Object representing the cluster
...
...
@@ -126,13 +132,15 @@ int main()
if
(
myNdb
->
init
()
==
-
1
)
APIERROR
(
myNdb
->
getNdbError
());
const
char
*
eventName
=
"CHNG_IN_
TAB
0"
;
const
char
*
eventTableName
=
"
TAB
0"
;
const
int
noEventColumnName
=
3
;
const
char
*
eventName
=
"CHNG_IN_
t
0"
;
const
char
*
eventTableName
=
"
t
0"
;
const
int
noEventColumnName
=
4
;
const
char
*
eventColumnName
[
noEventColumnName
]
=
{
"COL0"
,
"COL1"
,
"COL11"
};
{
"c0"
,
"c1"
,
"c2"
,
"c3"
};
// Create events
myCreateEvent
(
myNdb
,
...
...
@@ -142,13 +150,14 @@ int main()
noEventColumnName
);
int
j
=
0
;
while
(
j
<
5
)
{
while
(
j
<
99
)
{
// Start "transaction" for handling events
NdbEventOperation
*
op
;
printf
(
"create EventOperation
\n
"
);
if
((
op
=
myNdb
->
createEventOperation
(
eventName
))
==
NULL
)
APIERROR
(
myNdb
->
getNdbError
());
op
->
separateEvents
(
sep
);
printf
(
"get values
\n
"
);
NdbRecAttr
*
recAttr
[
noEventColumnName
];
...
...
@@ -175,32 +184,43 @@ int main()
i
++
;
switch
(
op
->
getEventType
())
{
case
NdbDictionary
:
:
Event
::
TE_INSERT
:
printf
(
"%u INSERT
:
"
,
i
);
printf
(
"%u INSERT"
,
i
);
break
;
case
NdbDictionary
:
:
Event
::
TE_DELETE
:
printf
(
"%u DELETE
:
"
,
i
);
printf
(
"%u DELETE"
,
i
);
break
;
case
NdbDictionary
:
:
Event
::
TE_UPDATE
:
printf
(
"%u UPDATE
:
"
,
i
);
printf
(
"%u UPDATE"
,
i
);
break
;
default:
abort
();
// should not happen
}
for
(
int
i
=
1
;
i
<
noEventColumnName
;
i
++
)
{
printf
(
" gci=%d
\n
"
,
op
->
getGCI
());
printf
(
"post: "
);
for
(
int
i
=
0
;
i
<
noEventColumnName
;
i
++
)
{
if
(
recAttr
[
i
]
->
isNULL
()
>=
0
)
{
// we have a value
printf
(
" post[%u]="
,
i
);
if
(
recAttr
[
i
]
->
isNULL
()
==
0
)
// we have a non-null value
printf
(
"%u"
,
recAttr
[
i
]
->
u_32_value
());
else
// we have a null value
printf
(
"NULL"
);
if
(
recAttr
[
i
]
->
isNULL
()
==
0
)
{
// we have a non-null value
if
(
i
<
2
)
printf
(
"%-5u"
,
recAttr
[
i
]
->
u_32_value
());
else
printf
(
"%-5.4s"
,
recAttr
[
i
]
->
aRef
());
}
else
// we have a null value
printf
(
"%-5s"
,
"NULL"
);
}
else
printf
(
"%-5s"
,
"-"
);
}
printf
(
"
\n
pre : "
);
for
(
int
i
=
0
;
i
<
noEventColumnName
;
i
++
)
{
if
(
recAttrPre
[
i
]
->
isNULL
()
>=
0
)
{
// we have a value
printf
(
" pre[%u]="
,
i
);
if
(
recAttrPre
[
i
]
->
isNULL
()
==
0
)
// we have a non-null value
printf
(
"%u"
,
recAttrPre
[
i
]
->
u_32_value
());
else
// we have a null value
printf
(
"NULL"
);
}
if
(
recAttrPre
[
i
]
->
isNULL
()
==
0
)
{
// we have a non-null value
if
(
i
<
2
)
printf
(
"%-5u"
,
recAttrPre
[
i
]
->
u_32_value
());
else
printf
(
"%-5.4s"
,
recAttrPre
[
i
]
->
aRef
());
}
else
// we have a null value
printf
(
"%-5s"
,
"NULL"
);
}
else
printf
(
"%-5s"
,
"-"
);
}
printf
(
"
\n
"
);
}
...
...
storage/ndb/src/ndbapi/NdbEventOperation.cpp
View file @
7e1ea97a
...
...
@@ -38,6 +38,11 @@ NdbEventOperation::State NdbEventOperation::getState()
return
m_impl
.
getState
();
}
void
NdbEventOperation
::
separateEvents
(
bool
flag
)
{
m_impl
.
m_separateEvents
=
flag
;
}
NdbRecAttr
*
NdbEventOperation
::
getValue
(
const
char
*
colName
,
char
*
aValue
)
{
...
...
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
View file @
7e1ea97a
This diff is collapsed.
Click to expand it.
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
View file @
7e1ea97a
...
...
@@ -25,16 +25,19 @@
#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
class
NdbEventOperationImpl
;
struct
EventBufData
{
union
{
SubTableData
*
sdata
;
char
*
memory
;
Uint32
*
memory
;
};
LinearSectionPtr
ptr
[
3
];
unsigned
sz
;
NdbEventOperationImpl
*
m_event_op
;
EventBufData
*
m_next
;
// Next wrt to global order
EventBufData
*
m_next_hash
;
// Next in per-GCI hash
Uint32
m_pkhash
;
// PK hash (without op) for fast compare
};
class
EventBufData_list
...
...
@@ -116,6 +119,34 @@ void EventBufData_list::append(const EventBufData_list &list)
m_sz
+=
list
.
m_sz
;
}
// GCI bucket has also a hash over data, with key event op, table PK.
// It can only be appended to and is invalid after remove_first().
class
EventBufData_hash
{
public:
struct
Pos
{
// search result
Uint32
index
;
// index into hash array
EventBufData
*
data
;
// non-zero if found
Uint32
pkhash
;
// PK hash
};
static
Uint32
getpkhash
(
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr
[
3
]);
static
bool
getpkequal
(
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr1
[
3
],
LinearSectionPtr
ptr2
[
3
]);
void
search
(
Pos
&
hpos
,
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr
[
3
]);
void
append
(
Pos
&
hpos
,
EventBufData
*
data
);
enum
{
GCI_EVENT_HASH_SIZE
=
101
};
EventBufData
*
m_hash
[
GCI_EVENT_HASH_SIZE
];
};
inline
void
EventBufData_hash
::
append
(
Pos
&
hpos
,
EventBufData
*
data
)
{
data
->
m_next_hash
=
m_hash
[
hpos
.
index
];
m_hash
[
hpos
.
index
]
=
data
;
}
struct
Gci_container
{
enum
State
...
...
@@ -127,6 +158,7 @@ struct Gci_container
Uint32
m_gcp_complete_rep_count
;
// Remaining SUB_GCP_COMPLETE_REP until done
Uint64
m_gci
;
// GCI
EventBufData_list
m_data
;
EventBufData_hash
m_data_hash
;
};
class
NdbEventOperationImpl
:
public
NdbEventOperation
{
...
...
@@ -174,6 +206,8 @@ public:
Uint32
m_eventId
;
Uint32
m_oid
;
bool
m_separateEvents
;
EventBufData
*
m_data_item
;
void
*
m_custom_data
;
...
...
@@ -212,7 +246,6 @@ public:
void
add_op
();
void
remove_op
();
void
init_gci_containers
();
Uint32
m_active_op_count
;
// accessed from the "receive thread"
int
insertDataL
(
NdbEventOperationImpl
*
op
,
...
...
@@ -233,10 +266,15 @@ public:
NdbEventOperationImpl
*
move_data
();
// used by both user thread and receive thread
int
copy_data_alloc
(
const
SubTableData
*
const
f_sdata
,
LinearSectionPtr
f_ptr
[
3
],
EventBufData
*
ev_buf
);
// routines to copy/merge events
EventBufData
*
alloc_data
();
int
alloc_mem
(
EventBufData
*
data
,
LinearSectionPtr
ptr
[
3
]);
int
copy_data
(
const
SubTableData
*
const
sdata
,
LinearSectionPtr
ptr
[
3
],
EventBufData
*
data
);
int
merge_data
(
const
SubTableData
*
const
sdata
,
LinearSectionPtr
ptr
[
3
],
EventBufData
*
data
);
void
free_list
(
EventBufData_list
&
list
);
...
...
@@ -290,6 +328,8 @@ private:
// dropped event operations that have not yet
// been deleted
NdbEventOperationImpl
*
m_dropped_ev_op
;
Uint32
m_active_op_count
;
};
inline
...
...
storage/ndb/test/ndbapi/test_event.cpp
View file @
7e1ea97a
...
...
@@ -169,6 +169,7 @@ eventOperation(Ndb* pNdb, const NdbDictionary::Table &tab, void* pstats, int rec
g_err
<<
function
<<
"Event operation creation failed
\n
"
;
return
NDBT_FAILED
;
}
pOp
->
separateEvents
(
true
);
g_info
<<
function
<<
"get values
\n
"
;
NdbRecAttr
*
recAttr
[
1024
];
...
...
@@ -380,6 +381,7 @@ int runCreateDropEventOperation(NDBT_Context* ctx, NDBT_Step* step)
g_err
<<
"Event operation creation failed
\n
"
;
return
NDBT_FAILED
;
}
pOp
->
separateEvents
(
true
);
g_info
<<
"dropping event operation"
<<
endl
;
int
res
=
pNdb
->
dropEventOperation
(
pOp
);
...
...
@@ -550,6 +552,7 @@ int runEventApplier(NDBT_Context* ctx, NDBT_Step* step)
g_err
<<
"Event operation creation failed on %s"
<<
buf
<<
endl
;
DBUG_RETURN
(
NDBT_FAILED
);
}
pOp
->
separateEvents
(
true
);
int
i
;
int
n_columns
=
table
->
getNoOfColumns
();
...
...
@@ -1195,6 +1198,7 @@ static int createEventOperations(Ndb * ndb)
{
DBUG_RETURN
(
NDBT_FAILED
);
}
pOp
->
separateEvents
(
true
);
int
n_columns
=
pTabs
[
i
]
->
getNoOfColumns
();
for
(
int
j
=
0
;
j
<
n_columns
;
j
++
)
...
...
storage/ndb/test/ndbapi/test_event_merge.cpp
View file @
7e1ea97a
...
...
@@ -473,9 +473,9 @@ struct Op { // single or composite
Kind
kind
;
Type
type
;
Op
*
next_op
;
// within one commit
Op
*
next_com
;
// next commit chain
or next event
Op
*
next_com
;
// next commit chain
Op
*
next_gci
;
// groups commit chains (unless --separate-events)
Op
*
next_ev
;
Op
*
next_ev
;
// next event
Op
*
next_free
;
// free list
bool
free
;
// on free list
uint
num_op
;
...
...
@@ -564,6 +564,8 @@ static NdbRecAttr* g_ev_ra[2][g_maxcol]; // 0-post 1-pre
static
NdbBlob
*
g_ev_bh
[
2
][
g_maxcol
];
// 0-post 1-pre
static
Op
*
g_rec_ev
;
static
uint
g_ev_pos
[
g_maxpk
];
static
uint
g_num_gci
=
0
;
static
uint
g_num_ev
=
0
;
static
Op
*
getop
(
Op
::
Kind
a_kind
)
...
...
@@ -651,6 +653,7 @@ resetmem()
}
}
assert
(
g_usedops
==
0
);
g_num_gci
=
g_num_ev
=
0
;
}
struct
Comp
{
...
...
@@ -877,9 +880,8 @@ createeventop()
chkdb
((
g_evt_op
=
g_ndb
->
createEventOperation
(
g_evt
->
getName
(),
bsz
))
!=
0
);
#else
chkdb
((
g_evt_op
=
g_ndb
->
createEventOperation
(
g_evt
->
getName
()))
!=
0
);
#ifdef version51rbr
// available in gci merge changeset
g_evt_op
->
separateEvents
(
g_opts
.
separate_events
);
// not yet inherited
#endif
#endif
uint
i
;
for
(
i
=
0
;
i
<
ncol
();
i
++
)
{
...
...
@@ -1203,8 +1205,9 @@ makeops()
// copy to gci level
copyop
(
com_op
,
gci_op
);
tot_op
->
num_com
+=
1
;
g_num_gci
+=
1
;
}
ll1
(
"makeops: used ops = "
<<
g_usedops
);
ll1
(
"makeops: used ops = "
<<
g_usedops
<<
" com ops = "
<<
g_num_gci
);
}
static
int
...
...
@@ -1341,12 +1344,13 @@ mergeops()
gci_op2
=
gci_op2
->
next_gci
;
freeop
(
tmp_op
);
mergecnt
++
;
assert
(
g_num_gci
!=
0
);
g_num_gci
--
;
}
gci_op
=
gci_op
->
next_gci
=
gci_op2
;
}
}
ll1
(
"mergeops: used ops = "
<<
g_usedops
);
ll1
(
"mergeops: merged "
<<
mergecnt
<<
" gci entries"
);
ll1
(
"mergeops: used ops = "
<<
g_usedops
<<
" gci ops = "
<<
g_num_gci
);
return
0
;
}
...
...
@@ -1504,27 +1508,37 @@ matchevents()
static
int
matchops
()
{
ll1
(
"matchops"
);
uint
nomatch
=
0
;
Uint32
pk1
;
for
(
pk1
=
0
;
pk1
<
g_opts
.
maxpk
;
pk1
++
)
{
Op
*
tot_op
=
g_pk_op
[
pk1
];
if
(
tot_op
==
0
)
continue
;
Op
*
com_op
=
tot_op
->
next_com
;
while
(
com_op
!=
0
)
{
if
(
com_op
->
type
!=
Op
::
NUL
&&
!
com_op
->
match
)
{
Op
*
gci_op
=
tot_op
->
next_gci
;
while
(
gci_op
!=
0
)
{
if
(
gci_op
->
type
==
Op
::
NUL
)
{
ll2
(
"GCI: "
<<
*
gci_op
<<
" [skip NUL]"
);
}
else
if
(
gci_op
->
match
)
{
ll2
(
"GCI: "
<<
*
gci_op
<<
" [match OK]"
);
}
else
{
ll0
(
"GCI: "
<<
*
gci_op
);
Op
*
com_op
=
gci_op
->
next_com
;
assert
(
com_op
!=
0
);
ll0
(
"COM: "
<<
*
com_op
);
Op
*
op
=
com_op
->
next_op
;
assert
(
op
!=
0
);
while
(
op
!=
0
)
{
ll0
(
"
---
: "
<<
*
op
);
ll0
(
"
OP
: "
<<
*
op
);
op
=
op
->
next_op
;
}
ll0
(
"no matching event"
);
return
-
1
;
nomatch
++
;
}
com_op
=
com_op
->
next_com
;
gci_op
=
gci_op
->
next_gci
;
}
}
chkrc
(
nomatch
==
0
);
return
0
;
}
...
...
@@ -1619,9 +1633,10 @@ runevents()
Op
*
ev
=
getop
(
Op
::
EV
);
copyop
(
g_rec_ev
,
ev
);
last_ev
->
next_ev
=
ev
;
g_num_ev
++
;
}
}
ll1
(
"runevents: used ops = "
<<
g_usedops
);
ll1
(
"runevents: used ops = "
<<
g_usedops
<<
" events = "
<<
g_num_ev
);
return
0
;
}
...
...
@@ -1666,6 +1681,7 @@ runtest()
chkrc
(
mergeops
()
==
0
);
cmppostpre
();
chkrc
(
runevents
()
==
0
);
ll0
(
"counts: gci = "
<<
g_num_gci
<<
" ev = "
<<
g_num_ev
);
chkrc
(
matchevents
()
==
0
);
chkrc
(
matchops
()
==
0
);
chkrc
(
dropeventop
()
==
0
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment