Commit 39dc9f71 authored by marko's avatar marko

branches/zip: Clean up fast index creation.

dict_index_is_unique(): New function.

row_merge_rec_fits_to_block(), row_merge_store_rec_to_block():
Add const qualifiers to rec, offsets.

row_merge_select(): Rename to row_merge_cmp(), simplify the interface.

row_merge_sort_linked_list(): Change the return type to ibool.
Replace sec_offs1,sec_offs2 with offsets1,offsets2.  Replace offset_heap
with heap.  Define a func_exit label.

row_merge_block_merge(): Reduce the scope of rec_offsets.
parent 573a7a47
...@@ -562,6 +562,16 @@ dict_index_is_clust( ...@@ -562,6 +562,16 @@ dict_index_is_clust(
zero for other indexes */ zero for other indexes */
const dict_index_t* index) /* in: index */ const dict_index_t* index) /* in: index */
__attribute__((pure)); __attribute__((pure));
/************************************************************************
Check whether the index is unique. */
UNIV_INLINE
ulint
dict_index_is_unique(
/*=================*/
/* out: nonzero for unique index,
zero for other indexes */
const dict_index_t* index) /* in: index */
__attribute__((pure));
/************************************************************************ /************************************************************************
Gets the number of user-defined columns in a table in the dictionary Gets the number of user-defined columns in a table in the dictionary
......
...@@ -160,6 +160,21 @@ dict_index_is_clust( ...@@ -160,6 +160,21 @@ dict_index_is_clust(
return(UNIV_UNLIKELY(index->type & DICT_CLUSTERED)); return(UNIV_UNLIKELY(index->type & DICT_CLUSTERED));
} }
/************************************************************************
Check whether the index is unique. */
UNIV_INLINE
ulint
dict_index_is_unique(
/*=================*/
/* out: nonzero for unique index,
zero for other indexes */
const dict_index_t* index) /* in: index */
{
ut_ad(index);
ut_ad(index->magic_n == DICT_INDEX_MAGIC_N);
return(UNIV_UNLIKELY(index->type & DICT_UNIQUE));
}
/************************************************************************ /************************************************************************
Gets the number of user-defined columns in a table in the dictionary Gets the number of user-defined columns in a table in the dictionary
......
...@@ -313,7 +313,7 @@ row_merge_rec_fits_to_block( ...@@ -313,7 +313,7 @@ row_merge_rec_fits_to_block(
/*========================*/ /*========================*/
/* out: TRUE if record fits to merge block, /* out: TRUE if record fits to merge block,
FALSE if record does not fit to block */ FALSE if record does not fit to block */
ulint* offsets,/* in: record offsets */ const ulint* offsets,/* in: record offsets */
ulint offset) /* in: offset where to store in the block */ ulint offset) /* in: offset where to store in the block */
{ {
ulint rec_len; ulint rec_len;
...@@ -342,8 +342,8 @@ ulint ...@@ -342,8 +342,8 @@ ulint
row_merge_store_rec_to_block( row_merge_store_rec_to_block(
/*=========================*/ /*=========================*/
/* out: offset for next data tuple */ /* out: offset for next data tuple */
rec_t* rec, /* in: record to be stored in the memory */ const rec_t* rec, /* in: record to be stored in the memory */
ulint* offsets,/* in: record offsets */ const ulint* offsets,/* in: record offsets */
merge_block_t* mblock, /* in: block where data tuple is stored */ merge_block_t* mblock, /* in: block where data tuple is stored */
ulint offset) /* in: offset where to store */ ulint offset) /* in: offset where to store */
{ {
...@@ -464,46 +464,25 @@ Compare a merge record to another merge record. Returns ...@@ -464,46 +464,25 @@ Compare a merge record to another merge record. Returns
3) first record if records are identical and index type is not UNIQUE 3) first record if records are identical and index type is not UNIQUE
4) second record if the first record is largen than second record*/ 4) second record if the first record is largen than second record*/
static static
merge_rec_t* int
row_merge_select( row_merge_cmp(
/*=============*/ /*==========*/
/* out: record or NULL */ /* out: 1, 0, -1 if mrec1 is
greater, equal, less,
respectively, than mrec2 */
merge_rec_t* mrec1, /* in: first merge record to be merge_rec_t* mrec1, /* in: first merge record to be
compared */ compared */
merge_rec_t* mrec2, /* in: second merge record to be merge_rec_t* mrec2, /* in: second merge record to be
compared */ compared */
ulint* offsets1, /* in: first record offsets */ const ulint* offsets1, /* in: first record offsets */
ulint* offsets2, /* in: second record offsets */ const ulint* offsets2, /* in: second record offsets */
dict_index_t* index, /* in: index */ dict_index_t* index) /* in: index */
int* selected) /* in/out: selected record */
{ {
int cmp_res = 0;
ut_ad(mrec1 && mrec2 && offsets1 && offsets2 && index && selected); ut_ad(mrec1 && mrec2 && offsets1 && offsets2 && index && selected);
ut_ad(rec_validate(mrec1->rec, offsets1)); ut_ad(rec_validate(mrec1->rec, offsets1));
ut_ad(rec_validate(mrec2->rec, offsets2)); ut_ad(rec_validate(mrec2->rec, offsets2));
cmp_res = cmp_rec_rec(mrec1->rec, mrec2->rec, offsets1, return(cmp_rec_rec(mrec1->rec, mrec2->rec, offsets1, offsets2, index));
offsets2, index);
if (cmp_res <= 0) {
if (cmp_res == 0 && (index->type & DICT_UNIQUE)) {
/* Attribute contains two identical keys and
index should be unique. Thus, duplicate
key error should be generated. Now return NULL */
return(NULL);
}
*selected=1;
return(mrec1);
} else {
*selected=2;
return(mrec2);
}
} }
/***************************************************************** /*****************************************************************
...@@ -559,16 +538,16 @@ As soon as a pass like this is performed with only one merge, the ...@@ -559,16 +538,16 @@ As soon as a pass like this is performed with only one merge, the
algorithm terminates and output list list_head is sorted. Otherwise, algorithm terminates and output list list_head is sorted. Otherwise,
double the value of block_size and go back to the beginning. */ double the value of block_size and go back to the beginning. */
static static
ulint ibool
row_merge_sort_linked_list( row_merge_sort_linked_list(
/*=======================*/ /*=======================*/
/* out: 1 or 0 in case of error */ /* out: FALSE on error */
dict_index_t* index, /* in: index to be created */ dict_index_t* index, /* in: index to be created */
merge_rec_list_t* list) /* in: Pointer to head element */ merge_rec_list_t* list) /* in: Pointer to head element */
{ {
ibool success;
merge_rec_t* list1; merge_rec_t* list1;
merge_rec_t* list2; merge_rec_t* list2;
merge_rec_t* tmp;
merge_rec_t* list_head; merge_rec_t* list_head;
merge_rec_t* list_tail; merge_rec_t* list_tail;
ulint block_size; ulint block_size;
...@@ -576,16 +555,16 @@ row_merge_sort_linked_list( ...@@ -576,16 +555,16 @@ row_merge_sort_linked_list(
ulint list1_size; ulint list1_size;
ulint list2_size; ulint list2_size;
ulint i; ulint i;
mem_heap_t* offset_heap = NULL; mem_heap_t* heap = NULL;
ulint sec_offsets1_[REC_OFFS_SMALL_SIZE]; ulint offsets1_[REC_OFFS_SMALL_SIZE];
ulint* sec_offs1 = sec_offsets1_; ulint* offsets1 = offsets1_;
ulint sec_offsets2_[REC_OFFS_SMALL_SIZE]; ulint offsets2_[REC_OFFS_SMALL_SIZE];
ulint* sec_offs2 = sec_offsets2_; ulint* offsets2 = offsets2_;
ut_ad(list && list->head && index); ut_ad(list && list->head && index);
*sec_offsets1_ = (sizeof sec_offsets1_) / sizeof *sec_offsets1_; *offsets1_ = (sizeof offsets1_) / sizeof *offsets1_;
*sec_offsets2_ = (sizeof sec_offsets2_) / sizeof *sec_offsets2_; *offsets2_ = (sizeof offsets2_) / sizeof *offsets2_;
block_size = 1; /* We start from block size 1 */ block_size = 1; /* We start from block size 1 */
...@@ -622,6 +601,7 @@ row_merge_sort_linked_list( ...@@ -622,6 +601,7 @@ row_merge_sort_linked_list(
Otherwice, we have a sorted list. */ Otherwice, we have a sorted list. */
while (list1_size > 0 || (list2_size > 0 && list2)) { while (list1_size > 0 || (list2_size > 0 && list2)) {
merge_rec_t* tmp;
/* Merge sort two lists by deciding whether /* Merge sort two lists by deciding whether
next element of merge comes from list1 or next element of merge comes from list1 or
list2. */ list2. */
...@@ -629,60 +609,49 @@ row_merge_sort_linked_list( ...@@ -629,60 +609,49 @@ row_merge_sort_linked_list(
if (list1_size == 0) { if (list1_size == 0) {
/* First list is empty, next element /* First list is empty, next element
must come from the second list. */ must come from the second list. */
goto pick2;
}
tmp = list2; if (list2_size == 0 || !list2) {
list2 = list2->next;
list2_size--;
} else if (list2_size == 0 || !list2) {
/* Second list is empty, next element /* Second list is empty, next element
must come from the first list. */ must come from the first list. */
goto pick1;
}
offsets1 = rec_get_offsets(list1->rec, index,
offsets1,
ULINT_UNDEFINED,
&heap);
offsets2 = rec_get_offsets(list2->rec, index,
offsets2,
ULINT_UNDEFINED,
&heap);
switch (row_merge_cmp(list1, list2,
offsets1, offsets2,
index)) {
case 0:
if (UNIV_UNLIKELY
(dict_index_is_unique(index))) {
success = FALSE;
goto func_exit;
}
/* fall through */
case -1:
pick1:
tmp = list1; tmp = list1;
list1 = list1->next; list1 = list1->next;
list1_size--; list1_size--;
} else { break;
int selected = 1; case 1:
pick2:
sec_offs1 = rec_get_offsets(list1->rec, tmp = list2;
index, list2 = list2->next;
sec_offs1, list2_size--;
ULINT_UNDEFINED, break;
&offset_heap);
sec_offs2 = rec_get_offsets(list2->rec,
index,
sec_offs2,
ULINT_UNDEFINED,
&offset_heap);
tmp = row_merge_select(list1,
list2,
sec_offs1,
sec_offs2,
index,
&selected);
if (UNIV_UNLIKELY(tmp == NULL)) {
if (offset_heap) {
mem_heap_free(
offset_heap);
}
return(0);
}
if (selected == 1) {
list1 = list1->next;
list1_size--;
} else {
list2 = list2->next;
list2_size--;
}
} }
/* Add selected element to the merged list */ /* Append the element to the merged list */
if (list_tail) { if (list_tail) {
list_tail->next = tmp; list_tail->next = tmp;
...@@ -705,17 +674,20 @@ row_merge_sort_linked_list( ...@@ -705,17 +674,20 @@ row_merge_sort_linked_list(
if (num_of_merges <= 1) { if (num_of_merges <= 1) {
list->head = list_head; list->head = list_head;
success = TRUE;
if (offset_heap) { goto func_exit;
mem_heap_free(offset_heap);
}
return(1);
} else { } else {
/* Otherwise merge lists twice the size */ /* Otherwise merge lists twice the size */
block_size *= 2; block_size *= 2;
} }
} }
func_exit:
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
return(success);
} }
/***************************************************************** /*****************************************************************
...@@ -932,7 +904,6 @@ row_merge_block_merge( ...@@ -932,7 +904,6 @@ row_merge_block_merge(
ulint* sec_offs1 = sec_offsets1_; ulint* sec_offs1 = sec_offsets1_;
ulint sec_offsets2_[REC_OFFS_SMALL_SIZE]; ulint sec_offsets2_[REC_OFFS_SMALL_SIZE];
ulint* sec_offs2 = sec_offsets2_; ulint* sec_offs2 = sec_offsets2_;
ulint* rec_offsets;
ut_ad(block1 && block2 && *block2 && index); ut_ad(block1 && block2 && *block2 && index);
ut_ad(row_merge_block_validate(block1, index)); ut_ad(row_merge_block_validate(block1, index));
...@@ -955,6 +926,7 @@ row_merge_block_merge( ...@@ -955,6 +926,7 @@ row_merge_block_merge(
while (nth_rec1 < block1->header.n_records || while (nth_rec1 < block1->header.n_records ||
nth_rec2 < tmp->header.n_records) { nth_rec2 < tmp->header.n_records) {
const ulint* rec_offsets;
mrec1 = mrec2 = NULL; mrec1 = mrec2 = NULL;
selected = 0; selected = 0;
...@@ -1022,29 +994,25 @@ row_merge_block_merge( ...@@ -1022,29 +994,25 @@ row_merge_block_merge(
ut_ad(rec_validate(mrec2->rec, sec_offs2)); ut_ad(rec_validate(mrec2->rec, sec_offs2));
mrec1 = row_merge_select( switch (row_merge_cmp(mrec1, mrec2,
mrec1, mrec2, sec_offs1, sec_offs2, index, sec_offs1, sec_offs2, index)) {
&selected); case 0:
if (UNIV_UNLIKELY
/* If selected record is null we have duplicate key (dict_index_is_unique(index))) {
on unique index */ goto error_handling;
}
if (mrec1 == NULL) { /* fall through */
goto error_handling; case 1:
}
/* Addvance records on the block where record was
selected and set offset back on this record
on the block where record was not selected. */
if (selected == 1) {
rec_offsets = sec_offs1; rec_offsets = sec_offs1;
nth_rec1++; nth_rec1++;
offset2 = tmp_offset2; offset2 = tmp_offset2;
} else { break;
case 2:
mrec1 = mrec2;
rec_offsets = sec_offs2; rec_offsets = sec_offs2;
nth_rec2++; nth_rec2++;
offset1 = tmp_offset1; offset1 = tmp_offset1;
break;
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment