Commit 3466b8d5 authored by unknown's avatar unknown

Merge tulin@bk-internal.mysql.com:/home/bk/mysql-4.1-ndb

into poseidon.ndb.mysql.com:/home/tomas/mysql-4.1-ndb-merge

parents 069d54fd 4cbb9917
...@@ -1811,7 +1811,7 @@ sub fix_image ...@@ -1811,7 +1811,7 @@ sub fix_image
{ {
my($text) = @_; my($text) = @_;
my($arg1, $ext); my($arg1, $ext);
$text =~ /^([^,]*)$/; $text =~ /^([^,]*)/;
die "error in image: '$text'" unless defined($1); die "error in image: '$text'" unless defined($1);
$arg1 = $1; $arg1 = $1;
$arg1 =~ s/@@/@/g; $arg1 =~ s/@@/@/g;
......
...@@ -365,6 +365,11 @@ uint my_instr_mb(struct charset_info_st *, ...@@ -365,6 +365,11 @@ uint my_instr_mb(struct charset_info_st *,
const char *s, uint s_length, const char *s, uint s_length,
my_match_t *match, uint nmatch); my_match_t *match, uint nmatch);
int my_wildcmp_unicode(CHARSET_INFO *cs,
const char *str, const char *str_end,
const char *wildstr, const char *wildend,
int escape, int w_one, int w_many,
MY_UNICASE_INFO **weights);
extern my_bool my_parse_charset_xml(const char *bug, uint len, extern my_bool my_parse_charset_xml(const char *bug, uint len,
int (*add)(CHARSET_INFO *cs)); int (*add)(CHARSET_INFO *cs));
......
...@@ -25,7 +25,6 @@ extern "C" { ...@@ -25,7 +25,6 @@ extern "C" {
MYSQL_FIELD *unpack_fields(MYSQL_DATA *data,MEM_ROOT *alloc,uint fields, MYSQL_FIELD *unpack_fields(MYSQL_DATA *data,MEM_ROOT *alloc,uint fields,
my_bool default_value, uint server_capabilities); my_bool default_value, uint server_capabilities);
void free_rows(MYSQL_DATA *cur); void free_rows(MYSQL_DATA *cur);
my_bool mysql_autenticate(MYSQL *mysql, const char *passwd);
void free_old_query(MYSQL *mysql); void free_old_query(MYSQL *mysql);
void end_server(MYSQL *mysql); void end_server(MYSQL *mysql);
my_bool mysql_reconnect(MYSQL *mysql); my_bool mysql_reconnect(MYSQL *mysql);
......
...@@ -2244,7 +2244,7 @@ dict_foreign_add_to_cache( ...@@ -2244,7 +2244,7 @@ dict_foreign_add_to_cache(
Scans from pointer onwards. Stops if is at the start of a copy of Scans from pointer onwards. Stops if is at the start of a copy of
'string' where characters are compared without case sensitivity. Stops 'string' where characters are compared without case sensitivity. Stops
also at '\0'. */ also at '\0'. */
static
const char* const char*
dict_scan_to( dict_scan_to(
/*=========*/ /*=========*/
......
...@@ -891,6 +891,18 @@ dict_tables_have_same_db( ...@@ -891,6 +891,18 @@ dict_tables_have_same_db(
const char* name2); /* in: table name in the form const char* name2); /* in: table name in the form
dbname '/' tablename */ dbname '/' tablename */
/*************************************************************************
Scans from pointer onwards. Stops if is at the start of a copy of
'string' where characters are compared without case sensitivity. Stops
also at '\0'. */
const char*
dict_scan_to(
/*=========*/
/* out: scanned up to this */
const char* ptr, /* in: scan from */
const char* string);/* in: look for this */
/* Buffers for storing detailed information about the latest foreign key /* Buffers for storing detailed information about the latest foreign key
and unique key errors */ and unique key errors */
extern FILE* dict_foreign_err_file; extern FILE* dict_foreign_err_file;
......
...@@ -50,6 +50,15 @@ innobase_invalidate_query_cache( ...@@ -50,6 +50,15 @@ innobase_invalidate_query_cache(
ulint full_name_len); /* in: full name length where also the null ulint full_name_len); /* in: full name length where also the null
chars count */ chars count */
/**********************************************************************
This function returns true if SQL-query in the current thread
is either REPLACE or LOAD DATA INFILE REPLACE.
NOTE that /mysql/innobase/row/row0ins.c must contain the
prototype for this function ! */
ibool
innobase_query_is_replace(void);
/*===========================*/
/************************************************************************* /*************************************************************************
Creates an insert node struct. */ Creates an insert node struct. */
...@@ -1482,9 +1491,9 @@ row_ins_scan_sec_index_for_duplicate( ...@@ -1482,9 +1491,9 @@ row_ins_scan_sec_index_for_duplicate(
ulint err = DB_SUCCESS; ulint err = DB_SUCCESS;
ibool moved; ibool moved;
mtr_t mtr; mtr_t mtr;
trx_t *trx; trx_t* trx;
ibool success; const char* ptr;
n_unique = dict_index_get_n_unique(index); n_unique = dict_index_get_n_unique(index);
/* If the secondary index is unique, but one of the fields in the /* If the secondary index is unique, but one of the fields in the
...@@ -1523,9 +1532,8 @@ row_ins_scan_sec_index_for_duplicate( ...@@ -1523,9 +1532,8 @@ row_ins_scan_sec_index_for_duplicate(
trx = thr_get_trx(thr); trx = thr_get_trx(thr);
ut_ad(trx); ut_ad(trx);
dict_accept(*trx->mysql_query_str, "REPLACE", &success);
if (success) { if (innobase_query_is_replace()) {
/* The manual defines the REPLACE semantics that it /* The manual defines the REPLACE semantics that it
is either an INSERT or DELETE(s) for duplicate key is either an INSERT or DELETE(s) for duplicate key
...@@ -1605,7 +1613,7 @@ row_ins_duplicate_error_in_clust( ...@@ -1605,7 +1613,7 @@ row_ins_duplicate_error_in_clust(
page_t* page; page_t* page;
ulint n_unique; ulint n_unique;
trx_t* trx = thr_get_trx(thr); trx_t* trx = thr_get_trx(thr);
ibool success; const char* ptr;
UT_NOT_USED(mtr); UT_NOT_USED(mtr);
...@@ -1639,10 +1647,7 @@ row_ins_duplicate_error_in_clust( ...@@ -1639,10 +1647,7 @@ row_ins_duplicate_error_in_clust(
sure that in roll-forward we get the same duplicate sure that in roll-forward we get the same duplicate
errors as in original execution */ errors as in original execution */
dict_accept(*trx->mysql_query_str, "REPLACE", if (innobase_query_is_replace()) {
&success);
if (success) {
/* The manual defines the REPLACE semantics /* The manual defines the REPLACE semantics
that it is either an INSERT or DELETE(s) that it is either an INSERT or DELETE(s)
...@@ -1683,15 +1688,9 @@ row_ins_duplicate_error_in_clust( ...@@ -1683,15 +1688,9 @@ row_ins_duplicate_error_in_clust(
/* The manual defines the REPLACE semantics that it /* The manual defines the REPLACE semantics that it
is either an INSERT or DELETE(s) for duplicate key is either an INSERT or DELETE(s) for duplicate key
+ INSERT. Therefore, we should take X-lock for + INSERT. Therefore, we should take X-lock for
duplicates. duplicates. */
*/
/* Is the first word in MySQL query REPLACE ? */
dict_accept(*trx->mysql_query_str, "REPLACE",
&success);
if (success) { if (innobase_query_is_replace()) {
err = row_ins_set_exclusive_rec_lock( err = row_ins_set_exclusive_rec_lock(
LOCK_REC_NOT_GAP, LOCK_REC_NOT_GAP,
......
...@@ -2794,7 +2794,7 @@ row_search_for_mysql( ...@@ -2794,7 +2794,7 @@ row_search_for_mysql(
rec_t* index_rec; rec_t* index_rec;
rec_t* clust_rec; rec_t* clust_rec;
rec_t* old_vers; rec_t* old_vers;
ulint err; ulint err = DB_SUCCESS;
ibool moved; ibool moved;
ibool cons_read_requires_clust_rec; ibool cons_read_requires_clust_rec;
ibool was_lock_wait; ibool was_lock_wait;
...@@ -3203,26 +3203,20 @@ row_search_for_mysql( ...@@ -3203,26 +3203,20 @@ row_search_for_mysql(
if (prebuilt->select_lock_type != LOCK_NONE if (prebuilt->select_lock_type != LOCK_NONE
&& set_also_gap_locks) { && set_also_gap_locks) {
/* Try to place a lock on the index record */ /* Try to place a lock on the index record */
/* If innodb_locks_unsafe_for_binlog option is used, /* If innodb_locks_unsafe_for_binlog option is used,
we lock only the record, i.e. next-key locking is we do not lock gaps. Supremum record is really
not used. a gap and therefore we do not set locks there. */
*/
if ( srv_locks_unsafe_for_binlog ) if ( srv_locks_unsafe_for_binlog == FALSE )
{
err = sel_set_rec_lock(rec, index,
prebuilt->select_lock_type,
LOCK_REC_NOT_GAP, thr);
}
else
{ {
err = sel_set_rec_lock(rec, index, err = sel_set_rec_lock(rec, index,
prebuilt->select_lock_type, prebuilt->select_lock_type,
LOCK_ORDINARY, thr); LOCK_ORDINARY, thr);
} }
if (err != DB_SUCCESS) { if (err != DB_SUCCESS) {
goto lock_wait_or_error; goto lock_wait_or_error;
} }
......
...@@ -237,7 +237,7 @@ int STDCALL mysql_manager_fetch_line(MYSQL_MANAGER* con, char* res_buf, ...@@ -237,7 +237,7 @@ int STDCALL mysql_manager_fetch_line(MYSQL_MANAGER* con, char* res_buf,
char* res_buf_end=res_buf+res_buf_size; char* res_buf_end=res_buf+res_buf_size;
char* net_buf=(char*) con->net.read_pos, *net_buf_end; char* net_buf=(char*) con->net.read_pos, *net_buf_end;
int res_buf_shift=RES_BUF_SHIFT; int res_buf_shift=RES_BUF_SHIFT;
uint num_bytes; ulong num_bytes;
if (res_buf_size<RES_BUF_SHIFT) if (res_buf_size<RES_BUF_SHIFT)
{ {
......
...@@ -63,6 +63,15 @@ select 'A' like 'a' collate utf8_bin; ...@@ -63,6 +63,15 @@ select 'A' like 'a' collate utf8_bin;
select _utf8 0xD0B0D0B1D0B2 like concat(_utf8'%',_utf8 0xD0B1,_utf8 '%'); select _utf8 0xD0B0D0B1D0B2 like concat(_utf8'%',_utf8 0xD0B1,_utf8 '%');
_utf8 0xD0B0D0B1D0B2 like concat(_utf8'%',_utf8 0xD0B1,_utf8 '%') _utf8 0xD0B0D0B1D0B2 like concat(_utf8'%',_utf8 0xD0B1,_utf8 '%')
1 1
select convert(_latin1'Gnter Andr' using utf8) like CONVERT(_latin1'GNTER%' USING utf8);
convert(_latin1'Gnter Andr' using utf8) like CONVERT(_latin1'GNTER%' USING utf8)
1
select CONVERT(_koi8r'' USING utf8) LIKE CONVERT(_koi8r'' USING utf8);
CONVERT(_koi8r'' USING utf8) LIKE CONVERT(_koi8r'' USING utf8)
1
select CONVERT(_koi8r'' USING utf8) LIKE CONVERT(_koi8r'' USING utf8);
CONVERT(_koi8r'' USING utf8) LIKE CONVERT(_koi8r'' USING utf8)
1
SELECT 'a' = 'a '; SELECT 'a' = 'a ';
'a' = 'a ' 'a' = 'a '
1 1
......
use test; drop table if exists t1, t2;
drop table if exists test_select; CREATE TABLE t1(session_id char(9) NOT NULL);
Warnings: INSERT INTO t1 VALUES ("abc");
Note 1051 Unknown table 'test_select' SELECT * FROM t1;
CREATE TABLE test_select(session_id char(9) NOT NULL);
INSERT INTO test_select VALUES ("abc");
SELECT * FROM test_select;
session_id session_id
abc abc
prepare st_1180 from 'SELECT * FROM test_select WHERE ?="1111" and session_id = "abc"'; prepare st_1180 from 'SELECT * FROM t1 WHERE ?="1111" and session_id = "abc"';
set @arg1= 'abc'; set @arg1= 'abc';
execute st_1180 using @arg1; execute st_1180 using @arg1;
session_id session_id
...@@ -18,4 +15,104 @@ abc ...@@ -18,4 +15,104 @@ abc
set @arg1= 'abc'; set @arg1= 'abc';
execute st_1180 using @arg1; execute st_1180 using @arg1;
session_id session_id
drop table test_select; drop table t1;
create table t1 (
c_01 char(6), c_02 integer, c_03 real, c_04 int(3), c_05 varchar(20),
c_06 date, c_07 char(1), c_08 real, c_09 int(11), c_10 time,
c_11 char(6), c_12 integer, c_13 real, c_14 int(3), c_15 varchar(20),
c_16 date, c_17 char(1), c_18 real, c_19 int(11), c_20 text);
prepare st_1644 from 'insert into t1 values ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)';
set @arg01= 'row_1';
set @arg02= 1;
set @arg03= 1.1;
set @arg04= 111;
set @arg05= 'row_one';
set @arg06= '2004-10-12';
set @arg07= '1';
set @arg08= 1.1;
set @arg09= '100100100';
set @arg10= '12:34:56';
set @arg11= 'row_1';
set @arg12= 1;
set @arg13= 1.1;
set @arg14= 111;
set @arg15= 'row_one';
set @arg16= '2004-10-12';
set @arg17= '1';
set @arg18= 1.1;
set @arg19= '100100100';
set @arg20= '12:34:56';
execute st_1644 using @arg01, @arg02, @arg03, @arg04, @arg05, @arg06, @arg07, @arg08, @arg09, @arg10,
@arg11, @arg12, @arg13, @arg14, @arg15, @arg16, @arg17, @arg18, @arg19, @arg20;
set @arg01= NULL;
set @arg02= NULL;
set @arg03= NULL;
set @arg04= NULL;
set @arg05= NULL;
set @arg06= NULL;
set @arg07= NULL;
set @arg08= NULL;
set @arg09= NULL;
set @arg10= NULL;
set @arg11= NULL;
set @arg12= NULL;
set @arg13= NULL;
set @arg14= NULL;
set @arg15= NULL;
set @arg16= NULL;
set @arg17= NULL;
set @arg18= NULL;
set @arg19= NULL;
set @arg20= NULL;
execute st_1644 using @arg01, @arg02, @arg03, @arg04, @arg05, @arg06, @arg07, @arg08, @arg09, @arg10,
@arg11, @arg12, @arg13, @arg14, @arg15, @arg16, @arg17, @arg18, @arg19, @arg20;
set @arg01= 'row_3';
set @arg02= 3;
set @arg03= 3.3;
set @arg04= 333;
set @arg05= 'row_three';
set @arg06= '2004-10-12';
set @arg07= '3';
set @arg08= 3.3;
set @arg09= '300300300';
set @arg10= '12:34:56';
set @arg11= 'row_3';
set @arg12= 3;
set @arg13= 3.3;
set @arg14= 333;
set @arg15= 'row_three';
set @arg16= '2004-10-12';
set @arg17= '3';
set @arg18= 3.3;
set @arg19= '300300300';
set @arg20= '12:34:56';
execute st_1644 using @arg01, @arg02, @arg03, @arg04, @arg05, @arg06, @arg07, @arg08, @arg09, @arg10,
@arg11, @arg12, @arg13, @arg14, @arg15, @arg16, @arg17, @arg18, @arg19, @arg20;
select * from t1;
c_01 c_02 c_03 c_04 c_05 c_06 c_07 c_08 c_09 c_10 c_11 c_12 c_13 c_14 c_15 c_16 c_17 c_18 c_19 c_20
row_1 1 1.1 111 row_one 2004-10-12 1 1.1 100100100 12:34:56 row_1 1 1.1 111 row_one 2004-10-12 1 1.1 100100100 12:34:56
NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL NULL
row_3 3 3.3 333 row_three 2004-10-12 3 3.3 300300300 12:34:56 row_3 3 3.3 333 row_three 2004-10-12 3 3.3 300300300 12:34:56
drop table t1;
create table t1(
cola varchar(50) not null,
colb varchar(8) not null,
colc varchar(12) not null,
cold varchar(2) not null,
primary key (cola, colb, cold));
create table t2(
cola varchar(50) not null,
colb varchar(8) not null,
colc varchar(2) not null,
cold float,
primary key (cold));
insert into t1 values ('aaaa', 'yyyy', 'yyyy-dd-mm', 'R');
insert into t2 values ('aaaa', 'yyyy', 'R', 203), ('bbbb', 'zzzz', 'C', 201);
prepare st_1676 from 'select a.cola, a.colb, a.cold from t1 a, t2 b where a.cola = ? and a.colb = ? and a.cold = ? and b.cola = a.cola and b.colb = a.colb and b.colc = a.cold';
set @arg0= "aaaa";
set @arg1= "yyyy";
set @arg2= "R";
execute st_1676 using @arg0, @arg1, @arg2;
cola colb cold
aaaa yyyy R
drop table t1, t2;
...@@ -33,6 +33,14 @@ select 'A' like 'a'; ...@@ -33,6 +33,14 @@ select 'A' like 'a';
select 'A' like 'a' collate utf8_bin; select 'A' like 'a' collate utf8_bin;
select _utf8 0xD0B0D0B1D0B2 like concat(_utf8'%',_utf8 0xD0B1,_utf8 '%'); select _utf8 0xD0B0D0B1D0B2 like concat(_utf8'%',_utf8 0xD0B1,_utf8 '%');
# Bug #6040: can't retrieve records with umlaut
# characters in case insensitive manner.
# Case insensitive search LIKE comparison
# was broken for multibyte characters:
select convert(_latin1'Gnter Andr' using utf8) like CONVERT(_latin1'GNTER%' USING utf8);
select CONVERT(_koi8r'' USING utf8) LIKE CONVERT(_koi8r'' USING utf8);
select CONVERT(_koi8r'' USING utf8) LIKE CONVERT(_koi8r'' USING utf8);
# #
# Check the following: # Check the following:
# "a" == "a " # "a" == "a "
...@@ -568,6 +576,7 @@ DROP TABLE t1; ...@@ -568,6 +576,7 @@ DROP TABLE t1;
# #
# Bug #5723: length(<varchar utf8 field>) returns varying results # Bug #5723: length(<varchar utf8 field>) returns varying results
# #
--disable_warnings
SET NAMES utf8; SET NAMES utf8;
--disable_warnings --disable_warnings
CREATE TABLE t1 ( CREATE TABLE t1 (
......
...@@ -3,19 +3,25 @@ ...@@ -3,19 +3,25 @@
# Prepared Statements # # Prepared Statements #
# re-testing bug DB entries # # re-testing bug DB entries #
# # # #
# The bugs are reported as "closed". #
# Command sequences taken from bug report. #
# No other test contains the bug# as comment. #
# #
# Tests drop/create tables 't1', 't2', ... #
# #
############################################### ###############################################
use test; --disable_warnings
drop table if exists t1, t2;
--enable_warnings
# bug#1180: optimized away part of WHERE clause cause incorect prepared satatement results # bug#1180: optimized away part of WHERE clause cause incorect prepared satatement results
drop table if exists test_select; CREATE TABLE t1(session_id char(9) NOT NULL);
INSERT INTO t1 VALUES ("abc");
SELECT * FROM t1;
CREATE TABLE test_select(session_id char(9) NOT NULL); prepare st_1180 from 'SELECT * FROM t1 WHERE ?="1111" and session_id = "abc"';
INSERT INTO test_select VALUES ("abc");
SELECT * FROM test_select;
prepare st_1180 from 'SELECT * FROM test_select WHERE ?="1111" and session_id = "abc"';
# Must not find a row # Must not find a row
set @arg1= 'abc'; set @arg1= 'abc';
...@@ -29,4 +35,97 @@ execute st_1180 using @arg1; ...@@ -29,4 +35,97 @@ execute st_1180 using @arg1;
set @arg1= 'abc'; set @arg1= 'abc';
execute st_1180 using @arg1; execute st_1180 using @arg1;
drop table test_select; drop table t1;
# end of bug#1180
# bug#1644: Insertion of more than 3 NULL columns with parameter binding fails
# Using prepared statements, insertion of more than three columns with NULL
# values fails to insert additional NULLS. After the third column NULLS will
# be inserted into the database as zeros.
# First insert four columns of a value (i.e. 22) to verify binding is working
# correctly. Then Bind to each columns bind parameter an is_null value of 1.
# Then insert four more columns of integers, just for sanity.
# A subsequent select on the server will result in this:
# mysql> select * from foo_dfr;
# +------+------+------+------+
# | col1 | col2 | col3 | col4 |
# +------+------+------+------+
# | 22 | 22 | 22 | 22 |
# | NULL | NULL | NULL | 0 |
# | 88 | 88 | 88 | 88 |
# +------+------+------+------+
# Test is extended to more columns - code stores bit vector in bytes.
create table t1 (
c_01 char(6), c_02 integer, c_03 real, c_04 int(3), c_05 varchar(20),
c_06 date, c_07 char(1), c_08 real, c_09 int(11), c_10 time,
c_11 char(6), c_12 integer, c_13 real, c_14 int(3), c_15 varchar(20),
c_16 date, c_17 char(1), c_18 real, c_19 int(11), c_20 text);
# Do not use "timestamp" type, because it has a non-NULL default as of 4.1.2
prepare st_1644 from 'insert into t1 values ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)';
set @arg01= 'row_1'; set @arg02= 1; set @arg03= 1.1; set @arg04= 111; set @arg05= 'row_one';
set @arg06= '2004-10-12'; set @arg07= '1'; set @arg08= 1.1; set @arg09= '100100100'; set @arg10= '12:34:56';
set @arg11= 'row_1'; set @arg12= 1; set @arg13= 1.1; set @arg14= 111; set @arg15= 'row_one';
set @arg16= '2004-10-12'; set @arg17= '1'; set @arg18= 1.1; set @arg19= '100100100'; set @arg20= '12:34:56';
execute st_1644 using @arg01, @arg02, @arg03, @arg04, @arg05, @arg06, @arg07, @arg08, @arg09, @arg10,
@arg11, @arg12, @arg13, @arg14, @arg15, @arg16, @arg17, @arg18, @arg19, @arg20;
set @arg01= NULL; set @arg02= NULL; set @arg03= NULL; set @arg04= NULL; set @arg05= NULL;
set @arg06= NULL; set @arg07= NULL; set @arg08= NULL; set @arg09= NULL; set @arg10= NULL;
set @arg11= NULL; set @arg12= NULL; set @arg13= NULL; set @arg14= NULL; set @arg15= NULL;
set @arg16= NULL; set @arg17= NULL; set @arg18= NULL; set @arg19= NULL; set @arg20= NULL;
execute st_1644 using @arg01, @arg02, @arg03, @arg04, @arg05, @arg06, @arg07, @arg08, @arg09, @arg10,
@arg11, @arg12, @arg13, @arg14, @arg15, @arg16, @arg17, @arg18, @arg19, @arg20;
set @arg01= 'row_3'; set @arg02= 3; set @arg03= 3.3; set @arg04= 333; set @arg05= 'row_three';
set @arg06= '2004-10-12'; set @arg07= '3'; set @arg08= 3.3; set @arg09= '300300300'; set @arg10= '12:34:56';
set @arg11= 'row_3'; set @arg12= 3; set @arg13= 3.3; set @arg14= 333; set @arg15= 'row_three';
set @arg16= '2004-10-12'; set @arg17= '3'; set @arg18= 3.3; set @arg19= '300300300'; set @arg20= '12:34:56';
execute st_1644 using @arg01, @arg02, @arg03, @arg04, @arg05, @arg06, @arg07, @arg08, @arg09, @arg10,
@arg11, @arg12, @arg13, @arg14, @arg15, @arg16, @arg17, @arg18, @arg19, @arg20;
select * from t1;
drop table t1;
# end of bug#1644
# bug#1677: Prepared statement two-table join returns no rows when one is expected
create table t1(
cola varchar(50) not null,
colb varchar(8) not null,
colc varchar(12) not null,
cold varchar(2) not null,
primary key (cola, colb, cold));
create table t2(
cola varchar(50) not null,
colb varchar(8) not null,
colc varchar(2) not null,
cold float,
primary key (cold));
insert into t1 values ('aaaa', 'yyyy', 'yyyy-dd-mm', 'R');
insert into t2 values ('aaaa', 'yyyy', 'R', 203), ('bbbb', 'zzzz', 'C', 201);
prepare st_1676 from 'select a.cola, a.colb, a.cold from t1 a, t2 b where a.cola = ? and a.colb = ? and a.cold = ? and b.cola = a.cola and b.colb = a.colb and b.colc = a.cold';
set @arg0= "aaaa";
set @arg1= "yyyy";
set @arg2= "R";
execute st_1676 using @arg0, @arg1, @arg2;
drop table t1, t2;
# end of bug#1676
...@@ -110,7 +110,7 @@ ...@@ -110,7 +110,7 @@
*/ */
#define MAX_TTREE_NODE_SIZE 64 // total words in node #define MAX_TTREE_NODE_SIZE 64 // total words in node
#define MAX_TTREE_PREF_SIZE 4 // words in min prefix #define MAX_TTREE_PREF_SIZE 4 // words in min prefix
#define MAX_TTREE_NODE_SLACK 3 // diff between max and min occupancy #define MAX_TTREE_NODE_SLACK 2 // diff between max and min occupancy
/* /*
* Blobs. * Blobs.
......
...@@ -32,7 +32,6 @@ ...@@ -32,7 +32,6 @@
// signal classes // signal classes
#include <signaldata/DictTabInfo.hpp> #include <signaldata/DictTabInfo.hpp>
#include <signaldata/TuxContinueB.hpp> #include <signaldata/TuxContinueB.hpp>
#include <signaldata/BuildIndx.hpp>
#include <signaldata/TupFrag.hpp> #include <signaldata/TupFrag.hpp>
#include <signaldata/AlterIndx.hpp> #include <signaldata/AlterIndx.hpp>
#include <signaldata/DropTab.hpp> #include <signaldata/DropTab.hpp>
...@@ -478,7 +477,7 @@ private: ...@@ -478,7 +477,7 @@ private:
Uint16 m_numAttrs; Uint16 m_numAttrs;
bool m_storeNullKey; bool m_storeNullKey;
TreeHead m_tree; TreeHead m_tree;
TupLoc m_freeLoc; // one node pre-allocated for insert TupLoc m_freeLoc; // list of free index nodes
DLList<ScanOp> m_scanList; // current scans on this fragment DLList<ScanOp> m_scanList; // current scans on this fragment
Uint32 m_tupIndexFragPtrI; Uint32 m_tupIndexFragPtrI;
Uint32 m_tupTableFragPtrI[2]; Uint32 m_tupTableFragPtrI[2];
...@@ -582,17 +581,24 @@ private: ...@@ -582,17 +581,24 @@ private:
* DbtuxNode.cpp * DbtuxNode.cpp
*/ */
int allocNode(Signal* signal, NodeHandle& node); int allocNode(Signal* signal, NodeHandle& node);
void selectNode(Signal* signal, NodeHandle& node, TupLoc loc); void selectNode(NodeHandle& node, TupLoc loc);
void insertNode(Signal* signal, NodeHandle& node); void insertNode(NodeHandle& node);
void deleteNode(Signal* signal, NodeHandle& node); void deleteNode(NodeHandle& node);
void setNodePref(Signal* signal, NodeHandle& node); void setNodePref(NodeHandle& node);
// node operations // node operations
void nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent); void nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList);
void nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); void nodePushUpScans(NodeHandle& node, unsigned pos);
void nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); void nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& en, Uint32* scanList);
void nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent); void nodePopDownScans(NodeHandle& node, unsigned pos);
void nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i); void nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList);
void nodePushDownScans(NodeHandle& node, unsigned pos);
void nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList);
void nodePopUpScans(NodeHandle& node, unsigned pos);
void nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i);
// scans linked to node // scans linked to node
void addScanList(NodeHandle& node, unsigned pos, Uint32 scanList);
void removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList);
void moveScanList(NodeHandle& node, unsigned pos);
void linkScan(NodeHandle& node, ScanOpPtr scanPtr); void linkScan(NodeHandle& node, ScanOpPtr scanPtr);
void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr); void unlinkScan(NodeHandle& node, ScanOpPtr scanPtr);
bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr); bool islinkScan(NodeHandle& node, ScanOpPtr scanPtr);
...@@ -600,10 +606,21 @@ private: ...@@ -600,10 +606,21 @@ private:
/* /*
* DbtuxTree.cpp * DbtuxTree.cpp
*/ */
void treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent); // add entry
void treeRemove(Signal* signal, Frag& frag, TreePos treePos); void treeAdd(Frag& frag, TreePos treePos, TreeEnt ent);
void treeRotateSingle(Signal* signal, Frag& frag, NodeHandle& node, unsigned i); void treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent);
void treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i); void treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i);
void treeAddRebalance(Frag& frag, NodeHandle node, unsigned i);
// remove entry
void treeRemove(Frag& frag, TreePos treePos);
void treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos);
void treeRemoveSemi(Frag& frag, NodeHandle node, unsigned i);
void treeRemoveLeaf(Frag& frag, NodeHandle node);
void treeRemoveNode(Frag& frag, NodeHandle node);
void treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i);
// rotate
void treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i);
void treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i);
/* /*
* DbtuxScan.cpp * DbtuxScan.cpp
...@@ -615,9 +632,9 @@ private: ...@@ -615,9 +632,9 @@ private:
void execACCKEYCONF(Signal* signal); void execACCKEYCONF(Signal* signal);
void execACCKEYREF(Signal* signal); void execACCKEYREF(Signal* signal);
void execACC_ABORTCONF(Signal* signal); void execACC_ABORTCONF(Signal* signal);
void scanFirst(Signal* signal, ScanOpPtr scanPtr); void scanFirst(ScanOpPtr scanPtr);
void scanNext(Signal* signal, ScanOpPtr scanPtr); void scanNext(ScanOpPtr scanPtr);
bool scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent); bool scanVisible(ScanOpPtr scanPtr, TreeEnt ent);
void scanClose(Signal* signal, ScanOpPtr scanPtr); void scanClose(Signal* signal, ScanOpPtr scanPtr);
void addAccLockOp(ScanOp& scan, Uint32 accLockOp); void addAccLockOp(ScanOp& scan, Uint32 accLockOp);
void removeAccLockOp(ScanOp& scan, Uint32 accLockOp); void removeAccLockOp(ScanOp& scan, Uint32 accLockOp);
...@@ -626,9 +643,9 @@ private: ...@@ -626,9 +643,9 @@ private:
/* /*
* DbtuxSearch.cpp * DbtuxSearch.cpp
*/ */
void searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); void searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos); void searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos);
void searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos); void searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos);
/* /*
* DbtuxCmp.cpp * DbtuxCmp.cpp
...@@ -652,7 +669,7 @@ private: ...@@ -652,7 +669,7 @@ private:
PrintPar(); PrintPar();
}; };
void printTree(Signal* signal, Frag& frag, NdbOut& out); void printTree(Signal* signal, Frag& frag, NdbOut& out);
void printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par); void printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par);
friend class NdbOut& operator<<(NdbOut&, const TupLoc&); friend class NdbOut& operator<<(NdbOut&, const TupLoc&);
friend class NdbOut& operator<<(NdbOut&, const TreeEnt&); friend class NdbOut& operator<<(NdbOut&, const TreeEnt&);
friend class NdbOut& operator<<(NdbOut&, const TreeNode&); friend class NdbOut& operator<<(NdbOut&, const TreeNode&);
......
...@@ -98,7 +98,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) ...@@ -98,7 +98,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
strcpy(par.m_path, "."); strcpy(par.m_path, ".");
par.m_side = 2; par.m_side = 2;
par.m_parent = NullTupLoc; par.m_parent = NullTupLoc;
printNode(signal, frag, out, tree.m_root, par); printNode(frag, out, tree.m_root, par);
out.m_out->flush(); out.m_out->flush();
if (! par.m_ok) { if (! par.m_ok) {
if (debugFile == 0) { if (debugFile == 0) {
...@@ -114,7 +114,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out) ...@@ -114,7 +114,7 @@ Dbtux::printTree(Signal* signal, Frag& frag, NdbOut& out)
} }
void void
Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par) Dbtux::printNode(Frag& frag, NdbOut& out, TupLoc loc, PrintPar& par)
{ {
if (loc == NullTupLoc) { if (loc == NullTupLoc) {
par.m_depth = 0; par.m_depth = 0;
...@@ -122,7 +122,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -122,7 +122,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
} }
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
NodeHandle node(frag); NodeHandle node(frag);
selectNode(signal, node, loc); selectNode(node, loc);
out << par.m_path << " " << node << endl; out << par.m_path << " " << node << endl;
// check children // check children
PrintPar cpar[2]; PrintPar cpar[2];
...@@ -132,7 +132,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -132,7 +132,7 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
cpar[i].m_side = i; cpar[i].m_side = i;
cpar[i].m_depth = 0; cpar[i].m_depth = 0;
cpar[i].m_parent = loc; cpar[i].m_parent = loc;
printNode(signal, frag, out, node.getLink(i), cpar[i]); printNode(frag, out, node.getLink(i), cpar[i]);
if (! cpar[i].m_ok) { if (! cpar[i].m_ok) {
par.m_ok = false; par.m_ok = false;
} }
...@@ -178,16 +178,19 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar& ...@@ -178,16 +178,19 @@ Dbtux::printNode(Signal* signal, Frag& frag, NdbOut& out, TupLoc loc, PrintPar&
out << "occupancy " << node.getOccup() << " of interior node"; out << "occupancy " << node.getOccup() << " of interior node";
out << " less than min " << tree.m_minOccup << endl; out << " less than min " << tree.m_minOccup << endl;
} }
// check missed half-leaf/leaf merge #ifdef dbtux_totally_groks_t_trees
// check missed semi-leaf/leaf merge
for (unsigned i = 0; i <= 1; i++) { for (unsigned i = 0; i <= 1; i++) {
if (node.getLink(i) != NullTupLoc && if (node.getLink(i) != NullTupLoc &&
node.getLink(1 - i) == NullTupLoc && node.getLink(1 - i) == NullTupLoc &&
node.getOccup() + cpar[i].m_occup <= tree.m_maxOccup) { // our semi-leaf seems to satify interior minOccup condition
node.getOccup() < tree.m_minOccup) {
par.m_ok = false; par.m_ok = false;
out << par.m_path << sep; out << par.m_path << sep;
out << "missed merge with child " << i << endl; out << "missed merge with child " << i << endl;
} }
} }
#endif
// check inline prefix // check inline prefix
{ ConstData data1 = node.getPref(); { ConstData data1 = node.getPref();
Uint32 data2[MaxPrefSize]; Uint32 data2[MaxPrefSize];
......
...@@ -117,7 +117,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -117,7 +117,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
switch (opCode) { switch (opCode) {
case TuxMaintReq::OpAdd: case TuxMaintReq::OpAdd:
jam(); jam();
searchToAdd(signal, frag, c_searchKey, ent, treePos); searchToAdd(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << treePos << (treePos.m_match ? " - error" : "") << endl; debugOut << treePos << (treePos.m_match ? " - error" : "") << endl;
...@@ -133,8 +133,8 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -133,8 +133,8 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
break; break;
} }
/* /*
* At most one new node is inserted in the operation. We keep one * At most one new node is inserted in the operation. Pre-allocate
* free node pre-allocated so the operation cannot fail. * it so that the operation cannot fail.
*/ */
if (frag.m_freeLoc == NullTupLoc) { if (frag.m_freeLoc == NullTupLoc) {
jam(); jam();
...@@ -144,14 +144,16 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -144,14 +144,16 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
jam(); jam();
break; break;
} }
// link to freelist
node.setLink(0, frag.m_freeLoc);
frag.m_freeLoc = node.m_loc; frag.m_freeLoc = node.m_loc;
ndbrequire(frag.m_freeLoc != NullTupLoc); ndbrequire(frag.m_freeLoc != NullTupLoc);
} }
treeAdd(signal, frag, treePos, ent); treeAdd(frag, treePos, ent);
break; break;
case TuxMaintReq::OpRemove: case TuxMaintReq::OpRemove:
jam(); jam();
searchToRemove(signal, frag, c_searchKey, ent, treePos); searchToRemove(frag, c_searchKey, ent, treePos);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugMaint) { if (debugFlags & DebugMaint) {
debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl; debugOut << treePos << (! treePos.m_match ? " - error" : "") << endl;
...@@ -166,7 +168,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal) ...@@ -166,7 +168,7 @@ Dbtux::execTUX_MAINT_REQ(Signal* signal)
} }
break; break;
} }
treeRemove(signal, frag, treePos); treeRemove(frag, treePos);
break; break;
default: default:
ndbrequire(false); ndbrequire(false);
......
...@@ -211,11 +211,7 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal) ...@@ -211,11 +211,7 @@ Dbtux::execTUX_ADD_ATTRREQ(Signal* signal)
// make these configurable later // make these configurable later
tree.m_nodeSize = MAX_TTREE_NODE_SIZE; tree.m_nodeSize = MAX_TTREE_NODE_SIZE;
tree.m_prefSize = MAX_TTREE_PREF_SIZE; tree.m_prefSize = MAX_TTREE_PREF_SIZE;
#ifdef dbtux_min_occup_less_max_occup
const unsigned maxSlack = MAX_TTREE_NODE_SLACK; const unsigned maxSlack = MAX_TTREE_NODE_SLACK;
#else
const unsigned maxSlack = 0;
#endif
// size up to and including first 2 entries // size up to and including first 2 entries
const unsigned pref = tree.getSize(AccPref); const unsigned pref = tree.getSize(AccPref);
if (! (pref <= tree.m_nodeSize)) { if (! (pref <= tree.m_nodeSize)) {
......
...@@ -42,7 +42,7 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node) ...@@ -42,7 +42,7 @@ Dbtux::allocNode(Signal* signal, NodeHandle& node)
* Set handle to point to existing node. * Set handle to point to existing node.
*/ */
void void
Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc) Dbtux::selectNode(NodeHandle& node, TupLoc loc)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
ndbrequire(loc != NullTupLoc); ndbrequire(loc != NullTupLoc);
...@@ -57,15 +57,15 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc) ...@@ -57,15 +57,15 @@ Dbtux::selectNode(Signal* signal, NodeHandle& node, TupLoc loc)
} }
/* /*
* Set handle to point to new node. Uses the pre-allocated node. * Set handle to point to new node. Uses a pre-allocated node.
*/ */
void void
Dbtux::insertNode(Signal* signal, NodeHandle& node) Dbtux::insertNode(NodeHandle& node)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TupLoc loc = frag.m_freeLoc; // unlink from freelist
frag.m_freeLoc = NullTupLoc; selectNode(node, frag.m_freeLoc);
selectNode(signal, node, loc); frag.m_freeLoc = node.getLink(0);
new (node.m_node) TreeNode(); new (node.m_node) TreeNode();
#ifdef VM_TRACE #ifdef VM_TRACE
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
...@@ -76,20 +76,17 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node) ...@@ -76,20 +76,17 @@ Dbtux::insertNode(Signal* signal, NodeHandle& node)
} }
/* /*
* Delete existing node. * Delete existing node. Simply put it on the freelist.
*/ */
void void
Dbtux::deleteNode(Signal* signal, NodeHandle& node) Dbtux::deleteNode(NodeHandle& node)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
ndbrequire(node.getOccup() == 0); ndbrequire(node.getOccup() == 0);
TupLoc loc = node.m_loc; // link to freelist
Uint32 pageId = loc.getPageId(); node.setLink(0, frag.m_freeLoc);
Uint32 pageOffset = loc.getPageOffset(); frag.m_freeLoc = node.m_loc;
Uint32* node32 = reinterpret_cast<Uint32*>(node.m_node); // invalidate the handle
c_tup->tuxFreeNode(signal, frag.m_tupIndexFragPtrI, pageId, pageOffset, node32);
jamEntry();
// invalidate handle and storage
node.m_loc = NullTupLoc; node.m_loc = NullTupLoc;
node.m_node = 0; node.m_node = 0;
} }
...@@ -99,7 +96,7 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node) ...@@ -99,7 +96,7 @@ Dbtux::deleteNode(Signal* signal, NodeHandle& node)
* attribute headers for now. XXX use null mask instead * attribute headers for now. XXX use null mask instead
*/ */
void void
Dbtux::setNodePref(Signal* signal, NodeHandle& node) Dbtux::setNodePref(NodeHandle& node)
{ {
const Frag& frag = node.m_frag; const Frag& frag = node.m_frag;
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
...@@ -117,18 +114,45 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node) ...@@ -117,18 +114,45 @@ Dbtux::setNodePref(Signal* signal, NodeHandle& node)
* v * v
* A B C D E _ _ => A B C X D E _ * A B C D E _ _ => A B C X D E _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6 * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Add list of scans at the new entry.
*/ */
void void
Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ent) Dbtux::nodePushUp(NodeHandle& node, unsigned pos, const TreeEnt& ent, Uint32 scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup < tree.m_maxOccup && pos <= occup); ndbrequire(occup < tree.m_maxOccup && pos <= occup);
// fix scans // fix old scans
if (node.getNodeScan() != RNIL)
nodePushUpScans(node, pos);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
for (unsigned i = occup; i > pos; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
tmpList[pos] = ent;
entList[0] = entList[occup + 1];
node.setOccup(occup + 1);
// add new scans
if (scanList != RNIL)
addScanList(node, pos, scanList);
// fix prefix
if (occup == 0 || pos == 0)
setNodePref(node);
}
void
Dbtux::nodePushUpScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr; ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan(); scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) { do {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
...@@ -144,21 +168,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ...@@ -144,21 +168,7 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
scanPos.m_pos++; scanPos.m_pos++;
} }
scanPtr.i = scanPtr.p->m_nodeScan; scanPtr.i = scanPtr.p->m_nodeScan;
} } while (scanPtr.i != RNIL);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
for (unsigned i = occup; i > pos; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
tmpList[pos] = ent;
entList[0] = entList[occup + 1];
node.setOccup(occup + 1);
// fix prefix
if (occup == 0 || pos == 0)
setNodePref(signal, node);
} }
/* /*
...@@ -169,42 +179,55 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt& ...@@ -169,42 +179,55 @@ Dbtux::nodePushUp(Signal* signal, NodeHandle& node, unsigned pos, const TreeEnt&
* ^ ^ * ^ ^
* A B C D E F _ => A B C E F _ _ * A B C D E F _ => A B C E F _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6 * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Scans at removed entry are returned if non-zero location is passed or
* else moved forward.
*/ */
void void
Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) Dbtux::nodePopDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32* scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup); ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr; if (node.getNodeScan() != RNIL) {
// move scans whose entry disappears // remove or move scans at this position
scanPtr.i = node.getNodeScan(); if (scanList == 0)
while (scanPtr.i != RNIL) { moveScanList(node, pos);
else
removeScanList(node, pos, *scanList);
// fix other scans
if (node.getNodeScan() != RNIL)
nodePopDownScans(node, pos);
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
ent = tmpList[pos];
for (unsigned i = pos; i < occup - 1; i++) {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); tmpList[i] = tmpList[i + 1];
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At popDown pos=" << pos << " " << node << endl;
}
#endif
scanNext(signal, scanPtr);
}
scanPtr.i = nextPtrI;
} }
// fix other scans entList[0] = entList[occup - 1];
node.setOccup(occup - 1);
// fix prefix
if (occup != 1 && pos == 0)
setNodePref(node);
}
void
Dbtux::nodePopDownScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan(); scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) { do {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
// handled before
ndbrequire(scanPos.m_pos != pos); ndbrequire(scanPos.m_pos != pos);
if (scanPos.m_pos > pos) { if (scanPos.m_pos > pos) {
jam(); jam();
...@@ -217,21 +240,7 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) ...@@ -217,21 +240,7 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos.m_pos--; scanPos.m_pos--;
} }
scanPtr.i = scanPtr.p->m_nodeScan; scanPtr.i = scanPtr.p->m_nodeScan;
} } while (scanPtr.i != RNIL);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
ent = tmpList[pos];
for (unsigned i = pos; i < occup - 1; i++) {
jam();
tmpList[i] = tmpList[i + 1];
}
entList[0] = entList[occup - 1];
node.setOccup(occup - 1);
// fix prefix
if (occup != 1 && pos == 0)
setNodePref(signal, node);
} }
/* /*
...@@ -242,43 +251,52 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) ...@@ -242,43 +251,52 @@ Dbtux::nodePopDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
* ^ v ^ * ^ v ^
* A B C D E _ _ => B C D X E _ _ * A B C D E _ _ => B C D X E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6 * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Return list of scans at the removed position 0.
*/ */
void void
Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) Dbtux::nodePushDown(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32& scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup); ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr; if (node.getNodeScan() != RNIL) {
// move scans whose entry disappears // remove scans at 0
scanPtr.i = node.getNodeScan(); removeScanList(node, 0, scanList);
while (scanPtr.i != RNIL) { // fix other scans
if (node.getNodeScan() != RNIL)
nodePushDownScans(node, pos);
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt oldMin = tmpList[0];
for (unsigned i = 0; i < pos; i++) {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); tmpList[i] = tmpList[i + 1];
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == 0) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At pushDown pos=" << pos << " " << node << endl;
}
#endif
// here we may miss a valid entry "X" XXX known bug
scanNext(signal, scanPtr);
}
scanPtr.i = nextPtrI;
} }
// fix other scans tmpList[pos] = ent;
ent = oldMin;
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(node);
}
void
Dbtux::nodePushDownScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan(); scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) { do {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup); ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
// handled before
ndbrequire(scanPos.m_pos != 0); ndbrequire(scanPos.m_pos != 0);
if (scanPos.m_pos <= pos) { if (scanPos.m_pos <= pos) {
jam(); jam();
...@@ -291,22 +309,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent ...@@ -291,22 +309,7 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
scanPos.m_pos--; scanPos.m_pos--;
} }
scanPtr.i = scanPtr.p->m_nodeScan; scanPtr.i = scanPtr.p->m_nodeScan;
} } while (scanPtr.i != RNIL);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt oldMin = tmpList[0];
for (unsigned i = 0; i < pos; i++) {
jam();
tmpList[i] = tmpList[i + 1];
}
tmpList[pos] = ent;
ent = oldMin;
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(signal, node);
} }
/* /*
...@@ -318,39 +321,50 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent ...@@ -318,39 +321,50 @@ Dbtux::nodePushDown(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent
* v ^ ^ * v ^ ^
* A B C D E _ _ => X A B C E _ _ * A B C D E _ _ => X A B C E _ _
* 0 1 2 3 4 5 6 0 1 2 3 4 5 6 * 0 1 2 3 4 5 6 0 1 2 3 4 5 6
*
* Move scans at removed entry and add scans at the new entry.
*/ */
void void
Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) Dbtux::nodePopUp(NodeHandle& node, unsigned pos, TreeEnt& ent, Uint32 scanList)
{ {
Frag& frag = node.m_frag; Frag& frag = node.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
const unsigned occup = node.getOccup(); const unsigned occup = node.getOccup();
ndbrequire(occup <= tree.m_maxOccup && pos < occup); ndbrequire(occup <= tree.m_maxOccup && pos < occup);
ScanOpPtr scanPtr; if (node.getNodeScan() != RNIL) {
// move scans whose entry disappears // move scans whose entry disappears
scanPtr.i = node.getNodeScan(); moveScanList(node, pos);
while (scanPtr.i != RNIL) { // fix other scans
if (node.getNodeScan() != RNIL)
nodePopUpScans(node, pos);
}
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt newMin = ent;
ent = tmpList[pos];
for (unsigned i = pos; i > 0; i--) {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); tmpList[i] = tmpList[i - 1];
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc && scanPos.m_pos < occup);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At popUp pos=" << pos << " " << node << endl;
}
#endif
// here we may miss a valid entry "X" XXX known bug
scanNext(signal, scanPtr);
}
scanPtr.i = nextPtrI;
} }
// fix other scans tmpList[0] = newMin;
entList[0] = entList[occup];
// add scans
if (scanList != RNIL)
addScanList(node, 0, scanList);
// fix prefix
if (true)
setNodePref(node);
}
void
Dbtux::nodePopUpScans(NodeHandle& node, unsigned pos)
{
const unsigned occup = node.getOccup();
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan(); scanPtr.i = node.getNodeScan();
while (scanPtr.i != RNIL) { do {
jam(); jam();
c_scanOpPool.getPtr(scanPtr); c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos; TreePos& scanPos = scanPtr.p->m_scanPos;
...@@ -367,41 +381,123 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent) ...@@ -367,41 +381,123 @@ Dbtux::nodePopUp(Signal* signal, NodeHandle& node, unsigned pos, TreeEnt& ent)
scanPos.m_pos++; scanPos.m_pos++;
} }
scanPtr.i = scanPtr.p->m_nodeScan; scanPtr.i = scanPtr.p->m_nodeScan;
} } while (scanPtr.i != RNIL);
// fix node
TreeEnt* const entList = tree.getEntList(node.m_node);
entList[occup] = entList[0];
TreeEnt* const tmpList = entList + 1;
TreeEnt newMin = ent;
ent = tmpList[pos];
for (unsigned i = pos; i > 0; i--) {
jam();
tmpList[i] = tmpList[i - 1];
}
tmpList[0] = newMin;
entList[0] = entList[occup];
// fix prefix
if (true)
setNodePref(signal, node);
} }
/* /*
* Move all possible entries from another node before the min (i=0) or * Move number of entries from another node to this node before the min
* after the max (i=1). XXX can be optimized * (i=0) or after the max (i=1). Expensive but not often used.
*/ */
void void
Dbtux::nodeSlide(Signal* signal, NodeHandle& dstNode, NodeHandle& srcNode, unsigned i) Dbtux::nodeSlide(NodeHandle& dstNode, NodeHandle& srcNode, unsigned cnt, unsigned i)
{ {
Frag& frag = dstNode.m_frag; Frag& frag = dstNode.m_frag;
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
ndbrequire(i <= 1); ndbrequire(i <= 1);
while (dstNode.getOccup() < tree.m_maxOccup && srcNode.getOccup() != 0) { while (cnt != 0) {
TreeEnt ent; TreeEnt ent;
nodePopDown(signal, srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent); Uint32 scanList = RNIL;
nodePushUp(signal, dstNode, i == 0 ? 0 : dstNode.getOccup(), ent); nodePopDown(srcNode, i == 0 ? srcNode.getOccup() - 1 : 0, ent, &scanList);
nodePushUp(dstNode, i == 0 ? 0 : dstNode.getOccup(), ent, scanList);
cnt--;
} }
} }
// scans linked to node
/*
* Add list of scans to node at given position.
*/
void
Dbtux::addScanList(NodeHandle& node, unsigned pos, Uint32 scanList)
{
ScanOpPtr scanPtr;
scanPtr.i = scanList;
do {
jam();
c_scanOpPool.getPtr(scanPtr);
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Add scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "To pos=" << pos << " " << node << endl;
}
#endif
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
scanPtr.p->m_nodeScan = RNIL;
linkScan(node, scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
// set position but leave direction alone
scanPos.m_loc = node.m_loc;
scanPos.m_pos = pos;
scanPtr.i = nextPtrI;
} while (scanPtr.i != RNIL);
}
/*
* Remove list of scans from node at given position. The return
* location must point to existing list (in fact RNIL always).
*/
void
Dbtux::removeScanList(NodeHandle& node, unsigned pos, Uint32& scanList)
{
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
do {
jam();
c_scanOpPool.getPtr(scanPtr);
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
TreePos& scanPos = scanPtr.p->m_scanPos;
ndbrequire(scanPos.m_loc == node.m_loc);
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Remove scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "Fron pos=" << pos << " " << node << endl;
}
#endif
unlinkScan(node, scanPtr);
scanPtr.p->m_nodeScan = scanList;
scanList = scanPtr.i;
// unset position but leave direction alone
scanPos.m_loc = NullTupLoc;
scanPos.m_pos = ZNIL;
}
scanPtr.i = nextPtrI;
} while (scanPtr.i != RNIL);
}
/*
* Move list of scans away from entry about to be removed. Uses scan
* method scanNext().
*/
void
Dbtux::moveScanList(NodeHandle& node, unsigned pos)
{
ScanOpPtr scanPtr;
scanPtr.i = node.getNodeScan();
do {
jam();
c_scanOpPool.getPtr(scanPtr);
TreePos& scanPos = scanPtr.p->m_scanPos;
const Uint32 nextPtrI = scanPtr.p->m_nodeScan;
ndbrequire(scanPos.m_loc == node.m_loc);
if (scanPos.m_pos == pos) {
jam();
#ifdef VM_TRACE
if (debugFlags & DebugScan) {
debugOut << "Move scan " << scanPtr.i << " " << *scanPtr.p << endl;
debugOut << "At pos=" << pos << " " << node << endl;
}
#endif
scanNext(scanPtr);
ndbrequire(! (scanPos.m_loc == node.m_loc && scanPos.m_pos == pos));
}
scanPtr.i = nextPtrI;
} while (scanPtr.i != RNIL);
}
/* /*
* Link scan to the list under the node. The list is single-linked and * Link scan to the list under the node. The list is single-linked and
* ordering does not matter. * ordering does not matter.
......
...@@ -275,7 +275,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal) ...@@ -275,7 +275,7 @@ Dbtux::execNEXT_SCANREQ(Signal* signal)
jam(); jam();
const TupLoc loc = scan.m_scanPos.m_loc; const TupLoc loc = scan.m_scanPos.m_loc;
NodeHandle node(frag); NodeHandle node(frag);
selectNode(signal, node, loc); selectNode(node, loc);
unlinkScan(node, scanPtr); unlinkScan(node, scanPtr);
scan.m_scanPos.m_loc = NullTupLoc; scan.m_scanPos.m_loc = NullTupLoc;
} }
...@@ -364,7 +364,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -364,7 +364,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::First) { if (scan.m_state == ScanOp::First) {
jam(); jam();
// search is done only once in single range scan // search is done only once in single range scan
scanFirst(signal, scanPtr); scanFirst(scanPtr);
#ifdef VM_TRACE #ifdef VM_TRACE
if (debugFlags & DebugScan) { if (debugFlags & DebugScan) {
debugOut << "First scan " << scanPtr.i << " " << scan << endl; debugOut << "First scan " << scanPtr.i << " " << scan << endl;
...@@ -374,7 +374,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal) ...@@ -374,7 +374,7 @@ Dbtux::execACC_CHECK_SCAN(Signal* signal)
if (scan.m_state == ScanOp::Next) { if (scan.m_state == ScanOp::Next) {
jam(); jam();
// look for next // look for next
scanNext(signal, scanPtr); scanNext(scanPtr);
} }
// for reading tuple key in Current or Locked state // for reading tuple key in Current or Locked state
Data pkData = c_dataBuffer; Data pkData = c_dataBuffer;
...@@ -680,7 +680,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal) ...@@ -680,7 +680,7 @@ Dbtux::execACC_ABORTCONF(Signal* signal)
* by scanNext. * by scanNext.
*/ */
void void
Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) Dbtux::scanFirst(ScanOpPtr scanPtr)
{ {
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
...@@ -698,7 +698,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -698,7 +698,7 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
} }
// search for scan start position // search for scan start position
TreePos treePos; TreePos treePos;
searchToScan(signal, frag, c_dataBuffer, scan.m_boundCnt[0], treePos); searchToScan(frag, c_dataBuffer, scan.m_boundCnt[0], treePos);
if (treePos.m_loc == NullTupLoc) { if (treePos.m_loc == NullTupLoc) {
// empty tree // empty tree
jam(); jam();
...@@ -710,13 +710,13 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -710,13 +710,13 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
scan.m_state = ScanOp::Next; scan.m_state = ScanOp::Next;
// link the scan to node found // link the scan to node found
NodeHandle node(frag); NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc); selectNode(node, treePos.m_loc);
linkScan(node, scanPtr); linkScan(node, scanPtr);
} }
/* /*
* Move to next entry. The scan is already linked to some node. When * Move to next entry. The scan is already linked to some node. When
* we leave, if any entry was found, it will be linked to a possibly * we leave, if an entry was found, it will be linked to a possibly
* different node. The scan has a position, and a direction which tells * different node. The scan has a position, and a direction which tells
* from where we came to this position. This is one of: * from where we came to this position. This is one of:
* *
...@@ -725,9 +725,12 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr) ...@@ -725,9 +725,12 @@ Dbtux::scanFirst(Signal* signal, ScanOpPtr scanPtr)
* 2 - up from root (the scan ends) * 2 - up from root (the scan ends)
* 3 - left to right within node (at end proceed to right child) * 3 - left to right within node (at end proceed to right child)
* 4 - down from parent (proceed to left child) * 4 - down from parent (proceed to left child)
*
* If an entry was found, scan direction is 3. Therefore tree
* re-organizations need not worry about scan direction.
*/ */
void void
Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) Dbtux::scanNext(ScanOpPtr scanPtr)
{ {
ScanOp& scan = *scanPtr.p; ScanOp& scan = *scanPtr.p;
Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
...@@ -736,22 +739,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -736,22 +739,8 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
debugOut << "Next in scan " << scanPtr.i << " " << scan << endl; debugOut << "Next in scan " << scanPtr.i << " " << scan << endl;
} }
#endif #endif
if (scan.m_state == ScanOp::Locked) { // cannot be moved away from tuple we have locked
jam(); ndbrequire(scan.m_state != ScanOp::Locked);
// version of a tuple locked by us cannot disappear (assert only)
#ifdef dbtux_wl_1942_is_done
ndbassert(false);
#endif
AccLockReq* const lockReq = (AccLockReq*)signal->getDataPtrSend();
lockReq->returnCode = RNIL;
lockReq->requestInfo = AccLockReq::Unlock;
lockReq->accOpPtr = scan.m_accLockOp;
EXECUTE_DIRECT(DBACC, GSN_ACC_LOCKREQ, signal, AccLockReq::UndoSignalLength);
jamEntry();
ndbrequire(lockReq->returnCode == AccLockReq::Success);
scan.m_accLockOp = RNIL;
scan.m_state = ScanOp::Current;
}
// set up index keys for this operation // set up index keys for this operation
setKeyAttrs(frag); setKeyAttrs(frag);
// unpack upper bound into c_dataBuffer // unpack upper bound into c_dataBuffer
...@@ -767,7 +756,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -767,7 +756,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
TreePos pos = scan.m_scanPos; TreePos pos = scan.m_scanPos;
// get and remember original node // get and remember original node
NodeHandle origNode(frag); NodeHandle origNode(frag);
selectNode(signal, origNode, pos.m_loc); selectNode(origNode, pos.m_loc);
ndbrequire(islinkScan(origNode, scanPtr)); ndbrequire(islinkScan(origNode, scanPtr));
// current node in loop // current node in loop
NodeHandle node = origNode; NodeHandle node = origNode;
...@@ -784,7 +773,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -784,7 +773,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
} }
if (node.m_loc != pos.m_loc) { if (node.m_loc != pos.m_loc) {
jam(); jam();
selectNode(signal, node, pos.m_loc); selectNode(node, pos.m_loc);
} }
if (pos.m_dir == 4) { if (pos.m_dir == 4) {
// coming down from parent proceed to left child // coming down from parent proceed to left child
...@@ -832,7 +821,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -832,7 +821,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
break; break;
} }
// can we see it // can we see it
if (! scanVisible(signal, scanPtr, ent)) { if (! scanVisible(scanPtr, ent)) {
jam(); jam();
continue; continue;
} }
...@@ -864,6 +853,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -864,6 +853,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
scan.m_scanPos = pos; scan.m_scanPos = pos;
// relink // relink
if (scan.m_state == ScanOp::Current) { if (scan.m_state == ScanOp::Current) {
ndbrequire(pos.m_match == true && pos.m_dir == 3);
ndbrequire(pos.m_loc == node.m_loc); ndbrequire(pos.m_loc == node.m_loc);
if (origNode.m_loc != node.m_loc) { if (origNode.m_loc != node.m_loc) {
jam(); jam();
...@@ -894,7 +884,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr) ...@@ -894,7 +884,7 @@ Dbtux::scanNext(Signal* signal, ScanOpPtr scanPtr)
* which are not analyzed or handled yet. * which are not analyzed or handled yet.
*/ */
bool bool
Dbtux::scanVisible(Signal* signal, ScanOpPtr scanPtr, TreeEnt ent) Dbtux::scanVisible(ScanOpPtr scanPtr, TreeEnt ent)
{ {
const ScanOp& scan = *scanPtr.p; const ScanOp& scan = *scanPtr.p;
const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI); const Frag& frag = *c_fragPool.getPtr(scan.m_fragPtrI);
......
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
* TODO optimize for initial equal attrs in node min/max * TODO optimize for initial equal attrs in node min/max
*/ */
void void
Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) Dbtux::searchToAdd(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
...@@ -46,7 +46,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear ...@@ -46,7 +46,7 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
NodeHandle bottomNode(frag); NodeHandle bottomNode(frag);
while (true) { while (true) {
jam(); jam();
selectNode(signal, currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
int ret; int ret;
// compare prefix // compare prefix
unsigned start = 0; unsigned start = 0;
...@@ -159,12 +159,12 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear ...@@ -159,12 +159,12 @@ Dbtux::searchToAdd(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt sear
* *
* Compares search key to each node min. A move to right subtree can * Compares search key to each node min. A move to right subtree can
* overshoot target node. The last such node is saved. The final node * overshoot target node. The last such node is saved. The final node
* is a half-leaf or leaf. If search key is less than final node min * is a semi-leaf or leaf. If search key is less than final node min
* then the saved node is the g.l.b of the final node and we move back * then the saved node is the g.l.b of the final node and we move back
* to it. * to it.
*/ */
void void
Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos) Dbtux::searchToRemove(Frag& frag, ConstData searchKey, TreeEnt searchEnt, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
const unsigned numAttrs = frag.m_numAttrs; const unsigned numAttrs = frag.m_numAttrs;
...@@ -182,7 +182,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s ...@@ -182,7 +182,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
NodeHandle glbNode(frag); // potential g.l.b of final node NodeHandle glbNode(frag); // potential g.l.b of final node
while (true) { while (true) {
jam(); jam();
selectNode(signal, currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
int ret; int ret;
// compare prefix // compare prefix
unsigned start = 0; unsigned start = 0;
...@@ -256,7 +256,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s ...@@ -256,7 +256,7 @@ Dbtux::searchToRemove(Signal* signal, Frag& frag, ConstData searchKey, TreeEnt s
* Similar to searchToAdd. * Similar to searchToAdd.
*/ */
void void
Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos) Dbtux::searchToScan(Frag& frag, ConstData boundInfo, unsigned boundCount, TreePos& treePos)
{ {
const TreeHead& tree = frag.m_tree; const TreeHead& tree = frag.m_tree;
NodeHandle currNode(frag); NodeHandle currNode(frag);
...@@ -271,7 +271,7 @@ Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned bo ...@@ -271,7 +271,7 @@ Dbtux::searchToScan(Signal* signal, Frag& frag, ConstData boundInfo, unsigned bo
NodeHandle bottomNode(frag); NodeHandle bottomNode(frag);
while (true) { while (true) {
jam(); jam();
selectNode(signal, currNode, currNode.m_loc); selectNode(currNode, currNode.m_loc);
int ret; int ret;
// compare prefix // compare prefix
ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize); ret = cmpScanBound(frag, 0, boundInfo, boundCount, currNode.getPref(), tree.m_prefSize);
......
...@@ -18,71 +18,105 @@ ...@@ -18,71 +18,105 @@
#include "Dbtux.hpp" #include "Dbtux.hpp"
/* /*
* Add entry. * Add entry. Handle the case when there is room for one more. This
* is the common case given slack in nodes.
*/ */
void void
Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) Dbtux::treeAdd(Frag& frag, TreePos treePos, TreeEnt ent)
{ {
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos;
NodeHandle node(frag); NodeHandle node(frag);
// check for empty tree if (treePos.m_loc != NullTupLoc) {
if (treePos.m_loc == NullTupLoc) { // non-empty tree
jam();
insertNode(signal, node);
nodePushUp(signal, node, 0, ent);
node.setSide(2);
tree.m_root = node.m_loc;
return;
}
selectNode(signal, node, treePos.m_loc);
// check if it is bounding node
if (pos != 0 && pos != node.getOccup()) {
jam(); jam();
// check if room for one more selectNode(node, treePos.m_loc);
unsigned pos = treePos.m_pos;
if (node.getOccup() < tree.m_maxOccup) { if (node.getOccup() < tree.m_maxOccup) {
// node has room
jam(); jam();
nodePushUp(signal, node, pos, ent); nodePushUp(node, pos, ent, RNIL);
return; return;
} }
// returns min entry treeAddFull(frag, node, pos, ent);
nodePushDown(signal, node, pos - 1, ent); return;
// find position to add the removed min entry }
TupLoc childLoc = node.getLink(0); jam();
if (childLoc == NullTupLoc) { insertNode(node);
nodePushUp(node, 0, ent, RNIL);
node.setSide(2);
tree.m_root = node.m_loc;
}
/*
* Add entry when node is full. Handle the case when there is g.l.b
* node in left subtree with room for one more. It will receive the min
* entry of this node. The min entry could be the entry to add.
*/
void
Dbtux::treeAddFull(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent)
{
TreeHead& tree = frag.m_tree;
TupLoc loc = lubNode.getLink(0);
if (loc != NullTupLoc) {
// find g.l.b node
NodeHandle glbNode(frag);
do {
jam(); jam();
// left child will be added selectNode(glbNode, loc);
pos = 0; loc = glbNode.getLink(1);
} else { } while (loc != NullTupLoc);
if (glbNode.getOccup() < tree.m_maxOccup) {
// g.l.b node has room
jam(); jam();
// find glb node Uint32 scanList = RNIL;
while (childLoc != NullTupLoc) { if (pos != 0) {
jam(); jam();
selectNode(signal, node, childLoc); // add the new entry and return min entry
childLoc = node.getLink(1); nodePushDown(lubNode, pos - 1, ent, scanList);
} }
pos = node.getOccup(); // g.l.b node receives min entry from l.u.b node
nodePushUp(glbNode, glbNode.getOccup(), ent, scanList);
return;
} }
// fall thru to next case treeAddNode(frag, lubNode, pos, ent, glbNode, 1);
}
// adding new min or max
unsigned i = (pos == 0 ? 0 : 1);
ndbrequire(node.getLink(i) == NullTupLoc);
// check if the half-leaf/leaf has room for one more
if (node.getOccup() < tree.m_maxOccup) {
jam();
nodePushUp(signal, node, pos, ent);
return; return;
} }
// add a new node treeAddNode(frag, lubNode, pos, ent, lubNode, 0);
NodeHandle childNode(frag); }
insertNode(signal, childNode);
nodePushUp(signal, childNode, 0, ent); /*
* Add entry when there is no g.l.b node in left subtree or the g.l.b
* node is full. We must add a new left or right child node which
* becomes the new g.l.b node.
*/
void
Dbtux::treeAddNode(Frag& frag, NodeHandle lubNode, unsigned pos, TreeEnt ent, NodeHandle parentNode, unsigned i)
{
NodeHandle glbNode(frag);
insertNode(glbNode);
// connect parent and child // connect parent and child
node.setLink(i, childNode.m_loc); parentNode.setLink(i, glbNode.m_loc);
childNode.setLink(2, node.m_loc); glbNode.setLink(2, parentNode.m_loc);
childNode.setSide(i); glbNode.setSide(i);
// re-balance tree at each node Uint32 scanList = RNIL;
if (pos != 0) {
jam();
// add the new entry and return min entry
nodePushDown(lubNode, pos - 1, ent, scanList);
}
// g.l.b node receives min entry from l.u.b node
nodePushUp(glbNode, 0, ent, scanList);
// re-balance the tree
treeAddRebalance(frag, parentNode, i);
}
/*
* Re-balance tree after adding a node. The process starts with the
* parent of the added node.
*/
void
Dbtux::treeAddRebalance(Frag& frag, NodeHandle node, unsigned i)
{
while (true) { while (true) {
// height of subtree i has increased by 1 // height of subtree i has increased by 1
int j = (i == 0 ? -1 : +1); int j = (i == 0 ? -1 : +1);
...@@ -102,14 +136,14 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) ...@@ -102,14 +136,14 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
// height of longer subtree increased // height of longer subtree increased
jam(); jam();
NodeHandle childNode(frag); NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(i)); selectNode(childNode, node.getLink(i));
int b2 = childNode.getBalance(); int b2 = childNode.getBalance();
if (b2 == b) { if (b2 == b) {
jam(); jam();
treeRotateSingle(signal, frag, node, i); treeRotateSingle(frag, node, i);
} else if (b2 == -b) { } else if (b2 == -b) {
jam(); jam();
treeRotateDouble(signal, frag, node, i); treeRotateDouble(frag, node, i);
} else { } else {
// height of subtree increased so it cannot be perfectly balanced // height of subtree increased so it cannot be perfectly balanced
ndbrequire(false); ndbrequire(false);
...@@ -126,115 +160,169 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent) ...@@ -126,115 +160,169 @@ Dbtux::treeAdd(Signal* signal, Frag& frag, TreePos treePos, TreeEnt ent)
break; break;
} }
i = node.getSide(); i = node.getSide();
selectNode(signal, node, parentLoc); selectNode(node, parentLoc);
} }
} }
/* /*
* Remove entry. * Remove entry. Optimize for nodes with slack. Handle the case when
* there is no underflow i.e. occupancy remains at least minOccup. For
* interior nodes this is a requirement. For others it means that we do
* not need to consider merge of semi-leaf and leaf.
*/ */
void void
Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) Dbtux::treeRemove(Frag& frag, TreePos treePos)
{ {
TreeHead& tree = frag.m_tree; TreeHead& tree = frag.m_tree;
unsigned pos = treePos.m_pos; unsigned pos = treePos.m_pos;
NodeHandle node(frag); NodeHandle node(frag);
selectNode(signal, node, treePos.m_loc); selectNode(node, treePos.m_loc);
TreeEnt ent; TreeEnt ent;
// check interior node first if (node.getOccup() > tree.m_minOccup) {
if (node.getChilds() == 2) { // no underflow in any node type
jam(); jam();
ndbrequire(node.getOccup() >= tree.m_minOccup); nodePopDown(node, pos, ent, 0);
// check if no underflow return;
if (node.getOccup() > tree.m_minOccup) {
jam();
nodePopDown(signal, node, pos, ent);
return;
}
// save current handle
NodeHandle parentNode = node;
// find glb node
TupLoc childLoc = node.getLink(0);
while (childLoc != NullTupLoc) {
jam();
selectNode(signal, node, childLoc);
childLoc = node.getLink(1);
}
// use glb max as new parent min
ent = node.getEnt(node.getOccup() - 1);
nodePopUp(signal, parentNode, pos, ent);
// set up to remove glb max
pos = node.getOccup() - 1;
// fall thru to next case
} }
// remove the element if (node.getChilds() == 2) {
nodePopDown(signal, node, pos, ent); // underflow in interior node
ndbrequire(node.getChilds() <= 1);
// handle half-leaf
unsigned i;
for (i = 0; i <= 1; i++) {
jam(); jam();
TupLoc childLoc = node.getLink(i); treeRemoveInner(frag, node, pos);
if (childLoc != NullTupLoc) { return;
// move to child
selectNode(signal, node, childLoc);
// balance of half-leaf parent requires child to be leaf
break;
}
} }
ndbrequire(node.getChilds() == 0); // remove entry in semi/leaf
// get parent if any nodePopDown(node, pos, ent, 0);
TupLoc parentLoc = node.getLink(2); if (node.getLink(0) != NullTupLoc) {
NodeHandle parentNode(frag);
i = node.getSide();
// move all that fits into parent
if (parentLoc != NullTupLoc) {
jam(); jam();
selectNode(signal, parentNode, node.getLink(2)); treeRemoveSemi(frag, node, 0);
nodeSlide(signal, parentNode, node, i); return;
// fall thru to next case
} }
// non-empty leaf if (node.getLink(1) != NullTupLoc) {
if (node.getOccup() >= 1) {
jam(); jam();
treeRemoveSemi(frag, node, 1);
return; return;
} }
// remove empty leaf treeRemoveLeaf(frag, node);
deleteNode(signal, node); }
if (parentLoc == NullTupLoc) {
/*
* Remove entry when interior node underflows. There is g.l.b node in
* left subtree to borrow an entry from. The max entry of the g.l.b
* node becomes the min entry of this node.
*/
void
Dbtux::treeRemoveInner(Frag& frag, NodeHandle lubNode, unsigned pos)
{
TreeHead& tree = frag.m_tree;
TreeEnt ent;
// find g.l.b node
NodeHandle glbNode(frag);
TupLoc loc = lubNode.getLink(0);
do {
jam();
selectNode(glbNode, loc);
loc = glbNode.getLink(1);
} while (loc != NullTupLoc);
// borrow max entry from semi/leaf
Uint32 scanList = RNIL;
nodePopDown(glbNode, glbNode.getOccup() - 1, ent, &scanList);
nodePopUp(lubNode, pos, ent, scanList);
if (glbNode.getLink(0) != NullTupLoc) {
jam(); jam();
// tree is now empty treeRemoveSemi(frag, glbNode, 0);
tree.m_root = NullTupLoc;
return; return;
} }
node = parentNode; treeRemoveLeaf(frag, glbNode);
node.setLink(i, NullTupLoc); }
#ifdef dbtux_min_occup_less_max_occup
// check if we created a half-leaf /*
if (node.getBalance() == 0) { * Handle semi-leaf after removing an entry. Move entries from leaf to
* semi-leaf to bring semi-leaf occupancy above minOccup, if possible.
* The leaf may become empty.
*/
void
Dbtux::treeRemoveSemi(Frag& frag, NodeHandle semiNode, unsigned i)
{
TreeHead& tree = frag.m_tree;
ndbrequire(semiNode.getChilds() < 2);
TupLoc leafLoc = semiNode.getLink(i);
NodeHandle leafNode(frag);
selectNode(leafNode, leafLoc);
if (semiNode.getOccup() < tree.m_minOccup) {
jam();
unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - semiNode.getOccup());
nodeSlide(semiNode, leafNode, cnt, i);
if (leafNode.getOccup() == 0) {
// remove empty leaf
jam();
treeRemoveNode(frag, leafNode);
}
}
}
/*
* Handle leaf after removing an entry. If parent is semi-leaf, move
* entries to it as in the semi-leaf case. If parent is interior node,
* do nothing.
*/
void
Dbtux::treeRemoveLeaf(Frag& frag, NodeHandle leafNode)
{
TreeHead& tree = frag.m_tree;
TupLoc parentLoc = leafNode.getLink(2);
if (parentLoc != NullTupLoc) {
jam(); jam();
// move entries from the other child NodeHandle parentNode(frag);
TupLoc childLoc = node.getLink(1 - i); selectNode(parentNode, parentLoc);
NodeHandle childNode(frag); unsigned i = leafNode.getSide();
selectNode(signal, childNode, childLoc); if (parentNode.getLink(1 - i) == NullTupLoc) {
nodeSlide(signal, node, childNode, 1 - i); // parent is semi-leaf
if (childNode.getOccup() == 0) {
jam(); jam();
deleteNode(signal, childNode); if (parentNode.getOccup() < tree.m_minOccup) {
node.setLink(1 - i, NullTupLoc);
// we are balanced again but our parent balance changes by -1
parentLoc = node.getLink(2);
if (parentLoc == NullTupLoc) {
jam(); jam();
return; unsigned cnt = min(leafNode.getOccup(), tree.m_minOccup - parentNode.getOccup());
nodeSlide(parentNode, leafNode, cnt, i);
} }
// fix side and become parent
i = node.getSide();
selectNode(signal, node, parentLoc);
} }
} }
#endif if (leafNode.getOccup() == 0) {
// re-balance tree at each node jam();
// remove empty leaf
treeRemoveNode(frag, leafNode);
}
}
/*
* Remove empty leaf.
*/
void
Dbtux::treeRemoveNode(Frag& frag, NodeHandle leafNode)
{
TreeHead& tree = frag.m_tree;
ndbrequire(leafNode.getChilds() == 0);
TupLoc parentLoc = leafNode.getLink(2);
unsigned i = leafNode.getSide();
deleteNode(leafNode);
if (parentLoc != NullTupLoc) {
jam();
NodeHandle parentNode(frag);
selectNode(parentNode, parentLoc);
parentNode.setLink(i, NullTupLoc);
// re-balance the tree
treeRemoveRebalance(frag, parentNode, i);
return;
}
// tree is now empty
tree.m_root = NullTupLoc;
}
/*
* Re-balance tree after removing a node. The process starts with the
* parent of the removed node.
*/
void
Dbtux::treeRemoveRebalance(Frag& frag, NodeHandle node, unsigned i)
{
while (true) { while (true) {
// height of subtree i has decreased by 1 // height of subtree i has decreased by 1
int j = (i == 0 ? -1 : +1); int j = (i == 0 ? -1 : +1);
...@@ -255,19 +343,19 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -255,19 +343,19 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
jam(); jam();
// child on the other side // child on the other side
NodeHandle childNode(frag); NodeHandle childNode(frag);
selectNode(signal, childNode, node.getLink(1 - i)); selectNode(childNode, node.getLink(1 - i));
int b2 = childNode.getBalance(); int b2 = childNode.getBalance();
if (b2 == b) { if (b2 == b) {
jam(); jam();
treeRotateSingle(signal, frag, node, 1 - i); treeRotateSingle(frag, node, 1 - i);
// height of tree decreased and propagates up // height of tree decreased and propagates up
} else if (b2 == -b) { } else if (b2 == -b) {
jam(); jam();
treeRotateDouble(signal, frag, node, 1 - i); treeRotateDouble(frag, node, 1 - i);
// height of tree decreased and propagates up // height of tree decreased and propagates up
} else { } else {
jam(); jam();
treeRotateSingle(signal, frag, node, 1 - i); treeRotateSingle(frag, node, 1 - i);
// height of tree did not change - done // height of tree did not change - done
return; return;
} }
...@@ -281,7 +369,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -281,7 +369,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
return; return;
} }
i = node.getSide(); i = node.getSide();
selectNode(signal, node, parentLoc); selectNode(node, parentLoc);
} }
} }
...@@ -302,10 +390,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos) ...@@ -302,10 +390,7 @@ Dbtux::treeRemove(Signal* signal, Frag& frag, TreePos treePos)
* all optional. If 4 are there it changes side. * all optional. If 4 are there it changes side.
*/ */
void void
Dbtux::treeRotateSingle(Signal* signal, Dbtux::treeRotateSingle(Frag& frag, NodeHandle& node, unsigned i)
Frag& frag,
NodeHandle& node,
unsigned i)
{ {
ndbrequire(i <= 1); ndbrequire(i <= 1);
/* /*
...@@ -325,7 +410,7 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -325,7 +410,7 @@ Dbtux::treeRotateSingle(Signal* signal,
*/ */
TupLoc loc3 = node5.getLink(i); TupLoc loc3 = node5.getLink(i);
NodeHandle node3(frag); NodeHandle node3(frag);
selectNode(signal, node3, loc3); selectNode(node3, loc3);
const int bal3 = node3.getBalance(); const int bal3 = node3.getBalance();
/* /*
2 must always be there but is not changed. Thus we mereley check that it 2 must always be there but is not changed. Thus we mereley check that it
...@@ -342,7 +427,7 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -342,7 +427,7 @@ Dbtux::treeRotateSingle(Signal* signal,
NodeHandle node4(frag); NodeHandle node4(frag);
if (loc4 != NullTupLoc) { if (loc4 != NullTupLoc) {
jam(); jam();
selectNode(signal, node4, loc4); selectNode(node4, loc4);
ndbrequire(node4.getSide() == (1 - i) && ndbrequire(node4.getSide() == (1 - i) &&
node4.getLink(2) == loc3); node4.getLink(2) == loc3);
node4.setSide(i); node4.setSide(i);
...@@ -377,7 +462,7 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -377,7 +462,7 @@ Dbtux::treeRotateSingle(Signal* signal,
if (loc0 != NullTupLoc) { if (loc0 != NullTupLoc) {
jam(); jam();
NodeHandle node0(frag); NodeHandle node0(frag);
selectNode(signal, node0, loc0); selectNode(node0, loc0);
node0.setLink(side5, loc3); node0.setLink(side5, loc3);
} else { } else {
jam(); jam();
...@@ -514,8 +599,10 @@ Dbtux::treeRotateSingle(Signal* signal, ...@@ -514,8 +599,10 @@ Dbtux::treeRotateSingle(Signal* signal,
* *
*/ */
void void
Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i) Dbtux::treeRotateDouble(Frag& frag, NodeHandle& node, unsigned i)
{ {
TreeHead& tree = frag.m_tree;
// old top node // old top node
NodeHandle node6 = node; NodeHandle node6 = node;
const TupLoc loc6 = node6.m_loc; const TupLoc loc6 = node6.m_loc;
...@@ -526,13 +613,13 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i ...@@ -526,13 +613,13 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
// level 1 // level 1
TupLoc loc2 = node6.getLink(i); TupLoc loc2 = node6.getLink(i);
NodeHandle node2(frag); NodeHandle node2(frag);
selectNode(signal, node2, loc2); selectNode(node2, loc2);
const int bal2 = node2.getBalance(); const int bal2 = node2.getBalance();
// level 2 // level 2
TupLoc loc4 = node2.getLink(1 - i); TupLoc loc4 = node2.getLink(1 - i);
NodeHandle node4(frag); NodeHandle node4(frag);
selectNode(signal, node4, loc4); selectNode(node4, loc4);
const int bal4 = node4.getBalance(); const int bal4 = node4.getBalance();
ndbrequire(i <= 1); ndbrequire(i <= 1);
...@@ -549,23 +636,26 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i ...@@ -549,23 +636,26 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
// fill up leaf before it becomes internal // fill up leaf before it becomes internal
if (loc3 == NullTupLoc && loc5 == NullTupLoc) { if (loc3 == NullTupLoc && loc5 == NullTupLoc) {
jam(); jam();
TreeHead& tree = frag.m_tree; if (node4.getOccup() < tree.m_minOccup) {
nodeSlide(signal, node4, node2, i); jam();
// implied by rule of merging half-leaves with leaves unsigned cnt = tree.m_minOccup - node4.getOccup();
ndbrequire(node4.getOccup() >= tree.m_minOccup); ndbrequire(cnt < node2.getOccup());
ndbrequire(node2.getOccup() != 0); nodeSlide(node4, node2, cnt, i);
ndbrequire(node4.getOccup() >= tree.m_minOccup);
ndbrequire(node2.getOccup() != 0);
}
} else { } else {
if (loc3 != NullTupLoc) { if (loc3 != NullTupLoc) {
jam(); jam();
NodeHandle node3(frag); NodeHandle node3(frag);
selectNode(signal, node3, loc3); selectNode(node3, loc3);
node3.setLink(2, loc2); node3.setLink(2, loc2);
node3.setSide(1 - i); node3.setSide(1 - i);
} }
if (loc5 != NullTupLoc) { if (loc5 != NullTupLoc) {
jam(); jam();
NodeHandle node5(frag); NodeHandle node5(frag);
selectNode(signal, node5, loc5); selectNode(node5, loc5);
node5.setLink(2, node6.m_loc); node5.setLink(2, node6.m_loc);
node5.setSide(i); node5.setSide(i);
} }
...@@ -588,7 +678,7 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i ...@@ -588,7 +678,7 @@ Dbtux::treeRotateDouble(Signal* signal, Frag& frag, NodeHandle& node, unsigned i
if (loc0 != NullTupLoc) { if (loc0 != NullTupLoc) {
jam(); jam();
selectNode(signal, node0, loc0); selectNode(node0, loc0);
node0.setLink(side6, loc4); node0.setLink(side6, loc4);
} else { } else {
jam(); jam();
......
...@@ -29,6 +29,7 @@ shows ms / 1000 rows for each and index time overhead ...@@ -29,6 +29,7 @@ shows ms / 1000 rows for each and index time overhead
samples 10% of all PKs (100,000 pk reads, 100,000 scans) samples 10% of all PKs (100,000 pk reads, 100,000 scans)
the "pct" values are from more accurate total times (not shown) the "pct" values are from more accurate total times (not shown)
comments [ ... ] are after the case
040616 mc02/a 40 ms 87 ms 114 pct 040616 mc02/a 40 ms 87 ms 114 pct
mc02/b 51 ms 128 ms 148 pct mc02/b 51 ms 128 ms 148 pct
...@@ -76,13 +77,12 @@ optim 13 mc02/a 40 ms 57 ms 42 pct ...@@ -76,13 +77,12 @@ optim 13 mc02/a 40 ms 57 ms 42 pct
mc02/c 9 ms 13 ms 50 pct mc02/c 9 ms 13 ms 50 pct
mc02/d 170 ms 256 ms 50 pct mc02/d 170 ms 256 ms 50 pct
after wl-1884 store all-NULL keys (the tests have pctnull=10 per column)
optim 13 mc02/a 39 ms 59 ms 50 pct optim 13 mc02/a 39 ms 59 ms 50 pct
mc02/b 47 ms 77 ms 61 pct mc02/b 47 ms 77 ms 61 pct
mc02/c 9 ms 12 ms 44 pct mc02/c 9 ms 12 ms 44 pct
mc02/d 246 ms 289 ms 17 pct mc02/d 246 ms 289 ms 17 pct
[ after wl-1884 store all-NULL keys (the tests have pctnull=10 per column) ]
[ case d: bug in testOIBasic killed PK read performance ] [ case d: bug in testOIBasic killed PK read performance ]
optim 14 mc02/a 41 ms 60 ms 44 pct optim 14 mc02/a 41 ms 60 ms 44 pct
...@@ -98,8 +98,7 @@ none mc02/a 35 ms 60 ms 71 pct ...@@ -98,8 +98,7 @@ none mc02/a 35 ms 60 ms 71 pct
mc02/c 5 ms 12 ms 106 pct mc02/c 5 ms 12 ms 106 pct
mc02/d 165 ms 238 ms 44 pct mc02/d 165 ms 238 ms 44 pct
[ johan re-installed mc02 as fedora gcc-3.3.2 ] [ johan re-installed mc02 as fedora gcc-3.3.2, tux uses more C++ stuff than tup]
[ case c: table scan has improved... ]
charsets mc02/a 35 ms 60 ms 71 pct charsets mc02/a 35 ms 60 ms 71 pct
mc02/b 42 ms 84 ms 97 pct mc02/b 42 ms 84 ms 97 pct
...@@ -118,6 +117,19 @@ optim 15 mc02/a 34 ms 60 ms 72 pct ...@@ -118,6 +117,19 @@ optim 15 mc02/a 34 ms 60 ms 72 pct
optim 16 mc02/a 34 ms 53 ms 53 pct optim 16 mc02/a 34 ms 53 ms 53 pct
mc02/b 42 ms 75 ms 75 pct mc02/b 42 ms 75 ms 75 pct
[ case a, b: binary search of bounding node when adding entry ] [ binary search of bounding node when adding entry ]
none mc02/a 35 ms 53 ms 51 pct
mc02/b 42 ms 75 ms 76 pct
[ rewrote treeAdd / treeRemove ]
optim 17 mc02/a 35 ms 52 ms 49 pct
mc02/b 43 ms 75 ms 75 pct
[ allow slack (2) in interior nodes - almost no effect?? ]
wl-1942 mc02/a 35 ms 52 ms 49 pct
mc02/b 42 ms 75 ms 76 pct
vim: set et: vim: set et:
...@@ -42,7 +42,7 @@ struct Opt { ...@@ -42,7 +42,7 @@ struct Opt {
CHARSET_INFO* m_cs; CHARSET_INFO* m_cs;
bool m_dups; bool m_dups;
NdbDictionary::Object::FragmentType m_fragtype; NdbDictionary::Object::FragmentType m_fragtype;
unsigned m_idxloop; unsigned m_subsubloop;
const char* m_index; const char* m_index;
unsigned m_loop; unsigned m_loop;
bool m_nologging; bool m_nologging;
...@@ -66,7 +66,7 @@ struct Opt { ...@@ -66,7 +66,7 @@ struct Opt {
m_cs(0), m_cs(0),
m_dups(false), m_dups(false),
m_fragtype(NdbDictionary::Object::FragUndefined), m_fragtype(NdbDictionary::Object::FragUndefined),
m_idxloop(4), m_subsubloop(4),
m_index(0), m_index(0),
m_loop(1), m_loop(1),
m_nologging(false), m_nologging(false),
...@@ -79,7 +79,7 @@ struct Opt { ...@@ -79,7 +79,7 @@ struct Opt {
m_seed(0), m_seed(0),
m_subloop(4), m_subloop(4),
m_table(0), m_table(0),
m_threads(4), m_threads(6), // table + 5 indexes
m_v(1) { m_v(1) {
} }
}; };
...@@ -208,6 +208,8 @@ struct Par : public Opt { ...@@ -208,6 +208,8 @@ struct Par : public Opt {
Set& set() const { assert(m_set != 0); return *m_set; } Set& set() const { assert(m_set != 0); return *m_set; }
Tmr* m_tmr; Tmr* m_tmr;
Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; } Tmr& tmr() const { assert(m_tmr != 0); return *m_tmr; }
unsigned m_lno;
unsigned m_slno;
unsigned m_totrows; unsigned m_totrows;
// value calculation // value calculation
unsigned m_range; unsigned m_range;
...@@ -226,6 +228,8 @@ struct Par : public Opt { ...@@ -226,6 +228,8 @@ struct Par : public Opt {
m_tab(0), m_tab(0),
m_set(0), m_set(0),
m_tmr(0), m_tmr(0),
m_lno(0),
m_slno(0),
m_totrows(m_threads * m_rows), m_totrows(m_threads * m_rows),
m_range(m_rows), m_range(m_rows),
m_pctrange(0), m_pctrange(0),
...@@ -2069,7 +2073,8 @@ pkinsert(Par par) ...@@ -2069,7 +2073,8 @@ pkinsert(Par par)
CHK(con.startTransaction() == 0); CHK(con.startTransaction() == 0);
Lst lst; Lst lst;
for (unsigned j = 0; j < par.m_rows; j++) { for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j); unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock(); set.lock();
if (set.exist(i) || set.pending(i)) { if (set.exist(i) || set.pending(i)) {
set.unlock(); set.unlock();
...@@ -2174,7 +2179,8 @@ pkdelete(Par par) ...@@ -2174,7 +2179,8 @@ pkdelete(Par par)
Lst lst; Lst lst;
bool deadlock = false; bool deadlock = false;
for (unsigned j = 0; j < par.m_rows; j++) { for (unsigned j = 0; j < par.m_rows; j++) {
unsigned i = thrrow(par, j); unsigned j2 = ! par.m_randomkey ? j : urandom(par.m_rows);
unsigned i = thrrow(par, j2);
set.lock(); set.lock();
if (! set.exist(i) || set.pending(i)) { if (! set.exist(i) || set.pending(i)) {
set.unlock(); set.unlock();
...@@ -2398,7 +2404,7 @@ static int ...@@ -2398,7 +2404,7 @@ static int
scanreadindex(Par par, const ITab& itab) scanreadindex(Par par, const ITab& itab)
{ {
const Tab& tab = par.tab(); const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_idxloop; i++) { for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows); BSet bset(tab, itab, par.m_rows);
bset.calc(par); bset.calc(par);
CHK(scanreadindex(par, itab, bset) == 0); CHK(scanreadindex(par, itab, bset) == 0);
...@@ -2645,7 +2651,7 @@ static int ...@@ -2645,7 +2651,7 @@ static int
scanupdateindex(Par par, const ITab& itab) scanupdateindex(Par par, const ITab& itab)
{ {
const Tab& tab = par.tab(); const Tab& tab = par.tab();
for (unsigned i = 0; i < par.m_idxloop; i++) { for (unsigned i = 0; i < par.m_subsubloop; i++) {
BSet bset(tab, itab, par.m_rows); BSet bset(tab, itab, par.m_rows);
bset.calc(par); bset.calc(par);
CHK(scanupdateindex(par, itab, bset) == 0); CHK(scanupdateindex(par, itab, bset) == 0);
...@@ -2685,6 +2691,53 @@ readverify(Par par) ...@@ -2685,6 +2691,53 @@ readverify(Par par)
return 0; return 0;
} }
static int
readverifyfull(Par par)
{
par.m_verify = true;
if (par.m_no == 0)
CHK(scanreadtable(par) == 0);
else {
const Tab& tab = par.tab();
unsigned i = par.m_no;
if (i <= tab.m_itabs && useindex(i)) {
const ITab& itab = tab.m_itab[i - 1];
BSet bset(tab, itab, par.m_rows);
CHK(scanreadindex(par, itab, bset) == 0);
}
}
return 0;
}
static int
pkops(Par par)
{
par.m_randomkey = true;
for (unsigned i = 0; i < par.m_subsubloop; i++) {
unsigned sel = urandom(10);
if (par.m_slno % 2 == 0) {
// favor insert
if (sel < 8) {
CHK(pkinsert(par) == 0);
} else if (sel < 9) {
CHK(pkupdate(par) == 0);
} else {
CHK(pkdelete(par) == 0);
}
} else {
// favor delete
if (sel < 1) {
CHK(pkinsert(par) == 0);
} else if (sel < 2) {
CHK(pkupdate(par) == 0);
} else {
CHK(pkdelete(par) == 0);
}
}
}
return 0;
}
static int static int
pkupdatescanread(Par par) pkupdatescanread(Par par)
{ {
...@@ -2930,6 +2983,8 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode) ...@@ -2930,6 +2983,8 @@ runstep(Par par, const char* fname, TFunc func, unsigned mode)
thr.m_par.m_tab = par.m_tab; thr.m_par.m_tab = par.m_tab;
thr.m_par.m_set = par.m_set; thr.m_par.m_set = par.m_set;
thr.m_par.m_tmr = par.m_tmr; thr.m_par.m_tmr = par.m_tmr;
thr.m_par.m_lno = par.m_lno;
thr.m_par.m_slno = par.m_slno;
thr.m_func = func; thr.m_func = func;
thr.start(); thr.start();
} }
...@@ -2953,8 +3008,8 @@ tbuild(Par par) ...@@ -2953,8 +3008,8 @@ tbuild(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
if (i % 2 == 0) { if (par.m_slno % 2 == 0) {
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
...@@ -2963,9 +3018,10 @@ tbuild(Par par) ...@@ -2963,9 +3018,10 @@ tbuild(Par par)
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
} }
RUNSTEP(par, readverify, MT); RUNSTEP(par, pkupdate, MT);
RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, pkdelete, MT); RUNSTEP(par, pkdelete, MT);
RUNSTEP(par, readverify, MT); RUNSTEP(par, readverifyfull, MT);
RUNSTEP(par, dropindex, ST); RUNSTEP(par, dropindex, ST);
} }
return 0; return 0;
...@@ -2973,6 +3029,22 @@ tbuild(Par par) ...@@ -2973,6 +3029,22 @@ tbuild(Par par)
static int static int
tpkops(Par par) tpkops(Par par)
{
RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT);
for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkops, MT);
LL2("rows=" << par.set().count());
RUNSTEP(par, readverifyfull, MT);
}
return 0;
}
static int
tpkopsread(Par par)
{ {
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
...@@ -2981,7 +3053,7 @@ tpkops(Par par) ...@@ -2981,7 +3053,7 @@ tpkops(Par par)
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdatescanread, MT); RUNSTEP(par, pkupdatescanread, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
} }
...@@ -3000,7 +3072,7 @@ tmixedops(Par par) ...@@ -3000,7 +3072,7 @@ tmixedops(Par par)
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, mixedoperations, MT); RUNSTEP(par, mixedoperations, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
} }
...@@ -3014,7 +3086,7 @@ tbusybuild(Par par) ...@@ -3014,7 +3086,7 @@ tbusybuild(Par par)
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkupdateindexbuild, MT); RUNSTEP(par, pkupdateindexbuild, MT);
RUNSTEP(par, invalidateindex, MT); RUNSTEP(par, invalidateindex, MT);
RUNSTEP(par, readverify, ST); RUNSTEP(par, readverify, ST);
...@@ -3030,7 +3102,7 @@ ttimebuild(Par par) ...@@ -3030,7 +3102,7 @@ ttimebuild(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
t1.on(); t1.on();
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
...@@ -3049,7 +3121,7 @@ ttimemaint(Par par) ...@@ -3049,7 +3121,7 @@ ttimemaint(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
t1.on(); t1.on();
RUNSTEP(par, pkupdate, MT); RUNSTEP(par, pkupdate, MT);
...@@ -3074,7 +3146,7 @@ ttimescan(Par par) ...@@ -3074,7 +3146,7 @@ ttimescan(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
par.m_tmr = &t1; par.m_tmr = &t1;
...@@ -3096,7 +3168,7 @@ ttimepkread(Par par) ...@@ -3096,7 +3168,7 @@ ttimepkread(Par par)
RUNSTEP(par, droptable, ST); RUNSTEP(par, droptable, ST);
RUNSTEP(par, createtable, ST); RUNSTEP(par, createtable, ST);
RUNSTEP(par, invalidatetable, MT); RUNSTEP(par, invalidatetable, MT);
for (unsigned i = 0; i < par.m_subloop; i++) { for (par.m_slno = 0; par.m_slno < par.m_subloop; par.m_slno++) {
RUNSTEP(par, pkinsert, MT); RUNSTEP(par, pkinsert, MT);
RUNSTEP(par, createindex, ST); RUNSTEP(par, createindex, ST);
par.m_tmr = &t1; par.m_tmr = &t1;
...@@ -3132,9 +3204,10 @@ struct TCase { ...@@ -3132,9 +3204,10 @@ struct TCase {
static const TCase static const TCase
tcaselist[] = { tcaselist[] = {
TCase("a", tbuild, "index build"), TCase("a", tbuild, "index build"),
TCase("b", tpkops, "pk operations and scan reads"), TCase("b", tpkops, "pk operations"),
TCase("c", tmixedops, "pk operations and scan operations"), TCase("c", tpkopsread, "pk operations and scan reads"),
TCase("d", tbusybuild, "pk operations and index build"), TCase("d", tmixedops, "pk operations and scan operations"),
TCase("e", tbusybuild, "pk operations and index build"),
TCase("t", ttimebuild, "time index build"), TCase("t", ttimebuild, "time index build"),
TCase("u", ttimemaint, "time index maintenance"), TCase("u", ttimemaint, "time index maintenance"),
TCase("v", ttimescan, "time full scan table vs index on pk"), TCase("v", ttimescan, "time full scan table vs index on pk"),
...@@ -3192,10 +3265,10 @@ runtest(Par par) ...@@ -3192,10 +3265,10 @@ runtest(Par par)
Thr& thr = *g_thrlist[n]; Thr& thr = *g_thrlist[n];
assert(thr.m_thread != 0); assert(thr.m_thread != 0);
} }
for (unsigned l = 0; par.m_loop == 0 || l < par.m_loop; l++) { for (par.m_lno = 0; par.m_loop == 0 || par.m_lno < par.m_loop; par.m_lno++) {
LL1("loop " << l); LL1("loop " << par.m_lno);
if (par.m_seed == 0) if (par.m_seed == 0)
srandom(l); srandom(par.m_lno);
for (unsigned i = 0; i < tcasecount; i++) { for (unsigned i = 0; i < tcasecount; i++) {
const TCase& tcase = tcaselist[i]; const TCase& tcase = tcaselist[i];
if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0) if (par.m_case != 0 && strchr(par.m_case, tcase.m_name[0]) == 0)
......
...@@ -263,8 +263,8 @@ int ha_example::write_row(byte * buf) ...@@ -263,8 +263,8 @@ int ha_example::write_row(byte * buf)
clause was used. Consecutive ordering is not guarenteed. clause was used. Consecutive ordering is not guarenteed.
Currently new_data will not have an updated auto_increament record, or Currently new_data will not have an updated auto_increament record, or
and updated timestamp field. You can do these for example by doing these: and updated timestamp field. You can do these for example by doing these:
if (table->timestamp_on_update_now) if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
update_timestamp(new_row+table->timestamp_on_update_now-1); table->timestamp_field->set_time();
if (table->next_number_field && record == table->record[0]) if (table->next_number_field && record == table->record[0])
update_auto_increment(); update_auto_increment();
......
...@@ -428,8 +428,8 @@ int ha_tina::write_row(byte * buf) ...@@ -428,8 +428,8 @@ int ha_tina::write_row(byte * buf)
statistic_increment(ha_write_count,&LOCK_status); statistic_increment(ha_write_count,&LOCK_status);
if (table->timestamp_default_now) if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
update_timestamp(buf+table->timestamp_default_now-1); table->timestamp_field->set_time();
size= encode_quote(buf); size= encode_quote(buf);
...@@ -464,8 +464,8 @@ int ha_tina::update_row(const byte * old_data, byte * new_data) ...@@ -464,8 +464,8 @@ int ha_tina::update_row(const byte * old_data, byte * new_data)
statistic_increment(ha_update_count,&LOCK_status); statistic_increment(ha_update_count,&LOCK_status);
if (table->timestamp_default_now) if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_UPDATE)
update_timestamp(new_data+table->timestamp_default_now-1); table->timestamp_field->set_time();
size= encode_quote(new_data); size= encode_quote(new_data);
......
...@@ -5468,4 +5468,29 @@ innobase_get_at_most_n_mbchars( ...@@ -5468,4 +5468,29 @@ innobase_get_at_most_n_mbchars(
} }
} }
extern "C" {
/**********************************************************************
This function returns true if SQL-query in the current thread
is either REPLACE or LOAD DATA INFILE REPLACE.
NOTE that /mysql/innobase/row/row0ins.c must contain the
prototype for this function ! */
ibool
innobase_query_is_replace(void)
/*===========================*/
{
THD* thd;
thd = (THD *)innobase_current_thd();
if ( thd->lex->sql_command == SQLCOM_REPLACE ||
( thd->lex->sql_command == SQLCOM_LOAD &&
thd->lex->duplicates == DUP_REPLACE )) {
return true;
} else {
return false;
}
}
}
#endif /* HAVE_INNOBASE_DB */ #endif /* HAVE_INNOBASE_DB */
...@@ -35,7 +35,7 @@ ...@@ -35,7 +35,7 @@
update user set password=PASSWORD("hello") where user="test" update user set password=PASSWORD("hello") where user="test"
This saves a hashed number as a string in the password field. This saves a hashed number as a string in the password field.
The new autentication is performed in following manner: The new authentication is performed in following manner:
SERVER: public_seed=create_random_string() SERVER: public_seed=create_random_string()
send(public_seed) send(public_seed)
......
CHARSET_INFO
============
A structure containing data for charset+collation pair implementation.
Virtual functions which use this data are collected
into separate structures MY_CHARSET_HANDLER and
MY_COLLATION_HANDLER.
typedef struct charset_info_st
{
uint number;
uint primary_number;
uint binary_number;
uint state;
const char *csname;
const char *name;
const char *comment;
uchar *ctype;
uchar *to_lower;
uchar *to_upper;
uchar *sort_order;
uint16 *tab_to_uni;
MY_UNI_IDX *tab_from_uni;
uchar state_map[256];
uchar ident_map[256];
uint strxfrm_multiply;
uint mbminlen;
uint mbmaxlen;
char max_sort_char; /* For LIKE optimization */
MY_CHARSET_HANDLER *cset;
MY_COLLATION_HANDLER *coll;
} CHARSET_INFO;
CHARSET_INFO fields description:
===============================
Numbers (identifiers)
---------------------
number - an ID uniquely identifying this charset+collation pair.
primary_number - ID of a charset+collation pair, which consists
of the same character set and the default collation of this
character set. Not really used now. Intended to optimize some
parts of the code where we need to find the default collation
using its non-default counterpart for the given character set.
binary_numner - ID of a charset+collation pair, which consists
of the same character set and the binary collation of this
character set. Not really used now. Intended to optimize
"SELECT BINARY x" in the future.
Names
-----
csname - name of the character set for this charset+collation pair.
name - name of the collation for this charset+collation pair.
comment - a text comment, dysplayed in "Description" column of
SHOW CHARACTER SET output.
Conversion tables
-----------------
ctype - pointer to array[257] of "type of characters"
bit mask for each chatacter, e.g. if a
character is a digit or a letter or a separator, etc.
to_lower - pointer to arrat[256] used in LCASE()
to_upper - pointer to array[256] used in UCASE()
sort_order - pointer to array[256] used for strings comparison
Unicode conversion data
-----------------------
For 8bit character sets:
tab_to_uni : array[256] of charset->Unicode translation
tab_from_uni: a structure for Unicode->charset translation
Non-8 bit charsets have their own structures per charset
hidden in correspondent ctype-xxx.c file and don't use
tab_to_uni and tab_from_uni tables.
Parser maps
-----------
state_map[]
ident_map[]
These maps are to quickly identify if a character is
an identificator part, a digit, a special character,
or a part of other SQL language lexical item.
Probably can be combined with ctype array in the future.
But for some reasons these two arrays are used in the parser,
while a separate ctype[] array is used in the other part of the
code, like fulltext, etc.
Misc fields
-----------
strxfrm_multiply - how many times a sort key (i.e. a string
which can be passed into memcmp() for comparison)
can be longer than the original string.
Usually it is 1. For some complex
collations it can be bigger. For example
in latin1_german2_ci, a sort key is up to
twice longer than the original string.
e.g. Letter 'A' with two dots above is
substituted with 'AE'.
mbminlen - mininum multibyte sequence length.
Now always 1 accept ucs2. For ucs2
it is 2.
mbmaxlen - maximum multibyte sequence length.
1 for 8bit charsets. Can be also 2 or 3.
MY_CHARSET_HANDLER
==================
MY_CHARSET_HANDLER is a collection of character-set
related routines. Defined in m_ctype.h. Have the
following set of functions:
Multibyte routines
------------------
ismbchar() - detects if the given string is a multibyte sequence
mbcharlen() - retuturns length of multibyte sequence starting with
the given character
numchars() - returns number of characters in the given string, e.g.
in SQL function CHAR_LENGTH().
charpos() - calculates the offset of the given position in the string.
Used in SQL functions LEFT(), RIGHT(), SUBSTRING(),
INSERT()
well_formed_length()
- finds the length of correctly formed multybyte beginning.
Used in INSERTs to cut a beginning of the given string
which is
a) "well formed" according to the given character set.
b) can fit into the given data type
Terminates the string in the good position, taking in account
multibyte character boundaries.
lengthsp() - returns the length of the given string without traling spaces.
Unicode conversion routines
---------------------------
mb_wc - converts the left multibyte sequence into it Unicode code.
mc_mb - converts the given Unicode code into multibyte sequence.
Case and sort convertion
------------------------
caseup_str - converts the given 0-terminated string into the upper case
casedn_str - converts the given 0-terminated string into the lower case
caseup - converts the given string into the lower case using length
casedn - converts the given string into the lower case using length
Number-to-string conversion routines
------------------------------------
snprintf()
long10_to_str()
longlong10_to_str()
The names are pretty self-descripting.
String padding routines
-----------------------
fill() - writes the given Unicode value into the given string
with the given length. Used to pad the string, usually
with space character, according to the given charset.
String-to-numner conversion routines
------------------------------------
strntol()
strntoul()
strntoll()
strntoull()
strntod()
These functions are almost for the same thing with their
STDLIB counterparts, but also:
- accept length instead of 0-terminator
- and are character set dependant
Simple scanner routines
-----------------------
scan() - to skip leading spaces in the given string.
Used when a string value is inserted into a numeric field.
MY_COLLATION_HANDLER
====================
strnncoll() - compares two strings according to the given collation
strnncollsp() - like the above but ignores trailing spaces
strnxfrm() - makes a sort key suitable for memcmp() corresponding
to the given string
like_range() - creates a LIKE range, for optimizer
wildcmp() - wildcard comparison, for LIKE
strcasecmp() - 0-terminated string comparison
instr() - finds the first substring appearence in the string
hash_sort() - calculates hash value taking in account
the collation rules, e.g. case-insensitivity,
accent sensitivity, etc.
\ No newline at end of file
...@@ -123,8 +123,7 @@ int my_strcasecmp_mb(CHARSET_INFO * cs,const char *s, const char *t) ...@@ -123,8 +123,7 @@ int my_strcasecmp_mb(CHARSET_INFO * cs,const char *s, const char *t)
** 1 if matched with wildcard ** 1 if matched with wildcard
*/ */
#define INC_PTR(cs,A,B) A+=((use_mb_flag && \ #define INC_PTR(cs,A,B) A+=(my_ismbchar(cs,A,B) ? my_ismbchar(cs,A,B) : 1)
my_ismbchar(cs,A,B)) ? my_ismbchar(cs,A,B) : 1)
#define likeconv(s,A) (uchar) (s)->sort_order[(uchar) (A)] #define likeconv(s,A) (uchar) (s)->sort_order[(uchar) (A)]
...@@ -135,8 +134,6 @@ int my_wildcmp_mb(CHARSET_INFO *cs, ...@@ -135,8 +134,6 @@ int my_wildcmp_mb(CHARSET_INFO *cs,
{ {
int result= -1; /* Not found, using wildcards */ int result= -1; /* Not found, using wildcards */
bool use_mb_flag=use_mb(cs);
while (wildstr != wildend) while (wildstr != wildend)
{ {
while (*wildstr != w_many && *wildstr != w_one) while (*wildstr != w_many && *wildstr != w_one)
...@@ -144,8 +141,7 @@ int my_wildcmp_mb(CHARSET_INFO *cs, ...@@ -144,8 +141,7 @@ int my_wildcmp_mb(CHARSET_INFO *cs,
int l; int l;
if (*wildstr == escape && wildstr+1 != wildend) if (*wildstr == escape && wildstr+1 != wildend)
wildstr++; wildstr++;
if (use_mb_flag && if ((l = my_ismbchar(cs, wildstr, wildend)))
(l = my_ismbchar(cs, wildstr, wildend)))
{ {
if (str+l > str_end || memcmp(str, wildstr, l) != 0) if (str+l > str_end || memcmp(str, wildstr, l) != 0)
return 1; return 1;
...@@ -200,41 +196,30 @@ int my_wildcmp_mb(CHARSET_INFO *cs, ...@@ -200,41 +196,30 @@ int my_wildcmp_mb(CHARSET_INFO *cs,
cmp= *++wildstr; cmp= *++wildstr;
mb=wildstr; mb=wildstr;
LINT_INIT(mblen); mblen= my_ismbchar(cs, wildstr, wildend);
if (use_mb_flag)
mblen = my_ismbchar(cs, wildstr, wildend);
INC_PTR(cs,wildstr,wildend); /* This is compared trough cmp */ INC_PTR(cs,wildstr,wildend); /* This is compared trough cmp */
cmp=likeconv(cs,cmp); cmp=likeconv(cs,cmp);
do do
{ {
if (use_mb_flag) for (;;)
{ {
for (;;) if (str >= str_end)
return -1;
if (mblen)
{ {
if (str >= str_end) if (str+mblen <= str_end && memcmp(str, mb, mblen) == 0)
return -1;
if (mblen)
{
if (str+mblen <= str_end && memcmp(str, mb, mblen) == 0)
{
str += mblen;
break;
}
}
else if (!my_ismbchar(cs, str, str_end) &&
likeconv(cs,*str) == cmp)
{ {
str++; str += mblen;
break; break;
} }
INC_PTR(cs,str, str_end);
} }
} else if (!my_ismbchar(cs, str, str_end) &&
else likeconv(cs,*str) == cmp)
{ {
while (str != str_end && likeconv(cs,*str) != cmp)
str++; str++;
if (str++ == str_end) return (-1); break;
}
INC_PTR(cs,str, str_end);
} }
{ {
int tmp=my_wildcmp_mb(cs,str,str_end,wildstr,wildend,escape,w_one, int tmp=my_wildcmp_mb(cs,str,str_end,wildstr,wildend,escape,w_one,
...@@ -555,8 +540,6 @@ static int my_wildcmp_mb_bin(CHARSET_INFO *cs, ...@@ -555,8 +540,6 @@ static int my_wildcmp_mb_bin(CHARSET_INFO *cs,
{ {
int result= -1; /* Not found, using wildcards */ int result= -1; /* Not found, using wildcards */
bool use_mb_flag=use_mb(cs);
while (wildstr != wildend) while (wildstr != wildend)
{ {
while (*wildstr != w_many && *wildstr != w_one) while (*wildstr != w_many && *wildstr != w_one)
...@@ -564,8 +547,7 @@ static int my_wildcmp_mb_bin(CHARSET_INFO *cs, ...@@ -564,8 +547,7 @@ static int my_wildcmp_mb_bin(CHARSET_INFO *cs,
int l; int l;
if (*wildstr == escape && wildstr+1 != wildend) if (*wildstr == escape && wildstr+1 != wildend)
wildstr++; wildstr++;
if (use_mb_flag && if ((l = my_ismbchar(cs, wildstr, wildend)))
(l = my_ismbchar(cs, wildstr, wildend)))
{ {
if (str+l > str_end || memcmp(str, wildstr, l) != 0) if (str+l > str_end || memcmp(str, wildstr, l) != 0)
return 1; return 1;
...@@ -620,39 +602,28 @@ static int my_wildcmp_mb_bin(CHARSET_INFO *cs, ...@@ -620,39 +602,28 @@ static int my_wildcmp_mb_bin(CHARSET_INFO *cs,
cmp= *++wildstr; cmp= *++wildstr;
mb=wildstr; mb=wildstr;
LINT_INIT(mblen); mblen= my_ismbchar(cs, wildstr, wildend);
if (use_mb_flag)
mblen = my_ismbchar(cs, wildstr, wildend);
INC_PTR(cs,wildstr,wildend); /* This is compared trough cmp */ INC_PTR(cs,wildstr,wildend); /* This is compared trough cmp */
do do
{ {
if (use_mb_flag) for (;;)
{ {
for (;;) if (str >= str_end)
return -1;
if (mblen)
{ {
if (str >= str_end) if (str+mblen <= str_end && memcmp(str, mb, mblen) == 0)
return -1;
if (mblen)
{
if (str+mblen <= str_end && memcmp(str, mb, mblen) == 0)
{
str += mblen;
break;
}
}
else if (!my_ismbchar(cs, str, str_end) && *str == cmp)
{ {
str++; str += mblen;
break; break;
} }
INC_PTR(cs,str, str_end);
} }
} else if (!my_ismbchar(cs, str, str_end) && *str == cmp)
else {
{
while (str != str_end && *str != cmp)
str++; str++;
if (str++ == str_end) return (-1); break;
}
INC_PTR(cs,str, str_end);
} }
{ {
int tmp=my_wildcmp_mb_bin(cs,str,str_end,wildstr,wildend,escape,w_one,w_many); int tmp=my_wildcmp_mb_bin(cs,str,str_end,wildstr,wildend,escape,w_one,w_many);
......
...@@ -1231,172 +1231,14 @@ uint my_lengthsp_ucs2(CHARSET_INFO *cs __attribute__((unused)), ...@@ -1231,172 +1231,14 @@ uint my_lengthsp_ucs2(CHARSET_INFO *cs __attribute__((unused)),
} }
/*
** Compare string against string with wildcard
** 0 if matched
** -1 if not matched with wildcard
** 1 if matched with wildcard
*/
static
int my_wildcmp_ucs2(CHARSET_INFO *cs,
const char *str,const char *str_end,
const char *wildstr,const char *wildend,
int escape, int w_one, int w_many,
MY_UNICASE_INFO **weights)
{
int result= -1; /* Not found, using wildcards */
my_wc_t s_wc, w_wc;
int scan, plane;
while (wildstr != wildend)
{
while (1)
{
scan= my_ucs2_uni(cs,&w_wc, (const uchar*)wildstr,
(const uchar*)wildend);
if (scan <= 0)
return 1;
if (w_wc == (my_wc_t)escape)
{
wildstr+= scan;
scan= my_ucs2_uni(cs,&w_wc, (const uchar*)wildstr,
(const uchar*)wildend);
if (scan <= 0)
return 1;
}
if (w_wc == (my_wc_t)w_many)
{
result= 1; /* Found an anchor char */
break;
}
wildstr+= scan;
scan= my_ucs2_uni(cs, &s_wc, (const uchar*)str, (const uchar*)str_end);
if (scan <=0)
return 1;
str+= scan;
if (w_wc == (my_wc_t)w_one)
{
result= 1; /* Found an anchor char */
}
else
{
if (weights)
{
plane=(s_wc>>8) & 0xFF;
s_wc = weights[plane] ? weights[plane][s_wc & 0xFF].sort : s_wc;
plane=(w_wc>>8) & 0xFF;
w_wc = weights[plane] ? weights[plane][w_wc & 0xFF].sort : w_wc;
}
if (s_wc != w_wc)
return 1; /* No match */
}
if (wildstr == wildend)
return (str != str_end); /* Match if both are at end */
}
if (w_wc == (my_wc_t)w_many)
{ /* Found w_many */
/* Remove any '%' and '_' from the wild search string */
for ( ; wildstr != wildend ; )
{
scan= my_ucs2_uni(cs,&w_wc, (const uchar*)wildstr,
(const uchar*)wildend);
if (scan <= 0)
return 1;
if (w_wc == (my_wc_t)w_many)
{
wildstr+= scan;
continue;
}
if (w_wc == (my_wc_t)w_one)
{
wildstr+= scan;
scan= my_ucs2_uni(cs, &s_wc, (const uchar*)str,
(const uchar*)str_end);
if (scan <=0)
return 1;
str+= scan;
continue;
}
break; /* Not a wild character */
}
if (wildstr == wildend)
return 0; /* Ok if w_many is last */
if (str == str_end)
return -1;
scan= my_ucs2_uni(cs,&w_wc, (const uchar*)wildstr,
(const uchar*)wildend);
if (scan <= 0)
return 1;
if (w_wc == (my_wc_t)escape)
{
wildstr+= scan;
scan= my_ucs2_uni(cs,&w_wc, (const uchar*)wildstr,
(const uchar*)wildend);
if (scan <= 0)
return 1;
}
while (1)
{
/* Skip until the first character from wildstr is found */
while (str != str_end)
{
scan= my_ucs2_uni(cs,&s_wc, (const uchar*)str,
(const uchar*)str_end);
if (scan <= 0)
return 1;
if (weights)
{
plane=(s_wc>>8) & 0xFF;
s_wc = weights[plane] ? weights[plane][s_wc & 0xFF].sort : s_wc;
plane=(w_wc>>8) & 0xFF;
w_wc = weights[plane] ? weights[plane][w_wc & 0xFF].sort : w_wc;
}
if (s_wc == w_wc)
break;
str+= scan;
}
if (str == str_end)
return -1;
result= my_wildcmp_ucs2(cs,str,str_end,wildstr,wildend,escape,
w_one,w_many,weights);
if (result <= 0)
return result;
str+= scan;
}
}
}
return (str != str_end ? 1 : 0);
}
static static
int my_wildcmp_ucs2_ci(CHARSET_INFO *cs, int my_wildcmp_ucs2_ci(CHARSET_INFO *cs,
const char *str,const char *str_end, const char *str,const char *str_end,
const char *wildstr,const char *wildend, const char *wildstr,const char *wildend,
int escape, int w_one, int w_many) int escape, int w_one, int w_many)
{ {
return my_wildcmp_ucs2(cs,str,str_end,wildstr,wildend, return my_wildcmp_unicode(cs,str,str_end,wildstr,wildend,
escape,w_one,w_many,uni_plane); escape,w_one,w_many,uni_plane);
} }
...@@ -1406,8 +1248,8 @@ int my_wildcmp_ucs2_bin(CHARSET_INFO *cs, ...@@ -1406,8 +1248,8 @@ int my_wildcmp_ucs2_bin(CHARSET_INFO *cs,
const char *wildstr,const char *wildend, const char *wildstr,const char *wildend,
int escape, int w_one, int w_many) int escape, int w_one, int w_many)
{ {
return my_wildcmp_ucs2(cs,str,str_end,wildstr,wildend, return my_wildcmp_unicode(cs,str,str_end,wildstr,wildend,
escape,w_one,w_many,NULL); escape,w_one,w_many,NULL);
} }
......
...@@ -1518,6 +1518,161 @@ MY_UNICASE_INFO *uni_plane[256]={ ...@@ -1518,6 +1518,161 @@ MY_UNICASE_INFO *uni_plane[256]={
}; };
/*
** Compare string against string with wildcard
** This function is used in UTF8 and UCS2
**
** 0 if matched
** -1 if not matched with wildcard
** 1 if matched with wildcard
*/
int my_wildcmp_unicode(CHARSET_INFO *cs,
const char *str,const char *str_end,
const char *wildstr,const char *wildend,
int escape, int w_one, int w_many,
MY_UNICASE_INFO **weights)
{
int result= -1; /* Not found, using wildcards */
my_wc_t s_wc, w_wc;
int scan, plane;
int (*mb_wc)(struct charset_info_st *cs, my_wc_t *wc,
const unsigned char *s,const unsigned char *e);
mb_wc= cs->cset->mb_wc;
while (wildstr != wildend)
{
while (1)
{
if ((scan= mb_wc(cs, &w_wc, (const uchar*)wildstr,
(const uchar*)wildend)) <= 0)
return 1;
if (w_wc == (my_wc_t)escape)
{
wildstr+= scan;
if ((scan= mb_wc(cs,&w_wc, (const uchar*)wildstr,
(const uchar*)wildend)) <= 0)
return 1;
}
if (w_wc == (my_wc_t)w_many)
{
result= 1; /* Found an anchor char */
break;
}
wildstr+= scan;
if ((scan= mb_wc(cs, &s_wc, (const uchar*)str,
(const uchar*)str_end)) <=0)
return 1;
str+= scan;
if (w_wc == (my_wc_t)w_one)
{
result= 1; /* Found an anchor char */
}
else
{
if (weights)
{
plane=(s_wc>>8) & 0xFF;
s_wc = weights[plane] ? weights[plane][s_wc & 0xFF].sort : s_wc;
plane=(w_wc>>8) & 0xFF;
w_wc = weights[plane] ? weights[plane][w_wc & 0xFF].sort : w_wc;
}
if (s_wc != w_wc)
return 1; /* No match */
}
if (wildstr == wildend)
return (str != str_end); /* Match if both are at end */
}
if (w_wc == (my_wc_t)w_many)
{ /* Found w_many */
/* Remove any '%' and '_' from the wild search string */
for ( ; wildstr != wildend ; )
{
if ((scan= mb_wc(cs, &w_wc, (const uchar*)wildstr,
(const uchar*)wildend)) <= 0)
return 1;
if (w_wc == (my_wc_t)w_many)
{
wildstr+= scan;
continue;
}
if (w_wc == (my_wc_t)w_one)
{
wildstr+= scan;
if ((scan= mb_wc(cs, &s_wc, (const uchar*)str,
(const uchar*)str_end)) <=0)
return 1;
str+= scan;
continue;
}
break; /* Not a wild character */
}
if (wildstr == wildend)
return 0; /* Ok if w_many is last */
if (str == str_end)
return -1;
if ((scan= mb_wc(cs, &w_wc, (const uchar*)wildstr,
(const uchar*)wildend)) <=0)
return 1;
if (w_wc == (my_wc_t)escape)
{
wildstr+= scan;
if ((scan= mb_wc(cs, &w_wc, (const uchar*)wildstr,
(const uchar*)wildend)) <=0)
return 1;
}
while (1)
{
/* Skip until the first character from wildstr is found */
while (str != str_end)
{
if ((scan= mb_wc(cs, &s_wc, (const uchar*)str,
(const uchar*)str_end)) <=0)
return 1;
if (weights)
{
plane=(s_wc>>8) & 0xFF;
s_wc = weights[plane] ? weights[plane][s_wc & 0xFF].sort : s_wc;
plane=(w_wc>>8) & 0xFF;
w_wc = weights[plane] ? weights[plane][w_wc & 0xFF].sort : w_wc;
}
if (s_wc == w_wc)
break;
str+= scan;
}
if (str == str_end)
return -1;
result= my_wildcmp_unicode(cs, str, str_end, wildstr, wildend,
escape, w_one, w_many,
weights);
if (result <= 0)
return result;
str+= scan;
}
}
}
return (str != str_end ? 1 : 0);
}
#endif #endif
...@@ -1992,6 +2147,17 @@ static int my_strcasecmp_utf8(CHARSET_INFO *cs, const char *s, const char *t) ...@@ -1992,6 +2147,17 @@ static int my_strcasecmp_utf8(CHARSET_INFO *cs, const char *s, const char *t)
return my_strncasecmp_utf8(cs, s, t, len); return my_strncasecmp_utf8(cs, s, t, len);
} }
static
int my_wildcmp_utf8(CHARSET_INFO *cs,
const char *str,const char *str_end,
const char *wildstr,const char *wildend,
int escape, int w_one, int w_many)
{
return my_wildcmp_unicode(cs,str,str_end,wildstr,wildend,
escape,w_one,w_many,uni_plane);
}
static int my_strnxfrm_utf8(CHARSET_INFO *cs, static int my_strnxfrm_utf8(CHARSET_INFO *cs,
uchar *dst, uint dstlen, uchar *dst, uint dstlen,
const uchar *src, uint srclen) const uchar *src, uint srclen)
...@@ -2060,7 +2226,7 @@ static MY_COLLATION_HANDLER my_collation_ci_handler = ...@@ -2060,7 +2226,7 @@ static MY_COLLATION_HANDLER my_collation_ci_handler =
my_strnncollsp_utf8, my_strnncollsp_utf8,
my_strnxfrm_utf8, my_strnxfrm_utf8,
my_like_range_mb, my_like_range_mb,
my_wildcmp_mb, my_wildcmp_utf8,
my_strcasecmp_utf8, my_strcasecmp_utf8,
my_instr_mb, my_instr_mb,
my_hash_sort_utf8 my_hash_sort_utf8
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment