Commit 1a8a63d0 authored by marko's avatar marko

branches/zip: Reimplement merge sort in fast index creation.

The creation of the primary key does not work.  We will have to flag
externally stored columns and copy the externally stored part from
the old table.

row_build_index_for_mysql(): Rename to row_merge_build_indexes().
Move from row0mysql.c to row0merge.c.

Remove private declarations from row0merge.h.  Make many functions static
in row0merge.c.

cmp_rec_rec_simple(): A new comparison function.

dict_index_get_min_size(): New function.

OS_FILE_FROM_FD(fd): A macro for converting from int to os_file_t.

rec_convert_dtuple_to_rec_comp(): Make the interface lower-level.

rec_get_converted_size_comp(): Return also extra_size.

UT_SORT_FUNCTION_BODY(): Remove reference to an obsolete test program.

row_rec_to_index_entry_low(): New function.

row0merge.c: Implement merge sort based on file streams instead of
fixed-size blocks.  Sort the small blocks as arrays of dfield_t*,
because it is faster than invoking rec_get_offsets() for every
comparison.
parent 673f836f
......@@ -8283,7 +8283,7 @@ err_exit:
/* Read clustered index of the table and build indexes
based on this information using temporary files and merge
sort.*/
error = row_build_index_for_mysql(
error = row_merge_build_indexes(
trx, innodb_table, indexed_table, index,
num_of_idx);
......
......@@ -656,6 +656,14 @@ dict_table_get_sys_col_no(
const dict_table_t* table, /* in: table */
ulint sys); /* in: DATA_ROW_ID, ... */
/************************************************************************
Returns the minimum data size of an index record. */
UNIV_INLINE
ulint
dict_index_get_min_size(
/*====================*/
/* out: minimum data size in bytes */
const dict_index_t* index); /* in: index */
/************************************************************************
Check whether the table uses the compact page format. */
UNIV_INLINE
ibool
......
......@@ -507,6 +507,26 @@ dict_index_get_nth_col_no(
return(dict_col_get_no(dict_index_get_nth_col(index, pos)));
}
/************************************************************************
Returns the minimum data size of an index record. */
UNIV_INLINE
ulint
dict_index_get_min_size(
/*====================*/
/* out: minimum data size in bytes */
const dict_index_t* index) /* in: index */
{
ulint n = dict_index_get_n_fields(index);
ulint size = 0;
while (n--) {
size += dict_col_get_min_size(dict_index_get_nth_col(index,
n));
}
return(size);
}
/*************************************************************************
Gets the space id of the root of the index tree. */
UNIV_INLINE
......
......@@ -43,8 +43,10 @@ extern ulint os_n_pending_writes;
#ifdef __WIN__
#define os_file_t HANDLE
#define OS_FILE_FROM_FD(fd) _get_osfhandle(fd)
#else
typedef int os_file_t;
#define OS_FILE_FROM_FD(fd) fd
#endif
extern ulint os_innodb_umask;
......
......@@ -125,6 +125,22 @@ cmp_dtuple_is_prefix_of_rec(
const dtuple_t* dtuple, /* in: data tuple */
const rec_t* rec, /* in: physical record */
const ulint* offsets);/* in: array returned by rec_get_offsets() */
#ifndef UNIV_HOTBACKUP
/*****************************************************************
Compare two physical records that contain the same number of columns,
none of which are stored externally. */
int
cmp_rec_rec_simple(
/*===============*/
/* out: 1, 0 , -1 if rec1 is greater, equal,
less, respectively, than rec2 */
const rec_t* rec1, /* in: physical record */
const rec_t* rec2, /* in: physical record */
const ulint* offsets1,/* in: rec_get_offsets(rec1, index) */
const ulint* offsets2,/* in: rec_get_offsets(rec2, index) */
dict_index_t* index); /* in: data dictionary index */
#endif /* !UNIV_HOTBACKUP */
/*****************************************************************
This function is used to compare two physical records. Only the common
first fields are compared, and if an externally stored field is
......
......@@ -607,16 +607,17 @@ rec_fold(
/*************************************************************
Builds a ROW_FORMAT=COMPACT record out of a data tuple. */
byte*
void
rec_convert_dtuple_to_rec_comp(
/*===========================*/
/* out: pointer to the start of data payload */
byte* buf, /* in: start address of the data area */
rec_t* rec, /* in: origin of record */
ulint extra, /* in: number of bytes to reserve between
the record header and the data payload
(usually REC_N_NEW_EXTRA_BYTES) */
dict_index_t* index, /* in: record descriptor */
const dtuple_t* dtuple, /* in: data tuple */
ulint status, /* in: status bits of the record */
const dfield_t* fields, /* in: array of data fields */
ulint n_fields,/* in: number of data fields */
const ulint* ext, /* in: array of extern field numbers,
in ascending order */
ulint n_ext); /* in: number of elements in ext */
......@@ -657,9 +658,12 @@ rec_get_converted_size_comp(
/* out: size */
dict_index_t* index, /* in: record descriptor;
dict_table_is_comp() is assumed to hold */
const dtuple_t* dtuple, /* in: data tuple */
ulint status, /* in: status bits of the record */
const dfield_t* fields, /* in: array of data fields */
ulint n_fields,/* in: number of data fields */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext); /* in: number of elements in ext */
ulint n_ext, /* in: number of elements in ext */
ulint* extra); /* out: extra size */
/**************************************************************
The following function returns the size of a data tuple when converted to
a physical record. */
......
......@@ -1538,7 +1538,12 @@ rec_get_converted_size(
: dict_index_get_n_fields(index)));
if (dict_table_is_comp(index->table)) {
return(rec_get_converted_size_comp(index, dtuple, ext, n_ext));
return(rec_get_converted_size_comp(index,
dtuple_get_info_bits(dtuple)
& REC_NEW_STATUS_MASK,
dtuple->fields,
dtuple->n_fields,
ext, n_ext, NULL));
}
data_size = dtuple_get_data_size(dtuple);
......
......@@ -21,17 +21,6 @@ Created 13/06/2005 Jan Lindstrom
#include "btr0types.h"
#include "row0mysql.h"
/* Information about temporary files used in merge sort are stored
to this structure */
struct merge_file_struct {
os_file_t file; /* File descriptor */
ulint offset; /* File offset */
ulint num_of_blocks; /* Number of blocks */
};
typedef struct merge_file_struct merge_file_t;
/* This structure holds index field definitions */
struct merge_index_field_struct {
......@@ -53,48 +42,6 @@ struct merge_index_def_struct {
typedef struct merge_index_def_struct merge_index_def_t;
/************************************************************************
Reads clustered index of the table and create temporary files
containing index entries for indexes to be built. */
ulint
row_merge_read_clustered_index(
/*===========================*/
/* out: DB_SUCCESS if successfull,
or ERROR code */
trx_t* trx, /* in: transaction */
dict_table_t* table, /* in: table where index is created */
dict_index_t** index, /* in: indexes to be created */
merge_file_t* files, /* in: Files where to write index
entries */
ulint num_of_idx); /* in: number of indexes to be
created */
/************************************************************************
Read sorted file containing index data tuples and insert these data
data tuples to the index */
ulint
row_merge_insert_index_tuples(
/*==========================*/
/* out: 0 or error number */
trx_t* trx, /* in: transaction */
dict_index_t* index, /* in: index */
dict_table_t* table, /* in: table */
os_file_t file, /* in: file handle */
ulint offset); /* in: offset where to start
reading */
/*****************************************************************
Merge sort for linked list in the disk. */
ulint
row_merge_sort_linked_list_in_disk(
/*===============================*/
/* out: offset to first block in
the list or ULINT_UNDEFINED in
case of error */
dict_index_t* index, /* in: index to be created */
os_file_t file, /* in: File handle */
int* error); /* out: 0 or error */
/*************************************************************************
Drop an index from the InnoDB system tables. */
......@@ -116,13 +63,6 @@ row_merge_drop_indexes(
dict_table_t* table, /* in: table containing the indexes */
dict_index_t** index, /* in: indexes to drop */
ulint num_created); /* in: number of elements in index[] */
/*************************************************************************
Initialize memory for a merge file structure */
void
row_merge_file_create(
/*==================*/
merge_file_t* merge_file); /* out: merge file structure */
/*************************************************************************
Create a temporary table using a definition of the old table. You must
......@@ -136,16 +76,7 @@ row_merge_create_temporary_table(
dict_table_t* table, /* in: old table definition */
trx_t* trx); /* in/out: trx (sets error_state) */
/*************************************************************************
Update all prebuilts for this table */
void
row_merge_prebuilts_update(
/*=======================*/
trx_t* trx, /* in: trx */
dict_table_t* old_table); /* in: old table */
/*************************************************************************
Rename the indexes in the dicitionary. */
Rename the indexes in the dictionary. */
ulint
row_merge_rename_index(
......@@ -155,7 +86,7 @@ row_merge_rename_index(
dict_table_t* table, /* in: Table for index */
dict_index_t* index); /* in: Index to rename */
/*************************************************************************
Create the index and load in to the dicitionary. */
Create the index and load in to the dictionary. */
dict_index_t*
row_merge_create_index(
......@@ -166,7 +97,7 @@ row_merge_create_index(
const merge_index_def_t* /* in: the index definition */
index_def);
/*************************************************************************
Check if a transaction can use an index.*/
Check if a transaction can use an index. */
ibool
row_merge_is_index_usable(
......@@ -177,13 +108,31 @@ row_merge_is_index_usable(
const dict_index_t* index); /* in: index to check */
/*************************************************************************
If there are views that refer to the old table name then we "attach" to
the new instance of the table else we drop it immediately.*/
the new instance of the table else we drop it immediately. */
ulint
row_merge_drop_table(
/*=================*/
/* out: DB_SUCCESS if all OK else
error code.*/
/* out: DB_SUCCESS or error code */
trx_t* trx, /* in: transaction */
dict_table_t* table); /* in: table instance to drop */
/*************************************************************************
Build indexes on a table by reading a clustered index,
creating a temporary file containing index entries, merge sorting
these index entries and inserting sorted index entries to indexes. */
ulint
row_merge_build_indexes(
/*====================*/
/* out: DB_SUCCESS or error code */
trx_t* trx, /* in: transaction */
dict_table_t* old_table, /* in: Table where rows are
read from */
dict_table_t* new_table, /* in: Table where indexes are
created. Note that old_table ==
new_table if we are creating a
secondary keys. */
dict_index_t** indexes, /* in: indexes to be created */
ulint n_indexes); /* in: size of indexes[] */
#endif /* row0merge.h */
......@@ -503,25 +503,6 @@ row_check_table_for_mysql(
handle */
#endif /* !UNIV_HOTBACKUP */
/*************************************************************************
Build new indexes to a table by reading a clustered index,
creating a temporary file containing index entries, merge sorting
these index entries and inserting sorted index entries to indexes. */
ulint
row_build_index_for_mysql(
/*======================*/
/* out: 0 or error code */
trx_t* trx, /* in: transaction */
dict_table_t* old_table, /* in: Table where rows are
read from */
dict_table_t* new_table, /* in: Table where indexes are
created. Note that old_table ==
new_table if we are creating a
secondary keys. */
dict_index_t** index, /* in: Indexes to be created */
ulint num_of_keys); /* in: Number of indexes to be
created */
/*************************************************************************
Create query graph for a index creation */
ulint
......
......@@ -68,7 +68,7 @@ row_build_index_entry(
mem_heap_t* heap); /* in: memory heap from which the memory for
the index entry is allocated */
/***********************************************************************
An inverse function to dict_row_build_index_entry. Builds a row from a
An inverse function to row_build_index_entry. Builds a row from a
record in a clustered index. */
dtuple_t*
......@@ -98,6 +98,21 @@ row_build(
/***********************************************************************
Converts an index record to a typed data tuple. */
dtuple_t*
row_rec_to_index_entry_low(
/*=======================*/
/* out, index entry built; does not
set info_bits, and the data fields in
the entry will point directly to rec */
const rec_t* rec, /* in: record in the index */
dict_index_t* index, /* in: index */
const ulint* offsets,/* in: rec_get_offsets(rec, index) */
mem_heap_t* heap); /* in: memory heap from which the memory
needed is allocated */
/***********************************************************************
Converts an index record to a typed data tuple. NOTE that externally
stored (often big) fields are NOT copied to heap. */
dtuple_t*
row_rec_to_index_entry(
/*===================*/
......
......@@ -30,8 +30,7 @@ and the low (LOW), inclusive, and high (HIGH), noninclusive,
limits for the sort interval as arguments.
CMP_FUN is the comparison function name. It takes as arguments
two elements from the array and returns 1, if the first is bigger,
0 if equal, and -1 if the second bigger. For an eaxmaple of use
see test program in tsut.c. */
0 if equal, and -1 if the second bigger. */
#define UT_SORT_FUNCTION_BODY(SORT_FUN, ARR, AUX_ARR, LOW, HIGH, CMP_FUN)\
{\
......
......@@ -481,7 +481,7 @@ engine = innodb default charset=utf8;
insert into t1 values (1,1,'ab','ab'),(2,2,'ac','ac'),(3,2,'ad','ad'),(4,4,'afe','afe');
commit;
alter table t1 add unique index (b);
ERROR 23000: Duplicate entry '0' for key 'b'
ERROR 23000: Duplicate entry '' for key 'b'
insert into t1 values(8,9,'fff','fff');
select * from t1;
a b c d
......@@ -650,7 +650,7 @@ engine = innodb default charset=ucs2;
insert into t1 values (1,1,'ab','ab'),(2,2,'ac','ac'),(3,2,'ad','ad'),(4,4,'afe','afe');
commit;
alter table t1 add unique index (b);
ERROR 23000: Duplicate entry '0' for key 'b'
ERROR 23000: Duplicate entry '' for key 'b'
show create table t1;
Table Create Table
t1 CREATE TABLE `t1` (
......
......@@ -132,6 +132,7 @@ create table t1(a int not null, b int, c char(10), d varchar(20), primary key (a
engine = innodb default charset=utf8;
insert into t1 values (1,1,'ab','ab'),(2,2,'ac','ac'),(3,2,'ad','ad'),(4,4,'afe','afe');
commit;
--replace_regex /Duplicate entry '[0-9]*'/Duplicate entry ''/
--error 1582
alter table t1 add unique index (b);
insert into t1 values(8,9,'fff','fff');
......@@ -170,6 +171,7 @@ create table t1(a int not null, b int, c char(10), d varchar(20), primary key (a
engine = innodb default charset=ucs2;
insert into t1 values (1,1,'ab','ab'),(2,2,'ac','ac'),(3,2,'ad','ad'),(4,4,'afe','afe');
commit;
--replace_regex /Duplicate entry '[0-9]*'/Duplicate entry ''/
--error 1582
alter table t1 add unique index (b);
show create table t1;
......
......@@ -1995,7 +1995,7 @@ explain select count(*) from t1 where v between 'a' and 'a ' and v between 'a '
id select_type table type possible_keys key key_len ref rows Extra
1 SIMPLE t1 ref v v 13 const # Using where; Using index
alter table t1 add unique(v);
ERROR 23000: Duplicate entry '{ ' for key 'v_2'
ERROR 23000: Duplicate entry '' for key 'v_2'
alter table t1 add key(v);
select concat('*',v,'*',c,'*',t,'*') as qq from t1 where v='a';
qq
......
......@@ -704,6 +704,154 @@ cmp_dtuple_is_prefix_of_rec(
return(FALSE);
}
#ifndef UNIV_HOTBACKUP
/*****************************************************************
Compare two physical records that contain the same number of columns,
none of which are stored externally. */
int
cmp_rec_rec_simple(
/*===============*/
/* out: 1, 0 , -1 if rec1 is greater, equal,
less, respectively, than rec2 */
const rec_t* rec1, /* in: physical record */
const rec_t* rec2, /* in: physical record */
const ulint* offsets1,/* in: rec_get_offsets(rec1, index) */
const ulint* offsets2,/* in: rec_get_offsets(rec2, index) */
dict_index_t* index) /* in: data dictionary index */
{
ulint rec1_f_len; /* length of current field in rec1 */
const byte* rec1_b_ptr; /* pointer to the current byte
in rec1 field */
ulint rec1_byte; /* value of current byte to be
compared in rec1 */
ulint rec2_f_len; /* length of current field in rec2 */
const byte* rec2_b_ptr; /* pointer to the current byte
in rec2 field */
ulint rec2_byte; /* value of current byte to be
compared in rec2 */
ulint cur_field; /* current field number */
ut_ad(!rec_offs_any_extern(offsets1));
ut_ad(!rec_offs_any_extern(offsets2));
ut_ad(rec_offs_comp(offsets1) == rec_offs_comp(offsets2));
ut_ad(rec_offs_n_fields(offsets1) == rec_offs_n_fields(offsets2));
for (cur_field = 0; cur_field < rec_offs_n_fields(offsets1);
cur_field++) {
ulint cur_bytes;
ulint mtype;
ulint prtype;
{
const dict_col_t* col
= dict_index_get_nth_col(index, cur_field);
mtype = col->mtype;
prtype = col->prtype;
}
rec1_b_ptr = rec_get_nth_field(rec1, offsets1,
cur_field, &rec1_f_len);
rec2_b_ptr = rec_get_nth_field(rec2, offsets2,
cur_field, &rec2_f_len);
if (rec1_f_len == UNIV_SQL_NULL
|| rec2_f_len == UNIV_SQL_NULL) {
if (rec1_f_len == rec2_f_len) {
goto next_field;
} else if (rec2_f_len == UNIV_SQL_NULL) {
/* We define the SQL null to be the
smallest possible value of a field
in the alphabetical order */
return(1);
} else {
return(-1);
}
}
if (mtype >= DATA_FLOAT
|| (mtype == DATA_BLOB
&& 0 == (prtype & DATA_BINARY_TYPE)
&& dtype_get_charset_coll(prtype)
!= DATA_MYSQL_LATIN1_SWEDISH_CHARSET_COLL)) {
int ret = cmp_whole_field(mtype, prtype,
rec1_b_ptr,
(unsigned) rec1_f_len,
rec2_b_ptr,
(unsigned) rec2_f_len);
if (ret) {
return(ret);
}
goto next_field;
}
/* Compare the fields */
for (cur_bytes = 0;; cur_bytes++, rec1_b_ptr++, rec2_b_ptr++) {
if (rec2_f_len <= cur_bytes) {
if (rec1_f_len <= cur_bytes) {
goto next_field;
}
rec2_byte = dtype_get_pad_char(mtype, prtype);
if (rec2_byte == ULINT_UNDEFINED) {
return(1);
}
} else {
rec2_byte = *rec2_b_ptr;
}
if (rec1_f_len <= cur_bytes) {
rec1_byte = dtype_get_pad_char(mtype, prtype);
if (rec1_byte == ULINT_UNDEFINED) {
return(-1);
}
} else {
rec1_byte = *rec1_b_ptr;
}
if (rec1_byte == rec2_byte) {
/* If the bytes are equal, they will remain
such even after the collation transformation
below */
continue;
}
if (mtype <= DATA_CHAR
|| (mtype == DATA_BLOB
&& !(prtype & DATA_BINARY_TYPE))) {
rec1_byte = cmp_collate(rec1_byte);
rec2_byte = cmp_collate(rec2_byte);
}
if (rec1_byte < rec2_byte) {
return(-1);
} else if (rec1_byte > rec2_byte) {
return(1);
}
}
next_field:
continue;
}
/* If we ran out of fields, rec1 was equal to rec2. */
return(0);
}
#endif /* !UNIV_HOTBACKUP */
/*****************************************************************
This function is used to compare two physical records. Only the common
first fields are compared, and if an externally stored field is
......
......@@ -236,6 +236,14 @@ rec_init_offsets_comp_ordinary(
dict_field_t* field;
ulint null_mask = 1;
#ifdef UNIV_DEBUG
/* We cannot invoke rec_offs_make_valid() here, because it can hold
that extra != REC_N_NEW_EXTRA_BYTES. Similarly, rec_offs_validate()
will fail in that case, because it invokes rec_get_status(). */
offsets[2] = (ulint) rec;
offsets[3] = (ulint) index;
#endif /* UNIV_DEBUG */
/* read the lengths of fields 0..n */
do {
ulint len;
......@@ -713,41 +721,50 @@ Determines the size of a data tuple in ROW_FORMAT=COMPACT. */
ulint
rec_get_converted_size_comp(
/*========================*/
/* out: size */
/* out: total size */
dict_index_t* index, /* in: record descriptor;
dict_table_is_comp() is assumed to hold */
const dtuple_t* dtuple, /* in: data tuple */
ulint status, /* in: status bits of the record */
const dfield_t* fields, /* in: array of data fields */
ulint n_fields,/* in: number of data fields */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext) /* in: number of elements in ext */
ulint n_ext, /* in: number of elements in ext */
ulint* extra) /* out: extra size */
{
ulint size = REC_N_NEW_EXTRA_BYTES
+ UT_BITS_IN_BYTES(index->n_nullable);
ulint extra_size;
ulint data_size;
ulint i;
ulint j;
ulint n_fields;
ut_ad(index && dtuple);
ut_ad(dtuple_validate(dtuple));
ut_ad(index);
ut_ad(fields);
ut_ad(n_fields > 0);
switch (dtuple_get_info_bits(dtuple) & REC_NEW_STATUS_MASK) {
switch (UNIV_EXPECT(status, REC_STATUS_ORDINARY)) {
case REC_STATUS_ORDINARY:
n_fields = dict_index_get_n_fields(index);
ut_ad(n_fields == dtuple_get_n_fields(dtuple));
ut_ad(n_fields == dict_index_get_n_fields(index));
data_size = 0;
break;
case REC_STATUS_NODE_PTR:
n_fields = dict_index_get_n_unique_in_tree(index);
ut_ad(n_fields + 1 == dtuple_get_n_fields(dtuple));
ut_ad(dtuple_get_nth_field(dtuple, n_fields)->len == 4);
size += 4; /* child page number */
n_fields--;
ut_ad(n_fields == dict_index_get_n_unique_in_tree(index));
ut_ad(fields[n_fields].len == 4);
ut_ad(!n_ext);
data_size = 4; /* child page number */
break;
case REC_STATUS_INFIMUM:
case REC_STATUS_SUPREMUM:
/* infimum or supremum record, 8 data bytes */
return(REC_N_NEW_EXTRA_BYTES + 8);
extra_size = REC_N_NEW_EXTRA_BYTES;
data_size = 8;
goto func_exit;
default:
ut_error;
return(ULINT_UNDEFINED);
}
extra_size = REC_N_NEW_EXTRA_BYTES
+ UT_BITS_IN_BYTES(index->n_nullable);
/* read the lengths of fields 0..n */
for (i = j = 0; i < n_fields; i++) {
dict_field_t* field;
......@@ -755,12 +772,11 @@ rec_get_converted_size_comp(
const dict_col_t* col;
field = dict_index_get_nth_field(index, i);
len = dtuple_get_nth_field(dtuple, i)->len;
len = fields[i].len;
col = dict_field_get_col(field);
ut_ad(dict_col_type_assert_equal(
col, dfield_get_type(dtuple_get_nth_field(
dtuple, i))));
ut_ad(dict_col_type_assert_equal(col,
dfield_get_type(&fields[i])));
if (len == UNIV_SQL_NULL) {
/* No length is stored for NULL fields. */
......@@ -777,23 +793,28 @@ rec_get_converted_size_comp(
|| field->fixed_len == field->prefix_len);
} else if (UNIV_UNLIKELY(j < n_ext) && i == ext[j]) {
j++;
size += 2;
extra_size += 2;
} else if (len < 128
|| (col->len < 256 && col->mtype != DATA_BLOB)) {
size++;
extra_size++;
} else {
/* For variable-length columns, we look up the
maximum length from the column itself. If this
is a prefix index column shorter than 256 bytes,
this will waste one byte. */
size += 2;
extra_size += 2;
}
size += len;
data_size += len;
}
ut_ad(j == n_ext);
return(size);
func_exit:
if (UNIV_LIKELY_NULL(extra)) {
*extra = extra_size;
}
return(extra_size + data_size);
}
/***************************************************************
......@@ -980,23 +1001,23 @@ rec_convert_dtuple_to_rec_old(
/*************************************************************
Builds a ROW_FORMAT=COMPACT record out of a data tuple. */
byte*
void
rec_convert_dtuple_to_rec_comp(
/*===========================*/
/* out: pointer to the start of data payload */
byte* buf, /* in: start address of the data area */
rec_t* rec, /* in: origin of record */
ulint extra, /* in: number of bytes to reserve between
the record header and the data payload
(usually REC_N_NEW_EXTRA_BYTES) */
(normally REC_N_NEW_EXTRA_BYTES) */
dict_index_t* index, /* in: record descriptor */
const dtuple_t* dtuple, /* in: data tuple */
ulint status, /* in: status bits of the record */
const dfield_t* fields, /* in: array of data fields */
ulint n_fields,/* in: number of data fields */
const ulint* ext, /* in: array of extern field numbers,
in ascending order */
ulint n_ext) /* in: number of elements in ext */
{
const dfield_t* field;
const dtype_t* type;
rec_t* rec = buf + extra;
byte* end;
byte* nulls;
byte* lens;
......@@ -1006,18 +1027,10 @@ rec_convert_dtuple_to_rec_comp(
ulint n_node_ptr_field;
ulint fixed_len;
ulint null_mask = 1;
const ulint n_fields = dtuple_get_n_fields(dtuple);
ut_ad(dict_table_is_comp(index->table));
ut_ad(n_fields > 0);
/* Try to ensure that the memset() between the for() loops
completes fast. The address is not exact, but UNIV_PREFETCH
should never generate a memory fault. */
UNIV_PREFETCH_RW(buf - n_fields);
UNIV_PREFETCH_RW(rec);
switch (UNIV_EXPECT(dtuple_get_info_bits(dtuple) & REC_NEW_STATUS_MASK,
REC_STATUS_ORDINARY)) {
switch (UNIV_EXPECT(status, REC_STATUS_ORDINARY)) {
case REC_STATUS_ORDINARY:
ut_ad(n_fields <= dict_index_get_n_fields(index));
n_node_ptr_field = ULINT_UNDEFINED;
......@@ -1030,62 +1043,12 @@ rec_convert_dtuple_to_rec_comp(
case REC_STATUS_SUPREMUM:
ut_ad(n_fields == 1);
n_node_ptr_field = ULINT_UNDEFINED;
ut_d(j = 0);
goto init;
break;
default:
ut_error;
return(0);
}
/* Calculate the offset of the origin in the physical record.
We must loop over all fields to do this. */
rec += UT_BITS_IN_BYTES(index->n_nullable);
for (i = j = 0; i < n_fields; i++) {
if (UNIV_UNLIKELY(i == n_node_ptr_field)) {
#ifdef UNIV_DEBUG
field = dtuple_get_nth_field(dtuple, i);
type = dfield_get_type(field);
ut_ad(dtype_get_prtype(type) & DATA_NOT_NULL);
ut_ad(dfield_get_len(field) == 4);
#endif /* UNIV_DEBUG */
goto init;
}
field = dtuple_get_nth_field(dtuple, i);
type = dfield_get_type(field);
len = dfield_get_len(field);
fixed_len = dict_index_get_nth_field(index, i)->fixed_len;
ut_ad(dict_col_type_assert_equal(
dict_field_get_col(dict_index_get_nth_field(
index, i)),
dfield_get_type(field)));
if (!(dtype_get_prtype(type) & DATA_NOT_NULL)) {
if (len == UNIV_SQL_NULL)
continue;
}
/* only nullable fields can be null */
ut_ad(len != UNIV_SQL_NULL);
if (fixed_len) {
ut_ad(len == fixed_len);
} else {
ut_ad(len <= dtype_get_len(type)
|| dtype_get_mtype(type) == DATA_BLOB);
rec++;
if (len >= 128
&& (dtype_get_len(type) >= 256
|| dtype_get_mtype(type) == DATA_BLOB)) {
rec++;
} else if (UNIV_UNLIKELY(j < n_ext) && i == ext[j]) {
j++;
rec++;
}
}
return;
}
init:
ut_ad(j == n_ext);
end = rec;
nulls = rec - (extra + 1);
lens = nulls - UT_BITS_IN_BYTES(index->n_nullable);
......@@ -1094,8 +1057,7 @@ init:
/* Store the data and the offsets */
for (i = j = 0; i < n_fields; i++) {
field = dtuple_get_nth_field(dtuple, i);
for (i = j = 0, field = fields; i < n_fields; i++, field++) {
type = dfield_get_type(field);
len = dfield_get_len(field);
......@@ -1106,7 +1068,6 @@ init:
end += 4;
break;
}
fixed_len = dict_index_get_nth_field(index, i)->fixed_len;
if (!(dtype_get_prtype(type) & DATA_NOT_NULL)) {
/* nullable field */
......@@ -1130,6 +1091,9 @@ init:
}
/* only nullable fields can be null */
ut_ad(len != UNIV_SQL_NULL);
fixed_len = dict_index_get_nth_field(index, i)->fixed_len;
if (fixed_len) {
ut_ad(len == fixed_len);
} else {
......@@ -1157,8 +1121,6 @@ init:
}
ut_ad(j == n_ext);
return(rec);
}
/*************************************************************
......@@ -1177,8 +1139,19 @@ rec_convert_dtuple_to_rec_new(
in ascending order */
ulint n_ext) /* in: number of elements in ext */
{
rec_t* rec = rec_convert_dtuple_to_rec_comp(
buf, REC_N_NEW_EXTRA_BYTES, index, dtuple, ext, n_ext);
ulint extra_size;
ulint status;
rec_t* rec;
status = dtuple_get_info_bits(dtuple) & REC_NEW_STATUS_MASK;
rec_get_converted_size_comp(index, status,
dtuple->fields, dtuple->n_fields,
ext, n_ext, &extra_size);
rec = buf + extra_size;
rec_convert_dtuple_to_rec_comp(
rec, REC_N_NEW_EXTRA_BYTES, index, status,
dtuple->fields, dtuple->n_fields, ext, n_ext);
/* Set the info bits of the record */
rec_set_info_and_status_bits(rec, dtuple_get_info_bits(dtuple));
......
/******************************************************
New index creation routines using a merge sort
(c) 2005 Innobase Oy
(c) 2005,2007 Innobase Oy
Created 12/4/2005 Jan Lindstrom
Completed by Sunny Bains and Marko Makela
*******************************************************/
/******************************************************
......@@ -14,15 +15,7 @@ TODO:
2. Add more test cases and fix bugs founds.
3. If we are using variable length keys, then in
some cases these keys do not fit into two empty blocks
in a different order. Therefore, some empty space is
left in every block. However, it has not been shown
that this empty space is enough for all cases. Therefore,
in the above case these overloaded records should be put
on another block.
4. Run benchmarks.
3. Run benchmarks.
*******************************************************/
#include "row0merge.h"
......@@ -55,1406 +48,796 @@ TODO:
#include "pars0pars.h"
#include "mem0mem.h"
#include "log0log.h"
/* Records are stored in the memory for main memory linked list
to this structure */
struct merge_rec_struct {
struct merge_rec_struct *next; /* Pointer to next record
in the list */
rec_t* rec; /* Record */
};
typedef struct merge_rec_struct merge_rec_t;
/* This structure is head element for main memory linked list
used for main memory linked list merge sort */
struct merge_rec_list_struct {
merge_rec_t* head; /* Pointer to head of the
list */
merge_rec_t* tail; /* Pointer to tail of the
list */
#ifdef UNIV_DEBUG
ulint n_records; /* Number of records in
the list */
#endif /* UNIV_DEBUG */
ulint total_size; /* Total size of all records in
the list */
mem_heap_t* heap; /* Heap where memory for this
list is allocated */
};
typedef struct merge_rec_list_struct merge_rec_list_t;
#include "ut0sort.h"
/* Block size for I/O operations in merge sort */
#define MERGE_BLOCK_SIZE 1048576 /* 1M */
/* Intentional free space on every block */
#define MERGE_BLOCK_SAFETY_MARGIN 128
/* Enable faster index creation debug code */
/* #define UNIV_DEBUG_INDEX_CREATE 1 */
/* This block header structure is used to create linked list of the
blocks to the disk. Every block contains one header.*/
struct merge_block_header_struct {
ulint n_records; /* Number of records in the block. */
ulint offset; /* Offset of this block */
ulint next; /* Offset of next block */
typedef byte row_merge_block_t[1048576];
/* Secondary buffer for I/O operations of merge records */
typedef byte mrec_buf_t[UNIV_PAGE_SIZE / 2];
/* Merge record in row_merge_block_t. The format is the same as a
record in ROW_FORMAT=COMPACT with the exception that the
REC_N_NEW_EXTRA_BYTES are omitted. */
typedef byte mrec_t;
/* Buffer for sorting in main memory. */
struct row_merge_buf_struct {
mem_heap_t* heap; /* memory heap where allocated */
dict_index_t* index; /* the index the tuples belong to */
ulint total_size; /* total amount of data bytes */
ulint n_tuples; /* number of data tuples */
ulint max_tuples; /* maximum number of data tuples */
const dfield_t**tuples; /* array of pointers to
arrays of fields that form
the data tuples */
const dfield_t**tmp_tuples; /* temporary copy of tuples,
for sorting */
};
typedef struct merge_block_header_struct merge_block_header_t;
typedef struct row_merge_buf_struct row_merge_buf_t;
/* This block structure is used to hold index records in the disk
and the memory */
/* Information about temporary files used in merge sort are stored
to this structure */
struct merge_block_struct {
merge_block_header_t header; /* Block header information */
char data[MERGE_BLOCK_SIZE - sizeof(merge_block_header_t)];/* Data area i.e. heap */
struct merge_file_struct {
int fd; /* File descriptor */
ulint offset; /* File offset */
};
typedef struct merge_block_struct merge_block_t;
/**************************************************************************
Search an index object by name and column names. If several indexes match,
return the index with the max id. */
static
dict_index_t*
row_merge_dict_table_get_index(
/*===========================*/
/* out: matching index,
NULL if not found */
dict_table_t* table, /* in: table */
const merge_index_def_t*index_def) /* in: index definition */
{
ulint i;
dict_index_t* index;
const char** column_names;
column_names = mem_alloc(index_def->n_fields * sizeof *column_names);
for (i = 0; i < index_def->n_fields; ++i) {
column_names[i] = index_def->fields[i].field_name;
}
index = dict_table_get_index_by_max_id(
table, index_def->name, column_names, index_def->n_fields);
mem_free(column_names);
return(index);
}
/************************************************************************
Creates and initializes a merge block */
static
merge_block_t*
row_merge_block_create(void)
/*========================*/
/* out: pointer to block */
{
merge_block_t* mblock;
mblock = mem_alloc(sizeof *mblock);
typedef struct merge_file_struct merge_file_t;
memset(&mblock->header, 0, sizeof mblock->header);
return(mblock);
}
/************************************************************************
Read a merge block from the file system. */
/**********************************************************
Allocate a sort buffer. */
static
ibool
row_merge_read(
/*===========*/
/* out: TRUE if request was
successful, FALSE if fail */
os_file_t file, /* in: file handle */
ulint offset, /* in: offset where to read */
void* buf, /* out: data */
ulint size) /* in: number of bytes to read */
row_merge_buf_t*
row_merge_buf_create_low(
/*=====================*/
/* out,own: sort buffer */
mem_heap_t* heap, /* in: heap where allocated */
dict_index_t* index, /* in: secondary index */
ulint buf_size, /* in: size of the buffer, in bytes */
ulint max_tuples) /* in: maximum number of data tuples */
{
ib_uint64_t ofs = ((ib_uint64_t) offset) * MERGE_BLOCK_SIZE;
ut_ad(size <= MERGE_BLOCK_SIZE);
return(UNIV_LIKELY(os_file_read(file, buf,
(ulint) (ofs & 0xFFFFFFFF),
(ulint) (ofs >> 32),
size)));
row_merge_buf_t* buf;
buf = mem_heap_alloc(heap, buf_size);
memset(buf, 0, buf_size);
buf->heap = heap;
buf->index = index;
buf->max_tuples = max_tuples;
buf->tuples = mem_heap_alloc(heap,
2 * max_tuples * sizeof *buf->tuples);
buf->tmp_tuples = buf->tuples + max_tuples;
return(buf);
}
/************************************************************************
Read a merge block from the file system. */
/**********************************************************
Allocate a sort buffer. */
static
ibool
row_merge_block_read(
row_merge_buf_t*
row_merge_buf_create(
/*=================*/
/* out: TRUE if request was
successful, FALSE if fail */
os_file_t file, /* in: file handle */
ulint offset, /* in: offset where to read */
merge_block_t* block) /* out: merge block */
{
return(row_merge_read(file, offset, block, sizeof *block));
}
/************************************************************************
Read a merge block header from the disk */
static
ibool
row_merge_block_header_read(
/*========================*/
/* out: TRUE if request was
successful, FALSE if fail */
os_file_t file, /* in: handle to a file */
ulint offset, /* in: offset where to read */
merge_block_header_t* header) /* out: merge block header */
{
return(row_merge_read(file, offset, header, sizeof *header));
}
/************************************************************************
Read a merge block from the file system. */
static
ibool
row_merge_write(
/*============*/
/* out: TRUE if request was
successful, FALSE if fail */
os_file_t file, /* in: file handle */
ulint offset, /* in: offset where to write */
const void* buf, /* in: data */
ulint size) /* in: number of bytes to write */
/* out,own: sort buffer */
dict_index_t* index) /* in: secondary index */
{
ib_uint64_t ofs = ((ib_uint64_t) offset) * MERGE_BLOCK_SIZE;
row_merge_buf_t* buf;
ulint max_tuples;
ulint buf_size;
mem_heap_t* heap;
ut_ad(size <= MERGE_BLOCK_SIZE);
max_tuples = sizeof(row_merge_block_t)
/ ut_max(1, dict_index_get_min_size(index));
return(UNIV_LIKELY(os_file_write("(merge)", file, buf,
(ulint) (ofs & 0xFFFFFFFF),
(ulint) (ofs >> 32),
size)));
}
buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
/************************************************************************
Write a merge block header to the disk */
static
ibool
row_merge_block_header_write(
/*=========================*/
/* out: TRUE if request was
successful, FALSE if fail */
os_file_t file, /* in: handle to a file */
const merge_block_header_t* header) /* in: block header */
{
return(row_merge_write(file, header->offset, header, sizeof *header));
}
heap = mem_heap_create(buf_size + sizeof(row_merge_block_t));
/************************************************************************
Write a merge block to the disk */
static
ibool
row_merge_block_write(
/*==================*/
/* out: TRUE if request was
successful, FALSE if fail */
os_file_t file, /* in: handle to a file */
ulint offset, /* in: file offset */
const merge_block_t* block) /* in: block header */
{
ut_ad(offset == block->header.offset);
buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
return(row_merge_write(file, offset, block, sizeof *block));
return(buf);
}
/**************************************************************
Create a merge record and copy a index data tuple to the merge
record */
/**********************************************************
Empty a sort buffer. */
static
merge_rec_t*
row_merge_rec_create(
/*=================*/
/* out: merge record */
const dtuple_t* dtuple, /* in: data tuple */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext, /* in: number of elements in ext */
dict_index_t* index, /* in: index record descriptor */
mem_heap_t* heap) /* in: heap where memory is allocated */
void
row_merge_buf_empty(
/*================*/
row_merge_buf_t* buf) /* in/out: sort buffer */
{
merge_rec_t* m_rec;
ulint rec_size;
byte* buf;
ulint buf_size;
ulint max_tuples = buf->max_tuples;
mem_heap_t* heap = buf->heap;
dict_index_t* index = buf->index;
ut_ad(dtuple && index && heap);
ut_ad(dtuple_validate(dtuple));
buf_size = (sizeof *buf) + (max_tuples - 1) * sizeof *buf->tuples;
m_rec = (merge_rec_t*) mem_heap_alloc(heap, sizeof(merge_rec_t));
mem_heap_empty(heap);
rec_size = rec_get_converted_size(index, dtuple, ext, n_ext);
buf = mem_heap_alloc(heap, rec_size);
m_rec->rec = rec_convert_dtuple_to_rec(buf, index, dtuple,
ext, n_ext);
m_rec->next = NULL;
return(m_rec);
buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
}
/************************************************************************
Checks that a record fits to a block */
/**********************************************************
Deallocate a sort buffer. */
static
ibool
row_merge_rec_fits_to_block(
/*========================*/
/* out: TRUE if record fits to merge block,
FALSE if record does not fit to block */
const ulint* offsets,/* in: record offsets */
ulint offset) /* in: offset where to store in the block */
void
row_merge_buf_free(
/*===============*/
row_merge_buf_t* buf) /* in,own: sort buffer, to be freed */
{
ulint rec_len;
ut_ad(offsets);
rec_len = mach_get_compressed_size(rec_offs_extra_size(offsets))
+ rec_offs_size(offsets);
/* Note that we intentionally leave free space on
every block. This free space might be later needed when two
blocks are merged and variable length keys are used. Variable
length keys on two blocks might be interleaved on such a manner
that they do not fit on two blocks if blocks are too full */
return((offset + rec_len) < (MERGE_BLOCK_SIZE
- MERGE_BLOCK_SAFETY_MARGIN
- sizeof(merge_block_header_t)));
mem_heap_free(buf->heap);
}
/************************************************************************
Store a record to a merge file block. Note that this function does
not check that the record fits to the block. */
/**********************************************************
Insert a data tuple into a sort buffer. */
static
ulint
row_merge_store_rec_to_block(
/*=========================*/
/* out: offset for next data tuple */
const rec_t* rec, /* in: record to be stored in the memory */
const ulint* offsets,/* in: record offsets */
merge_block_t* mblock, /* in: block where data tuple is stored */
ulint offset) /* in: offset where to store */
ibool
row_merge_buf_add(
/*==============*/
/* out: TRUE if added,
FALSE if out of space */
row_merge_buf_t* buf, /* in/out: sort buffer */
const dtuple_t* row, /* in: row in clustered index */
row_ext_t* ext) /* in/out: cache of externally stored
column prefixes, or NULL */
{
char* dest_data;
ulint rec_len;
ulint extra_len;
ulint storage_size;
ut_ad(rec && mblock && offsets);
ut_ad(rec_validate(rec, offsets));
/* Find the position in the block where this data tuple is stored.
If we are at the start of the block, remember to add size of header
to the offset */
ulint i;
ulint n_fields;
ulint data_size;
ulint extra_size;
dfield_t* entry;
dfield_t* field;
if (offset == 0) {
dest_data = mblock->data;
} else {
dest_data = ((char *)mblock + offset);
if (buf->n_tuples >= buf->max_tuples) {
return(FALSE);
}
ut_ad(dest_data < (char*) &mblock[1]);
extra_len = rec_offs_extra_size(offsets);
rec_len = rec_offs_size(offsets);
/* 1. Store the extra_len */
storage_size = mach_write_compressed((byte *)dest_data, extra_len);
dest_data+=storage_size;
ut_ad(dest_data < (char*) &mblock[1]);
/* 2. Store the record */
memcpy(dest_data, rec - extra_len, rec_len);
dest_data+=rec_len;
ut_ad(dest_data < (char*) &mblock[1]);
n_fields = dict_index_get_n_fields(buf->index);
entry = mem_heap_alloc(buf->heap, n_fields * sizeof *entry);
buf->tuples[buf->n_tuples] = entry;
field = entry;
data_size = 0;
extra_size = UT_BITS_IN_BYTES(buf->index->n_nullable);
for (i = 0; i < n_fields; i++, field++) {
dict_field_t* ifield;
const dict_col_t* col;
ulint col_no;
const dfield_t* row_field;
ifield = dict_index_get_nth_field(buf->index, i);
col = ifield->col;
col_no = dict_col_get_no(col);
row_field = dtuple_get_nth_field(row, col_no);
dfield_copy(field, row_field);
if (UNIV_LIKELY_NULL(ext)
&& dfield_get_len(row_field) != UNIV_SQL_NULL) {
/* See if the column is stored externally. */
byte* buf = row_ext_lookup(ext, col_no,
row_field->data,
row_field->len,
&field->len);
if (UNIV_LIKELY_NULL(buf)) {
field->data = buf;
}
}
mblock->header.n_records++;
if (field->len == UNIV_SQL_NULL) {
ut_ad(!(col->prtype & DATA_NOT_NULL));
field->data = NULL;
continue;
}
/* Return next offset */
return((char *)dest_data - (char *)mblock);
}
/* If a column prefix index, take only the prefix */
/************************************************************************
Read a record from the block */
static
merge_rec_t*
row_merge_read_rec_from_block(
/*==========================*/
/* out: record or NULL*/
merge_block_t* mblock, /* in: memory block where to read */
ulint* offset, /* in/out: offset where to read a record */
mem_heap_t* heap, /* in: heap were this memory for this record
is allocated */
dict_index_t* index) /* in: index record desriptor */
{
merge_rec_t* mrec;
char* from_data;
ulint extra_len;
ulint data_len;
ulint tmp_offset;
ulint storage_len;
rec_t* rec;
mem_heap_t* offset_heap = NULL;
ulint sec_offsets_[REC_OFFS_SMALL_SIZE];
ulint* sec_offs = sec_offsets_;
if (ifield->prefix_len) {
field->len = dtype_get_at_most_n_mbchars(
col->prtype,
col->mbminlen, col->mbmaxlen,
ifield->prefix_len,
field->len, field->data);
}
*sec_offsets_ = (sizeof sec_offsets_) / sizeof *sec_offsets_;
ut_ad(field->len <= col->len || col->mtype == DATA_BLOB);
ut_ad(mblock && offset && heap);
if (ifield->fixed_len) {
ut_ad(field->len == ifield->fixed_len);
} else if (field->len < 128
|| (col->len < 256 && col->mtype != DATA_BLOB)) {
extra_size++;
} else {
extra_size += 2;
}
data_size += field->len;
}
tmp_offset = *offset;
#ifdef UNIV_DEBUG
{
ulint size;
ulint extra;
/* Find the position in the block where this data tuple is stored.
If we are at the start of the block, remember to add size of header
to the offset */
size = rec_get_converted_size_comp(buf->index,
REC_STATUS_ORDINARY,
entry, n_fields, NULL, 0,
&extra);
if (tmp_offset == 0) {
from_data = mblock->data;
} else {
from_data = ((char *)mblock + tmp_offset);
ut_ad(data_size + extra_size + REC_N_NEW_EXTRA_BYTES == size);
ut_ad(extra_size + REC_N_NEW_EXTRA_BYTES == extra);
}
#endif /* UNIV_DEBUG */
ut_ad(from_data < (const char*) &mblock[1]);
mrec = mem_heap_alloc(heap, sizeof(merge_rec_t));
/* 1. Read the extra len and calculate its storage length */
extra_len = mach_read_compressed((byte *)from_data);
storage_len = mach_get_compressed_size(extra_len);
from_data+=storage_len;
ut_ad(from_data < (const char*) &mblock[1]);
/* Add to the total size of the record in row_merge_block_t
the encoded length of extra_size and the extra bytes (extra_size).
See row_merge_buf_write() for the variable-length encoding
of extra_size. */
data_size += extra_size + (extra_size >= 127);
/* 2. Read the record */
rec = (rec_t*)(from_data + extra_len);
mrec->rec = rec;
sec_offs = rec_get_offsets(mrec->rec, index, sec_offs, ULINT_UNDEFINED,
&offset_heap);
data_len = rec_offs_size(sec_offs);
ut_ad(rec_validate(rec, sec_offs));
/* Reserve one byte for the end marker of row_merge_block_t. */
if (buf->total_size + data_size >= sizeof(row_merge_block_t) - 1) {
return(FALSE);
}
from_data+=data_len;
ut_ad(from_data < (const char*) &mblock[1]);
buf->total_size += data_size;
buf->n_tuples++;
/* Return also start offset of the next data tuple */
*offset = ((char *)from_data - (char *)mblock);
field = entry;
if (offset_heap) {
mem_heap_free(offset_heap);
/* Copy the data fields. */
for (i = 0; i < n_fields; i++, field++) {
if (field->len != UNIV_SQL_NULL) {
field->data = mem_heap_dup(buf->heap,
field->data, field->len);
}
}
return(mrec);
return(TRUE);
}
/*****************************************************************
Compare two merge records. */
Compare two tuples. */
static
int
row_merge_cmp(
/*==========*/
/* out: 1, 0, -1 if mrec1 is
greater, equal, less,
respectively, than mrec2 */
merge_rec_t* mrec1, /* in: first merge record to be
compared */
merge_rec_t* mrec2, /* in: second merge record to be
compared */
const ulint* offsets1, /* in: first record offsets */
const ulint* offsets2, /* in: second record offsets */
dict_index_t* index) /* in: index */
row_merge_tuple_cmp(
/*================*/
/* out: 1, 0, -1 if a is greater,
equal, less, respectively, than b */
ulint n_field,/* in: number of fields */
ulint* n_dup, /* in/out: number of duplicates */
const dfield_t* a, /* in: first tuple to be compared */
const dfield_t* b) /* in: second tuple to be compared */
{
ut_ad(mrec1 && mrec2 && offsets1 && offsets2 && index);
ut_ad(rec_validate(mrec1->rec, offsets1));
ut_ad(rec_validate(mrec2->rec, offsets2));
return(cmp_rec_rec(mrec1->rec, mrec2->rec, offsets1, offsets2, index));
}
/*****************************************************************
Merge sort for linked list in memory.
Merge sort takes the input list and makes log N passes along
the list and in each pass it combines each adjacent pair of
small sorted lists into one larger sorted list. When only one
pass is needed the whole output list must have been sorted.
In each pass, two lists of size block_size are merged into lists of
size block_size*2. Initially block_size=1. Merge starts by pointing
a temporary pointer list1 at the head of the list and also preparing
an empty list list_tail where elements will be appended. Then:
1) If list1 is NULL we terminate this pass.
2) Otherwise, there is at least one element in the next
pair of block_size lists therefore, increase the number of
merges performed in this pass.
int cmp;
3) Point another temporary pointer list2 as the same
place as list1. Iterate list2 by block_size elements
or until the end of the list. Let the list_size1 be the
number of elements in the list2.
4) Let list_size1=merge_size. Now we merge list starting at
list1 of length list_size2 with a list starting at list2 of
length at most list_size1.
5) So, as long as either the list1 is non-empty (list_size1)
or the list2 is non-empty (list_size2 and list2 pointing to
a element):
5.1) Select which list to take the next element from.
If either lists is empty, we choose from the other one.
If both lists are non-empty, compare the first element
of each and choose the lower one.
5.2) Remove that element, tmp, from the start of its
lists, by advancing list1 or list2 to next element
and decreasing list1_size or list2_size.
5.3) Append tmp to list_tail
6) At this point, we have advanced list1 until it is where
list2 started out and we have advanced list2 until it is
pointing at the next pair of block_size lists to merge.
Thus, set list1 to the value of list2 and go back to the
start of this loop.
As soon as a pass like this is performed with only one merge, the
algorithm terminates and output list list_head is sorted. Otherwise,
double the value of block_size and go back to the beginning. */
static
ibool
row_merge_sort_linked_list(
/*=======================*/
/* out: FALSE on error */
dict_index_t* index, /* in: index to be created */
merge_rec_list_t* list) /* in: Pointer to head element */
{
ibool success;
merge_rec_t* list1;
merge_rec_t* list2;
merge_rec_t* list_head;
merge_rec_t* list_tail;
ulint block_size;
ulint list1_size;
ulint list2_size;
ulint i;
mem_heap_t* heap = NULL;
ulint offsets1_[REC_OFFS_SMALL_SIZE];
ulint* offsets1 = offsets1_;
ulint offsets2_[REC_OFFS_SMALL_SIZE];
ulint* offsets2 = offsets2_;
ut_ad(list && list->head && index);
*offsets1_ = (sizeof offsets1_) / sizeof *offsets1_;
*offsets2_ = (sizeof offsets2_) / sizeof *offsets2_;
list_head = list->head;
for (block_size = 1;; block_size *= 2) {
ibool sorted = TRUE;
list1 = list_head;
list_head = NULL;
list_tail = NULL;
for (;;) {
list2 = list1;
list1_size = 0;
list2_size = block_size;
/* Step at most block_size elements along from
list2. */
for (i = 0; i < block_size; i++) {
list1_size++;
list2 = list2->next;
if (!list2) {
list2_size = 0;
break;
}
}
/* If list2 is not NULL, we have two lists to merge.
Otherwise, we have a sorted list. */
while (list1_size || list2_size) {
merge_rec_t* tmp;
/* Merge sort two lists by deciding whether
next element of merge comes from list1 or
list2. */
if (list1_size == 0) {
/* First list is empty, next element
must come from the second list. */
goto pick2;
}
if (list2_size == 0) {
/* Second list is empty, next element
must come from the first list. */
goto pick1;
}
offsets1 = rec_get_offsets(list1->rec, index,
offsets1,
ULINT_UNDEFINED,
&heap);
offsets2 = rec_get_offsets(list2->rec, index,
offsets2,
ULINT_UNDEFINED,
&heap);
switch (row_merge_cmp(list1, list2,
offsets1, offsets2,
index)) {
case 0:
if (UNIV_UNLIKELY
(dict_index_is_unique(index))) {
success = FALSE;
goto func_exit;
}
/* fall through */
case -1:
pick1:
tmp = list1;
list1 = list1->next;
list1_size--;
break;
case 1:
pick2:
tmp = list2;
list2 = list2->next;
if (list2) {
list2_size--;
} else {
list2_size = 0;
}
break;
default:
ut_error;
}
/* Append the element to the merged list */
if (list_tail) {
list_tail->next = tmp;
} else {
list_head = tmp;
}
list_tail = tmp;
}
if (!list2) {
if (!sorted) {
break;
}
list->head = list_head;
list_tail->next = NULL;
success = TRUE;
goto func_exit;
}
sorted = FALSE;
list1 = list2;
}
do {
cmp = cmp_dfield_dfield(a++, b++);
} while (!cmp && --n_field);
list_tail->next = NULL;
if (!cmp) {
(*n_dup)++;
}
func_exit:
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
return(success);
return(cmp);
}
/*****************************************************************
Create and initialize record list used for in-memory merge sort */
/**************************************************************************
Merge sort the tuple buffer in main memory. */
static
merge_rec_list_t*
row_merge_create_list(void)
/*=======================*/
/* out: pointer to list */
void
row_merge_tuple_sort(
/*=================*/
ulint n_field,/* in: number of fields */
ulint* n_dup, /* in/out: number of duplicates */
const dfield_t** tuples, /* in/out: tuples */
const dfield_t** aux, /* in/out: work area */
ulint low, /* in: lower bound of the
sorting area, inclusive */
ulint high) /* in: upper bound of the
sorting area, exclusive */
{
merge_rec_list_t* list_header;
mem_heap_t* heap = NULL;
#define row_merge_tuple_sort_ctx(a,b,c,d) \
row_merge_tuple_sort(n_field, n_dup, a, b, c, d)
#define row_merge_tuple_cmp_ctx(a,b) row_merge_tuple_cmp(n_field, n_dup, a, b)
/* Create list header */
heap = mem_heap_create((MERGE_BLOCK_SIZE + sizeof(merge_rec_list_t)));
list_header = mem_heap_alloc(heap, sizeof(merge_rec_list_t));
list_header->head = NULL;
list_header->tail = NULL;
ut_d(list_header->n_records = 0);
list_header->total_size = sizeof(merge_rec_list_t);
list_header->heap = heap;
return(list_header);
UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
tuples, aux, low, high, row_merge_tuple_cmp_ctx);
}
/*****************************************************************
Add one record to the merge list */
/**********************************************************
Sort a buffer. */
static
void
row_merge_list_add(
ulint
row_merge_buf_sort(
/*===============*/
merge_rec_t* m_rec, /* in: record to be
inserted to the list */
ulint rec_len, /* in: record length */
merge_rec_list_t* list_header) /* in/out: list header */
/* out: number of duplicates
encountered */
row_merge_buf_t* buf) /* in/out: sort buffer */
{
ut_ad(m_rec && list_header);
m_rec->next = NULL;
list_header->total_size+=rec_len;
if (list_header->tail == NULL) {
ulint n_dup = 0;
list_header->tail = list_header->head = m_rec;
} else {
list_header->tail->next = m_rec;
list_header->tail = m_rec;
}
row_merge_tuple_sort(dict_index_get_n_fields(buf->index), &n_dup,
buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
ut_d(list_header->n_records++);
return(n_dup);
}
/*****************************************************************
Write records from the list to the merge block */
/**********************************************************
Write a buffer to a block. */
static
merge_rec_list_t*
row_merge_write_list_to_block(
/*==========================*/
/* out: pointer to a new list
where rest of the items are stored */
merge_rec_list_t* list, /* in: Record list */
merge_block_t* output, /* in: Pointer to block */
dict_index_t* index) /* in: Record descriptor */
void
row_merge_buf_write(
/*================*/
const row_merge_buf_t* buf, /* in: sorted buffer */
row_merge_block_t* block) /* out: buffer for writing to file */
{
ulint offset = 0;
merge_rec_t* m_rec = NULL;
merge_rec_list_t* new_list = NULL;
mem_heap_t* heap = NULL;
ulint sec_offsets_[REC_OFFS_SMALL_SIZE];
ulint* sec_offs = sec_offsets_;
ut_ad(list && output && index);
*sec_offsets_ = (sizeof sec_offsets_) / sizeof *sec_offsets_;
output->header.n_records = 0;
/* Write every record which fits to block to the block */
m_rec = list->head;
while (m_rec) {
sec_offs = rec_get_offsets(m_rec->rec, index, sec_offs,
ULINT_UNDEFINED, &heap);
if (!row_merge_rec_fits_to_block(sec_offs, offset)) {
break;
dict_index_t* index = buf->index;
ulint n_fields= dict_index_get_n_fields(index);
byte* b = &(*block)[0];
ulint i;
for (i = 0; i < buf->n_tuples; i++) {
ulint size;
ulint extra_size;
const dfield_t* entry = buf->tuples[i];
size = rec_get_converted_size_comp(buf->index,
REC_STATUS_ORDINARY,
entry, n_fields, NULL, 0,
&extra_size);
ut_ad(size > extra_size);
ut_ad(extra_size >= REC_N_NEW_EXTRA_BYTES);
extra_size -= REC_N_NEW_EXTRA_BYTES;
size -= REC_N_NEW_EXTRA_BYTES;
/* Encode extra_size + 1 */
if (extra_size + 1 < 0x80) {
*b++ = extra_size + 1;
} else {
ut_ad(extra_size < 0x8000);
*b++ = 0x80 | ((extra_size + 1) >> 8);
*b++ = (byte) (extra_size + 1);
}
offset = row_merge_store_rec_to_block(m_rec->rec,
sec_offs, output, offset);
m_rec = m_rec->next;
ut_d(list->n_records--);
}
/* Now create a new list and store rest of the records there.
Note that records must be copied because we deallocate memory
allocated for the original list. */
new_list = row_merge_create_list();
while (m_rec) {
rec_t* rec;
merge_rec_t* n_rec;
void* buff;
*sec_offsets_ = (sizeof sec_offsets_) / sizeof *sec_offsets_;
ut_ad(b + size < block[1]);
sec_offs = rec_get_offsets(m_rec->rec, index, sec_offs,
ULINT_UNDEFINED, &heap);
rec_convert_dtuple_to_rec_comp(b + extra_size, 0, index,
REC_STATUS_ORDINARY,
entry, n_fields, NULL, 0);
buff = mem_heap_alloc(new_list->heap,
rec_offs_size(sec_offs));
n_rec = mem_heap_alloc(new_list->heap, sizeof(merge_rec_t));
rec = rec_copy(buff, m_rec->rec, sec_offs);
n_rec->rec = rec;
row_merge_list_add(n_rec, rec_offs_size(sec_offs), new_list);
m_rec = m_rec->next;
}
/* We can now free original list */
mem_heap_free(list->heap);
if (heap) {
mem_heap_free(heap);
b += size;
}
return(new_list);
/* Write an "end-of-chunk" marker. */
ut_a(b < block[1]);
*b++ = 0;
#ifdef UNIV_DEBUG_VALGRIND
/* The rest of the block is uninitialized. Initialize it
to avoid bogus warnings. */
memset(b, 0, block[1] - b);
#endif /* UNIV_DEBUG_VALGRIND */
}
#ifdef UNIV_DEBUG
/*************************************************************************
Validate contents of the block */
/**********************************************************
Create a memory heap and allocate space for row_merge_rec_offsets(). */
static
ibool
row_merge_block_validate(
/*=====================*/
merge_block_t* block, /* in: block to be printed */
dict_index_t* index) /* in: record descriptor */
mem_heap_t*
row_merge_heap_create(
/*==================*/
/* out: memory heap */
dict_index_t* index, /* in: record descriptor */
ulint** offsets1, /* out: offsets */
ulint** offsets2) /* out: offsets */
{
merge_rec_t* mrec;
ulint offset = 0;
ulint n_recs = 0;
mem_heap_t* heap;
ulint sec_offsets1_[REC_OFFS_SMALL_SIZE];
ulint* sec_offs1 = sec_offsets1_;
*sec_offsets1_ = (sizeof sec_offsets1_) / sizeof *sec_offsets1_;
ut_a(block && index);
heap = mem_heap_create(1024);
ulint i = REC_OFFS_HEADER_SIZE
+ dict_index_get_n_fields(index);
mem_heap_t* heap = mem_heap_create(2 * i * sizeof *offsets1);
fprintf(stderr,
"Block validate %lu records, "
"offset %lu, next %lu\n",
block->header.n_records,
block->header.offset, block->header.next);
*offsets1 = mem_heap_alloc(heap, i * sizeof *offsets1);
*offsets2 = mem_heap_alloc(heap, i * sizeof *offsets2);
ut_a(block->header.n_records > 0);
(*offsets1)[0] = (*offsets2)[0] = i;
(*offsets1)[1] = (*offsets2)[1] = dict_index_get_n_fields(index);
for (n_recs = 0; n_recs < block->header.n_records; n_recs++) {
mrec = row_merge_read_rec_from_block(block, &offset, heap,
index);
sec_offs1 = rec_get_offsets(mrec->rec, index, sec_offs1,
ULINT_UNDEFINED, &heap);
ut_a(rec_validate(mrec->rec, sec_offs1));
mem_heap_empty(heap);
}
mem_heap_free(heap);
return(TRUE);
return(heap);
}
#endif /* UNIV_DEBUG */
/*************************************************************************
Merge two blocks resulting a two sorted blocks. */
/**************************************************************************
Search an index object by name and column names. If several indexes match,
return the index with the max id. */
static
merge_block_t*
row_merge_block_merge(
/*==================*/
/* out: Pointer to first sorted block
or NULL in case of error */
merge_block_t* block1, /* in: First block to be merged */
merge_block_t** block2, /* in/out: Second block to be merged.
Note that contents of the second sorted
block is returned with this parameter.*/
dict_index_t* index) /* in: Index to be created */
dict_index_t*
row_merge_dict_table_get_index(
/*===========================*/
/* out: matching index,
NULL if not found */
dict_table_t* table, /* in: table */
const merge_index_def_t*index_def) /* in: index definition */
{
merge_block_t* new_block1;
merge_block_t* new_block2;
merge_block_t* tmp;
ulint nth_rec1 = 0;
ulint nth_rec2 = 0;
ulint offset1 = 0;
ulint offset2 = 0;
ulint offset3 = 0;
ulint offset4 = 0;
ibool fits_to_new = TRUE;
mem_heap_t* heap;
mem_heap_t* offset_heap = NULL;
ulint sec_offsets1_[REC_OFFS_SMALL_SIZE];
ulint* sec_offs1 = sec_offsets1_;
ulint sec_offsets2_[REC_OFFS_SMALL_SIZE];
ulint* sec_offs2 = sec_offsets2_;
ut_ad(block1 && block2 && *block2 && index);
ut_ad(row_merge_block_validate(block1, index));
ut_ad(row_merge_block_validate(*block2, index));
*sec_offsets1_ = (sizeof sec_offsets1_) / sizeof *sec_offsets1_;
*sec_offsets2_ = (sizeof sec_offsets2_) / sizeof *sec_offsets2_;
new_block1 = row_merge_block_create();
new_block2 = row_merge_block_create();
tmp = *block2;
heap = mem_heap_create(256);
/* Copy block offset and next block offset to new blocks */
new_block1->header = block1->header;
new_block2->header = tmp->header;
new_block1->header.n_records = 0;
new_block2->header.n_records = 0;
/* Merge all records from both blocks */
while (nth_rec1 < block1->header.n_records ||
nth_rec2 < tmp->header.n_records) {
merge_rec_t* mrec1 = NULL;
merge_rec_t* mrec2 = NULL;
const ulint* rec_offsets;
mem_heap_empty(heap);
if (nth_rec1 < block1->header.n_records &&
nth_rec2 >= tmp->header.n_records) {
/* If the second block is empty read record from
the first block */
mrec1 = row_merge_read_rec_from_block(
block1, &offset1, heap, index);
sec_offs1 = rec_get_offsets(
mrec1->rec, index, sec_offs1, ULINT_UNDEFINED,
&offset_heap);
rec_offsets = sec_offs1;
ut_ad(rec_validate(mrec1->rec, sec_offs1));
nth_rec1++;
} else if (nth_rec2 < tmp->header.n_records &&
nth_rec1 >= block1->header.n_records) {
/* If the first block is empty read data tuple from
the second block */
mrec1 = row_merge_read_rec_from_block(
tmp, &offset2, heap, index);
sec_offs1 = rec_get_offsets(
mrec1->rec, index, sec_offs1, ULINT_UNDEFINED,
&offset_heap);
rec_offsets = sec_offs1;
ut_ad(rec_validate(mrec1->rec, sec_offs1));
nth_rec2++;
} else {
ulint tmp_offset1 = offset1;
ulint tmp_offset2 = offset2;
/* Both blocks contain record and thus they must
be compared */
mrec1 = row_merge_read_rec_from_block(
block1, &offset1, heap, index);
sec_offs1 = rec_get_offsets(
mrec1->rec, index, sec_offs1, ULINT_UNDEFINED,
&offset_heap);
ut_ad(rec_validate(mrec1->rec, sec_offs1));
mrec2 = row_merge_read_rec_from_block(
tmp, &offset2, heap, index);
sec_offs2 = rec_get_offsets(
mrec2->rec, index, sec_offs2, ULINT_UNDEFINED,
&offset_heap);
ut_ad(rec_validate(mrec2->rec, sec_offs2));
switch (row_merge_cmp(mrec1, mrec2,
sec_offs1, sec_offs2, index)) {
case 0:
if (UNIV_UNLIKELY
(dict_index_is_unique(index))) {
goto error_handling;
}
/* fall through */
case -1:
rec_offsets = sec_offs1;
nth_rec1++;
offset2 = tmp_offset2;
break;
case 1:
mrec1 = mrec2;
rec_offsets = sec_offs2;
nth_rec2++;
offset1 = tmp_offset1;
break;
default:
ut_error;
}
}
ut_ad(mrec1);
ut_ad(rec_validate(mrec1->rec, rec_offsets));
/* If the first output block is not yet full test whether this
new data tuple fits to block. If not this new data tuple must
be inserted to second output block */
if (fits_to_new) {
fits_to_new = row_merge_rec_fits_to_block(
rec_offsets, offset3);
}
if (fits_to_new) {
offset3 = row_merge_store_rec_to_block(
mrec1->rec, rec_offsets, new_block1, offset3);
} else {
ut_a(row_merge_rec_fits_to_block(rec_offsets,
offset4));
offset4 = row_merge_store_rec_to_block(
mrec1->rec, rec_offsets, new_block2, offset4);
}
/* TODO: If we are using variable length keys, then in
some cases these keys do not fit to two empty blocks
in a different order. Therefore, some empty space is
left to every block. However, it has not been prooven
that this empty space is enough in all cases. Therefore,
here these overloaded records should be put on another
block. */
}
ulint i;
dict_index_t* index;
const char** column_names;
/* Free memory from old blocks and return pointers to new blocks */
column_names = mem_alloc(index_def->n_fields * sizeof *column_names);
if (offset_heap) {
mem_heap_free(offset_heap);
for (i = 0; i < index_def->n_fields; ++i) {
column_names[i] = index_def->fields[i].field_name;
}
mem_heap_free(heap);
mem_free(block1);
mem_free(tmp);
ut_ad(row_merge_block_validate(new_block1, index));
ut_ad(row_merge_block_validate(new_block2, index));
*block2 = new_block2;
return(new_block1);
error_handling:
/* Duplicate key was found and unique key was requested. Free all
allocated memory and return NULL */
if (offset_heap) {
mem_heap_free(offset_heap);
}
index = dict_table_get_index_by_max_id(
table, index_def->name, column_names, index_def->n_fields);
mem_heap_free(heap);
mem_free(block1);
mem_free(tmp);
mem_free(new_block1);
mem_free(new_block2);
mem_free(column_names);
return(NULL);
return(index);
}
/*****************************************************************
Merge sort for linked list in the disk.
Merge sort takes the input list and makes log N passes along
the list and in each pass it combines each adjacent pair of
small sorted lists into one larger sorted list. When only one
pass is needed the whole output list must have been sorted.
The linked list is stored in the file system. File blocks represent
items of linked list. The list is singly linked by the next offset
stored in block header. Offset is calculated from the start of the
file. Thus whenever next item in the list is requested this item is
read from the disk. Similarly every item is witten back to the disk
when we have sorted two blocks in the memory.
/************************************************************************
Read a merge block from the file system. */
static
ibool
row_merge_read(
/*===========*/
/* out: TRUE if request was
successful, FALSE if fail */
int fd, /* in: file descriptor */
ulint offset, /* in: offset where to read */
row_merge_block_t* buf) /* out: data */
{
ib_uint64_t ofs = ((ib_uint64_t) offset) * sizeof *buf;
In each pass, two lists of size block_size are merged into lists of
size block_size*2. Initially block_size=1. Merge starts by pointing
a temporary pointer list1 at the head of the list and also preparing
an empty list list_tail where elements will be appended. Then:
return(UNIV_LIKELY(os_file_read(OS_FILE_FROM_FD(fd), buf,
(ulint) (ofs & 0xFFFFFFFF),
(ulint) (ofs >> 32),
sizeof *buf)));
}
1) If block1 is NULL we terminate this pass.
/************************************************************************
Read a merge block from the file system. */
static
ibool
row_merge_write(
/*============*/
/* out: TRUE if request was
successful, FALSE if fail */
int fd, /* in: file descriptor */
ulint offset, /* in: offset where to write */
const void* buf) /* in: data */
{
ib_uint64_t ofs = ((ib_uint64_t) offset)
* sizeof(row_merge_block_t);
2) Otherwise, there is at least one element in the next
pair of block_size lists therefore, increase the number of
merges performed in this pass.
return(UNIV_LIKELY(os_file_write("(merge)", OS_FILE_FROM_FD(fd), buf,
(ulint) (ofs & 0xFFFFFFFF),
(ulint) (ofs >> 32),
sizeof(row_merge_block_t))));
}
3) Point another temporary pointer list2 as the same
place as list1. Iterate list2 by block_size elements
or until the end of the list. Let the list_size1 be the
number of elements in the list2.
/************************************************************************
Read a merge record. */
static
const byte*
row_merge_read_rec(
/*===============*/
/* out: pointer to next record,
or NULL on I/O error
or end of list */
row_merge_block_t* block, /* in/out: file buffer */
mrec_buf_t* buf, /* in/out: secondary buffer */
const byte* b, /* in: pointer to record */
dict_index_t* index, /* in: index of the record */
int fd, /* in: file descriptor */
ulint* foffs, /* in/out: file offset */
const mrec_t** mrec, /* out: pointer to merge record,
or NULL on end of list
(non-NULL on I/O error) */
ulint* offsets)/* out: offsets of mrec */
{
ulint extra_size;
ulint data_size;
ulint avail_size;
ut_ad(block);
ut_ad(buf);
ut_ad(b >= block[0]);
ut_ad(b < block[1]);
ut_ad(index);
ut_ad(foffs);
ut_ad(mrec);
ut_ad(offsets);
4) Let list_size1=merge_size. Now we merge list starting at
list1 of length list_size2 with a list starting at list2 of
length at most list_size1.
ut_ad(*offsets == REC_OFFS_HEADER_SIZE
+ dict_index_get_n_fields(index));
5) So, as long as either the list1 is non-empty (list_size1)
or the list2 is non-empty (list_size2 and list2 pointing to
a element):
extra_size = *b++;
5.1) Select which list to take the next element from.
If either lists is empty, we choose from the other one.
If both lists are non-empty, compare the first element
of each and choose the lower one.
if (UNIV_UNLIKELY(!extra_size)) {
/* End of list */
*mrec = NULL;
return(NULL);
}
5.2) Remove that element, tmp, from the start of its
lists, by advancing list1 or list2 to next element
and decreasing list1_size or list2_size.
if (extra_size >= 0x80) {
/* Read another byte of extra_size. */
5.3) Append tmp to list_tail
if (UNIV_UNLIKELY(b >= block[1])) {
if (!row_merge_read(fd, ++(*foffs), block)) {
err_exit:
/* Signal I/O error. */
*mrec = b;
return(NULL);
}
6) At this point, we have advanced list1 until it is where
list2 started out and we have advanced list2 until it is
pointing at the next pair of block_size lists to merge.
Thus, set list1 to the value of list2 and go back to the
start of this loop.
/* Wrap around to the beginning of the buffer. */
b = block[0];
}
As soon as a pass like this is performed with only one merge, the
algorithm terminates. Otherwise, double the value of block_size
and go back to the beginning. */
extra_size = (extra_size & 0x7f) << 8;
extra_size |= *b++;
}
ulint
row_merge_sort_linked_list_in_disk(
/*===============================*/
/* out: offset to first block in
the list or ULINT_UNDEFINED in
case of error */
dict_index_t* index, /* in: index to be created */
os_file_t file, /* in: File handle */
int* error) /* out: 0 or error */
{
merge_block_t* block1;
merge_block_t* block2;
merge_block_t* backup1;
merge_block_t* backup2;
merge_file_t output;
ulint block_size;
ulint list_head = 0;
/* Normalize extra_size. Above, value 0 signals "end of list. */
extra_size--;
ut_ad(index);
/* Read the extra bytes. */
/* Allocate memory for blocks */
backup1 = block1 = row_merge_block_create();
backup2 = block2 = row_merge_block_create();
if (UNIV_UNLIKELY(b + extra_size >= block[1])) {
/* The record spans two blocks. Copy the entire record
to the auxiliary buffer and handle this as a special
case. */
output.file = file;
avail_size = block[1] - b;
for (block_size = 1;; block_size *= 2) {
ibool sorted = TRUE;
ibool list_is_empty = TRUE;
memcpy(*buf, b, avail_size);
block1 = backup1;
if (!row_merge_read(fd, ++(*foffs), block)) {
if (!row_merge_block_read(file, list_head, block1)) {
file_error:
*error = DB_CORRUPTION;
goto err_exit;
}
ut_ad(row_merge_block_validate(block1, index));
for (;;) {
ulint offset = block1->header.offset;
ulint list1_size = 0;
ulint list2_size = block_size;
ulint i;
/* Wrap around to the beginning of the buffer. */
b = block[0];
/* Count how many list elements we have in the list. */
/* Copy the record. */
memcpy(*buf + avail_size, b, extra_size - avail_size);
b += extra_size - avail_size;
for (i = 0; i < block_size; i++) {
merge_block_header_t header;
*mrec = *buf + extra_size;
list1_size++;
rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
/* Here read only the header to iterate the
list in the disk. */
data_size = rec_offs_data_size(offsets);
if (!row_merge_block_header_read(file, offset,
&header)) {
goto file_error;
}
/* These overflows should be impossible given that
records are much smaller than either buffer, and
the record starts near the beginning of each buffer. */
ut_a(extra_size + data_size < sizeof *buf);
ut_a(b + data_size < block[1]);
offset = header.next;
/* Copy the data bytes. */
memcpy(*buf + extra_size, b, data_size);
b += data_size;
/* If the offset is zero we have arrived to the
end of disk list */
return(b);
}
if (!offset) {
break;
}
}
*mrec = b + extra_size;
/* If offset is zero we have reached end of the list in
the disk. */
rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
if (!offset) {
block2 = NULL;
} else {
block2 = backup2;
if (!row_merge_block_read(
file, offset, block2)) {
goto file_error;
}
ut_ad(row_merge_block_validate(block2, index));
}
data_size = rec_offs_data_size(offsets);
ut_ad(extra_size + data_size < sizeof *buf);
/* If list2 is not empty, we have two lists to merge.
Otherwise, we have a sorted list. */
while (list1_size > 0 || (list2_size > 0 && block2)) {
/* Merge sort two lists by deciding whether
next element of merge comes from list1 or
list2. */
merge_block_t* tmp;
if (list1_size == 0) {
/* First list is empty, next element
must come from the second list. */
tmp = block2;
if (!block2->header.next) {
block2 = NULL;
list2_size = 0;
} else {
list2_size--;
}
} else if (list2_size == 0 || !block2) {
/* Second list is empty, next record
must come from the first list. */
tmp = block1;
list1_size--;
} else {
/* Both lists contain a block and we
need to merge records on these block */
tmp = row_merge_block_merge(
block1, &block2, index);
if (tmp == NULL) {
*error = DB_DUPLICATE_KEY;
goto err_exit;
}
block1 = backup1 = tmp;
backup2 = block2;
list1_size--;
}
b += extra_size + data_size;
/* Store the head offset of the disk
list. Note that only records in the
blocks are changed not the order of
the blocks in the disk. */
if (UNIV_LIKELY(b < block[1])) {
/* The record fits entirely in the block.
This is the normal case. */
return(b);
}
if (list_is_empty) {
list_is_empty = FALSE;
list_head = tmp->header.offset;
}
/* The record spans two blocks. Copy it to buf. */
ut_ad(row_merge_block_validate(tmp, index));
avail_size = block[1] - b;
memcpy(*buf, b, avail_size);
*mrec = *buf + extra_size;
rec_offs_make_valid(*mrec, index, offsets);
if (!row_merge_block_write(
file, tmp->header.offset, tmp)) {
goto file_error;
}
if (!row_merge_read(fd, ++(*foffs), block)) {
/* Now we can read the next record from the
selected list if it contains more records */
goto err_exit;
}
if (tmp->header.next
&& !row_merge_block_read(file,
tmp->header.next,
tmp)) {
goto file_error;
}
}
/* Wrap around to the beginning of the buffer. */
b = block[0];
/* Now we have processed block_size items from
the disk. Swap blocks using pointers. */
/* Copy the rest of the record. */
memcpy(*buf + avail_size, b, extra_size + data_size - avail_size);
b += extra_size + data_size - avail_size;
if (!block2) {
if (sorted) {
goto func_exit;
}
break;
}
return(b);
}
sorted = FALSE;
block2 = backup1;
block1 = backup2;
backup2 = block2;
backup1 = block1;
}
/************************************************************************
Write a merge record. */
static
void
row_merge_write_rec_low(
/*====================*/
byte* b, /* out: buffer */
ulint e, /* in: encoded extra_size */
const mrec_t* mrec, /* in: record to write */
const ulint* offsets)/* in: offsets of mrec */
{
if (e < 0x80) {
*b++ = e;
} else {
*b++ = 0x80 | (e >> 8);
*b++ = (byte) e;
}
err_exit:
list_head = ULINT_UNDEFINED;
func_exit:
mem_free(backup1);
mem_free(backup2);
return(list_head);
memcpy(b, mrec - rec_offs_extra_size(offsets), rec_offs_size(offsets));
}
/************************************************************************
Merge sort linked list in the memory and store part of the linked
list into a block and write this block to the disk. */
Write a merge record. */
static
ibool
row_merge_sort_and_store(
/*=====================*/
/* out: FALSE on error */
dict_index_t* index, /* in: Index */
merge_file_t* file, /* in: File where to write index
entries */
merge_block_t* block, /* in/out: Block where to store
the list */
merge_rec_list_t** list) /* in/out: Pointer to the list */
byte*
row_merge_write_rec(
/*================*/
/* out: pointer to end of block,
or NULL on error */
row_merge_block_t* block, /* in/out: file buffer */
mrec_buf_t* buf, /* in/out: secondary buffer */
byte* b, /* in: pointer to end of block */
int fd, /* in: file descriptor */
ulint* foffs, /* in/out: file offset */
const mrec_t* mrec, /* in: record to write */
const ulint* offsets)/* in: offsets of mrec */
{
ut_ad(index && file && block && list);
ulint extra_size;
ulint size;
ulint avail_size;
ut_ad(block);
ut_ad(buf);
ut_ad(b >= block[0]);
ut_ad(b < block[1]);
ut_ad(mrec);
ut_ad(foffs);
ut_ad(mrec < block[0] || mrec > block[1]);
ut_ad(mrec < buf[0] || mrec > buf[1]);
/* Normalize extra_size. Value 0 signals "end of list". */
extra_size = rec_offs_extra_size(offsets) + 1;
size = extra_size + (extra_size >= 0x80)
+ rec_offs_data_size(offsets);
if (UNIV_UNLIKELY(b + size >= block[1])) {
/* The record spans two blocks.
Copy it to the temporary buffer first. */
avail_size = block[1] - b;
row_merge_write_rec_low(buf[0], extra_size, mrec, offsets);
/* Copy the head of the temporary buffer, write
the completed block, and copy the tail of the
record to the head of the new block. */
memcpy(b, buf[0], avail_size);
if (!row_merge_write(fd, (*foffs)++, block)) {
return(NULL);
}
/* Firstly, merge sort linked list in the memory */
if (!row_merge_sort_linked_list(index, *list)) {
return(FALSE);
/* Copy the rest. */
b = block[0];
memcpy(b, buf[0] + avail_size, size - avail_size);
b += size - avail_size;
} else {
row_merge_write_rec_low(b, extra_size, mrec, offsets);
b += rec_offs_size(offsets);
}
/* Secondly, write part of the linked list to the block */
*list = row_merge_write_list_to_block(*list, block, index);
ut_ad(row_merge_block_validate(block, index));
/* Next block will be written directly behind this one. This will
create a 'linked list' of blocks to the disk. */
block->header.offset = file->offset;
block->header.next = ++file->offset;
/* Thirdly, write block to the disk */
return(row_merge_block_write(file->file, block->header.offset, block));
return(b);
}
#ifdef UNIV_DEBUG_INDEX_CREATE
/************************************************************************
Pretty print data tuple */
Write an end-of-list marker. */
static
void
row_merge_dtuple_print(
/*===================*/
FILE* f, /* in: output stream */
dtuple_t* dtuple) /* in: data tuple */
byte*
row_merge_write_eof(
/*================*/
/* out: pointer to end of block,
or NULL on error */
row_merge_block_t* block, /* in/out: file buffer */
byte* b, /* in: pointer to end of block */
int fd, /* in: file descriptor */
ulint* foffs) /* in/out: file offset */
{
ulint n_fields;
ulint i;
ut_ad(f && dtuple);
n_fields = dtuple_get_n_fields(dtuple);
fprintf(f, "DATA TUPLE: %lu fields;\n", (ulong) n_fields);
for (i = 0; i < n_fields; i++) {
dfield_t* dfield;
dfield = dtuple_get_nth_field(dtuple, i);
fprintf(f, "%lu: ", (ulong) i);
if (dfield->len != UNIV_SQL_NULL) {
dfield_print_also_hex(dfield);
} else {
fputs(" SQL NULL", f);
}
putc(';', f);
ut_ad(block);
ut_ad(b >= block[0]);
ut_ad(b < block[1]);
ut_ad(foffs);
*b++ = 0;
#ifdef UNIV_DEBUG_VALGRIND
/* The rest of the block is uninitialized. Initialize it
to avoid bogus warnings. */
memset(b, 0, block[1] - b);
#endif /* UNIV_DEBUG_VALGRIND */
if (!row_merge_write(fd, (*foffs)++, block)) {
return(NULL);
}
putc('\n', f);
ut_ad(dtuple_validate(dtuple));
return(block[0]);
}
/*****************************************************************
Compare two merge records. */
static
int
row_merge_cmp(
/*==========*/
/* out: 1, 0, -1 if mrec1 is
greater, equal, less,
respectively, than mrec2 */
const mrec_t* mrec1, /* in: first merge record to be
compared */
const mrec_t* mrec2, /* in: second merge record to be
compared */
const ulint* offsets1, /* in: first record offsets */
const ulint* offsets2, /* in: second record offsets */
dict_index_t* index) /* in: index */
{
return(cmp_rec_rec_simple(mrec1, mrec2, offsets1, offsets2, index));
}
#endif /* UNIV_DEBUG_INDEX_CREATE */
/************************************************************************
Reads clustered index of the table and create temporary files
containing index entries for indexes to be built. */
static
ulint
row_merge_read_clustered_index(
/*===========================*/
/* out: DB_SUCCESS if successfull,
or ERROR code */
trx_t* trx, /* in: transaction */
dict_table_t* table, /* in: table where index is created */
dict_index_t** index, /* in: indexes to be created */
merge_file_t* files, /* in: Files where to write index
entries */
ulint num_of_idx) /* in: number of indexes to be
created */
/* out: DB_SUCCESS or error */
trx_t* trx, /* in: transaction */
dict_table_t* table, /* in: table where index is created */
dict_index_t** index, /* in: indexes to be created */
merge_file_t* files, /* in: temporary files */
ulint n_index,/* in: number of indexes to create */
row_merge_block_t* block) /* in/out: file buffer */
{
dict_index_t* clust_index; /* Clustered index */
merge_rec_t* new_mrec; /* New merge record */
mem_heap_t* row_heap; /* Heap memory to create
dict_index_t* clust_index; /* Clustered index */
mem_heap_t* row_heap; /* Heap memory to create
clustered index records */
mem_heap_t* heap; /* Memory heap for
record lists and offsets */
merge_block_t* block; /* Merge block where records
are stored for memory sort and
then written to the disk */
merge_rec_list_t** merge_list; /* Temporary list for records*/
btr_pcur_t pcur; /* Persistent cursor on the
row_merge_buf_t** merge_buf; /* Temporary list for records*/
btr_pcur_t pcur; /* Persistent cursor on the
clustered index */
mtr_t mtr; /* Mini transaction */
ulint err = DB_SUCCESS; /* Return code */
ulint idx_num = 0; /* Index number */
ulint n_blocks = 0; /* Number of blocks written
to disk */
ulint sec_offsets_[REC_OFFS_NORMAL_SIZE];
ulint* sec_offs = sec_offsets_;
*sec_offsets_ = (sizeof sec_offsets_) / sizeof *sec_offsets_;
mtr_t mtr; /* Mini transaction */
ulint err = DB_SUCCESS;/* Return code */
ulint i;
trx->op_info="reading clustered index";
trx->op_info = "reading clustered index";
ut_ad(trx);
ut_ad(table);
ut_ad(index);
ut_ad(files);
/* Create block where index entries are stored */
block = row_merge_block_create();
/* Create and initialize memory for record buffers */
/* Create and initialize memory for record lists */
merge_buf = mem_alloc(n_index * sizeof *merge_buf);
heap = mem_heap_create(256);
merge_list = mem_heap_alloc(heap, num_of_idx * sizeof *merge_list);
for (idx_num = 0; idx_num < num_of_idx; idx_num++) {
merge_list[idx_num] = row_merge_create_list();
for (i = 0; i < n_index; i++) {
merge_buf[i] = row_merge_buf_create(index[i]);
}
mtr_start(&mtr);
......@@ -1467,13 +850,14 @@ row_merge_read_clustered_index(
btr_pcur_open_at_index_side(
TRUE, clust_index, BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);
row_heap = mem_heap_create(512);
row_heap = mem_heap_create(UNIV_PAGE_SIZE);
/* Iterate all records in the clustered index */
/* Scan the clustered index. */
for (;;) {
const rec_t* rec;
dtuple_t* row;
row_ext_t* ext;
ibool has_next = TRUE;
btr_pcur_move_to_next_on_page(&pcur, &mtr);
......@@ -1486,174 +870,328 @@ row_merge_read_clustered_index(
mtr_start(&mtr);
btr_pcur_restore_position(BTR_SEARCH_LEAF,
&pcur, &mtr);
if (!btr_pcur_move_to_next_user_rec(&pcur, &mtr)) {
break;
}
has_next = btr_pcur_move_to_next_user_rec(&pcur, &mtr);
}
rec = btr_pcur_get_rec(&pcur);
if (UNIV_LIKELY(has_next)) {
rec = btr_pcur_get_rec(&pcur);
/* We don't count the delete marked records as "Inserted" */
if (!rec_get_deleted_flag(rec, dict_table_is_comp(table))) {
/* Skip delete marked records. */
if (rec_get_deleted_flag(rec,
dict_table_is_comp(table))) {
continue;
}
srv_n_rows_inserted++;
/* Build row based on clustered index */
row = row_build(ROW_COPY_POINTERS, clust_index,
rec, NULL, &ext, row_heap);
/* Build all entries for all the indexes to be created
in a single scan of the clustered index. */
}
/* Build row based on clustered index */
mem_heap_empty(row_heap);
for (i = 0; i < n_index; i++) {
row_merge_buf_t* buf = merge_buf[i];
merge_file_t* file = &files[i];
row = row_build(ROW_COPY_POINTERS,
clust_index, rec, NULL, &ext, row_heap);
if (UNIV_LIKELY
(has_next && row_merge_buf_add(buf, row, ext))) {
continue;
}
ut_ad(buf->n_tuples || !has_next);
/* If the user has requested the creation of several indexes
for the same table. We build all index entries in a single
pass over the clustered index. */
/* We have enough data tuples to form a block.
Sort them and write to disk. */
for (idx_num = 0; idx_num < num_of_idx; idx_num++) {
if (buf->n_tuples
&& row_merge_buf_sort(buf)
&& dict_index_is_unique(buf->index)) {
err = DB_DUPLICATE_KEY;
goto func_exit;
}
dtuple_t* index_tuple;
row_merge_buf_write(buf, block);
index_tuple = row_build_index_entry(
row, ext,
index[idx_num], merge_list[idx_num]->heap);
if (!row_merge_write(file->fd, file->offset++,
block)) {
trx->error_key_num = i;
err = DB_OUT_OF_FILE_SPACE;
goto func_exit;
}
#ifdef UNIV_DEBUG_INDEX_CREATE
row_merge_dtuple_print(stderr, index_tuple);
#endif
row_merge_buf_empty(buf);
}
new_mrec = row_merge_rec_create(
index_tuple,
ext ? ext->ext : NULL, ext ? ext->n_ext : 0,
index[idx_num], merge_list[idx_num]->heap);
mem_heap_empty(row_heap);
sec_offs = rec_get_offsets(
new_mrec->rec, index[idx_num], sec_offs,
ULINT_UNDEFINED, &heap);
if (UNIV_UNLIKELY(!has_next)) {
goto func_exit;
}
}
/* Add data tuple to linked list of data tuples */
func_exit:
btr_pcur_close(&pcur);
mtr_commit(&mtr);
mem_heap_free(row_heap);
row_merge_list_add(
new_mrec, rec_offs_size(sec_offs),
merge_list[idx_num]);
for (i = 0; i < n_index; i++) {
row_merge_buf_free(merge_buf[i]);
}
/* If we have enough data tuples to form a block
sort linked list and store it to the block and
write this block to the disk. Note that not all
data tuples in the list fit to the block.*/
mem_free(merge_buf);
if (merge_list[idx_num]->total_size >=
MERGE_BLOCK_SIZE) {
trx->op_info = "";
if (!row_merge_sort_and_store(
index[idx_num],
&files[idx_num],
block,
&(merge_list[idx_num]))) {
return(err);
}
trx->error_key_num = idx_num;
err = DB_DUPLICATE_KEY;
goto error_handling;
}
/*****************************************************************
Merge two blocks of linked lists on disk and write a bigger block. */
static
ulint
row_merge_blocks(
/*=============*/
/* out: DB_SUCCESS or error code */
dict_index_t* index, /* in: index being created */
merge_file_t* file, /* in/out: file containing
index entries */
row_merge_block_t* block1, /* in/out: input buffer */
row_merge_block_t* block2, /* in/out: input buffer */
row_merge_block_t* block3, /* in/out: output buffer */
ulint* foffs1, /* in/out: offset of first
source list in the file */
ulint* foffs2, /* in/out: offset of second
source list in the file */
merge_file_t* of) /* in/out: output file */
{
mem_heap_t* heap; /* memory heap for offsets1, offsets2 */
mrec_buf_t buf1; /* buffer for handling split mrec1 in block1 */
mrec_buf_t buf2; /* buffer for handling split mrec2 in block2 */
mrec_buf_t buf3; /* buffer for handling split mrec in block3 */
const byte* b1; /* pointer to block1 */
const byte* b2; /* pointer to block2 */
byte* b3; /* pointer to block3 */
const mrec_t* mrec1; /* merge record, points to block1 or buf1 */
const mrec_t* mrec2; /* merge record, points to block2 or buf2 */
ulint* offsets1;/* offsets of mrec1 */
ulint* offsets2;/* offsets of mrec2 */
heap = row_merge_heap_create(index, &offsets1, &offsets2);
/* Write a record and read the next record. Split the output
file in two halves, which can be merged on the following pass. */
#define ROW_MERGE_WRITE_GET_NEXT(N, AT_END) \
do { \
b3 = row_merge_write_rec(block3, &buf3, b3, \
of->fd, &of->offset, \
mrec##N, offsets##N); \
if (UNIV_UNLIKELY(!b3)) { \
goto corrupt; \
} \
b##N = row_merge_read_rec(block##N, &buf##N, \
b##N, index, \
file->fd, foffs##N, \
&mrec##N, offsets##N); \
if (UNIV_UNLIKELY(!b##N)) { \
if (mrec##N) { \
goto corrupt; \
} \
AT_END; \
} \
} while (0)
if (!row_merge_read(file->fd, *foffs1, block1)
|| !row_merge_read(file->fd, *foffs2, block2)) {
corrupt:
mem_heap_free(heap);
return(DB_CORRUPTION);
}
b1 = *block1;
b2 = *block2;
b3 = *block3;
b1 = row_merge_read_rec(block1, &buf1, b1, index, file->fd,
foffs1, &mrec1, offsets1);
b2 = row_merge_read_rec(block2, &buf2, b2, index, file->fd,
foffs2, &mrec2, offsets2);
if (UNIV_UNLIKELY(!b1 && mrec1)
|| UNIV_UNLIKELY(!b2 && mrec2)) {
n_blocks++;
files[idx_num].num_of_blocks++;
goto corrupt;
}
while (mrec1 && mrec2) {
switch (row_merge_cmp(mrec1, mrec2,
offsets1, offsets2, index)) {
case 0:
if (UNIV_UNLIKELY
(dict_index_is_unique(index))) {
mem_heap_free(heap);
return(DB_DUPLICATE_KEY);
}
/* fall through */
case -1:
ROW_MERGE_WRITE_GET_NEXT(1, goto merged);
break;
case 1:
ROW_MERGE_WRITE_GET_NEXT(2, goto merged);
break;
default:
ut_error;
}
}
/* Now we have to write all remaining items in the list to
blocks and write these blocks to the disk */
merged:
if (mrec1) {
/* append all mrec1 to output */
for (;;) {
ROW_MERGE_WRITE_GET_NEXT(1, break);
}
}
for (idx_num = 0; idx_num < num_of_idx; idx_num++) {
if (mrec2) {
/* append all mrec2 to output */
for (;;) {
ROW_MERGE_WRITE_GET_NEXT(2, break);
}
}
/* While we have items in the list write them
to the block */
ut_ad(!merge_list[idx_num]->head
== !merge_list[idx_num]->tail);
ut_ad(!merge_list[idx_num]->n_records
== !merge_list[idx_num]->head);
mem_heap_free(heap);
b3 = row_merge_write_eof(block3, b3, of->fd, &of->offset);
return(b3 ? DB_SUCCESS : DB_CORRUPTION);
}
if (merge_list[idx_num]->head) {
/*****************************************************************
Merge disk files. */
static
ulint
row_merge(
/*======*/
/* out: DB_SUCCESS
or error code */
dict_index_t* index, /* in: index being created */
merge_file_t* file, /* in/out: file containing
index entries */
row_merge_block_t* block1, /* in/out: input buffer */
row_merge_block_t* block2, /* in/out: input buffer */
row_merge_block_t* block3, /* in/out: output buffer */
int* tmpfd) /* in/out: temporary file
handle */
{
ulint foffs1; /* first input offset */
ulint foffs2; /* second input offset */
ulint half; /* upper limit of foffs1 */
ulint error; /* error code */
merge_file_t of; /* output file */
/* Next block will be written directly
behind this one. This will create a
'linked list' of blocks to the disk. */
of.fd = *tmpfd;
of.offset = 0;
block->header.offset = files[idx_num].offset;
block->header.next = files[idx_num].offset + 1;
/* Split the input file in two halves. */
half = file->offset / 2;
if (!row_merge_sort_and_store(
index[idx_num],
&files[idx_num],
block,
&(merge_list[idx_num]))) {
/* Merge blocks to the output file. */
foffs1 = 0;
foffs2 = half;
trx->error_key_num = idx_num;
err = DB_DUPLICATE_KEY;
goto error_handling;
}
for (; foffs1 < half; foffs1++, foffs2++) {
error = row_merge_blocks(index, file, block1, block2, block3,
&foffs1, &foffs2, &of);
files[idx_num].num_of_blocks++;
n_blocks++;
if (error != DB_SUCCESS) {
return(error);
}
}
/* Write the last block. */
block->header.next = 0; /* end-of-list marker */
if (!row_merge_block_header_write(
files[idx_num].file, &block->header)) {
err = DB_CORRUPTION;
goto error_handling;
/* Copy the last block, if there is one. */
while (foffs2 < file->offset) {
if (!row_merge_read(file->fd, foffs2++, block2)
|| !row_merge_write(of.fd, of.offset++, block2)) {
return(DB_CORRUPTION);
}
}
#ifdef UNIV_DEBUG_INDEX_CREATE
fprintf(stderr, "Stored %lu blocks\n", n_blocks);
#endif
/* Swap file descriptors for the next pass. */
*tmpfd = file->fd;
*file = of;
error_handling:
return(DB_SUCCESS);
}
/* Cleanup resources */
/*****************************************************************
Merge disk files. */
static
ulint
row_merge_sort(
/*===========*/
/* out: DB_SUCCESS
or error code */
dict_index_t* index, /* in: index being created */
merge_file_t* file, /* in/out: file containing
index entries */
row_merge_block_t* block1, /* in/out: input buffer */
row_merge_block_t* block2, /* in/out: input buffer */
row_merge_block_t* block3, /* in/out: output buffer */
int* tmpfd) /* in/out: temporary file
handle */
{
ulint blksz; /* block size */
btr_pcur_close(&pcur);
mtr_commit(&mtr);
mem_heap_free(row_heap);
mem_free(block);
blksz = 1;
for (idx_num = 0; idx_num < num_of_idx; idx_num++) {
mem_heap_free(merge_list[idx_num]->heap);
}
for (;; blksz *= 2) {
ulint error = row_merge(index, file,
block1, block2, block3, tmpfd);
if (error != DB_SUCCESS) {
return(error);
}
mem_heap_free(heap);
if (blksz >= file->offset) {
/* everything is in a single block */
break;
}
trx->op_info="";
/* Round up the file size to a multiple of blksz. */
file->offset = ut_2pow_round(file->offset - 1, blksz) + blksz;
}
return(err);
return(DB_SUCCESS);
}
/************************************************************************
Read sorted file containing index data tuples and insert these data
tuples to the index */
static
ulint
row_merge_insert_index_tuples(
/*==========================*/
/* out: 0 or error number */
trx_t* trx, /* in: transaction */
dict_index_t* index, /* in: index */
dict_table_t* table, /* in: table */
os_file_t file, /* in: file handle */
ulint offset) /* in: offset where to start
reading */
/* out: DB_SUCCESS or error number */
trx_t* trx, /* in: transaction */
dict_index_t* index, /* in: index */
dict_table_t* table, /* in: table */
int fd, /* in: file descriptor */
row_merge_block_t* block) /* in/out: file buffer */
{
merge_block_t* block;
que_thr_t* thr;
ins_node_t* node;
mem_heap_t* heap;
mem_heap_t* graph_heap;
ulint error = DB_SUCCESS;
mrec_buf_t buf;
const byte* b;
que_thr_t* thr;
ins_node_t* node;
mem_heap_t* tuple_heap;
mem_heap_t* graph_heap;
ulint error = DB_SUCCESS;
ulint foffs = 0;
ulint* offsets;
ut_ad(trx && index && table);
ut_ad(trx);
ut_ad(index);
ut_ad(table);
/* We use the insert query graph as the dummy graph
needed in the row module call */
......@@ -1667,70 +1205,67 @@ row_merge_insert_index_tuples(
que_thr_move_to_run_state_for_mysql(thr, trx);
block = row_merge_block_create();
heap = mem_heap_create(1000);
do {
ulint n_rec;
ulint tuple_offset = 0;
if (!row_merge_block_read(file, offset, block)) {
error = DB_CORRUPTION;
break;
}
ut_ad(row_merge_block_validate(block, index));
for (n_rec = 0; n_rec < block->header.n_records; n_rec++) {
merge_rec_t* mrec = row_merge_read_rec_from_block(
block, &tuple_offset, heap, index);
tuple_heap = mem_heap_create(1000);
if (!rec_get_deleted_flag(mrec->rec, 0)) {
{
ulint i = REC_OFFS_HEADER_SIZE
+ dict_index_get_n_fields(index);
offsets = mem_heap_alloc(graph_heap, i * sizeof *offsets);
offsets[0] = i;
offsets[1] = dict_index_get_n_fields(index);
}
dtuple_t* dtuple = row_rec_to_index_entry(
ROW_COPY_POINTERS,
index, mrec->rec, heap);
b = *block;
node->row = dtuple;
node->table = table;
node->trx_id = trx->id;
if (!row_merge_read(fd, foffs, block)) {
error = DB_CORRUPTION;
} else {
for (;;) {
const mrec_t* mrec;
dtuple_t* dtuple;
b = row_merge_read_rec(block, &buf, b, index,
fd, &foffs, &mrec, offsets);
if (UNIV_UNLIKELY(!b)) {
/* End of list, or I/O error */
if (mrec) {
error = DB_CORRUPTION;
}
break;
}
ut_ad(dtuple_validate(dtuple));
dtuple = row_rec_to_index_entry_low(
mrec, index, offsets, tuple_heap);
#ifdef UNIV_DEBUG_INDEX_CREATE
row_merge_dtuple_print(stderr, dtuple);
#endif
node->row = dtuple;
node->table = table;
node->trx_id = trx->id;
do {
thr->run_node = thr;
thr->prev_node = thr->common.parent;
ut_ad(dtuple_validate(dtuple));
error = row_ins_index_entry(
index, dtuple, NULL, 0, thr);
do {
thr->run_node = thr;
thr->prev_node = thr->common.parent;
if (error == DB_SUCCESS) {
goto next_rec;
}
error = row_ins_index_entry(
index, dtuple, NULL, 0, thr);
thr->lock_state = QUE_THR_LOCK_ROW;
trx->error_state = error;
que_thr_stop_for_mysql(thr);
thr->lock_state = QUE_THR_LOCK_NOLOCK;
} while (row_mysql_handle_errors(&error, trx,
thr, NULL));
if (UNIV_LIKELY(error == DB_SUCCESS)) {
goto next_rec;
}
goto err_exit;
}
thr->lock_state = QUE_THR_LOCK_ROW;
trx->error_state = error;
que_thr_stop_for_mysql(thr);
thr->lock_state = QUE_THR_LOCK_NOLOCK;
} while (row_mysql_handle_errors(&error, trx,
thr, NULL));
goto err_exit;
next_rec:
mem_heap_empty(heap);
mem_heap_empty(tuple_heap);
}
offset = block->header.next;
/* If we have reached the end of the disk list we have
inserted all of the index entries to the index. */
} while (offset);
}
que_thr_stop_for_mysql_no_error(thr, trx);
err_exit:
......@@ -1738,8 +1273,7 @@ err_exit:
trx->op_info = "";
mem_free(block);
mem_heap_free(heap);
mem_heap_free(tuple_heap);
return(error);
}
......@@ -1827,17 +1361,29 @@ row_merge_drop_indexes(
}
/*************************************************************************
Allocate and initialize memory for a merge file structure */
Create a merge file. */
static
void
row_merge_file_create(
/*==================*/
merge_file_t* merge_file) /* out: merge file structure */
{
merge_file->file = innobase_mysql_tmpfile();
merge_file->fd = innobase_mysql_tmpfile();
merge_file->offset = 0;
merge_file->num_of_blocks = 0;
}
/*************************************************************************
Destroy a merge file. */
static
void
row_merge_file_destroy(
/*===================*/
merge_file_t* merge_file) /* out: merge file structure */
{
if (merge_file->fd != -1) {
close(merge_file->fd);
merge_file->fd = -1;
}
}
/*************************************************************************
......@@ -1858,10 +1404,7 @@ row_merge_create_temporary_table(
ulint error;
ut_ad(table_name && table && error);
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&dict_sys->mutex));
#endif /* UNIV_SYNC_DEBUG */
error = row_undo_report_create_table_dict_operation(trx, table_name);
......@@ -1901,11 +1444,12 @@ row_merge_create_temporary_table(
}
/*************************************************************************
Rename the indexes in the dicitionary. */
Rename the indexes in the dictionary. */
ulint
row_merge_rename_index(
/*===================*/
/* out: DB_SUCCESS if all OK */
trx_t* trx, /* in: Transaction */
dict_table_t* table, /* in: Table for index */
dict_index_t* index) /* in: Index to rename */
......@@ -1976,7 +1520,7 @@ row_merge_create_index(
/* Create the index prototype, using the passed in def, this is not
a persistent operation. We pass 0 as the space id, and determine at
a lower level the space id where to store the table.*/
a lower level the space id where to store the table. */
index = dict_mem_index_create(table->name, index_def->name,
0, index_def->ind_type, n_fields);
......@@ -2045,7 +1589,7 @@ row_merge_create_index(
}
/*************************************************************************
Check if a transaction can use an index.*/
Check if a transaction can use an index. */
ibool
row_merge_is_index_usable(
......@@ -2061,13 +1605,12 @@ row_merge_is_index_usable(
}
/*************************************************************************
Drop the old table.*/
Drop the old table. */
ulint
row_merge_drop_table(
/*=================*/
/* out: DB_SUCCESS if all OK else
error code.*/
/* out: DB_SUCCESS or error code */
trx_t* trx, /* in: transaction */
dict_table_t* table) /* in: table to drop */
{
......@@ -2084,7 +1627,7 @@ row_merge_drop_table(
/* Drop the table immediately iff it is not references by MySQL */
if (table->n_mysql_handles_opened == 0) {
/* Set the commit flag to FALSE.*/
/* Set the commit flag to FALSE. */
err = row_drop_table_for_mysql(table->name, trx, FALSE);
}
......@@ -2094,3 +1637,103 @@ row_merge_drop_table(
return(err);
}
/*************************************************************************
Build indexes on a table by reading a clustered index,
creating a temporary file containing index entries, merge sorting
these index entries and inserting sorted index entries to indexes. */
ulint
row_merge_build_indexes(
/*====================*/
/* out: DB_SUCCESS or error code */
trx_t* trx, /* in: transaction */
dict_table_t* old_table, /* in: Table where rows are
read from */
dict_table_t* new_table, /* in: Table where indexes are
created. Note that old_table ==
new_table if we are creating a
secondary keys. */
dict_index_t** indexes, /* in: indexes to be created */
ulint n_indexes) /* in: size of indexes[] */
{
merge_file_t* merge_files;
row_merge_block_t* block1;
row_merge_block_t* block2;
row_merge_block_t* block3;
ulint i;
ulint error;
int tmpfd;
ut_ad(trx);
ut_ad(old_table);
ut_ad(new_table);
ut_ad(indexes);
ut_ad(n_indexes);
trx_start_if_not_started(trx);
/* Allocate memory for merge file data structure and initialize
fields */
merge_files = mem_alloc(n_indexes * sizeof *merge_files);
block1 = mem_alloc(sizeof *block1);
block2 = mem_alloc(sizeof *block2);
block3 = mem_alloc(sizeof *block3);
for (i = 0; i < n_indexes; i++) {
row_merge_file_create(&merge_files[i]);
}
tmpfd = innobase_mysql_tmpfile();
/* Read clustered index of the table and create files for
secondary index entries for merge sort */
error = row_merge_read_clustered_index(
trx, old_table, indexes, merge_files, n_indexes, block1);
if (error != DB_SUCCESS) {
goto func_exit;
}
trx_start_if_not_started(trx);
/* Now we have files containing index entries ready for
sorting and inserting. */
for (i = 0; i < n_indexes; i++) {
error = row_merge_sort(indexes[i], &merge_files[i],
block1, block2, block3, &tmpfd);
if (error == DB_SUCCESS) {
error = row_merge_insert_index_tuples(
trx, indexes[i], new_table,
merge_files[i].fd, block1);
}
/* Close the temporary file to free up space. */
row_merge_file_destroy(&merge_files[i]);
if (error != DB_SUCCESS) {
trx->error_key_num = i;
goto func_exit;
}
}
func_exit:
close(tmpfd);
for (i = 0; i < n_indexes; i++) {
row_merge_file_destroy(&merge_files[i]);
}
mem_free(merge_files);
mem_free(block1);
mem_free(block2);
mem_free(block3);
return(error);
}
......@@ -33,7 +33,6 @@ Created 9/17/2000 Heikki Tuuri
#include "btr0sea.h"
#include "fil0fil.h"
#include "ibuf0ibuf.h"
#include "row0merge.h"
/* A dummy variable used to fool the compiler */
ibool row_mysql_identically_false = FALSE;
......@@ -4492,93 +4491,6 @@ row_create_index_graph_for_mysql(
return(err);
}
/*************************************************************************
Build new indexes to a table by reading a clustered index,
creating a temporary file containing index entries, merge sorting
these index entries and inserting sorted index entries to indexes. */
ulint
row_build_index_for_mysql(
/*======================*/
/* out: 0 or error code */
trx_t* trx, /* in: transaction */
dict_table_t* old_table, /* in: Table where rows are
read from */
dict_table_t* new_table, /* in: Table where indexes are
created. Note that old_table ==
new_table if we are creating a
secondary keys. */
dict_index_t** index, /* in: Indexes to be created */
ulint num_of_keys) /* in: Number of indexes to be
created */
{
merge_file_t* merge_files;
ulint index_num;
ulint error;
ut_ad(trx && old_table && new_table && index && num_of_keys);
trx_start_if_not_started(trx);
/* Allocate memory for merge file data structure and initialize
fields */
merge_files = mem_alloc(num_of_keys * sizeof *merge_files);
for (index_num = 0; index_num < num_of_keys; index_num++) {
row_merge_file_create(&merge_files[index_num]);
}
/* Read clustered index of the table and create files for
secondary index entries for merge sort */
error = row_merge_read_clustered_index(
trx, old_table, index, merge_files, num_of_keys);
if (error != DB_SUCCESS) {
goto func_exit;
}
trx_start_if_not_started(trx);
/* Now we have files containing index entries ready for
sorting and inserting. */
for (index_num = 0; index_num < num_of_keys; index_num++) {
/* Do a merge sort and insert from those files
which we have written at least one block */
if (merge_files[index_num].num_of_blocks > 0) {
/* Merge sort file using linked list merge
sort for files. */
row_merge_sort_linked_list_in_disk(
index[index_num],
merge_files[index_num].file,
(int *)&error);
if (error == DB_SUCCESS) {
error = row_merge_insert_index_tuples(
trx, index[index_num], new_table,
merge_files[index_num].file, 0);
}
if (error != DB_SUCCESS) {
trx->error_key_num = index_num;
goto func_exit;
}
}
}
func_exit:
mem_free(merge_files);
return(error);
}
#endif /* !UNIV_HOTBACKUP */
/*************************************************************************
......
......@@ -141,7 +141,7 @@ row_build_index_entry(
}
/***********************************************************************
An inverse function to dict_row_build_index_entry. Builds a row from a
An inverse function to row_build_index_entry. Builds a row from a
record in a clustered index. */
dtuple_t*
......@@ -256,6 +256,53 @@ row_build(
return(row);
}
/***********************************************************************
Converts an index record to a typed data tuple. */
dtuple_t*
row_rec_to_index_entry_low(
/*=======================*/
/* out, index entry built; does not
set info_bits, and the data fields in
the entry will point directly to rec */
const rec_t* rec, /* in: record in the index */
dict_index_t* index, /* in: index */
const ulint* offsets,/* in: rec_get_offsets(rec, index) */
mem_heap_t* heap) /* in: memory heap from which the memory
needed is allocated */
{
dtuple_t* entry;
dfield_t* dfield;
ulint i;
const byte* field;
ulint len;
ulint rec_len;
ut_ad(rec && heap && index);
rec_len = rec_offs_n_fields(offsets);
entry = dtuple_create(heap, rec_len);
dtuple_set_n_fields_cmp(entry,
dict_index_get_n_unique_in_tree(index));
ut_ad(rec_len == dict_index_get_n_fields(index));
dict_index_copy_types(entry, index, rec_len);
for (i = 0; i < rec_len; i++) {
dfield = dtuple_get_nth_field(entry, i);
field = rec_get_nth_field(rec, offsets, i, &len);
dfield_set_data(dfield, field, len);
}
ut_ad(dtuple_check_typed(entry));
return(entry);
}
/***********************************************************************
Converts an index record to a typed data tuple. NOTE that externally
stored (often big) fields are NOT copied to heap. */
......@@ -281,11 +328,6 @@ row_rec_to_index_entry(
needed is allocated */
{
dtuple_t* entry;
dfield_t* dfield;
ulint i;
const byte* field;
ulint len;
ulint rec_len;
byte* buf;
mem_heap_t* tmp_heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
......@@ -305,29 +347,12 @@ row_rec_to_index_entry(
rec_offs_make_valid(rec, index, offsets);
}
rec_len = rec_offs_n_fields(offsets);
entry = dtuple_create(heap, rec_len);
dtuple_set_n_fields_cmp(entry,
dict_index_get_n_unique_in_tree(index));
ut_ad(rec_len == dict_index_get_n_fields(index));
dict_index_copy_types(entry, index, rec_len);
entry = row_rec_to_index_entry_low(rec, index, offsets, heap);
dtuple_set_info_bits(entry,
rec_get_info_bits(rec, rec_offs_comp(offsets)));
for (i = 0; i < rec_len; i++) {
dfield = dtuple_get_nth_field(entry, i);
field = rec_get_nth_field(rec, offsets, i, &len);
dfield_set_data(dfield, field, len);
}
ut_ad(dtuple_check_typed(entry));
if (tmp_heap) {
if (UNIV_LIKELY_NULL(tmp_heap)) {
mem_heap_free(tmp_heap);
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment