Commit f4a69e01 authored by marko's avatar marko

branches/zip: Fix some bugs

btr_page_split_and_insert(): Avoid dereferencing pointers to garbage on
the old page.

btr_cur_pessimistic_insert(): Pass pointer to big_rec_vec to
btr_cur_optimistic_insert().

trx_undo_prev_version_build(): Only invoke rec_set_field_extern_bits()
if n_ext_vect > 0.

row_ins_index_entry_low(): Simplify a debug assertion.

page_copy_rec_list_end_no_locks(): Make the loop slightly more readable.

page_delete_rec_list_end(): Delete records on compressed pages one by one.
parent b63ff3be
......@@ -1668,6 +1668,7 @@ btr_page_split_and_insert(
byte* buf = 0; /* remove warning */
rec_t* move_limit;
ibool insert_will_fit;
ibool insert_left;
ulint n_iterations = 0;
rec_t* rec;
mem_heap_t* heap;
......@@ -1727,9 +1728,8 @@ btr_page_split_and_insert(
first record (move_limit) on original page which ends up on the
upper half */
if (split_rec != NULL) {
first_rec = split_rec;
move_limit = split_rec;
if (split_rec) {
first_rec = move_limit = split_rec;
} else {
buf = mem_alloc(rec_get_converted_size(cursor->index, tuple));
......@@ -1752,10 +1752,12 @@ btr_page_split_and_insert(
offsets = rec_get_offsets(split_rec, cursor->index, offsets,
n_uniq, &heap);
insert_left = cmp_dtuple_rec(tuple, split_rec, offsets) < 0;
insert_will_fit = btr_page_insert_fits(cursor,
split_rec, offsets, tuple, heap);
} else {
mem_free(buf);
insert_left = FALSE;
insert_will_fit = btr_page_insert_fits(cursor,
NULL, NULL, tuple, heap);
}
......@@ -1799,22 +1801,16 @@ btr_page_split_and_insert(
lock_update_split_right(right_page, left_page);
}
/* At this point, split_rec, move_limit and first_rec may point
to garbage on the old page. */
/* 6. The split and the tree modification is now completed. Decide the
page where the tuple should be inserted */
if (split_rec == NULL) {
insert_page = right_page;
if (insert_left) {
insert_page = left_page;
} else {
offsets = rec_get_offsets(first_rec, cursor->index,
offsets, n_uniq, &heap);
if (cmp_dtuple_rec(tuple, first_rec, offsets) >= 0) {
insert_page = right_page;
} else {
insert_page = left_page;
}
insert_page = right_page;
}
insert_page_zip = buf_block_get_page_zip(buf_block_align(insert_page));
......
......@@ -1230,8 +1230,8 @@ btr_cur_pessimistic_insert(
cursor->flag = BTR_CUR_BINARY;
err = btr_cur_optimistic_insert(flags, cursor, entry, rec, big_rec,
ext, n_ext, thr, mtr);
err = btr_cur_optimistic_insert(flags, cursor, entry, rec,
&big_rec_vec, ext, n_ext, thr, mtr);
if (err != DB_FAIL) {
return(err);
......
......@@ -504,7 +504,6 @@ page_copy_rec_list_end_no_locks(
{
page_cur_t cur1;
page_cur_t cur2;
rec_t* sup;
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
......@@ -528,13 +527,8 @@ page_copy_rec_list_end_no_locks(
/* Copy records from the original page to the new page */
sup = page_get_supremum_rec(ut_align_down(rec, UNIV_PAGE_SIZE));
for (;;) {
while (!page_cur_is_after_last(&cur1)) {
rec_t* cur1_rec = page_cur_get_rec(&cur1);
if (cur1_rec == sup) {
break;
}
offsets = rec_get_offsets(cur1_rec, index, offsets,
ULINT_UNDEFINED, &heap);
if (UNIV_UNLIKELY(!page_cur_rec_insert(&cur2, NULL,
......@@ -573,7 +567,9 @@ The records are copied to the start of the record list on new_page. */
ibool
page_copy_rec_list_end(
/*===================*/
/* out: TRUE on success */
/* out: TRUE on success; FALSE on
compression failure (new_page will
be decompressed from new_page_zip) */
page_t* new_page, /* in/out: index page to copy to */
page_zip_des_t* new_page_zip, /* in/out: compressed page, or NULL */
rec_t* rec, /* in: record on page */
......@@ -801,12 +797,16 @@ page_delete_rec_list_end(
page_zip_des_t* page_zip,/* in/out: compressed page, or NULL */
mtr_t* mtr) /* in: mtr */
{
page_dir_slot_t* slot;
ulint slot_index;
rec_t* last_rec;
rec_t* prev_rec;
ulint n_owned;
page_t* page;
page_dir_slot_t*slot;
ulint slot_index;
rec_t* last_rec;
rec_t* prev_rec;
ulint n_owned;
page_t* page;
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
*offsets_ = (sizeof offsets_) / sizeof *offsets_;
ut_ad(size == ULINT_UNDEFINED || size < UNIV_PAGE_SIZE);
ut_ad(!page_zip || page_rec_is_comp(rec));
......@@ -835,16 +835,42 @@ page_delete_rec_list_end(
? MLOG_COMP_LIST_END_DELETE
: MLOG_LIST_END_DELETE, mtr);
if (UNIV_LIKELY_NULL(page_zip)) {
ulint log_mode;
ut_a(page_is_comp(page));
/* Individual deletes are not logged */
log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
do {
page_cur_t cur;
page_cur_position(rec, &cur);
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &heap);
rec = rec_get_next_ptr(rec, TRUE);
page_cur_delete_rec(&cur, index, offsets,
page_zip, mtr);
} while (ut_align_offset(rec, UNIV_PAGE_SIZE)
!= PAGE_NEW_SUPREMUM);
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
/* Restore log mode */
mtr_set_log_mode(mtr, log_mode);
return;
}
prev_rec = page_rec_get_prev(rec);
last_rec = page_rec_get_prev(page_get_supremum_rec(page));
if ((size == ULINT_UNDEFINED) || (n_recs == ULINT_UNDEFINED)) {
rec_t* rec2 = rec;
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
*offsets_ = (sizeof offsets_) / sizeof *offsets_;
/* Calculate the sum of sizes and the number of records */
size = 0;
n_recs = 0;
......@@ -889,27 +915,6 @@ page_delete_rec_list_end(
n_owned = rec_get_n_owned_new(rec2) - count;
slot_index = page_dir_find_owner_slot(rec2);
slot = page_dir_get_nth_slot(page, slot_index);
if (UNIV_LIKELY_NULL(page_zip)) {
ulint n_slots;
rec2 = rec;
do {
/* The compression algorithm expects
info_bits and n_owned to be 0
for deleted records. */
rec2[-REC_N_NEW_EXTRA_BYTES] = 0;
rec2 = rec_get_next_ptr(rec2, TRUE);
} while (rec2);
/* The compression algorithm expects the removed
slots in the page directory to be cleared. */
n_slots = page_dir_get_n_slots(page) - slot_index - 1;
ut_ad(n_slots < UNIV_PAGE_SIZE / PAGE_DIR_SLOT_SIZE);
memset(slot - (n_slots * PAGE_DIR_SLOT_SIZE), 0,
n_slots * PAGE_DIR_SLOT_SIZE);
}
} else {
rec_t* rec2 = rec;
ulint count = 0;
......@@ -928,9 +933,9 @@ page_delete_rec_list_end(
}
page_dir_slot_set_rec(slot, page_get_supremum_rec(page));
page_dir_slot_set_n_owned(slot, page_zip, n_owned);
page_dir_slot_set_n_owned(slot, NULL, n_owned);
page_dir_set_n_slots(page, page_zip, slot_index + 1);
page_dir_set_n_slots(page, NULL, slot_index + 1);
/* Remove the record chain segment from the record chain */
page_rec_set_next(prev_rec, page_get_supremum_rec(page));
......@@ -938,16 +943,13 @@ page_delete_rec_list_end(
/* Catenate the deleted chain segment to the page free list */
page_rec_set_next(last_rec, page_header_get_ptr(page, PAGE_FREE));
page_header_set_ptr(page, page_zip, PAGE_FREE, rec);
page_header_set_ptr(page, NULL, PAGE_FREE, rec);
page_header_set_field(page, page_zip, PAGE_GARBAGE,
page_header_set_field(page, NULL, PAGE_GARBAGE,
size + page_header_get_field(page, PAGE_GARBAGE));
page_header_set_field(page, page_zip, PAGE_N_RECS,
page_header_set_field(page, NULL, PAGE_N_RECS,
(ulint)(page_get_n_recs(page) - n_recs));
if (UNIV_LIKELY_NULL(page_zip)) {
page_zip_dir_rewrite(page_zip, page);
}
}
/*****************************************************************
......
......@@ -2021,10 +2021,9 @@ row_ins_index_entry_low(
rec_t* first_rec = page_rec_get_next(
page_get_infimum_rec(page));
if (UNIV_LIKELY(first_rec != page_get_supremum_rec(page))) {
ut_a(rec_get_n_fields(first_rec, index)
ut_ad(page_rec_is_supremum(first_rec)
|| rec_get_n_fields(first_rec, index)
== dtuple_get_n_fields(entry));
}
}
#endif
......
......@@ -1406,8 +1406,10 @@ trx_undo_prev_version_build(
*old_vers = rec_convert_dtuple_to_rec(buf, index, entry);
/* Now set the extern bits in the old version of the record */
rec_set_field_extern_bits(*old_vers, index, NULL,
if (n_ext_vect) {
rec_set_field_extern_bits(*old_vers, index, NULL,
ext_vect, n_ext_vect);
}
mem_free(ext_vect);
} else {
buf = mem_heap_alloc(heap, rec_offs_size(offsets));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment