Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
93ceddfe
Commit
93ceddfe
authored
Jan 25, 2006
by
pekka@mysql.com
Browse files
Options
Browse Files
Download
Plain Diff
Merge pnousiainen@bk-internal.mysql.com:/home/bk/mysql-5.1-new
into mysql.com:/space/pekka/ndb/version/my51
parents
8147bbea
98e59a44
Changes
17
Hide whitespace changes
Inline
Side-by-side
Showing
17 changed files
with
1345 additions
and
300 deletions
+1345
-300
mysql-test/t/disabled.def
mysql-test/t/disabled.def
+4
-0
sql/ha_ndbcluster.cc
sql/ha_ndbcluster.cc
+67
-36
sql/ha_ndbcluster.h
sql/ha_ndbcluster.h
+8
-0
sql/ha_ndbcluster_binlog.cc
sql/ha_ndbcluster_binlog.cc
+113
-29
storage/ndb/include/ndbapi/NdbBlob.hpp
storage/ndb/include/ndbapi/NdbBlob.hpp
+37
-1
storage/ndb/include/ndbapi/NdbDictionary.hpp
storage/ndb/include/ndbapi/NdbDictionary.hpp
+21
-1
storage/ndb/include/ndbapi/NdbEventOperation.hpp
storage/ndb/include/ndbapi/NdbEventOperation.hpp
+8
-0
storage/ndb/ndbapi-examples/ndbapi_event/Makefile
storage/ndb/ndbapi-examples/ndbapi_event/Makefile
+3
-3
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
+117
-52
storage/ndb/src/ndbapi/NdbBlob.cpp
storage/ndb/src/ndbapi/NdbBlob.cpp
+231
-40
storage/ndb/src/ndbapi/NdbDictionary.cpp
storage/ndb/src/ndbapi/NdbDictionary.cpp
+11
-0
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
+121
-11
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
+7
-1
storage/ndb/src/ndbapi/NdbEventOperation.cpp
storage/ndb/src/ndbapi/NdbEventOperation.cpp
+12
-0
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
+451
-50
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
+34
-2
storage/ndb/test/ndbapi/test_event_merge.cpp
storage/ndb/test/ndbapi/test_event_merge.cpp
+100
-74
No files found.
mysql-test/t/disabled.def
View file @
93ceddfe
...
...
@@ -29,3 +29,7 @@ ndb_autodiscover : Needs to be fixed w.r.t binlog
ndb_autodiscover2
:
Needs
to
be
fixed
w
.
r
.
t
binlog
system_mysql_db
:
Needs
fixing
system_mysql_db_fix
:
Needs
fixing
#ndb_alter_table_row : sometimes wrong error 1015!=1046
ndb_gis
:
garbled
msgs
from
corrupt
THD
* +
partitioning
problem
# vim: set filetype=conf:
sql/ha_ndbcluster.cc
View file @
93ceddfe
...
...
@@ -35,6 +35,11 @@
#include "ha_ndbcluster_binlog.h"
#ifdef ndb_dynamite
#undef assert
#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
#endif
// options from from mysqld.cc
extern
my_bool
opt_ndb_optimized_node_selection
;
extern
const
char
*
opt_ndbcluster_connectstring
;
...
...
@@ -791,10 +796,20 @@ int g_get_ndb_blobs_value(NdbBlob *ndb_blob, void *arg)
if
(
ndb_blob
->
blobsNextBlob
()
!=
NULL
)
DBUG_RETURN
(
0
);
ha_ndbcluster
*
ha
=
(
ha_ndbcluster
*
)
arg
;
DBUG_RETURN
(
ha
->
get_ndb_blobs_value
(
ndb_blob
));
int
ret
=
get_ndb_blobs_value
(
ha
->
table
,
ha
->
m_value
,
ha
->
m_blobs_buffer
,
ha
->
m_blobs_buffer_size
,
0
);
DBUG_RETURN
(
ret
);
}
int
ha_ndbcluster
::
get_ndb_blobs_value
(
NdbBlob
*
last_ndb_blob
)
/*
This routine is shared by injector. There is no common blobs buffer
so the buffer and length are passed by reference. Injector also
passes a record pointer diff.
*/
int
get_ndb_blobs_value
(
TABLE
*
table
,
NdbValue
*
value_array
,
byte
*&
buffer
,
uint
&
buffer_size
,
my_ptrdiff_t
ptrdiff
)
{
DBUG_ENTER
(
"get_ndb_blobs_value"
);
...
...
@@ -803,44 +818,51 @@ int ha_ndbcluster::get_ndb_blobs_value(NdbBlob *last_ndb_blob)
for
(
int
loop
=
0
;
loop
<=
1
;
loop
++
)
{
uint32
offset
=
0
;
for
(
uint
i
=
0
;
i
<
table
_share
->
fields
;
i
++
)
for
(
uint
i
=
0
;
i
<
table
->
s
->
fields
;
i
++
)
{
Field
*
field
=
table
->
field
[
i
];
NdbValue
value
=
m_value
[
i
];
NdbValue
value
=
value_array
[
i
];
if
(
value
.
ptr
!=
NULL
&&
(
field
->
flags
&
BLOB_FLAG
))
{
Field_blob
*
field_blob
=
(
Field_blob
*
)
field
;
NdbBlob
*
ndb_blob
=
value
.
blob
;
Uint64
blob_len
=
0
;
if
(
ndb_blob
->
getLength
(
blob_len
)
!=
0
)
DBUG_RETURN
(
-
1
);
// Align to Uint64
uint32
blob_size
=
blob_len
;
if
(
blob_size
%
8
!=
0
)
blob_size
+=
8
-
blob_size
%
8
;
if
(
loop
==
1
)
{
char
*
buf
=
m_blobs_buffer
+
offset
;
uint32
len
=
0xffffffff
;
// Max uint32
DBUG_PRINT
(
"value"
,
(
"read blob ptr=%lx len=%u"
,
buf
,
(
uint
)
blob_len
));
if
(
ndb_blob
->
readData
(
buf
,
len
)
!=
0
)
int
isNull
;
ndb_blob
->
getDefined
(
isNull
);
if
(
isNull
==
0
)
{
// XXX -1 should be allowed only for events
Uint64
blob_len
=
0
;
if
(
ndb_blob
->
getLength
(
blob_len
)
!=
0
)
DBUG_RETURN
(
-
1
);
DBUG_ASSERT
(
len
==
blob_len
);
field_blob
->
set_ptr
(
len
,
buf
);
// Align to Uint64
uint32
blob_size
=
blob_len
;
if
(
blob_size
%
8
!=
0
)
blob_size
+=
8
-
blob_size
%
8
;
if
(
loop
==
1
)
{
char
*
buf
=
buffer
+
offset
;
uint32
len
=
0xffffffff
;
// Max uint32
DBUG_PRINT
(
"info"
,
(
"read blob ptr=%p len=%u"
,
buf
,
(
uint
)
blob_len
));
if
(
ndb_blob
->
readData
(
buf
,
len
)
!=
0
)
DBUG_RETURN
(
-
1
);
DBUG_ASSERT
(
len
==
blob_len
);
// Ugly hack assumes only ptr needs to be changed
field_blob
->
ptr
+=
ptrdiff
;
field_blob
->
set_ptr
(
len
,
buf
);
field_blob
->
ptr
-=
ptrdiff
;
}
offset
+=
blob_size
;
}
offset
+=
blob_size
;
}
}
if
(
loop
==
0
&&
offset
>
m_blobs_
buffer_size
)
if
(
loop
==
0
&&
offset
>
buffer_size
)
{
my_free
(
m_blobs_
buffer
,
MYF
(
MY_ALLOW_ZERO_PTR
));
m_blobs_
buffer_size
=
0
;
DBUG_PRINT
(
"
value
"
,
(
"allocate blobs buffer size %u"
,
offset
));
m_blobs_
buffer
=
my_malloc
(
offset
,
MYF
(
MY_WME
));
if
(
m_blobs_
buffer
==
NULL
)
my_free
(
buffer
,
MYF
(
MY_ALLOW_ZERO_PTR
));
buffer_size
=
0
;
DBUG_PRINT
(
"
info
"
,
(
"allocate blobs buffer size %u"
,
offset
));
buffer
=
my_malloc
(
offset
,
MYF
(
MY_WME
));
if
(
buffer
==
NULL
)
DBUG_RETURN
(
-
1
);
m_blobs_
buffer_size
=
offset
;
buffer_size
=
offset
;
}
}
DBUG_RETURN
(
0
);
...
...
@@ -2713,14 +2735,22 @@ void ndb_unpack_record(TABLE *table, NdbValue *value,
else
{
NdbBlob
*
ndb_blob
=
(
*
value
).
blob
;
bool
isNull
=
TRUE
;
#ifndef DBUG_OFF
int
ret
=
#endif
ndb_blob
->
getNull
(
isNull
);
DBUG_ASSERT
(
ret
==
0
);
if
(
isNull
)
field
->
set_null
(
row_offset
);
int
isNull
;
ndb_blob
->
getDefined
(
isNull
);
if
(
isNull
!=
0
)
{
uint
col_no
=
ndb_blob
->
getColumn
()
->
getColumnNo
();
if
(
isNull
==
1
)
{
DBUG_PRINT
(
"info"
,(
"[%u] NULL"
,
col_no
))
field
->
set_null
(
row_offset
);
}
else
{
DBUG_PRINT
(
"info"
,(
"[%u] UNDEFINED"
,
col_no
));
bitmap_clear_bit
(
defined
,
col_no
);
}
}
}
}
}
...
...
@@ -4713,6 +4743,7 @@ int ha_ndbcluster::alter_table_name(const char *to)
NDBDICT
*
dict
=
ndb
->
getDictionary
();
const
NDBTAB
*
orig_tab
=
(
const
NDBTAB
*
)
m_table
;
DBUG_ENTER
(
"alter_table_name"
);
DBUG_PRINT
(
"info"
,
(
"from: %s to: %s"
,
orig_tab
->
getName
(),
to
));
NdbDictionary
::
Table
new_tab
=
*
orig_tab
;
new_tab
.
setName
(
to
);
...
...
sql/ha_ndbcluster.h
View file @
93ceddfe
...
...
@@ -25,6 +25,9 @@
#pragma interface
/* gcc class implementation */
#endif
/* Blob tables and events are internal to NDB and must never be accessed */
#define IS_NDB_BLOB_PREFIX(A) is_prefix(A, "NDB$BLOB")
#include <NdbApi.hpp>
#include <ndbapi_limits.h>
...
...
@@ -78,6 +81,10 @@ typedef struct ndb_index_data {
typedef
union
{
const
NdbRecAttr
*
rec
;
NdbBlob
*
blob
;
void
*
ptr
;
}
NdbValue
;
int
get_ndb_blobs_value
(
TABLE
*
table
,
NdbValue
*
value_array
,
byte
*&
buffer
,
uint
&
buffer_size
,
my_ptrdiff_t
ptrdiff
);
typedef
enum
{
NSS_INITIAL
=
0
,
NSS_DROPPED
,
...
...
@@ -114,6 +121,7 @@ typedef struct st_ndbcluster_share {
#ifdef HAVE_NDB_BINLOG
/* NDB_SHARE.flags */
#define NSF_HIDDEN_PK 1
/* table has hidden primary key */
#define NSF_BLOB_FLAG 2
/* table has blob attributes */
#define NSF_NO_BINLOG 4
/* table should not be binlogged */
#endif
...
...
sql/ha_ndbcluster_binlog.cc
View file @
93ceddfe
...
...
@@ -23,6 +23,11 @@
#include "slave.h"
#include "ha_ndbcluster_binlog.h"
#ifdef ndb_dynamite
#undef assert
#define assert(x) do { if(x) break; ::printf("%s %d: assert failed: %s\n", __FILE__, __LINE__, #x); ::fflush(stdout); ::signal(SIGABRT,SIG_DFL); ::abort(); ::kill(::getpid(),6); ::kill(::getpid(),9); } while (0)
#endif
/*
defines for cluster replication table names
*/
...
...
@@ -237,6 +242,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
DBUG_ASSERT
(
_table
!=
0
);
if
(
_table
->
s
->
primary_key
==
MAX_KEY
)
share
->
flags
|=
NSF_HIDDEN_PK
;
if
(
_table
->
s
->
blob_fields
!=
0
)
share
->
flags
|=
NSF_BLOB_FLAG
;
return
;
}
while
(
1
)
...
...
@@ -316,6 +323,8 @@ void ndbcluster_binlog_init_share(NDB_SHARE *share, TABLE *_table)
}
if
(
table
->
s
->
primary_key
==
MAX_KEY
)
share
->
flags
|=
NSF_HIDDEN_PK
;
if
(
table
->
s
->
blob_fields
!=
0
)
share
->
flags
|=
NSF_BLOB_FLAG
;
break
;
}
}
...
...
@@ -1622,6 +1631,7 @@ int ndbcluster_create_binlog_setup(Ndb *ndb, const char *key,
NDB_SHARE
*
share
)
{
DBUG_ENTER
(
"ndbcluster_create_binlog_setup"
);
DBUG_ASSERT
(
!
IS_NDB_BLOB_PREFIX
(
table_name
));
pthread_mutex_lock
(
&
ndbcluster_mutex
);
...
...
@@ -1713,6 +1723,10 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
const
char
*
event_name
,
NDB_SHARE
*
share
)
{
DBUG_ENTER
(
"ndbcluster_create_event"
);
DBUG_PRINT
(
"info"
,
(
"table=%s version=%d event=%s share=%s"
,
ndbtab
->
getName
(),
ndbtab
->
getObjectVersion
(),
event_name
,
share
?
share
->
key
:
"(nil)"
));
DBUG_ASSERT
(
!
IS_NDB_BLOB_PREFIX
(
ndbtab
->
getName
()));
if
(
!
share
)
{
DBUG_PRINT
(
"info"
,
(
"share == NULL"
));
...
...
@@ -1730,7 +1744,14 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
my_event
.
addTableEvent
(
NDBEVENT
::
TE_ALL
);
if
(
share
->
flags
&
NSF_HIDDEN_PK
)
{
/* No primary key, susbscribe for all attributes */
if
(
share
->
flags
&
NSF_BLOB_FLAG
)
{
sql_print_error
(
"NDB Binlog: logging of table %s "
"with no PK and blob attributes is not supported"
,
share
->
key
);
DBUG_RETURN
(
-
1
);
}
/* No primary key, subscribe for all attributes */
my_event
.
setReport
(
NDBEVENT
::
ER_ALL
);
DBUG_PRINT
(
"info"
,
(
"subscription all"
));
}
...
...
@@ -1749,6 +1770,8 @@ ndbcluster_create_event(Ndb *ndb, const NDBTAB *ndbtab,
DBUG_PRINT
(
"info"
,
(
"subscription all and subscribe"
));
}
}
if
(
share
->
flags
&
NSF_BLOB_FLAG
)
my_event
.
mergeEvents
(
true
);
/* add all columns to the event */
int
n_cols
=
ndbtab
->
getNoOfColumns
();
...
...
@@ -1837,6 +1860,7 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
*/
DBUG_ENTER
(
"ndbcluster_create_event_ops"
);
DBUG_ASSERT
(
!
IS_NDB_BLOB_PREFIX
(
ndbtab
->
getName
()));
DBUG_ASSERT
(
share
!=
0
);
...
...
@@ -1857,22 +1881,6 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
}
TABLE
*
table
=
share
->
table
;
if
(
table
)
{
/*
Logging of blob tables is not yet implemented, it would require:
1. setup of events also on the blob attribute tables
2. collect the pieces of the blob into one from an epoch to
provide a full blob to binlog
*/
if
(
table
->
s
->
blob_fields
)
{
sql_print_error
(
"NDB Binlog: logging of blob table %s "
"is not supported"
,
share
->
key
);
share
->
flags
|=
NSF_NO_BINLOG
;
DBUG_RETURN
(
0
);
}
}
int
do_schema_share
=
0
,
do_apply_status_share
=
0
;
int
retries
=
100
;
...
...
@@ -1910,37 +1918,64 @@ ndbcluster_create_event_ops(NDB_SHARE *share, const NDBTAB *ndbtab,
DBUG_RETURN
(
-
1
);
}
if
(
share
->
flags
&
NSF_BLOB_FLAG
)
op
->
mergeEvents
(
true
);
// currently not inherited from event
if
(
share
->
flags
&
NSF_BLOB_FLAG
)
{
/*
* Given servers S1 S2, following results in out-of-date
* event->m_tableImpl and column->m_blobTable.
*
* S1: create table t1(a int primary key);
* S2: drop table t1;
* S1: create table t2(a int primary key, b blob);
* S1: alter table t2 add x int;
* S1: alter table t2 drop x;
*
* TODO fix at right place before we get here
*/
ndb
->
getDictionary
()
->
fix_blob_events
(
ndbtab
,
event_name
);
}
int
n_columns
=
ndbtab
->
getNoOfColumns
();
int
n_fields
=
table
?
table
->
s
->
fields
:
0
;
int
n_fields
=
table
?
table
->
s
->
fields
:
0
;
// XXX ???
for
(
int
j
=
0
;
j
<
n_columns
;
j
++
)
{
const
char
*
col_name
=
ndbtab
->
getColumn
(
j
)
->
getName
();
Ndb
RecAttr
*
attr0
,
*
attr1
;
Ndb
Value
attr0
,
attr1
;
if
(
j
<
n_fields
)
{
Field
*
f
=
share
->
table
->
field
[
j
];
if
(
is_ndb_compatible_type
(
f
))
{
DBUG_PRINT
(
"info"
,
(
"%s compatible"
,
col_name
));
attr0
=
op
->
getValue
(
col_name
,
f
->
ptr
);
attr1
=
op
->
getPreValue
(
col_name
,
(
f
->
ptr
-
share
->
table
->
record
[
0
])
+
attr0
.
rec
=
op
->
getValue
(
col_name
,
f
->
ptr
);
attr1
.
rec
=
op
->
getPreValue
(
col_name
,
(
f
->
ptr
-
share
->
table
->
record
[
0
])
+
share
->
table
->
record
[
1
]);
}
else
else
if
(
!
(
f
->
flags
&
BLOB_FLAG
))
{
DBUG_PRINT
(
"info"
,
(
"%s non compatible"
,
col_name
));
attr0
=
op
->
getValue
(
col_name
);
attr1
=
op
->
getPreValue
(
col_name
);
attr0
.
rec
=
op
->
getValue
(
col_name
);
attr1
.
rec
=
op
->
getPreValue
(
col_name
);
}
else
{
DBUG_PRINT
(
"info"
,
(
"%s blob"
,
col_name
));
attr0
.
blob
=
op
->
getBlobHandle
(
col_name
);
attr1
.
blob
=
op
->
getPreBlobHandle
(
col_name
);
}
}
else
{
DBUG_PRINT
(
"info"
,
(
"%s hidden key"
,
col_name
));
attr0
=
op
->
getValue
(
col_name
);
attr1
=
op
->
getPreValue
(
col_name
);
attr0
.
rec
=
op
->
getValue
(
col_name
);
attr1
.
rec
=
op
->
getPreValue
(
col_name
);
}
share
->
ndb_value
[
0
][
j
].
rec
=
attr0
;
share
->
ndb_value
[
1
][
j
].
rec
=
attr1
;
share
->
ndb_value
[
0
][
j
].
ptr
=
attr0
.
ptr
;
share
->
ndb_value
[
1
][
j
].
ptr
=
attr1
.
ptr
;
}
op
->
setCustomData
((
void
*
)
share
);
// set before execute
share
->
op
=
op
;
// assign op in NDB_SHARE
...
...
@@ -2229,12 +2264,27 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
(saves moving data about many times)
*/
/*
for now malloc/free blobs buffer each time
TODO if possible share single permanent buffer with handlers
*/
byte
*
blobs_buffer
[
2
]
=
{
0
,
0
};
uint
blobs_buffer_size
[
2
]
=
{
0
,
0
};
switch
(
pOp
->
getEventType
())
{
case
NDBEVENT
:
:
TE_INSERT
:
row
.
n_inserts
++
;
DBUG_PRINT
(
"info"
,
(
"INSERT INTO %s"
,
share
->
key
));
{
if
(
share
->
flags
&
NSF_BLOB_FLAG
)
{
my_ptrdiff_t
ptrdiff
=
0
;
int
ret
=
get_ndb_blobs_value
(
table
,
share
->
ndb_value
[
0
],
blobs_buffer
[
0
],
blobs_buffer_size
[
0
],
ptrdiff
);
DBUG_ASSERT
(
ret
==
0
);
}
ndb_unpack_record
(
table
,
share
->
ndb_value
[
0
],
&
b
,
table
->
record
[
0
]);
trans
.
write_row
(
::
server_id
,
injector
::
transaction
::
table
(
table
,
true
),
&
b
,
n_fields
,
table
->
record
[
0
]);
...
...
@@ -2261,6 +2311,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
key
*/
if
(
share
->
flags
&
NSF_BLOB_FLAG
)
{
my_ptrdiff_t
ptrdiff
=
table
->
record
[
n
]
-
table
->
record
[
0
];
int
ret
=
get_ndb_blobs_value
(
table
,
share
->
ndb_value
[
n
],
blobs_buffer
[
n
],
blobs_buffer_size
[
n
],
ptrdiff
);
DBUG_ASSERT
(
ret
==
0
);
}
ndb_unpack_record
(
table
,
share
->
ndb_value
[
n
],
&
b
,
table
->
record
[
n
]);
print_records
(
table
,
table
->
record
[
n
]);
trans
.
delete_row
(
::
server_id
,
injector
::
transaction
::
table
(
table
,
true
),
...
...
@@ -2271,13 +2329,21 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
row
.
n_updates
++
;
DBUG_PRINT
(
"info"
,
(
"UPDATE %s"
,
share
->
key
));
{
if
(
share
->
flags
&
NSF_BLOB_FLAG
)
{
my_ptrdiff_t
ptrdiff
=
0
;
int
ret
=
get_ndb_blobs_value
(
table
,
share
->
ndb_value
[
0
],
blobs_buffer
[
0
],
blobs_buffer_size
[
0
],
ptrdiff
);
DBUG_ASSERT
(
ret
==
0
);
}
ndb_unpack_record
(
table
,
share
->
ndb_value
[
0
],
&
b
,
table
->
record
[
0
]);
print_records
(
table
,
table
->
record
[
0
]);
if
(
table
->
s
->
primary_key
!=
MAX_KEY
)
{
/*
since table has a primary key, we can
t
o a write
since table has a primary key, we can
d
o a write
using only after values
*/
trans
.
write_row
(
::
server_id
,
injector
::
transaction
::
table
(
table
,
true
),
...
...
@@ -2289,6 +2355,14 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
mysql server cannot handle the ndb hidden key and
therefore needs the before image as well
*/
if
(
share
->
flags
&
NSF_BLOB_FLAG
)
{
my_ptrdiff_t
ptrdiff
=
table
->
record
[
1
]
-
table
->
record
[
0
];
int
ret
=
get_ndb_blobs_value
(
table
,
share
->
ndb_value
[
1
],
blobs_buffer
[
1
],
blobs_buffer_size
[
1
],
ptrdiff
);
DBUG_ASSERT
(
ret
==
0
);
}
ndb_unpack_record
(
table
,
share
->
ndb_value
[
1
],
&
b
,
table
->
record
[
1
]);
print_records
(
table
,
table
->
record
[
1
]);
trans
.
update_row
(
::
server_id
,
...
...
@@ -2305,6 +2379,12 @@ ndb_binlog_thread_handle_data_event(Ndb *ndb, NdbEventOperation *pOp,
break
;
}
if
(
share
->
flags
&
NSF_BLOB_FLAG
)
{
my_free
(
blobs_buffer
[
0
],
MYF
(
MY_ALLOW_ZERO_PTR
));
my_free
(
blobs_buffer
[
1
],
MYF
(
MY_ALLOW_ZERO_PTR
));
}
return
0
;
}
...
...
@@ -2544,6 +2624,9 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
Binlog_index_row
row
;
while
(
pOp
!=
NULL
)
{
// sometimes get TE_ALTER with invalid table
DBUG_ASSERT
(
pOp
->
getEventType
()
==
NdbDictionary
::
Event
::
TE_ALTER
||
!
IS_NDB_BLOB_PREFIX
(
pOp
->
getTable
()
->
getName
()));
ndb
->
setReportThreshEventGCISlip
(
ndb_report_thresh_binlog_epoch_slip
);
ndb
->
setReportThreshEventFreeMem
(
ndb_report_thresh_binlog_mem_usage
);
...
...
@@ -2684,6 +2767,7 @@ pthread_handler_t ndb_binlog_thread_func(void *arg)
DBUG_PRINT
(
"info"
,(
"removing all event operations"
));
while
((
op
=
ndb
->
getEventOperation
()))
{
DBUG_ASSERT
(
!
IS_NDB_BLOB_PREFIX
(
op
->
getTable
()
->
getName
()));
DBUG_PRINT
(
"info"
,(
"removing event operation on %s"
,
op
->
getEvent
()
->
getName
()));
NDB_SHARE
*
share
=
(
NDB_SHARE
*
)
op
->
getCustomData
();
...
...
storage/ndb/include/ndbapi/NdbBlob.hpp
View file @
93ceddfe
...
...
@@ -28,6 +28,7 @@ class NdbOperation;
class
NdbRecAttr
;
class
NdbTableImpl
;
class
NdbColumnImpl
;
class
NdbEventOperationImpl
;
/**
* @class NdbBlob
...
...
@@ -71,6 +72,10 @@ class NdbColumnImpl;
* writes. It avoids execute penalty if nothing is pending. It is not
* needed after execute (obviously) or after next scan result.
*
* NdbBlob also supports reading post or pre blob data from events. The
* handle can be read after next event on main table has been retrieved.
* The data is available immediately. See NdbEventOperation.
*
* NdbBlob methods return -1 on error and 0 on success, and use output
* parameters when necessary.
*
...
...
@@ -145,6 +150,12 @@ public:
* then the callback is invoked.
*/
int
setActiveHook
(
ActiveHook
*
activeHook
,
void
*
arg
);
/**
* Check if blob value is defined (NULL or not). Used as first call
* on event based blob. The argument is set to -1 for not defined.
* Unlike getNull() this does not cause error on the handle.
*/
int
getDefined
(
int
&
isNull
);
/**
* Check if blob is null.
*/
...
...
@@ -191,6 +202,11 @@ public:
* Get blob parts table name. Useful only to test programs.
*/
static
int
getBlobTableName
(
char
*
btname
,
Ndb
*
anNdb
,
const
char
*
tableName
,
const
char
*
columnName
);
/**
* Get blob event name. The blob event is created if the main event
* monitors the blob column. The name includes main event name.
*/
static
int
getBlobEventName
(
char
*
bename
,
Ndb
*
anNdb
,
const
char
*
eventName
,
const
char
*
columnName
);
/**
* Return error object. The error may be blob specific (below) or may
* be copied from a failed implicit operation.
...
...
@@ -217,17 +233,29 @@ private:
friend
class
NdbScanOperation
;
friend
class
NdbDictionaryImpl
;
friend
class
NdbResultSet
;
// atNextResult
friend
class
NdbEventBuffer
;
friend
class
NdbEventOperationImpl
;
#endif
// state
State
theState
;
void
setState
(
State
newState
);
// quick and dirty support for events (consider subclassing)
int
theEventBlobVersion
;
// -1=normal blob 0=post event 1=pre event
// define blob table
static
void
getBlobTableName
(
char
*
btname
,
const
NdbTableImpl
*
t
,
const
NdbColumnImpl
*
c
);
static
void
getBlobTable
(
NdbTableImpl
&
bt
,
const
NdbTableImpl
*
t
,
const
NdbColumnImpl
*
c
);
static
void
getBlobEventName
(
char
*
bename
,
const
NdbEventImpl
*
e
,
const
NdbColumnImpl
*
c
);
static
void
getBlobEvent
(
NdbEventImpl
&
be
,
const
NdbEventImpl
*
e
,
const
NdbColumnImpl
*
c
);
// ndb api stuff
Ndb
*
theNdb
;
NdbTransaction
*
theNdbCon
;
NdbOperation
*
theNdbOp
;
NdbEventOperationImpl
*
theEventOp
;
NdbEventOperationImpl
*
theBlobEventOp
;
NdbRecAttr
*
theBlobEventPkRecAttr
;
NdbRecAttr
*
theBlobEventDistRecAttr
;
NdbRecAttr
*
theBlobEventPartRecAttr
;
NdbRecAttr
*
theBlobEventDataRecAttr
;
const
NdbTableImpl
*
theTable
;
const
NdbTableImpl
*
theAccessTable
;
const
NdbTableImpl
*
theBlobTable
;
...
...
@@ -263,6 +291,8 @@ private:
Buf
theHeadInlineBuf
;
Buf
theHeadInlineCopyBuf
;
// for writeTuple
Buf
thePartBuf
;
Buf
theBlobEventDataBuf
;
Uint32
thePartNumber
;
// for event
Head
*
theHead
;
char
*
theInlineData
;
NdbRecAttr
*
theHeadInlineRecAttr
;
...
...
@@ -306,6 +336,8 @@ private:
int
readDataPrivate
(
char
*
buf
,
Uint32
&
bytes
);
int
writeDataPrivate
(
const
char
*
buf
,
Uint32
bytes
);
int
readParts
(
char
*
buf
,
Uint32
part
,
Uint32
count
);
int
readTableParts
(
char
*
buf
,
Uint32
part
,
Uint32
count
);
int
readEventParts
(
char
*
buf
,
Uint32
part
,
Uint32
count
);
int
insertParts
(
const
char
*
buf
,
Uint32
part
,
Uint32
count
);
int
updateParts
(
const
char
*
buf
,
Uint32
part
,
Uint32
count
);
int
deleteParts
(
Uint32
part
,
Uint32
count
);
...
...
@@ -317,19 +349,23 @@ private:
int
invokeActiveHook
();
// blob handle maintenance
int
atPrepare
(
NdbTransaction
*
aCon
,
NdbOperation
*
anOp
,
const
NdbColumnImpl
*
aColumn
);
int
atPrepare
(
NdbEventOperationImpl
*
anOp
,
NdbEventOperationImpl
*
aBlobOp
,
const
NdbColumnImpl
*
aColumn
,
int
version
);
int
prepareColumn
();
int
preExecute
(
NdbTransaction
::
ExecType
anExecType
,
bool
&
batch
);
int
postExecute
(
NdbTransaction
::
ExecType
anExecType
);
int
preCommit
();
int
atNextResult
();
int
atNextEvent
();
// errors
void
setErrorCode
(
int
anErrorCode
,
bool
invalidFlag
=
true
);
void
setErrorCode
(
NdbOperation
*
anOp
,
bool
invalidFlag
=
true
);
void
setErrorCode
(
NdbTransaction
*
aCon
,
bool
invalidFlag
=
true
);
void
setErrorCode
(
NdbEventOperationImpl
*
anOp
,
bool
invalidFlag
=
true
);
#ifdef VM_TRACE
int
getOperationType
()
const
;
friend
class
NdbOut
&
operator
<<
(
NdbOut
&
,
const
NdbBlob
&
);
#endif
// list stuff
void
next
(
NdbBlob
*
obj
)
{
theNext
=
obj
;}
NdbBlob
*
next
()
{
return
theNext
;}
friend
struct
Ndb_free_list_t
<
NdbBlob
>
;
...
...
storage/ndb/include/ndbapi/NdbDictionary.hpp
View file @
93ceddfe
...
...
@@ -883,6 +883,7 @@ public:
private:
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
friend
class
NdbDictionaryImpl
;
friend
class
NdbTableImpl
;
#endif
class
NdbTableImpl
&
m_impl
;
...
...
@@ -1124,7 +1125,7 @@ public:
_TE_NODE_FAILURE
=
10
,
_TE_SUBSCRIBE
=
11
,
_TE_UNSUBSCRIBE
=
12
,
_TE_NUL
=
13
// internal (INS o DEL within same GCI)
_TE_NUL
=
13
// internal (
e.g.
INS o DEL within same GCI)
};
#endif
/**
...
...
@@ -1261,6 +1262,24 @@ public:
*/
int
getNoOfEventColumns
()
const
;
/**
* The merge events flag is false by default. Setting it true
* implies that events are merged in following ways:
*
* - for given NdbEventOperation associated with this event,
* events on same PK within same GCI are merged into single event
*
* - a blob table event is created for each blob attribute
* and blob events are handled as part of main table events
*
* - blob post/pre data from the blob part events can be read
* via NdbBlob methods as a single value
*
* NOTE: Currently this flag is not inherited by NdbEventOperation
* and must be set on NdbEventOperation explicitly.
*/
void
mergeEvents
(
bool
flag
);
/**
* Get object status
*/
...
...
@@ -1746,6 +1765,7 @@ public:
#ifndef DOXYGEN_SHOULD_SKIP_INTERNAL
const
Table
*
getTable
(
const
char
*
name
,
void
**
data
)
const
;
void
set_local_table_data_size
(
unsigned
sz
);
void
fix_blob_events
(
const
Table
*
table
,
const
char
*
ev_name
);
#endif
};
};
...
...
storage/ndb/include/ndbapi/NdbEventOperation.hpp
View file @
93ceddfe
...
...
@@ -150,6 +150,14 @@ public:
*/
NdbRecAttr
*
getPreValue
(
const
char
*
anAttrName
,
char
*
aValue
=
0
);
/**
* These methods replace getValue/getPreValue for blobs. Each
* method creates a blob handle NdbBlob. The handle supports only
* read operations. See NdbBlob.
*/
NdbBlob
*
getBlobHandle
(
const
char
*
anAttrName
);
NdbBlob
*
getPreBlobHandle
(
const
char
*
anAttrName
);
int
isOverrun
()
const
;
/**
...
...
storage/ndb/ndbapi-examples/ndbapi_event/Makefile
View file @
93ceddfe
...
...
@@ -4,7 +4,7 @@ OBJS = ndbapi_event.o
CXX
=
g++
-g
CFLAGS
=
-c
-Wall
-fno-rtti
-fno-exceptions
CXXFLAGS
=
DEBUG
=
DEBUG
=
# -DVM_TRACE
LFLAGS
=
-Wall
TOP_SRCDIR
=
../../../..
INCLUDE_DIR
=
$(TOP_SRCDIR)
/storage/ndb/include
...
...
@@ -16,8 +16,8 @@ SYS_LIB =
$(TARGET)
:
$(OBJS)
$(CXX)
$(CXXFLAGS)
$(LFLAGS)
$(LIB_DIR)
$(OBJS)
-lndbclient
-lmysqlclient_r
-lmysys
-lmystrings
-lz
$(SYS_LIB)
-o
$(TARGET)
$(TARGET).o
:
$(SRCS)
$(CXX)
$(CFLAGS)
-I
$(INCLUDE_DIR)
-I
$(INCLUDE_DIR)
/ndbapi
-I
$(TOP_SRCDIR)
/include
$(SRCS)
$(TARGET).o
:
$(SRCS)
Makefile
$(CXX)
$(CFLAGS)
$(DEBUG)
-I
$(INCLUDE_DIR)
-I
$(INCLUDE_DIR)
/ndbapi
-I
$(TOP_SRCDIR)
/include
$(SRCS)
clean
:
rm
-f
*
.o
$(TARGET)
storage/ndb/ndbapi-examples/ndbapi_event/ndbapi_event.cpp
View file @
93ceddfe
...
...
@@ -54,26 +54,32 @@
#include <stdio.h>
#include <iostream>
#include <unistd.h>
#ifdef VM_TRACE
#include <my_global.h>
#endif
#ifndef assert
#include <assert.h>
#endif
/**
*
* Assume that there is a table t0 which is being updated by
* Assume that there is a table which is being updated by
* another process (e.g. flexBench -l 0 -stdtables).
* We want to monitor what happens with column
s c0,c1,c2,c3
.
* We want to monitor what happens with column
values
.
*
*
or together with the mysql client;
*
Or using the mysql client:
*
* shell> mysql -u root
* mysql> create database TEST_DB;
* mysql> use TEST_DB;
* mysql> create table t0 (c0 int, c1 int, c2 char(4), c3 char(4),
* mysql> create table t0
* (c0 int, c1 int, c2 char(4), c3 char(4), c4 text,
* primary key(c0, c2)) engine ndb charset latin1;
*
* In another window start ndbapi_event, wait until properly started
*
insert into t0 values (1, 2, 'a', 'b');
insert into t0 values (3, 4, 'c', 'd');
insert into t0 values (1, 2, 'a', 'b'
, null
);
insert into t0 values (3, 4, 'c', 'd'
, null
);
update t0 set c3 = 'e' where c0 = 1 and c2 = 'a'; -- use pk
update t0 set c3 = 'f'; -- use scan
update t0 set c3 = 'F'; -- use scan update to 'same'
...
...
@@ -81,7 +87,18 @@
update t0 set c2 = 'G' where c0 = 1; -- update pk part to 'same'
update t0 set c0 = 5, c2 = 'H' where c0 = 3; -- update full PK
delete from t0;
*
insert ...; update ...; -- see events w/ same pk merged (if -m option)
delete ...; insert ...; -- there are 5 combinations ID IU DI UD UU
update ...; update ...;
-- text requires -m flag
set @a = repeat('a',256); -- inline size
set @b = repeat('b',2000); -- part size
set @c = repeat('c',2000*30); -- 30 parts
-- update the text field using combinations of @a, @b, @c ...
* you should see the data popping up in the example window
*
*/
...
...
@@ -95,12 +112,18 @@ int myCreateEvent(Ndb* myNdb,
const
char
*
eventName
,
const
char
*
eventTableName
,
const
char
**
eventColumnName
,
const
int
noEventColumnName
);
const
int
noEventColumnName
,
bool
merge_events
);
int
main
(
int
argc
,
char
**
argv
)
{
ndb_init
();
bool
merge_events
=
argc
>
1
&&
strcmp
(
argv
[
1
],
"-m"
)
==
0
;
bool
merge_events
=
argc
>
1
&&
strchr
(
argv
[
1
],
'm'
)
!=
0
;
#ifdef VM_TRACE
bool
dbug
=
argc
>
1
&&
strchr
(
argv
[
1
],
'd'
)
!=
0
;
if
(
dbug
)
DBUG_PUSH
(
"d:t:"
);
if
(
dbug
)
putenv
(
"API_SIGNAL_LOG=-"
);
#endif
Ndb_cluster_connection
*
cluster_connection
=
new
Ndb_cluster_connection
();
// Object representing the cluster
...
...
@@ -134,12 +157,13 @@ int main(int argc, char** argv)
const
char
*
eventName
=
"CHNG_IN_t0"
;
const
char
*
eventTableName
=
"t0"
;
const
int
noEventColumnName
=
4
;
const
int
noEventColumnName
=
5
;
const
char
*
eventColumnName
[
noEventColumnName
]
=
{
"c0"
,
"c1"
,
"c2"
,
"c3"
"c3"
,
"c4"
};
// Create events
...
...
@@ -147,9 +171,14 @@ int main(int argc, char** argv)
eventName
,
eventTableName
,
eventColumnName
,
noEventColumnName
);
noEventColumnName
,
merge_events
);
// Normal values and blobs are unfortunately handled differently..
typedef
union
{
NdbRecAttr
*
ra
;
NdbBlob
*
bh
;
}
RA_BH
;
int
j
=
0
;
int
i
,
j
,
k
,
l
;
j
=
0
;
while
(
j
<
99
)
{
// Start "transaction" for handling events
...
...
@@ -160,12 +189,17 @@ int main(int argc, char** argv)
op
->
mergeEvents
(
merge_events
);
printf
(
"get values
\n
"
);
NdbRecAttr
*
recAttr
[
noEventColumnName
];
NdbRecAttr
*
recAttrPre
[
noEventColumnName
];
RA_BH
recAttr
[
noEventColumnName
];
RA_BH
recAttrPre
[
noEventColumnName
];
// primary keys should always be a part of the result
for
(
int
i
=
0
;
i
<
noEventColumnName
;
i
++
)
{
recAttr
[
i
]
=
op
->
getValue
(
eventColumnName
[
i
]);
recAttrPre
[
i
]
=
op
->
getPreValue
(
eventColumnName
[
i
]);
for
(
i
=
0
;
i
<
noEventColumnName
;
i
++
)
{
if
(
i
<
4
)
{
recAttr
[
i
].
ra
=
op
->
getValue
(
eventColumnName
[
i
]);
recAttrPre
[
i
].
ra
=
op
->
getPreValue
(
eventColumnName
[
i
]);
}
else
if
(
merge_events
)
{
recAttr
[
i
].
bh
=
op
->
getBlobHandle
(
eventColumnName
[
i
]);
recAttrPre
[
i
].
bh
=
op
->
getPreBlobHandle
(
eventColumnName
[
i
]);
}
}
// set up the callbacks
...
...
@@ -174,13 +208,16 @@ int main(int argc, char** argv)
if
(
op
->
execute
())
APIERROR
(
op
->
getNdbError
());
int
i
=
0
;
while
(
i
<
40
)
{
NdbEventOperation
*
the_op
=
op
;
i
=
0
;
while
(
i
<
40
)
{
// printf("now waiting for event...\n");
int
r
=
myNdb
->
pollEvents
(
1000
);
// wait for event or 1000 ms
int
r
=
myNdb
->
pollEvents
(
1000
);
// wait for event or 1000 ms
if
(
r
>
0
)
{
// printf("got data! %d\n", r);
while
((
op
=
myNdb
->
nextEvent
()))
{
assert
(
the_op
==
op
);
i
++
;
switch
(
op
->
getEventType
())
{
case
NdbDictionary
:
:
Event
::
TE_INSERT
:
...
...
@@ -195,40 +232,66 @@ int main(int argc, char** argv)
default:
abort
();
// should not happen
}
printf
(
" gci=%d
\n
"
,
op
->
getGCI
());
printf
(
"post: "
);
for
(
int
i
=
0
;
i
<
noEventColumnName
;
i
++
)
{
if
(
recAttr
[
i
]
->
isNULL
()
>=
0
)
{
// we have a value
if
(
recAttr
[
i
]
->
isNULL
()
==
0
)
{
// we have a non-null value
if
(
i
<
2
)
printf
(
"%-5u"
,
recAttr
[
i
]
->
u_32_value
());
else
printf
(
"%-5.4s"
,
recAttr
[
i
]
->
aRef
());
}
else
// we have a null value
printf
(
"%-5s"
,
"NULL"
);
}
else
printf
(
"%-5s"
,
"-"
);
printf
(
" gci=%d
\n
"
,
(
int
)
op
->
getGCI
());
for
(
k
=
0
;
k
<=
1
;
k
++
)
{
printf
(
k
==
0
?
"post: "
:
"pre : "
);
for
(
l
=
0
;
l
<
noEventColumnName
;
l
++
)
{
if
(
l
<
4
)
{
NdbRecAttr
*
ra
=
k
==
0
?
recAttr
[
l
].
ra
:
recAttrPre
[
l
].
ra
;
if
(
ra
->
isNULL
()
>=
0
)
{
// we have a value
if
(
ra
->
isNULL
()
==
0
)
{
// we have a non-null value
if
(
l
<
2
)
printf
(
"%-5u"
,
ra
->
u_32_value
());
else
printf
(
"%-5.4s"
,
ra
->
aRef
());
}
else
printf
(
"%-5s"
,
"NULL"
);
}
else
printf
(
"%-5s"
,
"-"
);
// no value
}
else
if
(
merge_events
)
{
int
isNull
;
NdbBlob
*
bh
=
k
==
0
?
recAttr
[
l
].
bh
:
recAttrPre
[
l
].
bh
;
bh
->
getDefined
(
isNull
);
if
(
isNull
>=
0
)
{
// we have a value
if
(
!
isNull
)
{
// we have a non-null value
Uint64
length
=
0
;
bh
->
getLength
(
length
);
// read into buffer
unsigned
char
*
buf
=
new
unsigned
char
[
length
];
memset
(
buf
,
'X'
,
length
);
Uint32
n
=
length
;
bh
->
readData
(
buf
,
n
);
// n is in/out
assert
(
n
==
length
);
// pretty-print
bool
first
=
true
;
Uint32
i
=
0
;
while
(
i
<
n
)
{
unsigned
char
c
=
buf
[
i
++
];
Uint32
m
=
1
;
while
(
i
<
n
&&
buf
[
i
]
==
c
)
i
++
,
m
++
;
if
(
!
first
)
printf
(
"+"
);
printf
(
"%u%c"
,
m
,
c
);
first
=
false
;
}
printf
(
"[%u]"
,
n
);
delete
[]
buf
;
}
else
printf
(
"%-5s"
,
"NULL"
);
}
else
printf
(
"%-5s"
,
"-"
);
// no value
}
}
printf
(
"
\n
"
);
}
printf
(
"
\n
pre : "
);
for
(
int
i
=
0
;
i
<
noEventColumnName
;
i
++
)
{
if
(
recAttrPre
[
i
]
->
isNULL
()
>=
0
)
{
// we have a value
if
(
recAttrPre
[
i
]
->
isNULL
()
==
0
)
{
// we have a non-null value
if
(
i
<
2
)
printf
(
"%-5u"
,
recAttrPre
[
i
]
->
u_32_value
());
else
printf
(
"%-5.4s"
,
recAttrPre
[
i
]
->
aRef
());
}
else
// we have a null value
printf
(
"%-5s"
,
"NULL"
);
}
else
printf
(
"%-5s"
,
"-"
);
}
printf
(
"
\n
"
);
}
}
else
;
//printf("timed out\n");
}
// don't want to listen to events anymore
if
(
myNdb
->
dropEventOperation
(
op
))
APIERROR
(
myNdb
->
getNdbError
());
if
(
myNdb
->
dropEventOperation
(
the_op
))
APIERROR
(
myNdb
->
getNdbError
());
the_op
=
0
;
j
++
;
}
...
...
@@ -250,7 +313,8 @@ int myCreateEvent(Ndb* myNdb,
const
char
*
eventName
,
const
char
*
eventTableName
,
const
char
**
eventColumnNames
,
const
int
noEventColumnNames
)
const
int
noEventColumnNames
,
bool
merge_events
)
{
NdbDictionary
::
Dictionary
*
myDict
=
myNdb
->
getDictionary
();
if
(
!
myDict
)
APIERROR
(
myNdb
->
getNdbError
());
...
...
@@ -265,6 +329,7 @@ int myCreateEvent(Ndb* myNdb,
// myEvent.addTableEvent(NdbDictionary::Event::TE_DELETE);
myEvent
.
addEventColumns
(
noEventColumnNames
,
eventColumnNames
);
myEvent
.
mergeEvents
(
merge_events
);
// Add event to database
if
(
myDict
->
createEvent
(
myEvent
)
==
0
)
...
...
storage/ndb/src/ndbapi/NdbBlob.cpp
View file @
93ceddfe
...
...
@@ -23,6 +23,7 @@
#include <NdbBlob.hpp>
#include "NdbBlobImpl.hpp"
#include <NdbScanOperation.hpp>
#include <NdbEventOperationImpl.hpp>
/*
* Reading index table directly (as a table) is faster but there are
...
...
@@ -147,6 +148,61 @@ NdbBlob::getBlobTable(NdbTableImpl& bt, const NdbTableImpl* t, const NdbColumnIm
DBUG_VOID_RETURN
;
}
int
NdbBlob
::
getBlobEventName
(
char
*
bename
,
Ndb
*
anNdb
,
const
char
*
eventName
,
const
char
*
columnName
)
{
NdbEventImpl
*
e
=
anNdb
->
theDictionary
->
m_impl
.
getEvent
(
eventName
);
if
(
e
==
NULL
)
return
-
1
;
NdbColumnImpl
*
c
=
e
->
m_tableImpl
->
getColumn
(
columnName
);
if
(
c
==
NULL
)
return
-
1
;
getBlobEventName
(
bename
,
e
,
c
);
return
0
;
}
void
NdbBlob
::
getBlobEventName
(
char
*
bename
,
const
NdbEventImpl
*
e
,
const
NdbColumnImpl
*
c
)
{
// XXX events should have object id
snprintf
(
bename
,
MAX_TAB_NAME_SIZE
,
"NDB$BLOBEVENT_%s_%d"
,
e
->
m_name
.
c_str
(),
(
int
)
c
->
m_column_no
);
}
void
NdbBlob
::
getBlobEvent
(
NdbEventImpl
&
be
,
const
NdbEventImpl
*
e
,
const
NdbColumnImpl
*
c
)
{
DBUG_ENTER
(
"NdbBlob::getBlobEvent"
);
// blob table
assert
(
c
->
m_blobTable
!=
NULL
);
const
NdbTableImpl
&
bt
=
*
c
->
m_blobTable
;
// blob event name
char
bename
[
NdbBlobImpl
::
BlobTableNameSize
];
getBlobEventName
(
bename
,
e
,
c
);
be
.
setName
(
bename
);
be
.
setTable
(
bt
);
// simple assigments
be
.
mi_type
=
e
->
mi_type
;
be
.
m_dur
=
e
->
m_dur
;
be
.
m_mergeEvents
=
e
->
m_mergeEvents
;
// report unchanged data
// not really needed now since UPD is DEL o INS and we subscribe to all
be
.
setReport
(
NdbDictionary
::
Event
::
ER_ALL
);
// columns PK - DIST - PART - DATA
{
const
NdbColumnImpl
*
bc
=
bt
.
getColumn
((
Uint32
)
0
);
be
.
addColumn
(
*
bc
);
}
{
const
NdbColumnImpl
*
bc
=
bt
.
getColumn
((
Uint32
)
1
);
be
.
addColumn
(
*
bc
);
}
{
const
NdbColumnImpl
*
bc
=
bt
.
getColumn
((
Uint32
)
2
);
be
.
addColumn
(
*
bc
);
}
{
const
NdbColumnImpl
*
bc
=
bt
.
getColumn
((
Uint32
)
3
);
be
.
addColumn
(
*
bc
);
}
DBUG_VOID_RETURN
;
}
// initialization
NdbBlob
::
NdbBlob
(
Ndb
*
)
...
...
@@ -158,9 +214,16 @@ void
NdbBlob
::
init
()
{
theState
=
Idle
;
theEventBlobVersion
=
-
1
;
theNdb
=
NULL
;
theNdbCon
=
NULL
;
theNdbOp
=
NULL
;
theEventOp
=
NULL
;
theBlobEventOp
=
NULL
;
theBlobEventPkRecAttr
=
NULL
;
theBlobEventDistRecAttr
=
NULL
;
theBlobEventPartRecAttr
=
NULL
;
theBlobEventDataRecAttr
=
NULL
;
theTable
=
NULL
;
theAccessTable
=
NULL
;
theBlobTable
=
NULL
;
...
...
@@ -439,7 +502,7 @@ NdbBlob::getHeadFromRecAttr()
DBUG_ENTER
(
"NdbBlob::getHeadFromRecAttr"
);
assert
(
theHeadInlineRecAttr
!=
NULL
);
theNullFlag
=
theHeadInlineRecAttr
->
isNULL
();
assert
(
theNullFlag
!=
-
1
);
assert
(
the
EventBlobVersion
>=
0
||
the
NullFlag
!=
-
1
);
theLength
=
!
theNullFlag
?
theHead
->
length
:
0
;
DBUG_VOID_RETURN
;
}
...
...
@@ -543,6 +606,18 @@ NdbBlob::setActiveHook(ActiveHook activeHook, void* arg)
// misc operations
int
NdbBlob
::
getDefined
(
int
&
isNull
)
{
DBUG_ENTER
(
"NdbBlob::getDefined"
);
if
(
theState
==
Prepared
&&
theSetFlag
)
{
isNull
=
(
theSetBuf
==
NULL
);
DBUG_RETURN
(
0
);
}
isNull
=
theNullFlag
;
DBUG_RETURN
(
0
);
}
int
NdbBlob
::
getNull
(
bool
&
isNull
)
{
...
...
@@ -887,6 +962,18 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
{
DBUG_ENTER
(
"NdbBlob::readParts"
);
DBUG_PRINT
(
"info"
,
(
"part=%u count=%u"
,
part
,
count
));
int
ret
;
if
(
theEventBlobVersion
==
-
1
)
ret
=
readTableParts
(
buf
,
part
,
count
);
else
ret
=
readEventParts
(
buf
,
part
,
count
);
DBUG_RETURN
(
ret
);
}
int
NdbBlob
::
readTableParts
(
char
*
buf
,
Uint32
part
,
Uint32
count
)
{
DBUG_ENTER
(
"NdbBlob::readTableParts"
);
Uint32
n
=
0
;
while
(
n
<
count
)
{
NdbOperation
*
tOp
=
theNdbCon
->
getNdbOperation
(
theBlobTable
);
...
...
@@ -906,6 +993,18 @@ NdbBlob::readParts(char* buf, Uint32 part, Uint32 count)
DBUG_RETURN
(
0
);
}
int
NdbBlob
::
readEventParts
(
char
*
buf
,
Uint32
part
,
Uint32
count
)
{
DBUG_ENTER
(
"NdbBlob::readEventParts"
);
int
ret
=
theEventOp
->
readBlobParts
(
buf
,
this
,
part
,
count
);
if
(
ret
!=
0
)
{
setErrorCode
(
theEventOp
);
DBUG_RETURN
(
-
1
);
}
DBUG_RETURN
(
0
);
}
int
NdbBlob
::
insertParts
(
const
char
*
buf
,
Uint32
part
,
Uint32
count
)
{
...
...
@@ -1094,48 +1193,12 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
theTable
=
anOp
->
m_currentTable
;
theAccessTable
=
anOp
->
m_accessTable
;
theColumn
=
aColumn
;
NdbDictionary
::
Column
::
Type
partType
=
NdbDictionary
::
Column
::
Undefined
;
switch
(
theColumn
->
getType
())
{
case
NdbDictionary
:
:
Column
::
Blob
:
partType
=
NdbDictionary
::
Column
::
Binary
;
theFillChar
=
0x0
;
break
;
case
NdbDictionary
:
:
Column
::
Text
:
partType
=
NdbDictionary
::
Column
::
Char
;
theFillChar
=
0x20
;
break
;
default:
setErrorCode
(
NdbBlobImpl
::
ErrUsage
);
// prepare blob column and table
if
(
prepareColumn
()
==
-
1
)
DBUG_RETURN
(
-
1
);
}
// sizes
theInlineSize
=
theColumn
->
getInlineSize
();
thePartSize
=
theColumn
->
getPartSize
();
theStripeSize
=
theColumn
->
getStripeSize
();
// sanity check
assert
((
NDB_BLOB_HEAD_SIZE
<<
2
)
==
sizeof
(
Head
));
assert
(
theColumn
->
m_attrSize
*
theColumn
->
m_arraySize
==
sizeof
(
Head
)
+
theInlineSize
);
if
(
thePartSize
>
0
)
{
const
NdbDictionary
::
Table
*
bt
=
NULL
;
const
NdbDictionary
::
Column
*
bc
=
NULL
;
if
(
theStripeSize
==
0
||
(
bt
=
theColumn
->
getBlobTable
())
==
NULL
||
(
bc
=
bt
->
getColumn
(
"DATA"
))
==
NULL
||
bc
->
getType
()
!=
partType
||
bc
->
getLength
()
!=
(
int
)
thePartSize
)
{
setErrorCode
(
NdbBlobImpl
::
ErrTable
);
DBUG_RETURN
(
-
1
);
}
theBlobTable
=
&
NdbTableImpl
::
getImpl
(
*
bt
);
}
// buffers
theKeyBuf
.
alloc
(
theTable
->
m_keyLenInWords
<<
2
);
// extra buffers
theAccessKeyBuf
.
alloc
(
theAccessTable
->
m_keyLenInWords
<<
2
);
theHeadInlineBuf
.
alloc
(
sizeof
(
Head
)
+
theInlineSize
);
theHeadInlineCopyBuf
.
alloc
(
sizeof
(
Head
)
+
theInlineSize
);
thePartBuf
.
alloc
(
thePartSize
);
theHead
=
(
Head
*
)
theHeadInlineBuf
.
data
;
theInlineData
=
theHeadInlineBuf
.
data
+
sizeof
(
Head
);
// handle different operation types
bool
supportedOp
=
false
;
if
(
isKeyOp
())
{
...
...
@@ -1189,6 +1252,103 @@ NdbBlob::atPrepare(NdbTransaction* aCon, NdbOperation* anOp, const NdbColumnImpl
DBUG_RETURN
(
0
);
}
int
NdbBlob
::
atPrepare
(
NdbEventOperationImpl
*
anOp
,
NdbEventOperationImpl
*
aBlobOp
,
const
NdbColumnImpl
*
aColumn
,
int
version
)
{
DBUG_ENTER
(
"NdbBlob::atPrepare [event]"
);
DBUG_PRINT
(
"info"
,
(
"this=%p op=%p"
,
this
,
anOp
));
assert
(
theState
==
Idle
);
assert
(
version
==
0
||
version
==
1
);
theEventBlobVersion
=
version
;
// ndb api stuff
theNdb
=
anOp
->
m_ndb
;
theEventOp
=
anOp
;
theBlobEventOp
=
aBlobOp
;
theTable
=
anOp
->
m_eventImpl
->
m_tableImpl
;
theColumn
=
aColumn
;
// prepare blob column and table
if
(
prepareColumn
()
==
-
1
)
DBUG_RETURN
(
-
1
);
// tinyblob sanity
assert
((
theBlobEventOp
==
NULL
)
==
(
theBlobTable
==
NULL
));
// extra buffers
theBlobEventDataBuf
.
alloc
(
thePartSize
);
// prepare receive of head+inline
theHeadInlineRecAttr
=
theEventOp
->
getValue
(
aColumn
,
theHeadInlineBuf
.
data
,
version
);
if
(
theHeadInlineRecAttr
==
NULL
)
{
setErrorCode
(
theEventOp
);
DBUG_RETURN
(
-
1
);
}
// prepare receive of blob part
if
(
theBlobEventOp
!=
NULL
)
{
if
((
theBlobEventPkRecAttr
=
theBlobEventOp
->
getValue
(
theBlobTable
->
getColumn
((
Uint32
)
0
),
theKeyBuf
.
data
,
version
))
==
NULL
||
(
theBlobEventDistRecAttr
=
theBlobEventOp
->
getValue
(
theBlobTable
->
getColumn
((
Uint32
)
1
),
(
char
*
)
0
,
version
))
==
NULL
||
(
theBlobEventPartRecAttr
=
theBlobEventOp
->
getValue
(
theBlobTable
->
getColumn
((
Uint32
)
2
),
(
char
*
)
&
thePartNumber
,
version
))
==
NULL
||
(
theBlobEventDataRecAttr
=
theBlobEventOp
->
getValue
(
theBlobTable
->
getColumn
((
Uint32
)
3
),
theBlobEventDataBuf
.
data
,
version
))
==
NULL
)
{
setErrorCode
(
theBlobEventOp
);
DBUG_RETURN
(
-
1
);
}
}
setState
(
Prepared
);
DBUG_RETURN
(
0
);
}
int
NdbBlob
::
prepareColumn
()
{
DBUG_ENTER
(
"prepareColumn"
);
NdbDictionary
::
Column
::
Type
partType
=
NdbDictionary
::
Column
::
Undefined
;
switch
(
theColumn
->
getType
())
{
case
NdbDictionary
:
:
Column
::
Blob
:
partType
=
NdbDictionary
::
Column
::
Binary
;
theFillChar
=
0x0
;
break
;
case
NdbDictionary
:
:
Column
::
Text
:
partType
=
NdbDictionary
::
Column
::
Char
;
theFillChar
=
0x20
;
break
;
default:
setErrorCode
(
NdbBlobImpl
::
ErrUsage
);
DBUG_RETURN
(
-
1
);
}
// sizes
theInlineSize
=
theColumn
->
getInlineSize
();
thePartSize
=
theColumn
->
getPartSize
();
theStripeSize
=
theColumn
->
getStripeSize
();
// sanity check
assert
((
NDB_BLOB_HEAD_SIZE
<<
2
)
==
sizeof
(
Head
));
assert
(
theColumn
->
m_attrSize
*
theColumn
->
m_arraySize
==
sizeof
(
Head
)
+
theInlineSize
);
if
(
thePartSize
>
0
)
{
const
NdbTableImpl
*
bt
=
NULL
;
const
NdbColumnImpl
*
bc
=
NULL
;
if
(
theStripeSize
==
0
||
(
bt
=
theColumn
->
m_blobTable
)
==
NULL
||
(
bc
=
bt
->
getColumn
(
"DATA"
))
==
NULL
||
bc
->
getType
()
!=
partType
||
bc
->
getLength
()
!=
(
int
)
thePartSize
)
{
setErrorCode
(
NdbBlobImpl
::
ErrTable
);
DBUG_RETURN
(
-
1
);
}
// blob table
theBlobTable
=
&
NdbTableImpl
::
getImpl
(
*
bt
);
}
// these buffers are always used
theKeyBuf
.
alloc
(
theTable
->
m_keyLenInWords
<<
2
);
theHeadInlineBuf
.
alloc
(
sizeof
(
Head
)
+
theInlineSize
);
theHead
=
(
Head
*
)
theHeadInlineBuf
.
data
;
theInlineData
=
theHeadInlineBuf
.
data
+
sizeof
(
Head
);
thePartBuf
.
alloc
(
thePartSize
);
DBUG_RETURN
(
0
);
}
/*
* Before execute of prepared operation. May add new operations before
* this one. May ask that this operation and all before it (a "batch")
...
...
@@ -1537,6 +1697,26 @@ NdbBlob::atNextResult()
DBUG_RETURN
(
0
);
}
/*
* After next event on main table.
*/
int
NdbBlob
::
atNextEvent
()
{
DBUG_ENTER
(
"NdbBlob::atNextEvent"
);
DBUG_PRINT
(
"info"
,
(
"this=%p op=%p blob op=%p version=%d"
,
this
,
theEventOp
,
theBlobEventOp
,
theEventBlobVersion
));
if
(
theState
==
Invalid
)
DBUG_RETURN
(
-
1
);
assert
(
theEventBlobVersion
>=
0
);
getHeadFromRecAttr
();
if
(
theNullFlag
==
-
1
)
// value not defined
DBUG_RETURN
(
0
);
if
(
setPos
(
0
)
==
-
1
)
DBUG_RETURN
(
-
1
);
setState
(
Active
);
DBUG_RETURN
(
0
);
}
// misc
const
NdbDictionary
::
Column
*
...
...
@@ -1589,6 +1769,17 @@ NdbBlob::setErrorCode(NdbTransaction* aCon, bool invalidFlag)
setErrorCode
(
code
,
invalidFlag
);
}
void
NdbBlob
::
setErrorCode
(
NdbEventOperationImpl
*
anOp
,
bool
invalidFlag
)
{
int
code
=
0
;
if
((
code
=
anOp
->
m_error
.
code
)
!=
0
)
;
else
code
=
NdbBlobImpl
::
ErrUnknown
;
setErrorCode
(
code
,
invalidFlag
);
}
// info about all blobs in this operation
NdbBlob
*
...
...
storage/ndb/src/ndbapi/NdbDictionary.cpp
View file @
93ceddfe
...
...
@@ -901,6 +901,11 @@ int NdbDictionary::Event::getNoOfEventColumns() const
return
m_impl
.
getNoOfEventColumns
();
}
void
NdbDictionary
::
Event
::
mergeEvents
(
bool
flag
)
{
m_impl
.
m_mergeEvents
=
flag
;
}
NdbDictionary
::
Object
::
Status
NdbDictionary
::
Event
::
getObjectStatus
()
const
{
...
...
@@ -1473,6 +1478,12 @@ NdbDictionary::Dictionary::getNdbError() const {
return
m_impl
.
getNdbError
();
}
void
NdbDictionary
::
Dictionary
::
fix_blob_events
(
const
Table
*
table
,
const
char
*
ev_name
)
{
m_impl
.
fix_blob_events
(
table
,
ev_name
);
}
// printers
NdbOut
&
...
...
storage/ndb/src/ndbapi/NdbDictionaryImpl.cpp
View file @
93ceddfe
...
...
@@ -1072,6 +1072,7 @@ void NdbEventImpl::init()
m_tableId
=
RNIL
;
mi_type
=
0
;
m_dur
=
NdbDictionary
::
Event
::
ED_UNDEFINED
;
m_mergeEvents
=
false
;
m_tableImpl
=
NULL
;
m_rep
=
NdbDictionary
::
Event
::
ER_UPDATED
;
}
...
...
@@ -2036,7 +2037,7 @@ int
NdbDictionaryImpl
::
addBlobTables
(
NdbTableImpl
&
t
)
{
unsigned
n
=
t
.
m_noOfBlobs
;
DBUG_ENTER
(
"NdbDictio
an
ryImpl::addBlobTables"
);
DBUG_ENTER
(
"NdbDictio
na
ryImpl::addBlobTables"
);
// optimized for blob column being the last one
// and not looking for more than one if not neccessary
for
(
unsigned
i
=
t
.
m_columns
.
size
();
i
>
0
&&
n
>
0
;)
{
...
...
@@ -3151,7 +3152,37 @@ NdbDictionaryImpl::createEvent(NdbEventImpl & evnt)
#endif
// NdbDictInterface m_receiver;
DBUG_RETURN
(
m_receiver
.
createEvent
(
m_ndb
,
evnt
,
0
/* getFlag unset */
));
if
(
m_receiver
.
createEvent
(
m_ndb
,
evnt
,
0
/* getFlag unset */
)
!=
0
)
DBUG_RETURN
(
-
1
);
// Create blob events
if
(
evnt
.
m_mergeEvents
&&
createBlobEvents
(
evnt
)
!=
0
)
{
int
save_code
=
m_error
.
code
;
(
void
)
dropEvent
(
evnt
.
m_name
.
c_str
());
m_error
.
code
=
save_code
;
DBUG_RETURN
(
-
1
);
}
DBUG_RETURN
(
0
);
}
int
NdbDictionaryImpl
::
createBlobEvents
(
NdbEventImpl
&
evnt
)
{
DBUG_ENTER
(
"NdbDictionaryImpl::createBlobEvents"
);
NdbTableImpl
&
t
=
*
evnt
.
m_tableImpl
;
Uint32
n
=
t
.
m_noOfBlobs
;
Uint32
i
;
for
(
i
=
0
;
i
<
evnt
.
m_columns
.
size
()
&&
n
>
0
;
i
++
)
{
NdbColumnImpl
&
c
=
*
evnt
.
m_columns
[
i
];
if
(
!
c
.
getBlobType
()
||
c
.
getPartSize
()
==
0
)
continue
;
n
--
;
NdbEventImpl
blob_evnt
;
NdbBlob
::
getBlobEvent
(
blob_evnt
,
&
evnt
,
&
c
);
if
(
createEvent
(
blob_evnt
)
!=
0
)
DBUG_RETURN
(
-
1
);
}
DBUG_RETURN
(
0
);
}
int
...
...
@@ -3367,12 +3398,14 @@ NdbDictionaryImpl::getEvent(const char * eventName)
if
(
ev
->
m_tableId
==
info
->
m_table_impl
->
m_id
&&
ev
->
m_tableVersion
==
info
->
m_table_impl
->
m_version
)
break
;
DBUG_PRINT
(
"error"
,(
"%s: retry=%d: "
"table version mismatch, event: [%u,%u] table: [%u,%u]"
,
ev
->
getTableName
(),
retry
,
ev
->
m_tableId
,
ev
->
m_tableVersion
,
info
->
m_table_impl
->
m_id
,
info
->
m_table_impl
->
m_version
));
if
(
retry
)
{
m_error
.
code
=
241
;
DBUG_PRINT
(
"error"
,(
"%s: table version mismatch, event: [%u,%u] table: [%u,%u]"
,
ev
->
getTableName
(),
ev
->
m_tableId
,
ev
->
m_tableVersion
,
info
->
m_table_impl
->
m_id
,
info
->
m_table_impl
->
m_version
));
delete
ev
;
DBUG_RETURN
(
NULL
);
}
...
...
@@ -3400,6 +3433,7 @@ NdbDictionaryImpl::getEvent(const char * eventName)
if
(
attributeList_sz
>
table
.
getNoOfColumns
()
)
{
m_error
.
code
=
241
;
DBUG_PRINT
(
"error"
,(
"Invalid version, too many columns"
));
delete
ev
;
DBUG_RETURN
(
NULL
);
...
...
@@ -3409,6 +3443,7 @@ NdbDictionaryImpl::getEvent(const char * eventName)
for
(
unsigned
id
=
0
;
ev
->
m_columns
.
size
()
<
attributeList_sz
;
id
++
)
{
if
(
id
>=
table
.
getNoOfColumns
())
{
m_error
.
code
=
241
;
DBUG_PRINT
(
"error"
,(
"Invalid version, column %d out of range"
,
id
));
delete
ev
;
DBUG_RETURN
(
NULL
);
...
...
@@ -3566,13 +3601,64 @@ NdbDictInterface::execSUB_START_REF(NdbApiSignal * signal,
int
NdbDictionaryImpl
::
dropEvent
(
const
char
*
eventName
)
{
NdbEventImpl
*
ev
=
new
NdbEventImpl
();
ev
->
setName
(
eventName
);
int
ret
=
m_receiver
.
dropEvent
(
*
ev
);
delete
ev
;
DBUG_ENTER
(
"NdbDictionaryImpl::dropEvent"
);
DBUG_PRINT
(
"info"
,
(
"name=%s"
,
eventName
));
// printf("__________________RET %u\n", ret);
return
ret
;
NdbEventImpl
*
evnt
=
getEvent
(
eventName
);
// allocated
if
(
evnt
==
NULL
)
{
if
(
m_error
.
code
!=
723
&&
// no such table
m_error
.
code
!=
241
)
// invalid table
DBUG_RETURN
(
-
1
);
DBUG_PRINT
(
"info"
,
(
"no table err=%d, drop by name alone"
,
m_error
.
code
));
evnt
=
new
NdbEventImpl
();
evnt
->
setName
(
eventName
);
}
int
ret
=
dropEvent
(
*
evnt
);
delete
evnt
;
DBUG_RETURN
(
ret
);
}
int
NdbDictionaryImpl
::
dropEvent
(
const
NdbEventImpl
&
evnt
)
{
if
(
dropBlobEvents
(
evnt
)
!=
0
)
return
-
1
;
if
(
m_receiver
.
dropEvent
(
evnt
)
!=
0
)
return
-
1
;
return
0
;
}
int
NdbDictionaryImpl
::
dropBlobEvents
(
const
NdbEventImpl
&
evnt
)
{
DBUG_ENTER
(
"NdbDictionaryImpl::dropBlobEvents"
);
if
(
evnt
.
m_tableImpl
!=
0
)
{
const
NdbTableImpl
&
t
=
*
evnt
.
m_tableImpl
;
Uint32
n
=
t
.
m_noOfBlobs
;
Uint32
i
;
for
(
i
=
0
;
i
<
evnt
.
m_columns
.
size
()
&&
n
>
0
;
i
++
)
{
const
NdbColumnImpl
&
c
=
*
evnt
.
m_columns
[
i
];
if
(
!
c
.
getBlobType
()
||
c
.
getPartSize
()
==
0
)
continue
;
n
--
;
char
bename
[
MAX_TAB_NAME_SIZE
];
NdbBlob
::
getBlobEventName
(
bename
,
&
evnt
,
&
c
);
(
void
)
dropEvent
(
bename
);
}
}
else
{
// loop over MAX_ATTRIBUTES_IN_TABLE ...
Uint32
i
;
for
(
i
=
0
;
i
<
MAX_ATTRIBUTES_IN_TABLE
;
i
++
)
{
char
bename
[
MAX_TAB_NAME_SIZE
];
// XXX should get name from NdbBlob
sprintf
(
bename
,
"NDB$BLOBEVENT_%s_%u"
,
evnt
.
getName
(),
i
);
NdbEventImpl
*
bevnt
=
new
NdbEventImpl
();
bevnt
->
setName
(
bename
);
(
void
)
m_receiver
.
dropEvent
(
*
bevnt
);
delete
bevnt
;
}
}
DBUG_RETURN
(
0
);
}
int
...
...
@@ -4557,6 +4643,30 @@ NdbDictInterface::parseFileInfo(NdbFileImpl &dst,
return
0
;
}
// XXX temp
void
NdbDictionaryImpl
::
fix_blob_events
(
const
NdbDictionary
::
Table
*
table
,
const
char
*
ev_name
)
{
const
NdbTableImpl
&
t
=
table
->
m_impl
;
const
NdbEventImpl
*
ev
=
getEvent
(
ev_name
);
assert
(
ev
!=
NULL
&&
ev
->
m_tableImpl
==
&
t
);
Uint32
i
;
for
(
i
=
0
;
i
<
t
.
m_columns
.
size
();
i
++
)
{
assert
(
t
.
m_columns
[
i
]
!=
NULL
);
const
NdbColumnImpl
&
c
=
*
t
.
m_columns
[
i
];
if
(
!
c
.
getBlobType
()
||
c
.
getPartSize
()
==
0
)
continue
;
char
bename
[
200
];
NdbBlob
::
getBlobEventName
(
bename
,
ev
,
&
c
);
// following fixes dict cache blob table
NdbEventImpl
*
bev
=
getEvent
(
bename
);
if
(
c
.
m_blobTable
!=
bev
->
m_tableImpl
)
{
// XXX const violation
((
NdbColumnImpl
*
)
&
c
)
->
m_blobTable
=
bev
->
m_tableImpl
;
}
}
}
template
class
Vector
<
int
>;
template
class
Vector
<
Uint16
>;
template
class
Vector
<
Uint32
>;
...
...
storage/ndb/src/ndbapi/NdbDictionaryImpl.hpp
View file @
93ceddfe
...
...
@@ -277,7 +277,6 @@ public:
NdbDictionary
::
Event
::
EventDurability
getDurability
()
const
;
void
setReport
(
NdbDictionary
::
Event
::
EventReport
r
);
NdbDictionary
::
Event
::
EventReport
getReport
()
const
;
void
addEventColumn
(
const
NdbColumnImpl
&
c
);
int
getNoOfEventColumns
()
const
;
void
print
()
{
...
...
@@ -295,6 +294,7 @@ public:
Uint32
mi_type
;
NdbDictionary
::
Event
::
EventDurability
m_dur
;
NdbDictionary
::
Event
::
EventReport
m_rep
;
bool
m_mergeEvents
;
NdbTableImpl
*
m_tableImpl
;
BaseString
m_tableName
;
...
...
@@ -547,7 +547,10 @@ public:
NdbTableImpl
*
table
);
int
createEvent
(
NdbEventImpl
&
);
int
createBlobEvents
(
NdbEventImpl
&
);
int
dropEvent
(
const
char
*
eventName
);
int
dropEvent
(
const
NdbEventImpl
&
);
int
dropBlobEvents
(
const
NdbEventImpl
&
);
int
executeSubscribeEvent
(
NdbEventOperationImpl
&
);
int
stopSubscribeEvent
(
NdbEventOperationImpl
&
);
...
...
@@ -589,6 +592,9 @@ public:
NdbDictInterface
m_receiver
;
Ndb
&
m_ndb
;
// XXX temp
void
fix_blob_events
(
const
NdbDictionary
::
Table
*
table
,
const
char
*
ev_name
);
private:
NdbIndexImpl
*
getIndexImpl
(
const
char
*
name
,
const
BaseString
&
internalName
);
...
...
storage/ndb/src/ndbapi/NdbEventOperation.cpp
View file @
93ceddfe
...
...
@@ -55,6 +55,18 @@ NdbEventOperation::getPreValue(const char *colName, char *aValue)
return
m_impl
.
getValue
(
colName
,
aValue
,
1
);
}
NdbBlob
*
NdbEventOperation
::
getBlobHandle
(
const
char
*
colName
)
{
return
m_impl
.
getBlobHandle
(
colName
,
0
);
}
NdbBlob
*
NdbEventOperation
::
getPreBlobHandle
(
const
char
*
colName
)
{
return
m_impl
.
getBlobHandle
(
colName
,
1
);
}
int
NdbEventOperation
::
execute
()
{
...
...
storage/ndb/src/ndbapi/NdbEventOperationImpl.cpp
View file @
93ceddfe
...
...
@@ -38,6 +38,7 @@
#include "DictCache.hpp"
#include <portlib/NdbMem.h>
#include <NdbRecAttr.hpp>
#include <NdbBlob.hpp>
#include <NdbEventOperation.hpp>
#include "NdbEventOperationImpl.hpp"
...
...
@@ -48,6 +49,20 @@ static Gci_container g_empty_gci_container;
static
const
Uint32
ACTIVE_GCI_DIRECTORY_SIZE
=
4
;
static
const
Uint32
ACTIVE_GCI_MASK
=
ACTIVE_GCI_DIRECTORY_SIZE
-
1
;
#ifdef VM_TRACE
static
void
print_std
(
const
SubTableData
*
sdata
,
LinearSectionPtr
ptr
[
3
])
{
printf
(
"addr=%p gci=%d op=%d
\n
"
,
(
void
*
)
sdata
,
sdata
->
gci
,
sdata
->
operation
);
for
(
int
i
=
0
;
i
<=
2
;
i
++
)
{
printf
(
"sec=%d addr=%p sz=%d
\n
"
,
i
,
(
void
*
)
ptr
[
i
].
p
,
ptr
[
i
].
sz
);
for
(
int
j
=
0
;
j
<
ptr
[
i
].
sz
;
j
++
)
printf
(
"%08x "
,
ptr
[
i
].
p
[
j
]);
printf
(
"
\n
"
);
}
}
#endif
/*
* Class NdbEventOperationImpl
*
...
...
@@ -60,7 +75,7 @@ static const Uint32 ACTIVE_GCI_MASK = ACTIVE_GCI_DIRECTORY_SIZE - 1;
#define DBUG_RETURN_EVENT(A) DBUG_RETURN(A)
#define DBUG_VOID_RETURN_EVENT DBUG_VOID_RETURN
#define DBUG_PRINT_EVENT(A,B) DBUG_PRINT(A,B)
#define DBUG_DUMP_EVENT(A,B,C) DBUG_
S
UMP(A,B,C)
#define DBUG_DUMP_EVENT(A,B,C) DBUG_
D
UMP(A,B,C)
#else
#define DBUG_ENTER_EVENT(A)
#define DBUG_RETURN_EVENT(A) return(A)
...
...
@@ -92,6 +107,11 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
theCurrentDataAttrs
[
0
]
=
NULL
;
theFirstDataAttrs
[
1
]
=
NULL
;
theCurrentDataAttrs
[
1
]
=
NULL
;
theBlobList
=
NULL
;
theBlobOpList
=
NULL
;
theMainOp
=
NULL
;
m_data_item
=
NULL
;
m_eventImpl
=
NULL
;
...
...
@@ -117,7 +137,11 @@ NdbEventOperationImpl::NdbEventOperationImpl(NdbEventOperation &N,
m_state
=
EO_CREATED
;
m_mergeEvents
=
false
;
#ifdef ndb_event_stores_merge_events_flag
m_mergeEvents
=
m_eventImpl
->
m_mergeEvents
;
#else
m_mergeEvents
=
false
;
#endif
m_has_error
=
0
;
...
...
@@ -254,10 +278,191 @@ NdbEventOperationImpl::getValue(const NdbColumnImpl *tAttrInfo, char *aValue, in
DBUG_RETURN
(
tAttr
);
}
NdbBlob
*
NdbEventOperationImpl
::
getBlobHandle
(
const
char
*
colName
,
int
n
)
{
DBUG_ENTER
(
"NdbEventOperationImpl::getBlobHandle (colName)"
);
assert
(
m_mergeEvents
);
if
(
m_state
!=
EO_CREATED
)
{
ndbout_c
(
"NdbEventOperationImpl::getBlobHandle may only be called between "
"instantiation and execute()"
);
DBUG_RETURN
(
NULL
);
}
NdbColumnImpl
*
tAttrInfo
=
m_eventImpl
->
m_tableImpl
->
getColumn
(
colName
);
if
(
tAttrInfo
==
NULL
)
{
ndbout_c
(
"NdbEventOperationImpl::getBlobHandle attribute %s not found"
,
colName
);
DBUG_RETURN
(
NULL
);
}
NdbBlob
*
bh
=
getBlobHandle
(
tAttrInfo
,
n
);
DBUG_RETURN
(
bh
);
}
NdbBlob
*
NdbEventOperationImpl
::
getBlobHandle
(
const
NdbColumnImpl
*
tAttrInfo
,
int
n
)
{
DBUG_ENTER
(
"NdbEventOperationImpl::getBlobHandle"
);
DBUG_PRINT
(
"info"
,
(
"attr=%s post/pre=%d"
,
tAttrInfo
->
m_name
.
c_str
(),
n
));
// as in NdbOperation, create only one instance
NdbBlob
*
tBlob
=
theBlobList
;
NdbBlob
*
tLastBlob
=
NULL
;
while
(
tBlob
!=
NULL
)
{
if
(
tBlob
->
theColumn
==
tAttrInfo
&&
tBlob
->
theEventBlobVersion
==
n
)
DBUG_RETURN
(
tBlob
);
tLastBlob
=
tBlob
;
tBlob
=
tBlob
->
theNext
;
}
NdbEventOperationImpl
*
tBlobOp
=
NULL
;
const
bool
is_tinyblob
=
(
tAttrInfo
->
getPartSize
()
==
0
);
assert
(
is_tinyblob
==
(
tAttrInfo
->
m_blobTable
==
NULL
));
if
(
!
is_tinyblob
)
{
// blob event name
char
bename
[
MAX_TAB_NAME_SIZE
];
NdbBlob
::
getBlobEventName
(
bename
,
m_eventImpl
,
tAttrInfo
);
// find blob event op if any (it serves both post and pre handles)
tBlobOp
=
theBlobOpList
;
NdbEventOperationImpl
*
tLastBlopOp
=
NULL
;
while
(
tBlobOp
!=
NULL
)
{
if
(
strcmp
(
tBlobOp
->
m_eventImpl
->
m_name
.
c_str
(),
bename
)
==
0
)
{
assert
(
tBlobOp
->
m_eventImpl
->
m_tableImpl
==
tAttrInfo
->
m_blobTable
);
break
;
}
tLastBlopOp
=
tBlobOp
;
tBlobOp
=
tBlobOp
->
m_next
;
}
DBUG_PRINT
(
"info"
,
(
"%s op %s"
,
tBlobOp
?
" reuse"
:
" create"
,
bename
));
// create blob event op if not found
if
(
tBlobOp
==
NULL
)
{
// to hide blob op it is linked under main op, not under m_ndb
NdbEventOperation
*
tmp
=
m_ndb
->
theEventBuffer
->
createEventOperation
(
bename
,
m_error
);
if
(
tmp
==
NULL
)
DBUG_RETURN
(
NULL
);
tBlobOp
=
&
tmp
->
m_impl
;
// pointer to main table op
tBlobOp
->
theMainOp
=
this
;
tBlobOp
->
m_mergeEvents
=
m_mergeEvents
;
// add to list end
if
(
tLastBlopOp
==
NULL
)
theBlobOpList
=
tBlobOp
;
else
tLastBlopOp
->
m_next
=
tBlobOp
;
tBlobOp
->
m_next
=
NULL
;
}
}
tBlob
=
m_ndb
->
getNdbBlob
();
if
(
tBlob
==
NULL
)
DBUG_RETURN
(
NULL
);
// calls getValue on inline and blob part
if
(
tBlob
->
atPrepare
(
this
,
tBlobOp
,
tAttrInfo
,
n
)
==
-
1
)
{
m_ndb
->
releaseNdbBlob
(
tBlob
);
DBUG_RETURN
(
NULL
);
}
// add to list end
if
(
tLastBlob
==
NULL
)
theBlobList
=
tBlob
;
else
tLastBlob
->
theNext
=
tBlob
;
tBlob
->
theNext
=
NULL
;
DBUG_RETURN
(
tBlob
);
}
int
NdbEventOperationImpl
::
readBlobParts
(
char
*
buf
,
NdbBlob
*
blob
,
Uint32
part
,
Uint32
count
)
{
DBUG_ENTER_EVENT
(
"NdbEventOperationImpl::readBlobParts"
);
DBUG_PRINT_EVENT
(
"info"
,
(
"part=%u count=%u post/pre=%d"
,
part
,
count
,
blob
->
theEventBlobVersion
));
NdbEventOperationImpl
*
blob_op
=
blob
->
theBlobEventOp
;
EventBufData
*
main_data
=
m_data_item
;
DBUG_PRINT_EVENT
(
"info"
,
(
"main_data=%p"
,
main_data
));
assert
(
main_data
!=
NULL
);
// search for blob parts list head
EventBufData
*
head
;
assert
(
m_data_item
!=
NULL
);
head
=
m_data_item
->
m_next_blob
;
while
(
head
!=
NULL
)
{
if
(
head
->
m_event_op
==
blob_op
)
{
DBUG_PRINT_EVENT
(
"info"
,
(
"found blob parts head %p"
,
head
));
break
;
}
head
=
head
->
m_next_blob
;
}
Uint32
nparts
=
0
;
EventBufData
*
data
=
head
;
// XXX optimize using part no ordering
while
(
data
!=
NULL
)
{
/*
* Hack part no directly out of buffer since it is not returned
* in pre data (PK buglet). For part data use receive_event().
* This means extra copy.
*/
blob_op
->
m_data_item
=
data
;
int
r
=
blob_op
->
receive_event
();
assert
(
r
>
0
);
Uint32
no
=
data
->
get_blob_part_no
();
Uint32
sz
=
blob
->
thePartSize
;
const
char
*
src
=
blob
->
theBlobEventDataBuf
.
data
;
DBUG_PRINT_EVENT
(
"info"
,
(
"part_data=%p part no=%u part sz=%u"
,
data
,
no
,
sz
));
if
(
part
<=
no
&&
no
<
part
+
count
)
{
DBUG_PRINT_EVENT
(
"info"
,
(
"part within read range"
));
memcpy
(
buf
+
(
no
-
part
)
*
sz
,
src
,
sz
);
nparts
++
;
}
else
{
DBUG_PRINT_EVENT
(
"info"
,
(
"part outside read range"
));
}
data
=
data
->
m_next
;
}
assert
(
nparts
==
count
);
DBUG_RETURN_EVENT
(
0
);
}
int
NdbEventOperationImpl
::
execute
()
{
DBUG_ENTER
(
"NdbEventOperationImpl::execute"
);
m_ndb
->
theEventBuffer
->
add_drop_lock
();
int
r
=
execute_nolock
();
m_ndb
->
theEventBuffer
->
add_drop_unlock
();
DBUG_RETURN
(
r
);
}
int
NdbEventOperationImpl
::
execute_nolock
()
{
DBUG_ENTER
(
"NdbEventOperationImpl::execute_nolock"
);
DBUG_PRINT
(
"info"
,
(
"this=%p type=%s"
,
this
,
!
theMainOp
?
"main"
:
"blob"
));
NdbDictionary
::
Dictionary
*
myDict
=
m_ndb
->
getDictionary
();
if
(
!
myDict
)
{
m_error
.
code
=
m_ndb
->
getNdbError
().
code
;
...
...
@@ -266,18 +471,26 @@ NdbEventOperationImpl::execute()
if
(
theFirstPkAttrs
[
0
]
==
NULL
&&
theFirstDataAttrs
[
0
]
==
NULL
)
{
// defaults to get all
}
m_ndb
->
theEventBuffer
->
add_drop_lock
();
m_magic_number
=
NDB_EVENT_OP_MAGIC_NUMBER
;
m_state
=
EO_EXECUTING
;
mi_type
=
m_eventImpl
->
mi_type
;
m_ndb
->
theEventBuffer
->
add_op
();
int
r
=
NdbDictionaryImpl
::
getImpl
(
*
myDict
).
executeSubscribeEvent
(
*
this
);
if
(
r
==
0
)
{
m_ndb
->
theEventBuffer
->
add_drop_unlock
();
DBUG_RETURN
(
0
);
if
(
theMainOp
==
NULL
)
{
DBUG_PRINT
(
"info"
,
(
"execute blob ops"
));
NdbEventOperationImpl
*
blob_op
=
theBlobOpList
;
while
(
blob_op
!=
NULL
)
{
r
=
blob_op
->
execute_nolock
();
if
(
r
!=
0
)
break
;
blob_op
=
blob_op
->
m_next
;
}
}
if
(
r
==
0
)
DBUG_RETURN
(
0
);
}
//Error
m_state
=
EO_ERROR
;
...
...
@@ -285,7 +498,6 @@ NdbEventOperationImpl::execute()
m_magic_number
=
0
;
m_error
.
code
=
myDict
->
getNdbError
().
code
;
m_ndb
->
theEventBuffer
->
remove_op
();
m_ndb
->
theEventBuffer
->
add_drop_unlock
();
DBUG_RETURN
(
r
);
}
...
...
@@ -709,21 +921,6 @@ NdbEventBuffer::pollEvents(int aMillisecondNumber, Uint64 *latestGCI)
return
ret
;
}
#ifdef VM_TRACE
static
void
print_std
(
const
char
*
tag
,
const
SubTableData
*
sdata
,
LinearSectionPtr
ptr
[
3
])
{
printf
(
"%s
\n
"
,
tag
);
printf
(
"addr=%p gci=%d op=%d
\n
"
,
(
void
*
)
sdata
,
sdata
->
gci
,
sdata
->
operation
);
for
(
int
i
=
0
;
i
<=
2
;
i
++
)
{
printf
(
"sec=%d addr=%p sz=%d
\n
"
,
i
,
(
void
*
)
ptr
[
i
].
p
,
ptr
[
i
].
sz
);
for
(
int
j
=
0
;
j
<
ptr
[
i
].
sz
;
j
++
)
printf
(
"%08x "
,
ptr
[
i
].
p
[
j
]);
printf
(
"
\n
"
);
}
}
#endif
NdbEventOperation
*
NdbEventBuffer
::
nextEvent
()
{
...
...
@@ -751,6 +948,10 @@ NdbEventBuffer::nextEvent()
while
((
data
=
m_available_data
.
m_head
))
{
NdbEventOperationImpl
*
op
=
data
->
m_event_op
;
DBUG_PRINT_EVENT
(
"info"
,
(
"available data=%p op=%p"
,
data
,
op
));
// blob table ops must not be seen at this level
assert
(
op
->
theMainOp
==
NULL
);
// set NdbEventOperation data
op
->
m_data_item
=
data
;
...
...
@@ -767,7 +968,10 @@ NdbEventBuffer::nextEvent()
// NUL event is not returned
if
(
data
->
sdata
->
operation
==
NdbDictionary
::
Event
::
_TE_NUL
)
{
DBUG_PRINT_EVENT
(
"info"
,
(
"skip _TE_NUL"
));
continue
;
}
int
r
=
op
->
receive_event
();
if
(
r
>
0
)
...
...
@@ -777,6 +981,12 @@ NdbEventBuffer::nextEvent()
#ifdef VM_TRACE
m_latest_command
=
m_latest_command_save
;
#endif
NdbBlob
*
tBlob
=
op
->
theBlobList
;
while
(
tBlob
!=
NULL
)
{
(
void
)
tBlob
->
atNextEvent
();
tBlob
=
tBlob
->
theNext
;
}
DBUG_RETURN_EVENT
(
op
->
m_facade
);
}
// the next event belonged to an event op that is no
...
...
@@ -1161,7 +1371,7 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_ENTER_EVENT
(
"NdbEventBuffer::insertDataL"
);
Uint64
gci
=
sdata
->
gci
;
if
(
likely
((
Uint32
)
op
->
mi_type
&
1
<<
(
Uint32
)
sdata
->
operation
)
)
if
(
likely
((
Uint32
)
op
->
mi_type
&
(
1
<<
(
Uint32
)
sdata
->
operation
)
)
)
{
Gci_container
*
bucket
=
find_bucket
(
&
m_active_gci
,
gci
);
...
...
@@ -1179,9 +1389,17 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_RETURN_EVENT
(
0
);
}
bool
use_hash
=
op
->
m_mergeEvents
&&
const
bool
is_blob_event
=
(
op
->
theMainOp
!=
NULL
);
const
bool
is_data_event
=
sdata
->
operation
<
NdbDictionary
::
Event
::
_TE_FIRST_NON_DATA_EVENT
;
const
bool
use_hash
=
op
->
m_mergeEvents
&&
is_data_event
;
if
(
!
is_data_event
&&
is_blob_event
)
{
// currently subscribed to but not used
DBUG_PRINT_EVENT
(
"info"
,
(
"ignore non-data event on blob table"
));
DBUG_RETURN_EVENT
(
0
);
}
// find position in bucket hash table
EventBufData
*
data
=
0
;
...
...
@@ -1201,16 +1419,43 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
op
->
m_has_error
=
2
;
DBUG_RETURN_EVENT
(
-
1
);
}
if
(
unlikely
(
copy_data
(
sdata
,
ptr
,
data
)))
{
op
->
m_has_error
=
3
;
DBUG_RETURN_EVENT
(
-
1
);
}
// add it to list and hash table
bucket
->
m_data
.
append
(
data
);
data
->
m_event_op
=
op
;
if
(
!
is_blob_event
||
!
is_data_event
)
{
bucket
->
m_data
.
append
(
data
);
}
else
{
// find or create main event for this blob event
EventBufData_hash
::
Pos
main_hpos
;
int
ret
=
get_main_data
(
bucket
,
main_hpos
,
data
);
if
(
ret
==
-
1
)
{
op
->
m_has_error
=
4
;
DBUG_RETURN_EVENT
(
-
1
);
}
EventBufData
*
main_data
=
main_hpos
.
data
;
if
(
ret
!=
0
)
// main event was created
{
main_data
->
m_event_op
=
op
->
theMainOp
;
bucket
->
m_data
.
append
(
main_data
);
if
(
use_hash
)
{
main_data
->
m_pkhash
=
main_hpos
.
pkhash
;
bucket
->
m_data_hash
.
append
(
main_hpos
,
main_data
);
}
}
// link blob event under main event
add_blob_data
(
main_data
,
data
);
}
if
(
use_hash
)
{
data
->
m_pkhash
=
hpos
.
pkhash
;
bucket
->
m_data_hash
.
append
(
hpos
,
data
);
}
#ifdef VM_TRACE
...
...
@@ -1226,18 +1471,12 @@ NdbEventBuffer::insertDataL(NdbEventOperationImpl *op,
DBUG_RETURN_EVENT
(
-
1
);
}
}
data
->
m_event_op
=
op
;
if
(
use_hash
)
{
data
->
m_pkhash
=
hpos
.
pkhash
;
}
DBUG_RETURN_EVENT
(
0
);
}
#ifdef VM_TRACE
if
((
Uint32
)
op
->
m_eventImpl
->
mi_type
&
1
<<
(
Uint32
)
sdata
->
operation
)
if
((
Uint32
)
op
->
m_eventImpl
->
mi_type
&
(
1
<<
(
Uint32
)
sdata
->
operation
)
)
{
// XXX never reached
DBUG_PRINT_EVENT
(
"info"
,(
"Data arrived before ready eventId"
,
op
->
m_eventId
));
DBUG_RETURN_EVENT
(
0
);
}
...
...
@@ -1300,6 +1539,8 @@ NdbEventBuffer::alloc_data()
int
NdbEventBuffer
::
alloc_mem
(
EventBufData
*
data
,
LinearSectionPtr
ptr
[
3
])
{
DBUG_ENTER
(
"NdbEventBuffer::alloc_mem"
);
DBUG_PRINT
(
"info"
,
(
"ptr sz %u + %u + %u"
,
ptr
[
0
].
sz
,
ptr
[
1
].
sz
,
ptr
[
2
].
sz
));
const
Uint32
min_alloc_size
=
128
;
Uint32
sz4
=
(
sizeof
(
SubTableData
)
+
3
)
>>
2
;
...
...
@@ -1317,7 +1558,7 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3])
data
->
memory
=
(
Uint32
*
)
NdbMem_Allocate
(
alloc_size
);
if
(
data
->
memory
==
0
)
return
-
1
;
DBUG_RETURN
(
-
1
)
;
data
->
sz
=
alloc_size
;
m_total_alloc
+=
data
->
sz
;
}
...
...
@@ -1332,7 +1573,7 @@ NdbEventBuffer::alloc_mem(EventBufData* data, LinearSectionPtr ptr[3])
memptr
+=
ptr
[
i
].
sz
;
}
return
0
;
DBUG_RETURN
(
0
)
;
}
int
...
...
@@ -1404,13 +1645,10 @@ copy_attr(AttributeHeader ah,
{
Uint32
k
;
for
(
k
=
0
;
k
<
n
;
k
++
)
p1
[
j1
++
]
=
p2
[
j2
++
];
}
else
{
j1
+=
n
;
j2
+=
n
;
p1
[
j1
+
k
]
=
p2
[
j2
+
k
];
}
j1
+=
n
;
j2
+=
n
;
}
int
...
...
@@ -1443,8 +1681,8 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
data
->
sz
=
0
;
// compose ptr1 o ptr2 = ptr
LinearSectionPtr
(
&
ptr1
)
[
3
]
=
olddata
.
ptr
;
LinearSectionPtr
(
&
ptr
)
[
3
]
=
data
->
ptr
;
LinearSectionPtr
(
&
ptr1
)[
3
]
=
olddata
.
ptr
;
LinearSectionPtr
(
&
ptr
)[
3
]
=
data
->
ptr
;
// loop twice where first loop only sets sizes
int
loop
;
...
...
@@ -1458,7 +1696,7 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
data
->
sdata
->
operation
=
tp
->
t3
;
}
ptr
[
0
].
sz
=
ptr
[
1
].
sz
=
ptr
[
3
].
sz
=
0
;
ptr
[
0
].
sz
=
ptr
[
1
].
sz
=
ptr
[
2
].
sz
=
0
;
// copy pk from new version
{
...
...
@@ -1572,6 +1810,113 @@ NdbEventBuffer::merge_data(const SubTableData * const sdata,
DBUG_RETURN_EVENT
(
0
);
}
/*
* Given blob part event, find main table event on inline part. It
* should exist (force in TUP) but may arrive later. If so, create
* NUL event on main table. The real event replaces it later.
*/
// write attribute headers for concatened PK
static
void
split_concatenated_pk
(
const
NdbTableImpl
*
t
,
Uint32
*
ah_buffer
,
const
Uint32
*
pk_buffer
,
Uint32
pk_sz
)
{
Uint32
sz
=
0
;
// words parsed so far
Uint32
n
;
// pk attr count
Uint32
i
;
for
(
i
=
n
=
0
;
i
<
t
->
m_columns
.
size
()
&&
n
<
t
->
m_noOfKeys
;
i
++
)
{
const
NdbColumnImpl
*
c
=
t
->
getColumn
(
i
);
assert
(
c
!=
NULL
);
if
(
!
c
->
m_pk
)
continue
;
assert
(
sz
<
pk_sz
);
Uint32
bytesize
=
c
->
m_attrSize
*
c
->
m_arraySize
;
Uint32
lb
,
len
;
bool
ok
=
NdbSqlUtil
::
get_var_length
(
c
->
m_type
,
&
pk_buffer
[
sz
],
bytesize
,
lb
,
len
);
assert
(
ok
);
AttributeHeader
ah
(
i
,
lb
+
len
);
ah_buffer
[
n
++
]
=
ah
.
m_value
;
sz
+=
ah
.
getDataSize
();
}
assert
(
n
==
t
->
m_noOfKeys
&&
sz
==
pk_sz
);
}
int
NdbEventBuffer
::
get_main_data
(
Gci_container
*
bucket
,
EventBufData_hash
::
Pos
&
hpos
,
EventBufData
*
blob_data
)
{
DBUG_ENTER_EVENT
(
"NdbEventBuffer::get_main_data"
);
NdbEventOperationImpl
*
main_op
=
blob_data
->
m_event_op
->
theMainOp
;
assert
(
main_op
!=
NULL
);
const
NdbTableImpl
*
mainTable
=
main_op
->
m_eventImpl
->
m_tableImpl
;
// create LinearSectionPtr for main table key
LinearSectionPtr
ptr
[
3
];
Uint32
ah_buffer
[
NDB_MAX_NO_OF_ATTRIBUTES_IN_KEY
];
ptr
[
0
].
sz
=
mainTable
->
m_noOfKeys
;
ptr
[
0
].
p
=
ah_buffer
;
ptr
[
1
].
sz
=
AttributeHeader
(
blob_data
->
ptr
[
0
].
p
[
0
]).
getDataSize
();
ptr
[
1
].
p
=
blob_data
->
ptr
[
1
].
p
;
ptr
[
2
].
sz
=
0
;
ptr
[
2
].
p
=
0
;
split_concatenated_pk
(
mainTable
,
ptr
[
0
].
p
,
ptr
[
1
].
p
,
ptr
[
1
].
sz
);
DBUG_DUMP_EVENT
(
"ah"
,
(
char
*
)
ptr
[
0
].
p
,
ptr
[
0
].
sz
<<
2
);
DBUG_DUMP_EVENT
(
"pk"
,
(
char
*
)
ptr
[
1
].
p
,
ptr
[
1
].
sz
<<
2
);
// search for main event buffer
bucket
->
m_data_hash
.
search
(
hpos
,
main_op
,
ptr
);
if
(
hpos
.
data
!=
NULL
)
DBUG_RETURN_EVENT
(
0
);
// not found, create a place-holder
EventBufData
*
main_data
=
alloc_data
();
if
(
main_data
==
NULL
)
DBUG_RETURN_EVENT
(
-
1
);
SubTableData
sdata
=
*
blob_data
->
sdata
;
sdata
.
tableId
=
main_op
->
m_eventImpl
->
m_tableImpl
->
m_id
;
sdata
.
operation
=
NdbDictionary
::
Event
::
_TE_NUL
;
if
(
copy_data
(
&
sdata
,
ptr
,
main_data
)
!=
0
)
DBUG_RETURN_EVENT
(
-
1
);
hpos
.
data
=
main_data
;
DBUG_RETURN_EVENT
(
1
);
}
void
NdbEventBuffer
::
add_blob_data
(
EventBufData
*
main_data
,
EventBufData
*
blob_data
)
{
DBUG_ENTER_EVENT
(
"NdbEventBuffer::add_blob_data"
);
DBUG_PRINT_EVENT
(
"info"
,
(
"main_data=%p blob_data=%p"
,
main_data
,
blob_data
));
EventBufData
*
head
;
head
=
main_data
->
m_next_blob
;
while
(
head
!=
NULL
)
{
if
(
head
->
m_event_op
==
blob_data
->
m_event_op
)
break
;
head
=
head
->
m_next_blob
;
}
if
(
head
==
NULL
)
{
head
=
blob_data
;
head
->
m_next_blob
=
main_data
->
m_next_blob
;
main_data
->
m_next_blob
=
head
;
}
else
{
blob_data
->
m_next
=
head
->
m_next
;
head
->
m_next
=
blob_data
;
}
DBUG_VOID_RETURN_EVENT
;
}
NdbEventOperationImpl
*
NdbEventBuffer
::
move_data
()
...
...
@@ -1613,6 +1958,31 @@ NdbEventBuffer::free_list(EventBufData_list &list)
#endif
m_free_data_sz
+=
list
.
m_sz
;
// free blobs XXX unacceptable performance, fix later
{
EventBufData
*
data
=
list
.
m_head
;
while
(
1
)
{
while
(
data
->
m_next_blob
!=
NULL
)
{
EventBufData
*
blob_head
=
data
->
m_next_blob
;
data
->
m_next_blob
=
blob_head
->
m_next_blob
;
blob_head
->
m_next_blob
=
NULL
;
while
(
blob_head
!=
NULL
)
{
EventBufData
*
blob_part
=
blob_head
;
blob_head
=
blob_head
->
m_next
;
blob_part
->
m_next
=
m_free_data
;
m_free_data
=
blob_part
;
#ifdef VM_TRACE
m_free_data_count
++
;
#endif
m_free_data_sz
+=
blob_part
->
sz
;
}
}
if
(
data
==
list
.
m_tail
)
break
;
data
=
data
->
m_next
;
}
}
// list returned to m_free_data
new
(
&
list
)
EventBufData_list
;
}
...
...
@@ -1648,6 +2018,17 @@ NdbEventBuffer::dropEventOperation(NdbEventOperation* tOp)
if
(
m_dropped_ev_op
)
m_dropped_ev_op
->
m_prev
=
op
;
m_dropped_ev_op
=
op
;
// stop blob event ops
if
(
op
->
theMainOp
==
NULL
)
{
NdbEventOperationImpl
*
tBlobOp
=
op
->
theBlobOpList
;
while
(
tBlobOp
!=
NULL
)
{
tBlobOp
->
stop
();
tBlobOp
=
tBlobOp
->
m_next
;
}
}
// ToDo, take care of these to be deleted at the
// appropriate time, after we are sure that there
...
...
@@ -1717,6 +2098,10 @@ NdbEventBuffer::reportStatus()
Uint32
EventBufData_hash
::
getpkhash
(
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr
[
3
])
{
DBUG_ENTER_EVENT
(
"EventBufData_hash::getpkhash"
);
DBUG_DUMP_EVENT
(
"ah"
,
(
char
*
)
ptr
[
0
].
p
,
ptr
[
0
].
sz
<<
2
);
DBUG_DUMP_EVENT
(
"pk"
,
(
char
*
)
ptr
[
1
].
p
,
ptr
[
1
].
sz
<<
2
);
const
NdbTableImpl
*
tab
=
op
->
m_eventImpl
->
m_tableImpl
;
// in all cases ptr[0] = pk ah.. ptr[1] = pk ad..
...
...
@@ -1747,13 +2132,19 @@ EventBufData_hash::getpkhash(NdbEventOperationImpl* op, LinearSectionPtr ptr[3])
(
*
cs
->
coll
->
hash_sort
)(
cs
,
dptr
+
lb
,
len
,
&
nr1
,
&
nr2
);
dptr
+=
((
bytesize
+
3
)
/
4
)
*
4
;
}
return
nr1
;
DBUG_PRINT_EVENT
(
"info"
,
(
"hash result=%08x"
,
nr1
));
DBUG_RETURN_EVENT
(
nr1
);
}
// this is seldom invoked
bool
EventBufData_hash
::
getpkequal
(
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr1
[
3
],
LinearSectionPtr
ptr2
[
3
])
{
DBUG_ENTER_EVENT
(
"EventBufData_hash::getpkequal"
);
DBUG_DUMP_EVENT
(
"ah1"
,
(
char
*
)
ptr1
[
0
].
p
,
ptr1
[
0
].
sz
<<
2
);
DBUG_DUMP_EVENT
(
"pk1"
,
(
char
*
)
ptr1
[
1
].
p
,
ptr1
[
1
].
sz
<<
2
);
DBUG_DUMP_EVENT
(
"ah2"
,
(
char
*
)
ptr2
[
0
].
p
,
ptr2
[
0
].
sz
<<
2
);
DBUG_DUMP_EVENT
(
"pk2"
,
(
char
*
)
ptr2
[
1
].
p
,
ptr2
[
1
].
sz
<<
2
);
const
NdbTableImpl
*
tab
=
op
->
m_eventImpl
->
m_tableImpl
;
Uint32
nkey
=
tab
->
m_noOfKeys
;
...
...
@@ -1763,6 +2154,8 @@ EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3
const
uchar
*
dptr1
=
(
uchar
*
)
ptr1
[
1
].
p
;
const
uchar
*
dptr2
=
(
uchar
*
)
ptr2
[
1
].
p
;
bool
equal
=
true
;
while
(
nkey
--
!=
0
)
{
AttributeHeader
ah1
(
*
hptr1
++
);
...
...
@@ -1787,16 +2180,22 @@ EventBufData_hash::getpkequal(NdbEventOperationImpl* op, LinearSectionPtr ptr1[3
CHARSET_INFO
*
cs
=
col
->
m_cs
?
col
->
m_cs
:
&
my_charset_bin
;
int
res
=
(
cs
->
coll
->
strnncollsp
)(
cs
,
dptr1
+
lb1
,
len1
,
dptr2
+
lb2
,
len2
,
false
);
if
(
res
!=
0
)
return
false
;
{
equal
=
false
;
break
;
}
dptr1
+=
((
bytesize1
+
3
)
/
4
)
*
4
;
dptr2
+=
((
bytesize2
+
3
)
/
4
)
*
4
;
}
return
true
;
DBUG_PRINT_EVENT
(
"info"
,
(
"equal=%s"
,
equal
?
"true"
:
"false"
));
DBUG_RETURN_EVENT
(
equal
);
}
void
EventBufData_hash
::
search
(
Pos
&
hpos
,
NdbEventOperationImpl
*
op
,
LinearSectionPtr
ptr
[
3
])
{
DBUG_ENTER_EVENT
(
"EventBufData_hash::search"
);
Uint32
pkhash
=
getpkhash
(
op
,
ptr
);
Uint32
index
=
(
op
->
m_oid
^
pkhash
)
%
GCI_EVENT_HASH_SIZE
;
EventBufData
*
data
=
m_hash
[
index
];
...
...
@@ -1811,6 +2210,8 @@ EventBufData_hash::search(Pos& hpos, NdbEventOperationImpl* op, LinearSectionPtr
hpos
.
index
=
index
;
hpos
.
data
=
data
;
hpos
.
pkhash
=
pkhash
;
DBUG_PRINT_EVENT
(
"info"
,
(
"search result=%p"
,
data
));
DBUG_VOID_RETURN_EVENT
;
}
template
class
Vector
<
Gci_container
>;
...
...
storage/ndb/src/ndbapi/NdbEventOperationImpl.hpp
View file @
93ceddfe
...
...
@@ -21,6 +21,7 @@
#include <signaldata/SumaImpl.hpp>
#include <transporter/TransporterDefinitions.hpp>
#include <NdbRecAttr.hpp>
#include <AttributeHeader.hpp>
#define NDB_EVENT_OP_MAGIC_NUMBER 0xA9F301B4
...
...
@@ -35,9 +36,28 @@ struct EventBufData
LinearSectionPtr
ptr
[
3
];
unsigned
sz
;
NdbEventOperationImpl
*
m_event_op
;
EventBufData
*
m_next
;
// Next wrt to global order
/*
* Blobs are stored in blob list (m_next_blob) where each entry
* is list of parts (m_next) in part number order.
*
* TODO order by part no and link for fast read and free_list
*/
EventBufData
*
m_next
;
// Next wrt to global order or Next blob part
EventBufData
*
m_next_blob
;
// First part in next blob
EventBufData
*
m_next_hash
;
// Next in per-GCI hash
Uint32
m_pkhash
;
// PK hash (without op) for fast compare
// Get blob part number from blob data
Uint32
get_blob_part_no
()
{
assert
(
ptr
[
0
].
sz
>
2
);
Uint32
pos
=
AttributeHeader
(
ptr
[
0
].
p
[
0
]).
getDataSize
()
+
AttributeHeader
(
ptr
[
0
].
p
[
1
]).
getDataSize
();
Uint32
no
=
ptr
[
1
].
p
[
pos
];
return
no
;
}
};
class
EventBufData_list
...
...
@@ -70,7 +90,6 @@ EventBufData_list::~EventBufData_list()
{
}
inline
int
EventBufData_list
::
is_empty
()
{
...
...
@@ -173,9 +192,13 @@ public:
NdbEventOperation
::
State
getState
();
int
execute
();
int
execute_nolock
();
int
stop
();
NdbRecAttr
*
getValue
(
const
char
*
colName
,
char
*
aValue
,
int
n
);
NdbRecAttr
*
getValue
(
const
NdbColumnImpl
*
,
char
*
aValue
,
int
n
);
NdbBlob
*
getBlobHandle
(
const
char
*
colName
,
int
n
);
NdbBlob
*
getBlobHandle
(
const
NdbColumnImpl
*
,
int
n
);
int
readBlobParts
(
char
*
buf
,
NdbBlob
*
blob
,
Uint32
part
,
Uint32
count
);
int
receive_event
();
Uint64
getGCI
();
Uint64
getLatestGCI
();
...
...
@@ -199,6 +222,10 @@ public:
NdbRecAttr
*
theFirstDataAttrs
[
2
];
NdbRecAttr
*
theCurrentDataAttrs
[
2
];
NdbBlob
*
theBlobList
;
NdbEventOperationImpl
*
theBlobOpList
;
// in main op, list of blob ops
NdbEventOperationImpl
*
theMainOp
;
// in blob op, the main op
NdbEventOperation
::
State
m_state
;
/* note connection to mi_type */
Uint32
mi_type
;
/* should be == 0 if m_state != EO_EXECUTING
* else same as in EventImpl
...
...
@@ -275,6 +302,11 @@ public:
int
merge_data
(
const
SubTableData
*
const
sdata
,
LinearSectionPtr
ptr
[
3
],
EventBufData
*
data
);
int
get_main_data
(
Gci_container
*
bucket
,
EventBufData_hash
::
Pos
&
hpos
,
EventBufData
*
blob_data
);
void
add_blob_data
(
EventBufData
*
main_data
,
EventBufData
*
blob_data
);
void
free_list
(
EventBufData_list
&
list
);
...
...
storage/ndb/test/ndbapi/test_event_merge.cpp
View file @
93ceddfe
...
...
@@ -21,14 +21,7 @@
#include <my_sys.h>
#include <ndb_version.h>
#if NDB_VERSION_D < MAKE_VERSION(5, 1, 0)
#define version50
#else
#undef version50
#endif
// until rbr in 5.1
#undef version51rbr
// version >= 5.1 required
#if !defined(min) || !defined(max)
#define min(x, y) ((x) < (y) ? (x) : (y))
...
...
@@ -57,11 +50,11 @@
* There are other -no-* options, each added to isolate a specific bug.
*
* There are 5 ways (ignoring NUL operand) to compose 2 ops:
*
5.0 bugs 5.1 bugs
*
* INS o DEL = NUL
* INS o UPD = INS
type=INS
* DEL o INS = UPD
type=INS type=INS
* UPD o DEL = DEL
no event
* INS o UPD = INS
* DEL o INS = UPD
* UPD o DEL = DEL
* UPD o UPD = UPD
*/
...
...
@@ -73,17 +66,19 @@ struct Opts {
uint
maxpk
;
my_bool
no_blobs
;
my_bool
no_implicit_nulls
;
my_bool
no_missing_update
;
my_bool
no_multiops
;
my_bool
no_nulls
;
my_bool
one_blob
;
const
char
*
opstring
;
uint
seed
;
my_bool
separate_events
;
uint
tweak
;
// whatever's useful
my_bool
use_table
;
};
static
Opts
g_opts
;
static
const
uint
g_maxpk
=
100
;
static
const
uint
g_maxpk
=
100
0
;
static
const
uint
g_maxopstringpart
=
100
;
static
const
char
*
g_opstringpart
[
g_maxopstringpart
];
static
uint
g_opstringparts
=
0
;
...
...
@@ -208,7 +203,9 @@ struct Col {
uint
length
;
uint
size
;
bool
isblob
()
const
{
return
type
==
NdbDictionary
::
Column
::
Text
;
return
type
==
NdbDictionary
::
Column
::
Text
||
type
==
NdbDictionary
::
Column
::
Blob
;
}
};
...
...
@@ -218,19 +215,21 @@ static Col g_col[] = {
{
2
,
"seq"
,
NdbDictionary
::
Column
::
Unsigned
,
false
,
false
,
1
,
4
},
{
3
,
"cc1"
,
NdbDictionary
::
Column
::
Char
,
false
,
true
,
g_charlen
,
g_charlen
},
{
4
,
"tx1"
,
NdbDictionary
::
Column
::
Text
,
false
,
true
,
0
,
0
},
{
5
,
"tx2"
,
NdbDictionary
::
Column
::
Text
,
false
,
true
,
0
,
0
}
{
5
,
"tx2"
,
NdbDictionary
::
Column
::
Text
,
false
,
true
,
0
,
0
},
{
6
,
"bl1"
,
NdbDictionary
::
Column
::
Blob
,
false
,
true
,
0
,
0
}
// tinyblob
};
static
const
uint
g_maxcol
=
sizeof
(
g_col
)
/
sizeof
(
g_col
[
0
]);
static
const
uint
g_blobcols
=
3
;
static
uint
ncol
()
{
uint
n
=
g_maxcol
;
if
(
g_opts
.
no_blobs
)
n
-=
2
;
n
-=
g_blobcols
;
else
if
(
g_opts
.
one_blob
)
n
-=
1
;
n
-=
(
g_blobcols
-
1
)
;
return
n
;
}
...
...
@@ -283,6 +282,11 @@ createtable()
col
.
setStripeSize
(
g_blobstripesize
);
col
.
setCharset
(
cs
);
break
;
case
NdbDictionary
:
:
Column
::
Blob
:
col
.
setInlineSize
(
g_blobinlinesize
);
col
.
setPartSize
(
0
);
col
.
setStripeSize
(
0
);
break
;
default:
assert
(
false
);
break
;
...
...
@@ -337,6 +341,7 @@ struct Data {
char
cc1
[
g_charlen
+
1
];
Txt
tx1
;
Txt
tx2
;
Txt
bl1
;
Ptr
ptr
[
g_maxcol
];
int
ind
[
g_maxcol
];
// -1 = no data, 1 = NULL, 0 = not NULL
uint
noop
;
// bit: omit in NdbOperation (implicit NULL INS or no UPD)
...
...
@@ -347,14 +352,15 @@ struct Data {
memset
(
pk2
,
0
,
sizeof
(
pk2
));
seq
=
0
;
memset
(
cc1
,
0
,
sizeof
(
cc1
));
tx1
.
val
=
tx2
.
val
=
0
;
tx1
.
len
=
tx2
.
len
=
0
;
tx1
.
val
=
tx2
.
val
=
bl1
.
val
=
0
;
tx1
.
len
=
tx2
.
len
=
bl1
.
len
=
0
;
ptr
[
0
].
u32
=
&
pk1
;
ptr
[
1
].
ch
=
pk2
;
ptr
[
2
].
u32
=
&
seq
;
ptr
[
3
].
ch
=
cc1
;
ptr
[
4
].
txt
=
&
tx1
;
ptr
[
5
].
txt
=
&
tx2
;
ptr
[
6
].
txt
=
&
bl1
;
for
(
i
=
0
;
i
<
g_maxcol
;
i
++
)
ind
[
i
]
=
-
1
;
noop
=
0
;
...
...
@@ -363,6 +369,7 @@ struct Data {
void
free
()
{
delete
[]
tx1
.
val
;
delete
[]
tx2
.
val
;
delete
[]
bl1
.
val
;
init
();
}
};
...
...
@@ -384,6 +391,7 @@ cmpcol(const Col& c, const Data& d1, const Data& d2)
return
1
;
break
;
case
NdbDictionary
:
:
Column
::
Text
:
case
NdbDictionary
:
:
Column
::
Blob
:
{
const
Data
::
Txt
&
t1
=
*
d1
.
ptr
[
i
].
txt
;
const
Data
::
Txt
&
t2
=
*
d2
.
ptr
[
i
].
txt
;
...
...
@@ -434,6 +442,7 @@ operator<<(NdbOut& out, const Data& d)
}
break
;
case
NdbDictionary
:
:
Column
::
Text
:
case
NdbDictionary
:
:
Column
::
Blob
:
{
Data
::
Txt
&
t
=
*
d
.
ptr
[
i
].
txt
;
bool
first
=
true
;
...
...
@@ -712,6 +721,20 @@ checkop(const Op* op, Uint32& pk1)
if
(
!
c
.
nullable
)
{
chkrc
(
ind0
<=
0
&&
ind1
<=
0
);
}
if
(
c
.
isblob
())
{
// blob values must be from allowed chars
int
j
;
for
(
j
=
0
;
j
<
2
;
j
++
)
{
const
Data
&
d
=
op
->
data
[
j
];
if
(
d
.
ind
[
i
]
==
0
)
{
const
Data
::
Txt
&
t
=
*
d
.
ptr
[
i
].
txt
;
int
k
;
for
(
k
=
0
;
k
<
t
.
len
;
k
++
)
{
chkrc
(
strchr
(
g_charval
,
t
.
val
[
k
])
!=
0
);
}
}
}
}
}
return
0
;
}
...
...
@@ -849,9 +872,8 @@ createevent()
const
Col
&
c
=
g_col
[
i
];
evt
.
addEventColumn
(
c
.
name
);
}
#ifdef version51rbr
evt
.
setReport
(
NdbDictionary
::
Event
::
ER_UPDATED
);
evt
.
mergeEvents
(
!
g_opts
.
separate_events
);
#endif
if
(
g_dic
->
getEvent
(
evt
.
getName
())
!=
0
)
chkdb
(
g_dic
->
dropEvent
(
evt
.
getName
())
==
0
);
chkdb
(
g_dic
->
createEvent
(
evt
)
==
0
);
...
...
@@ -875,14 +897,8 @@ static int
createeventop
()
{
ll1
(
"createeventop"
);
#ifdef version50
uint
bsz
=
10
*
g_opts
.
maxops
;
chkdb
((
g_evt_op
=
g_ndb
->
createEventOperation
(
g_evt
->
getName
(),
bsz
))
!=
0
);
#else
chkdb
((
g_evt_op
=
g_ndb
->
createEventOperation
(
g_evt
->
getName
()))
!=
0
);
// available in gci merge changeset
g_evt_op
->
mergeEvents
(
!
g_opts
.
separate_events
);
// not yet inherited
#endif
uint
i
;
for
(
i
=
0
;
i
<
ncol
();
i
++
)
{
const
Col
&
c
=
g_col
[
i
];
...
...
@@ -891,10 +907,8 @@ createeventop()
chkdb
((
g_ev_ra
[
0
][
i
]
=
g_evt_op
->
getValue
(
c
.
name
,
(
char
*
)
d
[
0
].
ptr
[
i
].
v
))
!=
0
);
chkdb
((
g_ev_ra
[
1
][
i
]
=
g_evt_op
->
getPreValue
(
c
.
name
,
(
char
*
)
d
[
1
].
ptr
[
i
].
v
))
!=
0
);
}
else
{
#ifdef version51rbr
chkdb
((
g_ev_bh
[
0
][
i
]
=
g_evt_op
->
getBlobHandle
(
c
.
name
))
!=
0
);
chkdb
((
g_ev_bh
[
1
][
i
]
=
g_evt_op
->
getPreBlobHandle
(
c
.
name
))
!=
0
);
#endif
}
}
return
0
;
...
...
@@ -909,10 +923,10 @@ dropeventop()
return
0
;
}
// wait for event to be installed and for GCIs to pass
static
int
waitgci
(
)
// wait for event to be installed and for at least 1 GCI to pass
waitgci
(
uint
ngci
)
{
const
uint
ngci
=
3
;
ll1
(
"waitgci "
<<
ngci
);
Uint32
gci
[
2
];
uint
i
=
0
;
...
...
@@ -976,7 +990,6 @@ scantab()
if
(
!
c
.
isblob
())
{
ind
=
ra
[
i
]
->
isNULL
();
}
else
{
#ifdef version51rbr
int
ret
;
ret
=
bh
[
i
]
->
getDefined
(
ind
);
assert
(
ret
==
0
);
...
...
@@ -992,8 +1005,10 @@ scantab()
Uint32
len
=
t
.
len
;
ret
=
bh
[
i
]
->
readData
(
t
.
val
,
len
);
assert
(
ret
==
0
&&
len
==
t
.
len
);
// to see the data, have to execute...
chkdb
(
g_con
->
execute
(
NoCommit
)
==
0
);
assert
(
memchr
(
t
.
val
,
'X'
,
t
.
len
)
==
0
);
}
#endif
}
assert
(
ind
>=
0
);
d0
.
ind
[
i
]
=
ind
;
...
...
@@ -1042,7 +1057,7 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t)
}
else
if
(
t
==
Op
::
INS
&&
!
g_opts
.
no_implicit_nulls
&&
c
.
nullable
&&
urandom
(
10
,
100
))
{
d
.
noop
|=
(
1
<<
i
);
d
.
ind
[
i
]
=
1
;
// implicit NULL value is known
}
else
if
(
t
==
Op
::
UPD
&&
urandom
(
10
,
100
))
{
}
else
if
(
t
==
Op
::
UPD
&&
!
g_opts
.
no_missing_update
&&
urandom
(
10
,
100
))
{
d
.
noop
|=
(
1
<<
i
);
d
.
ind
[
i
]
=
-
1
;
// fixed up in caller
}
else
if
(
!
g_opts
.
no_nulls
&&
c
.
nullable
&&
urandom
(
10
,
100
))
{
...
...
@@ -1060,6 +1075,8 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t)
{
char
*
p
=
d
.
ptr
[
i
].
ch
;
uint
u
=
urandom
(
g_charlen
);
if
(
u
==
0
)
u
=
urandom
(
g_charlen
);
// 2x bias for non-empty
uint
j
;
for
(
j
=
0
;
j
<
g_charlen
;
j
++
)
{
uint
v
=
urandom
(
strlen
(
g_charval
));
...
...
@@ -1068,12 +1085,23 @@ makedata(const Col& c, Data& d, Uint32 pk1, Op::Type t)
}
break
;
case
NdbDictionary
:
:
Column
::
Text
:
case
NdbDictionary
:
:
Column
::
Blob
:
{
const
bool
tinyblob
=
(
c
.
type
==
NdbDictionary
::
Column
::
Blob
);
Data
::
Txt
&
t
=
*
d
.
ptr
[
i
].
txt
;
uint
u
=
urandom
(
g_maxblobsize
);
delete
[]
t
.
val
;
t
.
val
=
0
;
if
(
g_opts
.
tweak
&
1
)
{
uint
u
=
g_blobinlinesize
+
(
tinyblob
?
0
:
g_blobpartsize
);
uint
v
=
(
g_opts
.
tweak
&
2
)
?
0
:
urandom
(
strlen
(
g_charval
));
t
.
val
=
new
char
[
u
];
t
.
len
=
u
;
memset
(
t
.
val
,
g_charval
[
v
],
u
);
break
;
}
uint
u
=
urandom
(
tinyblob
?
g_blobinlinesize
:
g_maxblobsize
);
u
=
urandom
(
u
);
// 4x bias for smaller blobs
u
=
urandom
(
u
);
delete
[]
t
.
val
;
t
.
val
=
new
char
[
u
];
t
.
len
=
u
;
uint
j
=
0
;
...
...
@@ -1134,9 +1162,15 @@ makeops()
{
ll1
(
"makeops"
);
Uint32
pk1
=
0
;
while
(
g_usedops
<
g_opts
.
maxops
&&
pk1
<
g_opts
.
maxpk
)
{
if
(
g_opts
.
opstring
==
0
)
while
(
1
)
{
if
(
g_opts
.
opstring
==
0
)
{
if
(
g_usedops
>=
g_opts
.
maxops
)
// use up ops
break
;
pk1
=
urandom
(
g_opts
.
maxpk
);
}
else
{
if
(
pk1
>=
g_opts
.
maxpk
)
// use up pks
break
;
}
ll2
(
"makeops: pk1="
<<
pk1
);
// total op on the pk so far
// optype either NUL=initial/deleted or INS=created
...
...
@@ -1465,7 +1499,7 @@ matchevent(Op* ev)
}
if
(
tmpok
)
{
ok
=
gci_op
->
match
=
true
;
ll2
(
"
===:
match"
);
ll2
(
"match"
);
}
}
pos
++
;
...
...
@@ -1555,7 +1589,6 @@ geteventdata()
NdbRecAttr
*
ra
=
g_ev_ra
[
j
][
i
];
ind
=
ra
->
isNULL
();
}
else
{
#ifdef version51rbr
NdbBlob
*
bh
=
g_ev_bh
[
j
][
i
];
ret
=
bh
->
getDefined
(
ind
);
assert
(
ret
==
0
);
...
...
@@ -1572,7 +1605,6 @@ geteventdata()
ret
=
bh
->
readData
(
t
.
val
,
len
);
assert
(
ret
==
0
&&
len
==
t
.
len
);
}
#endif
}
d
[
j
].
ind
[
i
]
=
ind
;
}
...
...
@@ -1585,38 +1617,22 @@ runevents()
ll1
(
"runevents"
);
uint
mspoll
=
1000
;
uint
npoll
=
6
;
// strangely long delay
ll1
(
"poll "
<<
npoll
);
while
(
npoll
!=
0
)
{
npoll
--
;
int
ret
;
ll1
(
"poll"
);
ret
=
g_ndb
->
pollEvents
(
mspoll
);
if
(
ret
<=
0
)
continue
;
while
(
1
)
{
g_rec_ev
->
init
(
Op
::
EV
);
#ifdef version50
int
overrun
=
g_opts
.
maxops
;
chkdb
((
ret
=
g_evt_op
->
next
(
&
overrun
))
>=
0
);
chkrc
(
overrun
==
0
);
if
(
ret
==
0
)
break
;
#else
NdbEventOperation
*
tmp_op
=
g_ndb
->
nextEvent
();
if
(
tmp_op
==
0
)
break
;
reqrc
(
g_evt_op
==
tmp_op
);
#endif
chkrc
(
seteventtype
(
g_rec_ev
,
g_evt_op
->
getEventType
())
==
0
);
geteventdata
();
g_rec_ev
->
gci
=
g_evt_op
->
getGCI
();
#ifdef version50
// fix to match 5.1
if
(
g_rec_ev
->
type
==
Op
::
UPD
)
{
Uint32
pk1
=
g_rec_ev
->
data
[
0
].
pk1
;
makedata
(
getcol
(
"pk1"
),
g_rec_ev
->
data
[
1
],
pk1
,
Op
::
UPD
);
makedata
(
getcol
(
"pk2"
),
g_rec_ev
->
data
[
1
],
pk1
,
Op
::
UPD
);
}
#endif
// get indicators and blob value
ll2
(
"runevents: EVT: "
<<
*
g_rec_ev
);
// check basic sanity
...
...
@@ -1667,7 +1683,7 @@ runtest()
chkrc
(
createtable
()
==
0
);
chkrc
(
createevent
()
==
0
);
for
(
g_loop
=
0
;
g_opts
.
loop
==
0
||
g_loop
<
g_opts
.
loop
;
g_loop
++
)
{
ll0
(
"
loop "
<<
g_loop
);
ll0
(
"
=== loop "
<<
g_loop
<<
" ==="
);
setseed
(
g_loop
);
resetmem
();
chkrc
(
scantab
()
==
0
);
// alternative: save tot_op for loop > 0
...
...
@@ -1675,7 +1691,7 @@ runtest()
g_rec_ev
=
getop
(
Op
::
EV
);
chkrc
(
createeventop
()
==
0
);
chkdb
(
g_evt_op
->
execute
()
==
0
);
chkrc
(
waitgci
()
==
0
);
chkrc
(
waitgci
(
3
)
==
0
);
chkrc
(
runops
()
==
0
);
if
(
!
g_opts
.
separate_events
)
chkrc
(
mergeops
()
==
0
);
...
...
@@ -1685,6 +1701,8 @@ runtest()
chkrc
(
matchevents
()
==
0
);
chkrc
(
matchops
()
==
0
);
chkrc
(
dropeventop
()
==
0
);
// time erases everything..
chkrc
(
waitgci
(
1
)
==
0
);
}
chkrc
(
dropevent
()
==
0
);
chkrc
(
droptable
()
==
0
);
...
...
@@ -1703,41 +1721,48 @@ my_long_options[] =
{
"loglevel"
,
1002
,
"Logging level in this program (default 0)"
,
(
gptr
*
)
&
g_opts
.
loglevel
,
(
gptr
*
)
&
g_opts
.
loglevel
,
0
,
GET_INT
,
REQUIRED_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"loop"
,
1003
,
"Number of test loops (default
2
, 0=forever)"
,
{
"loop"
,
1003
,
"Number of test loops (default
3
, 0=forever)"
,
(
gptr
*
)
&
g_opts
.
loop
,
(
gptr
*
)
&
g_opts
.
loop
,
0
,
GET_INT
,
REQUIRED_ARG
,
2
,
0
,
0
,
0
,
0
,
0
},
GET_INT
,
REQUIRED_ARG
,
3
,
0
,
0
,
0
,
0
,
0
},
{
"maxops"
,
1004
,
"Approx number of PK operations (default 1000)"
,
(
gptr
*
)
&
g_opts
.
maxops
,
(
gptr
*
)
&
g_opts
.
maxops
,
0
,
GET_UINT
,
REQUIRED_ARG
,
1000
,
0
,
0
,
0
,
0
,
0
},
{
"maxpk"
,
1005
,
"Number of different PK values (default 10)"
,
(
gptr
*
)
&
g_opts
.
maxpk
,
(
gptr
*
)
&
g_opts
.
maxpk
,
0
,
GET_UINT
,
REQUIRED_ARG
,
10
,
1
,
g_maxpk
,
0
,
0
,
0
},
GET_UINT
,
REQUIRED_ARG
,
10
,
0
,
0
,
0
,
0
,
0
},
{
"no-blobs"
,
1006
,
"Omit blob attributes (5.0: true)"
,
(
gptr
*
)
&
g_opts
.
no_blobs
,
(
gptr
*
)
&
g_opts
.
no_blobs
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"no-implicit-nulls"
,
1007
,
"Insert must include NULL values explicitly"
,
{
"no-implicit-nulls"
,
1007
,
"Insert must include all attrs"
" i.e. no implicit NULLs"
,
(
gptr
*
)
&
g_opts
.
no_implicit_nulls
,
(
gptr
*
)
&
g_opts
.
no_implicit_nulls
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"no-multiops"
,
1008
,
"Allow only 1 operation per commit"
,
{
"no-missing-update"
,
1008
,
"Update must include all non-PK attrs"
,
(
gptr
*
)
&
g_opts
.
no_missing_update
,
(
gptr
*
)
&
g_opts
.
no_missing_update
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"no-multiops"
,
1009
,
"Allow only 1 operation per commit"
,
(
gptr
*
)
&
g_opts
.
no_multiops
,
(
gptr
*
)
&
g_opts
.
no_multiops
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"no-nulls"
,
10
09
,
"Create no NULL values"
,
{
"no-nulls"
,
10
10
,
"Create no NULL values"
,
(
gptr
*
)
&
g_opts
.
no_nulls
,
(
gptr
*
)
&
g_opts
.
no_nulls
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"one-blob"
,
101
0
,
"Only one blob attribute (defaut
t 2)"
,
{
"one-blob"
,
101
1
,
"Only one blob attribute (defaul
t 2)"
,
(
gptr
*
)
&
g_opts
.
one_blob
,
(
gptr
*
)
&
g_opts
.
one_blob
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"opstring"
,
101
1
,
"Operations to run e.g. idiucdc (c is commit) or"
{
"opstring"
,
101
2
,
"Operations to run e.g. idiucdc (c is commit) or"
" iuuc:uudc (the : separates loops)"
,
(
gptr
*
)
&
g_opts
.
opstring
,
(
gptr
*
)
&
g_opts
.
opstring
,
0
,
GET_STR_ALLOC
,
REQUIRED_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"seed"
,
101
2
,
"Random seed (0=loop number, default -1=random)"
,
{
"seed"
,
101
3
,
"Random seed (0=loop number, default -1=random)"
,
(
gptr
*
)
&
g_opts
.
seed
,
(
gptr
*
)
&
g_opts
.
seed
,
0
,
GET_INT
,
REQUIRED_ARG
,
-
1
,
0
,
0
,
0
,
0
,
0
},
{
"separate-events"
,
101
3
,
"Do not combine events per GCI (5.0: true)"
,
{
"separate-events"
,
101
4
,
"Do not combine events per GCI (5.0: true)"
,
(
gptr
*
)
&
g_opts
.
separate_events
,
(
gptr
*
)
&
g_opts
.
separate_events
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"use-table"
,
1014
,
"Use existing table 'tem1'"
,
{
"tweak"
,
1015
,
"Whatever the source says"
,
(
gptr
*
)
&
g_opts
.
tweak
,
(
gptr
*
)
&
g_opts
.
tweak
,
0
,
GET_UINT
,
REQUIRED_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
"use-table"
,
1016
,
"Use existing table 'tem1'"
,
(
gptr
*
)
&
g_opts
.
use_table
,
(
gptr
*
)
&
g_opts
.
use_table
,
0
,
GET_BOOL
,
NO_ARG
,
0
,
0
,
0
,
0
,
0
,
0
},
{
0
,
0
,
0
,
...
...
@@ -1754,9 +1779,10 @@ usage()
static
int
checkopts
()
{
#ifdef version50
g_opts
.
separate_events
=
true
;
#endif
if
(
g_opts
.
maxpk
>
g_maxpk
)
{
ll0
(
"setting maxpk to "
<<
g_maxpk
);
g_opts
.
maxpk
=
g_maxpk
;
}
if
(
g_opts
.
separate_events
)
{
g_opts
.
no_blobs
=
true
;
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment