Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
M
MariaDB
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
nexedi
MariaDB
Commits
70cb38cf
Commit
70cb38cf
authored
Jun 06, 2019
by
Will DeVries
Committed by
Sergei Petrunia
Mar 10, 2020
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Inital commit of Clustrix backend plugin.
parent
e0e5d8c5
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
1848 additions
and
0 deletions
+1848
-0
storage/clustrixdb/CMakeLists.txt
storage/clustrixdb/CMakeLists.txt
+24
-0
storage/clustrixdb/clustrix_connection.cc
storage/clustrixdb/clustrix_connection.cc
+589
-0
storage/clustrixdb/clustrix_connection.h
storage/clustrixdb/clustrix_connection.h
+100
-0
storage/clustrixdb/ha_clustrixdb.cc
storage/clustrixdb/ha_clustrixdb.cc
+1011
-0
storage/clustrixdb/ha_clustrixdb.h
storage/clustrixdb/ha_clustrixdb.h
+124
-0
No files found.
storage/clustrixdb/CMakeLists.txt
0 → 100644
View file @
70cb38cf
#*****************************************************************************
# Copyright (c) 2019, MariaDB Corporation.
#****************************************************************************/
IF
(
MSVC
)
# Temporarily disable "conversion from size_t .."
IF
(
CMAKE_SIZEOF_VOID_P EQUAL 8
)
SET
(
CMAKE_CXX_FLAGS
"
${
CMAKE_CXX_FLAGS
}
/wd4267"
)
ENDIF
()
ENDIF
()
SET
(
CLUSTRIXDB_PLUGIN_STATIC
"clustrixdb"
)
SET
(
CLUSTRIXDB_PLUGIN_DYNAMIC
"ha_clustrixdb"
)
SET
(
CLUSTRIXDB_SOURCES ha_clustrixdb.cc clustrix_connection.cc
)
MYSQL_ADD_PLUGIN
(
clustrixdb
${
CLUSTRIXDB_SOURCES
}
STORAGE_ENGINE
)
IF
(
MSVC
)
IF
(
CMAKE_BUILD_TYPE STREQUAL
"Debug"
)
ADD_CUSTOM_COMMAND
(
TARGET clustrixdb
POST_BUILD
COMMAND if not exist ..\\..\\sql\\lib mkdir ..\\..\\sql\\lib\\plugin
COMMAND copy Debug\\ha_clustrixdb.dll ..\\..\\sql\\lib\\plugin\\ha_clustrixdb.dll
)
ENDIF
()
ENDIF
()
storage/clustrixdb/clustrix_connection.cc
0 → 100644
View file @
70cb38cf
/*****************************************************************************
Copyright (c) 2019, MariaDB Corporation.
*****************************************************************************/
/** @file clustrix_connection.cc */
#include "clustrix_connection.h"
#include <string>
#include "errmsg.h"
#include "handler.h"
#include "table.h"
extern
int
clustrix_connect_timeout
;
extern
int
clustrix_read_timeout
;
extern
int
clustrix_write_timeout
;
extern
char
*
clustrix_host
;
extern
char
*
clustrix_username
;
extern
char
*
clustrix_password
;
extern
uint
clustrix_port
;
extern
char
*
clustrix_socket
;
static
const
char
charset_name
[]
=
"latin1"
;
enum
clustrix_commands
{
CLUSTRIX_WRITE_ROW
=
1
,
CLUSTRIX_SCAN_INIT
,
CLUSTRIX_SCAN_NEXT
,
CLUSTRIX_SCAN_STOP
,
CLUSTRIX_KEY_READ
,
CLUSTRIX_KEY_DELETE
,
};
/****************************************************************************
** Class clustrix_connection
****************************************************************************/
void
clustrix_connection
::
disconnect
(
bool
is_destructor
)
{
if
(
is_destructor
)
{
/*
Connection object destruction occurs after the destruction of
the thread used by the network has begun, so usage of that
thread object now is not reliable
*/
clustrix_net
.
net
.
thd
=
NULL
;
}
mysql_close
(
&
clustrix_net
);
}
int
clustrix_connection
::
connect
()
{
int
error_code
=
0
;
my_bool
my_true
=
1
;
DBUG_ENTER
(
"connect"
);
/* Validate the connection parameters */
if
(
!
strcmp
(
clustrix_socket
,
""
))
if
(
!
strcmp
(
clustrix_host
,
"127.0.0.1"
))
if
(
clustrix_port
==
MYSQL_PORT_DEFAULT
)
DBUG_RETURN
(
ER_CONNECT_TO_FOREIGN_DATA_SOURCE
);
//clustrix_net.methods = &connection_methods;
if
(
!
mysql_init
(
&
clustrix_net
))
DBUG_RETURN
(
HA_ERR_OUT_OF_MEM
);
mysql_options
(
&
clustrix_net
,
MYSQL_OPT_READ_TIMEOUT
,
&
clustrix_read_timeout
);
mysql_options
(
&
clustrix_net
,
MYSQL_OPT_WRITE_TIMEOUT
,
&
clustrix_write_timeout
);
mysql_options
(
&
clustrix_net
,
MYSQL_OPT_CONNECT_TIMEOUT
,
&
clustrix_connect_timeout
);
mysql_options
(
&
clustrix_net
,
MYSQL_OPT_USE_REMOTE_CONNECTION
,
NULL
);
mysql_options
(
&
clustrix_net
,
MYSQL_SET_CHARSET_NAME
,
charset_name
);
mysql_options
(
&
clustrix_net
,
MYSQL_OPT_USE_THREAD_SPECIFIC_MEMORY
,
(
char
*
)
&
my_true
);
mysql_options
(
&
clustrix_net
,
MYSQL_INIT_COMMAND
,
"SET autocommit=0"
);
#ifdef CLUSTRIX_CONNECTION_SSL
if
(
opt_ssl_ca_length
|
conn
->
tgt_ssl_capath_length
|
conn
->
tgt_ssl_cert_length
|
conn
->
tgt_ssl_key_length
)
{
mysql_ssl_set
(
&
clustrix_net
,
conn
->
tgt_ssl_key
,
conn
->
tgt_ssl_cert
,
conn
->
tgt_ssl_ca
,
conn
->
tgt_ssl_capath
,
conn
->
tgt_ssl_cipher
);
if
(
conn
->
tgt_ssl_vsc
)
{
my_bool
verify_flg
=
TRUE
;
mysql_options
(
&
clustrix_net
,
MYSQL_OPT_SSL_VERIFY_SERVER_CERT
,
&
verify_flg
);
}
}
#endif
if
(
!
mysql_real_connect
(
&
clustrix_net
,
clustrix_host
,
clustrix_username
,
clustrix_password
,
NULL
,
clustrix_port
,
clustrix_socket
,
CLIENT_MULTI_STATEMENTS
))
{
error_code
=
mysql_errno
(
&
clustrix_net
);
disconnect
();
if
(
error_code
!=
CR_CONN_HOST_ERROR
&&
error_code
!=
CR_CONNECTION_ERROR
)
{
if
(
error_code
==
ER_CON_COUNT_ERROR
)
{
my_error
(
ER_CON_COUNT_ERROR
,
MYF
(
0
));
DBUG_RETURN
(
ER_CON_COUNT_ERROR
);
}
my_error
(
ER_CONNECT_TO_FOREIGN_DATA_SOURCE
,
MYF
(
0
),
clustrix_host
);
DBUG_RETURN
(
ER_CONNECT_TO_FOREIGN_DATA_SOURCE
);
}
}
clustrix_net
.
reconnect
=
1
;
DBUG_RETURN
(
0
);
}
int
clustrix_connection
::
send_command
()
{
my_bool
com_error
;
com_error
=
simple_command
(
&
clustrix_net
,
(
enum_server_command
)
CLUSTRIX_SERVER_REQUEST
,
command_buffer
,
command_length
,
TRUE
);
if
(
com_error
)
{
my_printf_error
(
mysql_errno
(
&
clustrix_net
),
"Clustrix error: %s"
,
MYF
(
0
),
mysql_error
(
&
clustrix_net
));
return
ER_QUERY_ON_FOREIGN_DATA_SOURCE
;
}
return
0
;
}
int
clustrix_connection
::
read_query_response
()
{
my_bool
comerr
=
clustrix_net
.
methods
->
read_query_result
(
&
clustrix_net
);
if
(
comerr
)
{
my_printf_error
(
mysql_errno
(
&
clustrix_net
),
"Clustrix error: %s"
,
MYF
(
0
),
mysql_error
(
&
clustrix_net
));
return
ER_QUERY_ON_FOREIGN_DATA_SOURCE
;
}
return
0
;
}
int
clustrix_connection
::
begin_trans
()
{
const
char
*
stmt
=
"BEGIN TRANSACTION"
;
int
error_code
=
mysql_real_query
(
&
clustrix_net
,
stmt
,
strlen
(
stmt
));
if
(
error_code
)
return
mysql_errno
(
&
clustrix_net
);
return
error_code
;
}
int
clustrix_connection
::
commit_trans
()
{
const
char
*
stmt
=
"COMMIT TRANSACTION"
;
int
error_code
=
mysql_real_query
(
&
clustrix_net
,
stmt
,
strlen
(
stmt
));
if
(
error_code
)
return
mysql_errno
(
&
clustrix_net
);
return
error_code
;
}
int
clustrix_connection
::
rollback_trans
()
{
const
char
*
stmt
=
"ROLLBACK TRANSACTION"
;
int
error_code
=
mysql_real_query
(
&
clustrix_net
,
stmt
,
strlen
(
stmt
));
if
(
error_code
)
return
mysql_errno
(
&
clustrix_net
);
return
error_code
;
}
int
clustrix_connection
::
create_table
(
char
*
stmt
)
{
int
error_code
=
mysql_real_query
(
&
clustrix_net
,
stmt
,
strlen
(
stmt
));
if
(
error_code
)
return
mysql_errno
(
&
clustrix_net
);
return
error_code
;
}
int
clustrix_connection
::
delete_table
(
char
*
stmt
)
{
int
error_code
=
mysql_real_query
(
&
clustrix_net
,
stmt
,
strlen
(
stmt
));
if
(
error_code
)
return
mysql_errno
(
&
clustrix_net
);
return
error_code
;
}
int
clustrix_connection
::
write_row
(
ulonglong
clustrix_table_oid
,
uchar
*
packed_row
,
size_t
packed_size
)
{
int
error_code
;
command_length
=
0
;
if
((
error_code
=
add_command_operand_uchar
(
CLUSTRIX_WRITE_ROW
)))
return
error_code
;
if
((
error_code
=
add_command_operand_ulonglong
(
clustrix_table_oid
)))
return
error_code
;
if
((
error_code
=
add_command_operand_str
(
packed_row
,
packed_size
)))
return
error_code
;
if
((
error_code
=
send_command
()))
return
error_code
;
if
((
error_code
=
read_query_response
()))
return
error_code
;
last_insert_id
=
clustrix_net
.
insert_id
;
return
error_code
;
}
int
clustrix_connection
::
key_delete
(
ulonglong
clustrix_table_oid
,
uchar
*
packed_key
,
size_t
packed_key_length
)
{
int
error_code
;
command_length
=
0
;
if
((
error_code
=
add_command_operand_uchar
(
CLUSTRIX_KEY_DELETE
)))
return
error_code
;
if
((
error_code
=
add_command_operand_ulonglong
(
clustrix_table_oid
)))
return
error_code
;
if
((
error_code
=
add_command_operand_str
(
packed_key
,
packed_key_length
)))
return
error_code
;
if
((
error_code
=
send_command
()))
return
error_code
;
if
((
error_code
=
read_query_response
()))
return
mysql_errno
(
&
clustrix_net
);
return
error_code
;
}
int
clustrix_connection
::
key_read
(
ulonglong
clustrix_table_oid
,
uint
index
,
MY_BITMAP
*
read_set
,
uchar
*
packed_key
,
ulong
packed_key_length
,
uchar
**
rowdata
,
ulong
*
rowdata_length
)
{
int
error_code
;
command_length
=
0
;
if
((
error_code
=
add_command_operand_uchar
(
CLUSTRIX_KEY_READ
)))
return
error_code
;
if
((
error_code
=
add_command_operand_ulonglong
(
clustrix_table_oid
)))
return
error_code
;
if
((
error_code
=
add_command_operand_uint
(
index
)))
return
error_code
;
if
((
error_code
=
add_command_operand_bitmap
(
read_set
)))
return
error_code
;
if
((
error_code
=
add_command_operand_str
(
packed_key
,
packed_key_length
)))
return
error_code
;
if
((
error_code
=
send_command
()))
return
error_code
;
ulong
packet_length
=
cli_safe_read
(
&
clustrix_net
);
if
(
packet_length
==
packet_error
)
return
mysql_errno
(
&
clustrix_net
);
*
rowdata
=
clustrix_net
.
net
.
read_pos
;
*
rowdata_length
=
safe_net_field_length_ll
(
rowdata
,
packet_length
);
return
0
;
}
int
clustrix_connection
::
scan_init
(
ulonglong
clustrix_table_oid
,
uint
index
,
enum
sort_order
sort
,
MY_BITMAP
*
read_set
,
ulonglong
*
scan_refid
)
{
int
error_code
;
command_length
=
0
;
if
((
error_code
=
add_command_operand_uchar
(
CLUSTRIX_SCAN_INIT
)))
return
error_code
;
if
((
error_code
=
add_command_operand_ulonglong
(
clustrix_table_oid
)))
return
error_code
;
if
((
error_code
=
add_command_operand_uint
(
index
)))
return
error_code
;
if
((
error_code
=
add_command_operand_uchar
(
sort
)))
return
error_code
;
if
((
error_code
=
add_command_operand_bitmap
(
read_set
)))
return
error_code
;
if
((
error_code
=
send_command
()))
return
error_code
;
ulong
packet_length
=
cli_safe_read
(
&
clustrix_net
);
if
(
packet_length
==
packet_error
)
return
mysql_errno
(
&
clustrix_net
);
unsigned
char
*
pos
=
clustrix_net
.
net
.
read_pos
;
*
scan_refid
=
safe_net_field_length_ll
(
&
pos
,
packet_length
);
return
error_code
;
}
int
clustrix_connection
::
scan_next
(
ulonglong
scan_refid
,
uchar
**
rowdata
,
ulong
*
rowdata_length
)
{
int
error_code
;
command_length
=
0
;
if
((
error_code
=
add_command_operand_uchar
(
CLUSTRIX_SCAN_NEXT
)))
return
error_code
;
if
((
error_code
=
add_command_operand_lcb
(
scan_refid
)))
return
error_code
;
if
((
error_code
=
send_command
()))
return
error_code
;
ulong
packet_length
=
cli_safe_read
(
&
clustrix_net
);
if
(
packet_length
==
packet_error
)
return
mysql_errno
(
&
clustrix_net
);
*
rowdata
=
clustrix_net
.
net
.
read_pos
;
*
rowdata_length
=
safe_net_field_length_ll
(
rowdata
,
packet_length
);
return
0
;
}
int
clustrix_connection
::
scan_end
(
ulonglong
scan_refid
)
{
int
error_code
;
command_length
=
0
;
if
((
error_code
=
add_command_operand_uchar
(
CLUSTRIX_SCAN_STOP
)))
return
error_code
;
if
((
error_code
=
add_command_operand_lcb
(
scan_refid
)))
return
error_code
;
if
((
error_code
=
send_command
()))
return
error_code
;
ulong
packet_length
=
cli_safe_read
(
&
clustrix_net
);
if
(
packet_length
==
packet_error
)
return
mysql_errno
(
&
clustrix_net
);
return
0
;
}
int
clustrix_connection
::
populate_table_list
(
LEX_CSTRING
*
db
,
handlerton
::
discovered_list
*
result
)
{
int
error_code
=
0
;
String
stmt
;
stmt
.
append
(
"SHOW FULL TABLES FROM "
);
stmt
.
append
(
db
);
stmt
.
append
(
" WHERE table_type = 'BASE TABLE'"
);
if
(
mysql_real_query
(
&
clustrix_net
,
stmt
.
c_ptr
(),
stmt
.
length
()))
{
int
error_code
=
mysql_errno
(
&
clustrix_net
);
if
(
error_code
==
ER_BAD_DB_ERROR
)
return
0
;
else
return
error_code
;
}
MYSQL_RES
*
results
=
mysql_store_result
(
&
clustrix_net
);
if
(
mysql_num_fields
(
results
)
!=
2
)
{
error_code
=
HA_ERR_CORRUPT_EVENT
;
goto
error
;
}
MYSQL_ROW
row
;
while
((
row
=
mysql_fetch_row
(
results
)))
result
->
add_table
(
row
[
0
],
strlen
(
row
[
0
]));
error:
mysql_free_result
(
results
);
return
error_code
;
}
int
clustrix_connection
::
discover_table_details
(
LEX_CSTRING
*
db
,
LEX_CSTRING
*
name
,
THD
*
thd
,
TABLE_SHARE
*
share
)
{
DBUG_ENTER
(
"clustrix_connection::discover_table_details"
);
int
error_code
=
0
;
MYSQL_RES
*
results
=
NULL
;
MYSQL_ROW
row
;
String
get_oid
,
show
;
/* get oid */
get_oid
.
append
(
"select r.table "
"from system.databases d "
" inner join ""system.relations r on d.db = r.db "
"where d.name = '"
);
get_oid
.
append
(
db
);
get_oid
.
append
(
"' and r.name = '"
);
get_oid
.
append
(
name
);
get_oid
.
append
(
"'"
);
if
(
mysql_real_query
(
&
clustrix_net
,
get_oid
.
c_ptr
(),
get_oid
.
length
()))
{
if
((
error_code
=
mysql_errno
(
&
clustrix_net
)))
return
error_code
;
}
results
=
mysql_store_result
(
&
clustrix_net
);
DBUG_PRINT
(
"oid results"
,
(
"rows: %llu, fields: %u"
,
mysql_num_rows
(
results
),
mysql_num_fields
(
results
)));
if
(
mysql_num_rows
(
results
)
!=
1
)
{
error_code
=
HA_ERR_NO_SUCH_TABLE
;
goto
error
;
}
while
((
row
=
mysql_fetch_row
(
results
)))
{
DBUG_PRINT
(
"row"
,
(
"%s"
,
row
[
0
]));
uchar
*
to
=
(
uchar
*
)
alloc_root
(
&
share
->
mem_root
,
strlen
(
row
[
0
])
+
1
);
if
(
!
to
)
{
error_code
=
HA_ERR_OUT_OF_MEM
;
goto
error
;
}
strcpy
((
char
*
)
to
,
(
char
*
)
row
[
0
]);
share
->
tabledef_version
.
str
=
to
;
share
->
tabledef_version
.
length
=
strlen
(
row
[
0
]);
}
mysql_free_result
(
results
);
/* get show create statement */
show
.
append
(
"show create table "
);
show
.
append
(
db
);
show
.
append
(
"."
);
show
.
append
(
name
);
if
(
mysql_real_query
(
&
clustrix_net
,
show
.
c_ptr
(),
show
.
length
()))
{
if
((
error_code
=
mysql_errno
(
&
clustrix_net
)))
return
error_code
;
}
results
=
mysql_store_result
(
&
clustrix_net
);
DBUG_PRINT
(
"show table results"
,
(
"rows: %llu, fields: %u"
,
mysql_num_rows
(
results
),
mysql_num_fields
(
results
)));
if
(
mysql_num_rows
(
results
)
!=
1
)
{
error_code
=
HA_ERR_NO_SUCH_TABLE
;
goto
error
;
}
if
(
mysql_num_fields
(
results
)
!=
2
)
{
error_code
=
HA_ERR_CORRUPT_EVENT
;
goto
error
;
}
while
((
row
=
mysql_fetch_row
(
results
)))
{
DBUG_PRINT
(
"row"
,
(
"%s - %s"
,
row
[
0
],
row
[
1
]));
error_code
=
share
->
init_from_sql_statement_string
(
thd
,
false
,
row
[
1
],
strlen
(
row
[
1
]));
}
error:
if
(
results
)
mysql_free_result
(
results
);
DBUG_RETURN
(
error_code
);
}
int
clustrix_connection
::
expand_command_buffer
(
size_t
add_length
)
{
size_t
expanded_length
;
if
(
command_buffer_length
>=
command_length
+
add_length
)
return
0
;
expanded_length
=
command_buffer_length
+
((
add_length
>>
COMMAND_BUFFER_SIZE_INCREMENT_BITS
)
<<
COMMAND_BUFFER_SIZE_INCREMENT_BITS
)
+
COMMAND_BUFFER_SIZE_INCREMENT
;
if
(
!
command_buffer_length
)
command_buffer
=
(
uchar
*
)
my_malloc
(
expanded_length
,
MYF
(
MY_WME
));
else
command_buffer
=
(
uchar
*
)
my_realloc
(
command_buffer
,
expanded_length
,
MYF
(
MY_WME
));
if
(
!
command_buffer
)
return
HA_ERR_OUT_OF_MEM
;
command_buffer_length
=
expanded_length
;
return
0
;
}
int
clustrix_connection
::
add_command_operand_uchar
(
uchar
value
)
{
int
error_code
=
expand_command_buffer
(
sizeof
(
value
));
if
(
error_code
)
return
error_code
;
memcpy
(
command_buffer
+
command_length
,
&
value
,
sizeof
(
value
));
command_length
+=
sizeof
(
value
);
return
0
;
}
int
clustrix_connection
::
add_command_operand_uint
(
uint
value
)
{
uint
be_value
=
htobe32
(
value
);
int
error_code
=
expand_command_buffer
(
sizeof
(
be_value
));
if
(
error_code
)
return
error_code
;
memcpy
(
command_buffer
+
command_length
,
&
be_value
,
sizeof
(
be_value
));
command_length
+=
sizeof
(
be_value
);
return
0
;
}
int
clustrix_connection
::
add_command_operand_ulonglong
(
ulonglong
value
)
{
ulonglong
be_value
=
htobe64
(
value
);
int
error_code
=
expand_command_buffer
(
sizeof
(
be_value
));
if
(
error_code
)
return
error_code
;
memcpy
(
command_buffer
+
command_length
,
&
be_value
,
sizeof
(
be_value
));
command_length
+=
sizeof
(
be_value
);
return
0
;
}
int
clustrix_connection
::
add_command_operand_lcb
(
ulonglong
value
)
{
int
len
=
net_length_size
(
value
);
int
error_code
=
expand_command_buffer
(
len
);
if
(
error_code
)
return
error_code
;
net_store_length
(
command_buffer
+
command_length
,
value
);
command_length
+=
len
;
return
0
;
}
int
clustrix_connection
::
add_command_operand_str
(
const
uchar
*
str
,
size_t
str_length
)
{
int
error_code
=
add_command_operand_lcb
(
str_length
);
if
(
error_code
)
return
error_code
;
error_code
=
expand_command_buffer
(
str_length
);
if
(
error_code
)
return
error_code
;
memcpy
(
command_buffer
+
command_length
,
str
,
str_length
);
command_length
+=
str_length
;
return
0
;
}
int
clustrix_connection
::
add_command_operand_lex_string
(
LEX_CSTRING
str
)
{
return
add_command_operand_str
((
const
uchar
*
)
str
.
str
,
str
.
length
);
}
int
clustrix_connection
::
add_command_operand_bitmap
(
MY_BITMAP
*
bitmap
)
{
int
error_code
=
add_command_operand_lcb
(
bitmap
->
n_bits
);
if
(
error_code
)
return
error_code
;
int
no_bytes
=
no_bytes_in_map
(
bitmap
);
error_code
=
expand_command_buffer
(
no_bytes
);
if
(
error_code
)
return
error_code
;
memcpy
(
command_buffer
+
command_length
,
bitmap
->
bitmap
,
no_bytes
);
command_length
+=
no_bytes
;
return
0
;
}
storage/clustrixdb/clustrix_connection.h
0 → 100644
View file @
70cb38cf
/*****************************************************************************
Copyright (c) 2019, MariaDB Corporation.
*****************************************************************************/
#ifndef _clustrix_connection_h
#define _clustrix_connection_h
#ifdef USE_PRAGMA_INTERFACE
#pragma interface
/* gcc class implementation */
#endif
#define MYSQL_SERVER 1
#include "my_global.h"
#include "m_string.h"
#include "mysql.h"
#include "sql_common.h"
#include "my_base.h"
#include "mysqld_error.h"
#include "my_bitmap.h"
#include "handler.h"
#define CLUSTRIX_SERVER_REQUEST 30
class
clustrix_connection
{
private:
# define COMMAND_BUFFER_SIZE_INCREMENT 1024
# define COMMAND_BUFFER_SIZE_INCREMENT_BITS 10
MYSQL
clustrix_net
;
uchar
*
command_buffer
;
size_t
command_buffer_length
;
size_t
command_length
;
uchar
*
reply_buffer
;
size_t
reply_length
;
public:
ulonglong
last_insert_id
;
clustrix_connection
()
:
command_buffer
(
NULL
),
command_buffer_length
(
0
),
command_length
(
0
)
{
memset
(
&
clustrix_net
,
0
,
sizeof
(
MYSQL
));
}
~
clustrix_connection
()
{
if
(
is_connected
())
disconnect
(
TRUE
);
if
(
command_buffer
)
my_free
(
command_buffer
);
}
inline
bool
is_connected
()
{
return
clustrix_net
.
net
.
vio
;
}
int
connect
();
void
disconnect
(
bool
is_destructor
=
FALSE
);
int
begin_trans
();
int
commit_trans
();
int
rollback_trans
();
int
create_table
(
char
*
stmt
);
int
delete_table
(
char
*
stmt
);
int
write_row
(
ulonglong
clustrix_table_oid
,
uchar
*
packed_row
,
size_t
packed_size
);
int
key_delete
(
ulonglong
clustrix_table_oid
,
uchar
*
packed_key
,
size_t
packed_key_length
);
int
key_read
(
ulonglong
clustrix_table_oid
,
uint
index
,
MY_BITMAP
*
read_set
,
uchar
*
packed_key
,
ulong
packed_key_length
,
uchar
**
rowdata
,
ulong
*
rowdata_length
);
enum
sort_order
{
SORT_NONE
=
0
,
SORT_ASC
=
1
,
SORT_DESC
=
2
};
int
scan_init
(
ulonglong
clustrix_table_oid
,
uint
index
,
enum
sort_order
sort
,
MY_BITMAP
*
read_set
,
ulonglong
*
scan_refid
);
int
scan_next
(
ulonglong
scan_refid
,
uchar
**
rowdata
,
ulong
*
rowdata_length
);
int
scan_end
(
ulonglong
scan_refid
);
int
populate_table_list
(
LEX_CSTRING
*
db
,
handlerton
::
discovered_list
*
result
);
int
discover_table_details
(
LEX_CSTRING
*
db
,
LEX_CSTRING
*
name
,
THD
*
thd
,
TABLE_SHARE
*
share
);
private:
int
expand_command_buffer
(
size_t
add_length
);
int
add_command_operand_uchar
(
uchar
value
);
int
add_command_operand_uint
(
uint
value
);
int
add_command_operand_ulonglong
(
ulonglong
value
);
int
add_command_operand_lcb
(
ulonglong
value
);
int
add_command_operand_str
(
const
uchar
*
str
,
size_t
length
);
int
add_command_operand_lex_string
(
LEX_CSTRING
str
);
int
add_command_operand_bitmap
(
MY_BITMAP
*
bitmap
);
int
send_command
();
int
read_query_response
();
};
#endif // _clustrix_connection_h
storage/clustrixdb/ha_clustrixdb.cc
0 → 100644
View file @
70cb38cf
/*****************************************************************************
Copyright (c) 2019, MariaDB Corporation.
*****************************************************************************/
/** @file ha_clustrixdb.cc */
#include "ha_clustrixdb.h"
#include "key.h"
handlerton
*
clustrixdb_hton
=
NULL
;
int
clustrix_connect_timeout
;
static
MYSQL_SYSVAR_INT
(
connect_timeout
,
clustrix_connect_timeout
,
PLUGIN_VAR_OPCMDARG
,
"Timeout for connecting to Clustrix"
,
NULL
,
NULL
,
-
1
,
-
1
,
2147483647
,
0
);
int
clustrix_read_timeout
;
static
MYSQL_SYSVAR_INT
(
read_timeout
,
clustrix_read_timeout
,
PLUGIN_VAR_OPCMDARG
,
"Timeout for receiving data from Clustrix"
,
NULL
,
NULL
,
-
1
,
-
1
,
2147483647
,
0
);
int
clustrix_write_timeout
;
static
MYSQL_SYSVAR_INT
(
write_timeout
,
clustrix_write_timeout
,
PLUGIN_VAR_OPCMDARG
,
"Timeout for sending data to Clustrix"
,
NULL
,
NULL
,
-
1
,
-
1
,
2147483647
,
0
);
char
*
clustrix_host
;
static
MYSQL_SYSVAR_STR
(
host
,
clustrix_host
,
PLUGIN_VAR_OPCMDARG
|
PLUGIN_VAR_MEMALLOC
,
"Clustrix host"
,
NULL
,
NULL
,
"127.0.0.1"
);
char
*
clustrix_username
;
static
MYSQL_SYSVAR_STR
(
username
,
clustrix_username
,
PLUGIN_VAR_OPCMDARG
|
PLUGIN_VAR_MEMALLOC
,
"Clustrix user name"
,
NULL
,
NULL
,
"root"
);
char
*
clustrix_password
;
static
MYSQL_SYSVAR_STR
(
password
,
clustrix_password
,
PLUGIN_VAR_OPCMDARG
|
PLUGIN_VAR_MEMALLOC
,
"Clustrix password"
,
NULL
,
NULL
,
""
);
uint
clustrix_port
;
static
MYSQL_SYSVAR_UINT
(
port
,
clustrix_port
,
PLUGIN_VAR_RQCMDARG
,
"Clustrix port"
,
NULL
,
NULL
,
MYSQL_PORT_DEFAULT
,
MYSQL_PORT_DEFAULT
,
65535
,
0
);
char
*
clustrix_socket
;
static
MYSQL_SYSVAR_STR
(
socket
,
clustrix_socket
,
PLUGIN_VAR_OPCMDARG
|
PLUGIN_VAR_MEMALLOC
,
"Clustrix socket"
,
NULL
,
NULL
,
""
);
/****************************************************************************
** Class ha_clustrixdb_trx
****************************************************************************/
st_clustrixdb_trx
::
st_clustrixdb_trx
(
THD
*
trx_thd
)
{
thd
=
trx_thd
;
clustrix_net
=
NULL
;
//query_id = 0;
//mem_root = NULL;
has_transaction
=
FALSE
;
}
st_clustrixdb_trx
::~
st_clustrixdb_trx
()
{
if
(
clustrix_net
)
delete
clustrix_net
;
}
int
st_clustrixdb_trx
::
net_init
()
{
if
(
!
this
->
clustrix_net
)
{
this
->
clustrix_net
=
new
clustrix_connection
();
int
error_code
=
this
->
clustrix_net
->
connect
();
if
(
error_code
)
return
error_code
;
}
return
0
;
}
int
st_clustrixdb_trx
::
begin_trans
()
{
// XXX: What were these for?
//if (thd->transaction.stmt.trans_did_ddl() ||
// thd->transaction.stmt.modified_non_trans_table)
if
(
!
has_transaction
)
{
int
error_code
=
this
->
clustrix_net
->
begin_trans
();
if
(
error_code
)
return
error_code
;
/* Register for commit/rollback on the transaction */
if
(
!
thd_test_options
(
thd
,
OPTION_NOT_AUTOCOMMIT
|
OPTION_BEGIN
))
trans_register_ha
(
thd
,
FALSE
,
clustrixdb_hton
);
else
trans_register_ha
(
thd
,
TRUE
,
clustrixdb_hton
);
has_transaction
=
TRUE
;
}
return
0
;
}
/****************************************************************************
** Class ha_clustrixdb
****************************************************************************/
ha_clustrixdb
::
ha_clustrixdb
(
handlerton
*
hton
,
TABLE_SHARE
*
table_arg
)
:
handler
(
hton
,
table_arg
)
{
DBUG_ENTER
(
"ha_clustrixdb::ha_clustrixdb"
);
rli
=
NULL
;
rgi
=
NULL
;
scan_refid
=
0
;
clustrix_table_oid
=
0
;
DBUG_VOID_RETURN
;
}
ha_clustrixdb
::~
ha_clustrixdb
()
{
if
(
rli
)
ha_clustrixdb
::
remove_current_table_from_rpl_table_list
();
}
int
ha_clustrixdb
::
create
(
const
char
*
name
,
TABLE
*
form
,
HA_CREATE_INFO
*
info
)
{
int
error_code
;
THD
*
thd
=
ha_thd
();
st_clustrixdb_trx
*
trx
=
get_trx
(
thd
,
&
error_code
);
if
(
!
trx
)
return
error_code
;
enum
tmp_table_type
saved_tmp_table_type
=
form
->
s
->
tmp_table
;
Table_specification_st
*
create_info
=
&
thd
->
lex
->
create_info
;
const
bool
is_tmp_table
=
info
->
options
&
HA_LEX_CREATE_TMP_TABLE
;
char
create_table_stmt_buffer
[
2048
];
String
create_table_stmt
(
create_table_stmt_buffer
,
sizeof
(
create_table_stmt_buffer
),
system_charset_info
);
create_table_stmt
.
length
(
0
);
/* Create a copy of the CREATE TABLE statement */
if
(
!
is_tmp_table
)
form
->
s
->
tmp_table
=
NO_TMP_TABLE
;
const
char
*
old_dbstr
=
thd
->
db
.
str
;
thd
->
db
.
str
=
NULL
;
ulong
old
=
create_info
->
used_fields
;
create_info
->
used_fields
&=
~
HA_CREATE_USED_ENGINE
;
TABLE_LIST
table_list
;
memset
(
&
table_list
,
0
,
sizeof
(
table_list
));
table_list
.
table
=
form
;
error_code
=
show_create_table
(
thd
,
&
table_list
,
&
create_table_stmt
,
create_info
,
WITH_DB_NAME
);
if
(
!
is_tmp_table
)
form
->
s
->
tmp_table
=
saved_tmp_table_type
;
create_info
->
used_fields
=
old
;
thd
->
db
.
str
=
old_dbstr
;
if
(
error_code
)
return
error_code
;
error_code
=
trx
->
clustrix_net
->
create_table
(
create_table_stmt_buffer
);
return
error_code
;
}
int
ha_clustrixdb
::
delete_table
(
const
char
*
name
)
{
int
error_code
;
THD
*
thd
=
ha_thd
();
st_clustrixdb_trx
*
trx
=
get_trx
(
thd
,
&
error_code
);
if
(
!
trx
)
return
error_code
;
/* XXX: I know this is unsafe. */
char
db_name
[
1000
];
const
char
*
ptr
=
name
+
2
;
int
i
=
0
;
while
(
*
ptr
!=
'/'
)
db_name
[
i
++
]
=
*
ptr
++
;
db_name
[
i
]
=
'\0'
;
char
tbl_name
[
1000
];
ptr
++
;
strcpy
(
tbl_name
,
ptr
);
char
delete_cmd
[
2000
];
/* XXX: I know this is unsafe. */
sprintf
(
delete_cmd
,
"drop table `%s`.`%s`"
,
db_name
,
tbl_name
);
return
trx
->
clustrix_net
->
delete_table
(
delete_cmd
);
}
int
ha_clustrixdb
::
open
(
const
char
*
name
,
int
mode
,
uint
test_if_locked
)
{
DBUG_ENTER
(
"ha_clustrixdb::open"
);
DBUG_PRINT
(
"oid"
,
(
"%s"
,
table
->
s
->
tabledef_version
.
str
));
if
(
!
table
->
s
->
tabledef_version
.
str
)
return
HA_ERR_TABLE_DEF_CHANGED
;
if
(
!
clustrix_table_oid
)
clustrix_table_oid
=
atoll
((
const
char
*
)
table
->
s
->
tabledef_version
.
str
);
has_hidden_key
=
table
->
s
->
primary_key
==
MAX_KEY
;
if
(
has_hidden_key
)
{
ref_length
=
8
;
}
else
{
KEY
*
key_info
=
table
->
key_info
+
table
->
s
->
primary_key
;
ref_length
=
key_info
->
key_length
;
}
DBUG_PRINT
(
"open finished"
,
(
"oid: %llu, ref_length: %u"
,
clustrix_table_oid
,
ref_length
));
DBUG_RETURN
(
0
);
}
int
ha_clustrixdb
::
close
(
void
)
{
return
0
;
}
int
ha_clustrixdb
::
reset
()
{
return
0
;
}
int
ha_clustrixdb
::
write_row
(
uchar
*
buf
)
{
int
error_code
=
0
;
THD
*
thd
=
ha_thd
();
st_clustrixdb_trx
*
trx
=
get_trx
(
thd
,
&
error_code
);
if
(
!
trx
)
return
error_code
;
assert
(
trx
->
has_transaction
);
/* Convert the row format to binlog (packed) format */
/* XXX: We cannot use a fixed length buffer for this. */
uchar
packed_new_row
[
CLUSTRIXDB_ROW_LIMIT
];
size_t
packed_size
=
pack_row
(
table
,
table
->
write_set
,
packed_new_row
,
buf
);
/* XXX: Clustrix may needs to return HA_ERR_AUTOINC_ERANGE if we hit that
error. */
if
((
error_code
=
trx
->
clustrix_net
->
write_row
(
clustrix_table_oid
,
packed_new_row
,
packed_size
)))
return
error_code
;
Field
*
auto_inc_field
=
table
->
next_number_field
;
if
(
auto_inc_field
)
insert_id_for_cur_row
=
trx
->
clustrix_net
->
last_insert_id
;
return
error_code
;
}
int
ha_clustrixdb
::
update_row
(
const
uchar
*
old_data
,
const
uchar
*
new_data
)
{
int
error_code
;
THD
*
thd
=
ha_thd
();
st_clustrixdb_trx
*
trx
=
get_trx
(
thd
,
&
error_code
);
if
(
!
trx
)
return
error_code
;
assert
(
trx
->
has_transaction
);
/* XXX: We cannot use a fixed length buffer for this. */
uchar
packed_new_row
[
CLUSTRIXDB_ROW_LIMIT
];
/*size_t packed_new_size =*/
pack_row
(
table
,
table
->
write_set
,
packed_new_row
,
new_data
);
/* XXX: We cannot use a fixed length buffer for this. */
uchar
packed_old_row
[
CLUSTRIXDB_ROW_LIMIT
];
/*size_t packed_old_size =*/
pack_row
(
table
,
table
->
write_set
,
packed_old_row
,
old_data
);
/* Send the packed rows to Clustrix */
return
error_code
;
}
int
ha_clustrixdb
::
delete_row
(
const
uchar
*
buf
)
{
int
error_code
;
THD
*
thd
=
ha_thd
();
st_clustrixdb_trx
*
trx
=
get_trx
(
thd
,
&
error_code
);
if
(
!
trx
)
return
error_code
;
assert
(
trx
->
has_transaction
);
/* XXX: We cannot use a fixed length buffer for this. */
uchar
packed_key
[
CLUSTRIXDB_ROW_LIMIT
];
size_t
packed_key_len
;
build_key_packed_row
(
table
->
s
->
primary_key
,
packed_key
,
&
packed_key_len
);
if
((
error_code
=
trx
->
clustrix_net
->
key_delete
(
clustrix_table_oid
,
packed_key
,
packed_key_len
)))
return
error_code
;
return
error_code
;
}
ha_clustrixdb
::
Table_flags
ha_clustrixdb
::
table_flags
(
void
)
const
{
Table_flags
flags
=
HA_PARTIAL_COLUMN_READ
|
HA_REC_NOT_IN_SEQ
|
HA_FAST_KEY_READ
|
HA_NULL_IN_KEY
|
HA_CAN_INDEX_BLOBS
|
HA_AUTO_PART_KEY
|
HA_CAN_SQL_HANDLER
|
HA_BINLOG_ROW_CAPABLE
|
HA_BINLOG_STMT_CAPABLE
|
HA_CAN_TABLE_CONDITION_PUSHDOWN
;
return
flags
;
}
ulong
ha_clustrixdb
::
index_flags
(
uint
idx
,
uint
part
,
bool
all_parts
)
const
{
ulong
flags
=
HA_READ_NEXT
|
HA_READ_PREV
|
HA_READ_ORDER
;
return
flags
;
}
ha_rows
ha_clustrixdb
::
records
()
{
return
10000
;
}
ha_rows
ha_clustrixdb
::
records_in_range
(
uint
inx
,
key_range
*
min_key
,
key_range
*
max_key
)
{
return
2
;
}
int
ha_clustrixdb
::
info
(
uint
flag
)
{
//THD *thd = ha_thd();
if
(
flag
&
HA_STATUS_TIME
)
{
/* Retrieve the time of the most recent update to the table */
// stats.update_time =
}
if
(
flag
&
HA_STATUS_AUTO
)
{
/* Retrieve the latest auto_increment value */
stats
.
auto_increment_value
=
next_insert_id
;
}
if
(
flag
&
HA_STATUS_VARIABLE
)
{
/* Retrieve variable info, such as row counts and file lengths */
stats
.
records
=
records
();
stats
.
deleted
=
0
;
// stats.data_file_length =
// stats.index_file_length =
// stats.delete_length =
stats
.
check_time
=
0
;
// stats.mrr_length_per_rec =
if
(
stats
.
records
==
0
)
stats
.
mean_rec_length
=
0
;
else
stats
.
mean_rec_length
=
(
ulong
)
(
stats
.
data_file_length
/
stats
.
records
);
}
if
(
flag
&
HA_STATUS_CONST
)
{
/*
Retrieve constant info, such as file names, max file lengths,
create time, block size
*/
// stats.max_data_file_length =
// stats.create_time =
// stats.block_size =
}
return
0
;
}
int
ha_clustrixdb
::
index_init
(
uint
idx
,
bool
sorted
)
{
int
error_code
=
0
;
THD
*
thd
=
ha_thd
();
st_clustrixdb_trx
*
trx
=
get_trx
(
thd
,
&
error_code
);
if
(
!
trx
)
return
error_code
;
active_index
=
idx
;
add_current_table_to_rpl_table_list
();
scan_refid
=
0
;
return
0
;
}
int
ha_clustrixdb
::
index_read
(
uchar
*
buf
,
const
uchar
*
key
,
uint
key_len
,
enum
ha_rkey_function
find_flag
)
{
DBUG_ENTER
(
"ha_clustrixdb::index_read"
);
int
error_code
=
0
;
THD
*
thd
=
ha_thd
();
st_clustrixdb_trx
*
trx
=
get_trx
(
thd
,
&
error_code
);
if
(
!
trx
)
DBUG_RETURN
(
error_code
);
is_scan
=
false
;
key_restore
(
table
->
record
[
0
],
key
,
&
table
->
key_info
[
active_index
],
key_len
);
/* XXX: We cannot use a fixed length buffer for this. */
uchar
packed_key
[
CLUSTRIXDB_ROW_LIMIT
];
size_t
packed_key_len
;
build_key_packed_row
(
active_index
,
packed_key
,
&
packed_key_len
);
uchar
*
rowdata
;
ulong
rowdata_length
;
if
((
error_code
=
trx
->
clustrix_net
->
key_read
(
clustrix_table_oid
,
active_index
,
table
->
read_set
,
packed_key
,
packed_key_len
,
&
rowdata
,
&
rowdata_length
)))
DBUG_RETURN
(
error_code
);
uchar
const
*
current_row_end
;
ulong
master_reclength
;
if
((
error_code
=
unpack_row
(
rgi
,
table
,
table
->
s
->
fields
,
rowdata
,
table
->
read_set
,
&
current_row_end
,
&
master_reclength
,
rowdata
+
rowdata_length
)))
DBUG_RETURN
(
error_code
);
DBUG_RETURN
(
0
);
}
int
ha_clustrixdb
::
index_first
(
uchar
*
buf
)
{
int
error_code
=
0
;
THD
*
thd
=
ha_thd
();
st_clustrixdb_trx
*
trx
=
get_trx
(
thd
,
&
error_code
);
if
(
!
trx
)
return
error_code
;
is_scan
=
true
;
if
(
my_bitmap_init
(
&
scan_fields
,
NULL
,
table
->
read_set
->
n_bits
,
false
))
return
ER_OUTOFMEMORY
;
#if 0
if (table->s->keys)
table->mark_columns_used_by_index(table->s->primary_key, &scan_fields);
else
bitmap_clear_all(&scan_fields);
bitmap_union(&scan_fields, table->read_set);
#else
/* Why is read_set not setup correctly? */
bitmap_set_all
(
&
scan_fields
);
#endif
if
((
error_code
=
trx
->
clustrix_net
->
scan_init
(
clustrix_table_oid
,
active_index
,
clustrix_connection
::
SORT_NONE
,
&
scan_fields
,
&
scan_refid
)))
return
error_code
;
return
rnd_next
(
buf
);
}
int
ha_clustrixdb
::
index_last
(
uchar
*
buf
)
{
int
error_code
=
0
;
THD
*
thd
=
ha_thd
();
st_clustrixdb_trx
*
trx
=
get_trx
(
thd
,
&
error_code
);
if
(
!
trx
)
return
error_code
;
is_scan
=
true
;
if
(
my_bitmap_init
(
&
scan_fields
,
NULL
,
table
->
read_set
->
n_bits
,
false
))
return
ER_OUTOFMEMORY
;
#if 0
if (table->s->keys)
table->mark_columns_used_by_index(table->s->primary_key, &scan_fields);
else
bitmap_clear_all(&scan_fields);
bitmap_union(&scan_fields, table->read_set);
#else
/* Why is read_set not setup correctly? */
bitmap_set_all
(
&
scan_fields
);
#endif
if
((
error_code
=
trx
->
clustrix_net
->
scan_init
(
clustrix_table_oid
,
active_index
,
clustrix_connection
::
SORT_NONE
,
&
scan_fields
,
&
scan_refid
)))
return
error_code
;
return
rnd_next
(
buf
);
}
int
ha_clustrixdb
::
index_next
(
uchar
*
buf
)
{
DBUG_ENTER
(
"index_next"
);
DBUG_RETURN
(
rnd_next
(
buf
));
}
#if 0
int ha_clustrixdb::index_next_same(uchar *buf, const uchar *key, uint keylen)
{
DBUG_ENTER("index_next_same");
DBUG_RETURN(rnd_next(buf));
}
#endif
int
ha_clustrixdb
::
index_prev
(
uchar
*
buf
)
{
DBUG_ENTER
(
"index_prev"
);
DBUG_RETURN
(
rnd_next
(
buf
));
}
int
ha_clustrixdb
::
index_end
()
{
DBUG_ENTER
(
"index_prev"
);
if
(
scan_refid
)
DBUG_RETURN
(
rnd_end
());
else
DBUG_RETURN
(
0
);
}
int
ha_clustrixdb
::
rnd_init
(
bool
scan
)
{
int
error_code
=
0
;
THD
*
thd
=
ha_thd
();
st_clustrixdb_trx
*
trx
=
get_trx
(
thd
,
&
error_code
);
if
(
!
trx
)
return
error_code
;
add_current_table_to_rpl_table_list
();
is_scan
=
scan
;
scan_refid
=
0
;
if
(
my_bitmap_init
(
&
scan_fields
,
NULL
,
table
->
read_set
->
n_bits
,
false
))
return
ER_OUTOFMEMORY
;
#if 0
if (table->s->keys)
table->mark_columns_used_by_index(table->s->primary_key, &scan_fields);
else
bitmap_clear_all(&scan_fields);
bitmap_union(&scan_fields, table->read_set);
#else
/* Why is read_set not setup correctly? */
bitmap_set_all
(
&
scan_fields
);
#endif
if
((
error_code
=
trx
->
clustrix_net
->
scan_init
(
clustrix_table_oid
,
0
,
clustrix_connection
::
SORT_NONE
,
&
scan_fields
,
&
scan_refid
)))
return
error_code
;
return
0
;
}
int
ha_clustrixdb
::
rnd_next
(
uchar
*
buf
)
{
int
error_code
=
0
;
THD
*
thd
=
ha_thd
();
st_clustrixdb_trx
*
trx
=
get_trx
(
thd
,
&
error_code
);
if
(
!
trx
)
return
error_code
;
assert
(
is_scan
);
assert
(
scan_refid
);
uchar
*
rowdata
;
ulong
rowdata_length
;
if
((
error_code
=
trx
->
clustrix_net
->
scan_next
(
scan_refid
,
&
rowdata
,
&
rowdata_length
)))
return
error_code
;
if
(
has_hidden_key
)
{
last_hidden_key
=
*
(
ulonglong
*
)
rowdata
;
rowdata
+=
8
;
rowdata_length
-=
8
;
}
uchar
const
*
current_row_end
;
ulong
master_reclength
;
error_code
=
unpack_row
(
rgi
,
table
,
table
->
s
->
fields
,
rowdata
,
&
scan_fields
,
&
current_row_end
,
&
master_reclength
,
rowdata
+
rowdata_length
);
if
(
error_code
)
return
error_code
;
return
0
;
}
int
ha_clustrixdb
::
rnd_pos
(
uchar
*
buf
,
uchar
*
pos
)
{
DBUG_ENTER
(
"clx_rnd_pos"
);
DBUG_DUMP
(
"pos"
,
pos
,
ref_length
);
int
error_code
=
0
;
THD
*
thd
=
ha_thd
();
st_clustrixdb_trx
*
trx
=
get_trx
(
thd
,
&
error_code
);
if
(
!
trx
)
DBUG_RETURN
(
error_code
);
/* WDD: We need a way to convert key buffers directy to rbr buffers. */
if
(
has_hidden_key
)
{
memcpy
(
&
last_hidden_key
,
pos
,
sizeof
(
ulonglong
));
}
else
{
uint
keyno
=
table
->
s
->
primary_key
;
uint
len
=
calculate_key_len
(
table
,
keyno
,
pos
,
table
->
const_key_parts
[
keyno
]);
key_restore
(
table
->
record
[
0
],
pos
,
&
table
->
key_info
[
keyno
],
len
);
}
/* XXX: We cannot use a fixed length buffer for this. */
uchar
packed_key
[
CLUSTRIXDB_ROW_LIMIT
];
size_t
packed_key_len
;
build_key_packed_row
(
table
->
s
->
primary_key
,
packed_key
,
&
packed_key_len
);
uchar
*
rowdata
;
ulong
rowdata_length
;
if
((
error_code
=
trx
->
clustrix_net
->
key_read
(
clustrix_table_oid
,
0
,
table
->
read_set
,
packed_key
,
packed_key_len
,
&
rowdata
,
&
rowdata_length
)))
DBUG_RETURN
(
error_code
);
uchar
const
*
current_row_end
;
ulong
master_reclength
;
if
((
error_code
=
unpack_row
(
rgi
,
table
,
table
->
s
->
fields
,
rowdata
,
table
->
read_set
,
&
current_row_end
,
&
master_reclength
,
rowdata
+
rowdata_length
)))
DBUG_RETURN
(
error_code
);
DBUG_RETURN
(
0
);
}
int
ha_clustrixdb
::
rnd_end
()
{
int
error_code
=
0
;
THD
*
thd
=
ha_thd
();
st_clustrixdb_trx
*
trx
=
get_trx
(
thd
,
&
error_code
);
if
(
!
trx
)
return
error_code
;
my_bitmap_free
(
&
scan_fields
);
if
(
scan_refid
&&
(
error_code
=
trx
->
clustrix_net
->
scan_end
(
scan_refid
)))
return
error_code
;
scan_refid
=
0
;
return
0
;
}
void
ha_clustrixdb
::
position
(
const
uchar
*
record
)
{
DBUG_ENTER
(
"clx_position"
);
if
(
has_hidden_key
)
{
memcpy
(
ref
,
&
last_hidden_key
,
sizeof
(
ulonglong
));
}
else
{
KEY
*
key_info
=
table
->
key_info
+
table
->
s
->
primary_key
;
key_copy
(
ref
,
record
,
key_info
,
key_info
->
key_length
);
}
DBUG_DUMP
(
"key"
,
ref
,
ref_length
);
DBUG_VOID_RETURN
;
}
uint
ha_clustrixdb
::
lock_count
(
void
)
const
{
/* Hopefully, we don't need to use thread locks */
return
0
;
}
THR_LOCK_DATA
**
ha_clustrixdb
::
store_lock
(
THD
*
thd
,
THR_LOCK_DATA
**
to
,
enum
thr_lock_type
lock_type
)
{
/* Hopefully, we don't need to use thread locks */
return
to
;
}
int
ha_clustrixdb
::
external_lock
(
THD
*
thd
,
int
lock_type
)
{
int
error_code
;
st_clustrixdb_trx
*
trx
=
get_trx
(
thd
,
&
error_code
);
if
(
lock_type
!=
F_UNLCK
)
trx
->
begin_trans
();
//if (lock_type != F_UNLCK)
//DBUG_ASSERT(trx && trx == get_trx(thd, &error_code));
return
0
;
}
/****************************************************************************
Engine Condition Pushdown
****************************************************************************/
const
COND
*
ha_clustrixdb
::
cond_push
(
const
COND
*
cond
)
{
return
cond
;
}
void
ha_clustrixdb
::
cond_pop
()
{
}
int
ha_clustrixdb
::
info_push
(
uint
info_type
,
void
*
info
)
{
return
0
;
}
void
ha_clustrixdb
::
add_current_table_to_rpl_table_list
()
{
if
(
rli
)
return
;
THD
*
thd
=
ha_thd
();
rli
=
new
Relay_log_info
(
FALSE
);
rli
->
sql_driver_thd
=
thd
;
rgi
=
new
rpl_group_info
(
rli
);
rgi
->
thd
=
thd
;
rgi
->
tables_to_lock_count
=
0
;
rgi
->
tables_to_lock
=
NULL
;
if
(
rgi
->
tables_to_lock_count
)
return
;
rgi
->
tables_to_lock
=
(
RPL_TABLE_LIST
*
)
my_malloc
(
sizeof
(
RPL_TABLE_LIST
),
MYF
(
MY_WME
));
rgi
->
tables_to_lock
->
init_one_table
(
&
table
->
s
->
db
,
&
table
->
s
->
table_name
,
0
,
TL_READ
);
rgi
->
tables_to_lock
->
table
=
table
;
rgi
->
tables_to_lock
->
table_id
=
table
->
tablenr
;
rgi
->
tables_to_lock
->
m_conv_table
=
NULL
;
rgi
->
tables_to_lock
->
master_had_triggers
=
FALSE
;
rgi
->
tables_to_lock
->
m_tabledef_valid
=
TRUE
;
/* XXX: We cannot use a fixed length buffer for this. */
uchar
col_type
[
CLUSTRIXDB_ROW_LIMIT
];
for
(
uint
i
=
0
;
i
<
table
->
s
->
fields
;
++
i
)
col_type
[
i
]
=
table
->
field
[
i
]
->
binlog_type
();
table_def
*
tabledef
=
&
rgi
->
tables_to_lock
->
m_tabledef
;
new
(
tabledef
)
table_def
(
col_type
,
table
->
s
->
fields
,
NULL
,
0
,
NULL
,
0
);
rgi
->
tables_to_lock_count
++
;
}
void
ha_clustrixdb
::
remove_current_table_from_rpl_table_list
()
{
if
(
!
rgi
->
tables_to_lock
)
return
;
rgi
->
tables_to_lock
->
m_tabledef
.
table_def
::~
table_def
();
rgi
->
tables_to_lock
->
m_tabledef_valid
=
FALSE
;
my_free
(
rgi
->
tables_to_lock
);
rgi
->
tables_to_lock_count
--
;
rgi
->
tables_to_lock
=
NULL
;
delete
rli
;
delete
rgi
;
}
st_clustrixdb_trx
*
ha_clustrixdb
::
get_trx
(
THD
*
thd
,
int
*
error_code
)
{
*
error_code
=
0
;
st_clustrixdb_trx
*
trx
;
if
(
!
(
trx
=
(
st_clustrixdb_trx
*
)
thd_get_ha_data
(
thd
,
clustrixdb_hton
)))
{
if
(
!
(
trx
=
new
st_clustrixdb_trx
(
thd
)))
{
*
error_code
=
HA_ERR_OUT_OF_MEM
;
return
NULL
;
}
if
((
*
error_code
=
trx
->
net_init
()))
{
delete
trx
;
return
NULL
;
}
thd_set_ha_data
(
thd
,
clustrixdb_hton
,
trx
);
}
return
trx
;
}
void
ha_clustrixdb
::
build_key_packed_row
(
uint
index
,
uchar
*
packed_key
,
size_t
*
packed_key_len
)
{
if
(
index
==
table
->
s
->
primary_key
&&
has_hidden_key
)
{
memcpy
(
packed_key
,
&
last_hidden_key
,
sizeof
(
ulonglong
));
*
packed_key_len
=
sizeof
(
ulonglong
);
}
else
{
// make a row from the table
table
->
mark_columns_used_by_index
(
index
,
&
table
->
tmp_set
);
*
packed_key_len
=
pack_row
(
table
,
&
table
->
tmp_set
,
packed_key
,
table
->
record
[
0
]);
}
}
/****************************************************************************
** Plugin Functions
****************************************************************************/
static
int
clustrixdb_commit
(
handlerton
*
hton
,
THD
*
thd
,
bool
all
)
{
int
error_code
=
0
;
st_clustrixdb_trx
*
trx
=
(
st_clustrixdb_trx
*
)
thd_get_ha_data
(
thd
,
hton
);
assert
(
trx
);
if
(
trx
->
has_transaction
)
{
if
(
all
||
!
thd_test_options
(
thd
,
OPTION_NOT_AUTOCOMMIT
|
OPTION_BEGIN
))
{
error_code
=
trx
->
clustrix_net
->
commit_trans
();
trx
->
has_transaction
=
FALSE
;
}
}
return
error_code
;
}
static
int
clustrixdb_rollback
(
handlerton
*
hton
,
THD
*
thd
,
bool
all
)
{
int
error_code
=
0
;
st_clustrixdb_trx
*
trx
=
(
st_clustrixdb_trx
*
)
thd_get_ha_data
(
thd
,
hton
);
assert
(
trx
);
if
(
trx
->
has_transaction
)
{
if
(
all
||
!
thd_test_options
(
thd
,
OPTION_NOT_AUTOCOMMIT
|
OPTION_BEGIN
))
{
error_code
=
trx
->
clustrix_net
->
rollback_trans
();
trx
->
has_transaction
=
FALSE
;
}
}
return
error_code
;
}
static
handler
*
clustrixdb_create_handler
(
handlerton
*
hton
,
TABLE_SHARE
*
table
,
MEM_ROOT
*
mem_root
)
{
return
new
(
mem_root
)
ha_clustrixdb
(
hton
,
table
);
}
static
int
clustrixdb_close_connection
(
handlerton
*
hton
,
THD
*
thd
)
{
st_clustrixdb_trx
*
trx
=
(
st_clustrixdb_trx
*
)
thd_get_ha_data
(
thd
,
hton
);
if
(
!
trx
)
return
0
;
/* Transaction is not started */
if
(
trx
->
has_transaction
)
clustrixdb_rollback
(
clustrixdb_hton
,
thd
,
TRUE
);
thd_set_ha_data
(
thd
,
clustrixdb_hton
,
NULL
);
delete
trx
;
return
0
;
}
static
int
clustrixdb_panic
(
handlerton
*
hton
,
ha_panic_function
type
)
{
return
0
;
}
static
bool
clustrixdb_show_status
(
handlerton
*
hton
,
THD
*
thd
,
stat_print_fn
*
stat_print
,
enum
ha_stat_type
stat_type
)
{
return
FALSE
;
}
static
int
clustrixdb_discover_table_names
(
handlerton
*
hton
,
LEX_CSTRING
*
db
,
MY_DIR
*
dir
,
handlerton
::
discovered_list
*
result
)
{
clustrix_connection
*
clustrix_net
=
new
clustrix_connection
();
int
error_code
=
clustrix_net
->
connect
();
if
(
error_code
)
return
error_code
;
error_code
=
clustrix_net
->
populate_table_list
(
db
,
result
);
delete
clustrix_net
;
return
0
;
// error_code;
}
int
clustrixdb_discover_table
(
handlerton
*
hton
,
THD
*
thd
,
TABLE_SHARE
*
share
)
{
clustrix_connection
*
clustrix_net
=
new
clustrix_connection
();
int
error_code
=
clustrix_net
->
connect
();
if
(
error_code
)
return
error_code
;
error_code
=
clustrix_net
->
discover_table_details
(
&
share
->
db
,
&
share
->
table_name
,
thd
,
share
);
delete
clustrix_net
;
return
error_code
;
}
static
int
clustrixdb_init
(
void
*
p
)
{
clustrixdb_hton
=
(
handlerton
*
)
p
;
clustrixdb_hton
->
state
=
SHOW_OPTION_YES
;
clustrixdb_hton
->
flags
=
HTON_NO_FLAGS
;
clustrixdb_hton
->
panic
=
clustrixdb_panic
;
clustrixdb_hton
->
close_connection
=
clustrixdb_close_connection
;
clustrixdb_hton
->
commit
=
clustrixdb_commit
;
clustrixdb_hton
->
rollback
=
clustrixdb_rollback
;
clustrixdb_hton
->
create
=
clustrixdb_create_handler
;
clustrixdb_hton
->
show_status
=
clustrixdb_show_status
;
clustrixdb_hton
->
discover_table_names
=
clustrixdb_discover_table_names
;
clustrixdb_hton
->
discover_table
=
clustrixdb_discover_table
;
return
0
;
}
struct
st_mysql_show_var
clustrixdb_status_vars
[]
=
{
{
NullS
,
NullS
,
SHOW_LONG
}
};
static
struct
st_mysql_sys_var
*
clustrixdb_system_variables
[]
=
{
MYSQL_SYSVAR
(
connect_timeout
),
MYSQL_SYSVAR
(
read_timeout
),
MYSQL_SYSVAR
(
write_timeout
),
MYSQL_SYSVAR
(
host
),
MYSQL_SYSVAR
(
username
),
MYSQL_SYSVAR
(
password
),
MYSQL_SYSVAR
(
port
),
MYSQL_SYSVAR
(
socket
),
NULL
};
static
struct
st_mysql_storage_engine
clustrixdb_storage_engine
=
{
MYSQL_HANDLERTON_INTERFACE_VERSION
};
maria_declare_plugin
(
clustrixdb
)
{
MYSQL_STORAGE_ENGINE_PLUGIN
,
/* Plugin Type */
&
clustrixdb_storage_engine
,
/* Plugin Descriptor */
"CLUSTRIXDB"
,
/* Plugin Name */
"MariaDB"
,
/* Plugin Author */
"ClustrixDB storage engine"
,
/* Plugin Description */
PLUGIN_LICENSE_GPL
,
/* Plugin Licence */
clustrixdb_init
,
/* Plugin Entry Point */
NULL
,
/* Plugin Deinitializer */
0x0001
,
/* Hex Version Number (0.1) */
NULL
/* clustrixdb_status_vars */
,
/* Status Variables */
clustrixdb_system_variables
,
/* System Variables */
"0.1"
,
/* String Version */
MariaDB_PLUGIN_MATURITY_EXPERIMENTAL
/* Maturity Level */
}
maria_declare_plugin_end
;
storage/clustrixdb/ha_clustrixdb.h
0 → 100644
View file @
70cb38cf
/*****************************************************************************
Copyright (c) 2019, MariaDB Corporation.
*****************************************************************************/
#ifndef _ha_clustrixdb_h
#define _ha_clustrixdb_h
#ifdef USE_PRAGMA_INTERFACE
#pragma interface
/* gcc class implementation */
#endif
#define MYSQL_SERVER 1
#include "clustrix_connection.h"
#include "my_bitmap.h"
#include "table.h"
#include "rpl_rli.h"
#include "handler.h"
#include "sql_class.h"
#include "sql_show.h"
#include "mysql.h"
#include "../../sql/rpl_record.h"
class
ha_clustrixdb
;
class
st_clustrixdb_trx
{
public:
THD
*
thd
;
clustrix_connection
*
clustrix_net
;
//query_id_t query_id;
//MEM_ROOT mem_root; /* Memory allocated for the executing transaction */
bool
has_transaction
;
st_clustrixdb_trx
(
THD
*
trx_thd
);
~
st_clustrixdb_trx
();
int
net_init
();
int
begin_trans
();
};
class
ha_clustrixdb
:
public
handler
{
private:
# define CLUSTRIXDB_ROW_LIMIT 1024
ulonglong
clustrix_table_oid
;
rpl_group_info
*
rgi
;
Relay_log_info
*
rli
;
RPL_TABLE_LIST
*
rpl_table_list
;
Field
*
auto_inc_field
;
ulonglong
auto_inc_value
;
bool
has_hidden_key
;
ulonglong
last_hidden_key
;
ulonglong
scan_refid
;
bool
is_scan
;
MY_BITMAP
scan_fields
;
public:
ha_clustrixdb
(
handlerton
*
hton
,
TABLE_SHARE
*
table_arg
);
~
ha_clustrixdb
();
int
create
(
const
char
*
name
,
TABLE
*
form
,
HA_CREATE_INFO
*
info
);
int
delete_table
(
const
char
*
name
);
int
open
(
const
char
*
name
,
int
mode
,
uint
test_if_locked
);
int
close
(
void
);
int
reset
();
int
write_row
(
uchar
*
buf
);
// start_bulk_update exec_bulk_update
int
update_row
(
const
uchar
*
old_data
,
const
uchar
*
new_data
);
// start_bulk_delete exec_bulk_delete
int
delete_row
(
const
uchar
*
buf
);
Table_flags
table_flags
(
void
)
const
;
ulong
index_flags
(
uint
idx
,
uint
part
,
bool
all_parts
)
const
;
uint
max_supported_keys
()
const
{
return
MAX_KEY
;
}
ha_rows
records
();
ha_rows
records_in_range
(
uint
inx
,
key_range
*
min_key
,
key_range
*
max_key
);
int
info
(
uint
flag
);
// see my_base.h for full description
// multi_read_range
// read_range
int
index_init
(
uint
idx
,
bool
sorted
);
int
index_read
(
uchar
*
buf
,
const
uchar
*
key
,
uint
key_len
,
enum
ha_rkey_function
find_flag
);
int
index_first
(
uchar
*
buf
);
int
index_prev
(
uchar
*
buf
);
int
index_last
(
uchar
*
buf
);
int
index_next
(
uchar
*
buf
);
//int index_next_same(uchar *buf, const uchar *key, uint keylen);
int
index_end
();
int
rnd_init
(
bool
scan
);
int
rnd_next
(
uchar
*
buf
);
int
rnd_pos
(
uchar
*
buf
,
uchar
*
pos
);
int
rnd_end
();
void
position
(
const
uchar
*
record
);
uint
lock_count
(
void
)
const
;
THR_LOCK_DATA
**
store_lock
(
THD
*
thd
,
THR_LOCK_DATA
**
to
,
enum
thr_lock_type
lock_type
);
int
external_lock
(
THD
*
thd
,
int
lock_type
);
uint8
table_cache_type
()
{
return
(
HA_CACHE_TBL_NOCACHE
);
}
const
COND
*
cond_push
(
const
COND
*
cond
);
void
cond_pop
();
int
info_push
(
uint
info_type
,
void
*
info
);
private:
st_clustrixdb_trx
*
get_trx
(
THD
*
thd
,
int
*
error_code
);
void
add_current_table_to_rpl_table_list
();
void
remove_current_table_from_rpl_table_list
();
void
build_key_packed_row
(
uint
index
,
uchar
*
packed_key
,
size_t
*
packed_key_len
);
};
#endif // _ha_clustrixdb_h
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment