Commit 83539afa authored by marko's avatar marko

branches/zip: Restore left b-tree splits.

page_move_rec_list_start(): Restored.  Reorganize old page if compressed.
btr_page_split_and_insert(): Do left page splits.
parent 3bf12fb4
......@@ -1618,13 +1618,16 @@ btr_page_split_and_insert(
mtr_t* mtr) /* in: mtr */
{
dict_tree_t* tree;
page_t* page;
ulint page_no;
byte direction;
ulint hint_page_no;
page_t* new_page;
rec_t* split_rec;
page_t* left_page;
page_t* right_page;
page_t* insert_page;
page_zip_des_t* left_page_zip;
page_zip_des_t* right_page_zip;
page_zip_des_t* insert_page_zip;
page_cur_t* page_cursor;
rec_t* first_rec;
byte* buf = 0; /* remove warning */
......@@ -1649,13 +1652,13 @@ btr_page_split_and_insert(
ut_ad(rw_lock_own(dict_tree_get_lock(tree), RW_LOCK_EX));
#endif /* UNIV_SYNC_DEBUG */
left_page = btr_cur_get_page(cursor);
page = btr_cur_get_page(cursor);
ut_ad(mtr_memo_contains(mtr, buf_block_align(left_page),
ut_ad(mtr_memo_contains(mtr, buf_block_align(page),
MTR_MEMO_PAGE_X_FIX));
ut_ad(page_get_n_recs(left_page) >= 2);
left_page_zip = buf_block_get_page_zip(buf_block_align(left_page));
ut_ad(page_get_n_recs(page) >= 2);
page_no = buf_frame_get_page_no(page);
/* 1. Decide the split record; split_rec == NULL means that the
tuple to be inserted should be the first record on the upper
......@@ -1663,24 +1666,26 @@ btr_page_split_and_insert(
if (n_iterations > 0) {
direction = FSP_UP;
hint_page_no = page_no + 1;
split_rec = btr_page_get_sure_split_rec(cursor, tuple);
} else if (btr_page_get_split_rec_to_right(cursor, &split_rec)) {
direction = FSP_UP;
hint_page_no = page_no + 1;
} else if (btr_page_get_split_rec_to_left(cursor, &split_rec)) {
direction = FSP_DOWN;
hint_page_no = page_no - 1;
} else {
direction = FSP_UP;
split_rec = page_get_middle_rec(left_page);
hint_page_no = page_no + 1;
split_rec = page_get_middle_rec(page);
}
/* 2. Allocate a new page to the tree */
right_page = btr_page_alloc(tree,
buf_frame_get_page_no(left_page) + 1,
direction,
btr_page_get_level(left_page, mtr), mtr);
btr_page_create(right_page, tree, mtr);
new_page = btr_page_alloc(tree, hint_page_no, direction,
btr_page_get_level(page, mtr), mtr);
btr_page_create(new_page, tree, mtr);
/* 3. Calculate the first record on the upper half-page, and the
first record (move_limit) on original page which ends up on the
......@@ -1699,8 +1704,7 @@ btr_page_split_and_insert(
/* 4. Do first the modifications in the tree structure */
btr_attach_half_pages(tree, left_page, first_rec, right_page,
direction, mtr);
btr_attach_half_pages(tree, page, first_rec, new_page, direction, mtr);
/* If the split is made on the leaf level and the insert will fit
on the appropriate half-page, we may release the tree x-latch.
......@@ -1719,26 +1723,37 @@ btr_page_split_and_insert(
NULL, NULL, tuple, heap);
}
if (insert_will_fit && (btr_page_get_level(left_page, mtr) == 0)) {
if (insert_will_fit && (btr_page_get_level(page, mtr) == 0)) {
mtr_memo_release(mtr, dict_tree_get_lock(tree),
MTR_MEMO_X_LOCK);
}
/* 5. Move then the records to the new page */
right_page_zip = buf_block_get_page_zip(buf_block_align(right_page));
if (direction == FSP_DOWN) {
/* fputs("Split left\n", stderr); */
page_move_rec_list_end(right_page, right_page_zip,
move_limit, left_page_zip,
cursor->index, mtr);
page_move_rec_list_start(new_page, buf_block_get_page_zip(
buf_block_align(new_page)),
move_limit, buf_block_get_page_zip(
buf_block_align(page)),
cursor->index, mtr);
if (UNIV_UNLIKELY(direction == FSP_DOWN)) {
fputs("Split left\n", stderr); /* TODO: coverage test */
left_page = new_page;
right_page = page;
lock_update_split_left(right_page, left_page);
} else {
/* fputs("Split right\n", stderr); */
page_move_rec_list_end(new_page, buf_block_get_page_zip(
buf_block_align(new_page)),
move_limit, buf_block_get_page_zip(
buf_block_align(page)),
cursor->index, mtr);
left_page = page;
right_page = new_page;
lock_update_split_right(right_page, left_page);
}
......@@ -1760,16 +1775,19 @@ btr_page_split_and_insert(
}
}
insert_page_zip = buf_block_get_page_zip(buf_block_align(insert_page));
/* 7. Reposition the cursor for insert and try insertion */
page_cursor = btr_cur_get_page_cur(cursor);
page_cur_search(insert_page, cursor->index, tuple,
PAGE_CUR_LE, page_cursor);
rec = page_cur_tuple_insert(page_cursor, left_page_zip,
rec = page_cur_tuple_insert(page_cursor, insert_page_zip,
tuple, cursor->index, mtr);
ut_ad(!left_page_zip || page_zip_validate(left_page_zip, left_page));
ut_ad(!insert_page_zip
|| page_zip_validate(insert_page_zip, insert_page));
if (UNIV_LIKELY(rec != NULL)) {
/* Insert fit on the page: update the free bits for the
......@@ -1791,16 +1809,15 @@ btr_page_split_and_insert(
page_cur_search(insert_page, cursor->index, tuple,
PAGE_CUR_LE, page_cursor);
rec = page_cur_tuple_insert(page_cursor, left_page_zip,
rec = page_cur_tuple_insert(page_cursor, insert_page_zip,
tuple, cursor->index, mtr);
if (UNIV_UNLIKELY(rec == NULL)) {
/* The insert did not fit on the page: loop back to the
start of the function for a new split */
/* We play safe and reset the free bits for right_page */
ibuf_reset_free_bits(cursor->index, right_page);
/* We play safe and reset the free bits for new_page */
ibuf_reset_free_bits(cursor->index, new_page);
/* fprintf(stderr, "Split second round %lu\n",
buf_frame_get_page_no(page)); */
......
......@@ -666,6 +666,22 @@ page_move_rec_list_end(
dict_index_t* index, /* in: record descriptor */
mtr_t* mtr) /* in: mtr */
__attribute__((nonnull(1, 3, 5, 6)));
/*****************************************************************
Moves record list start to another page. Moved records do not include
split_rec. */
void
page_move_rec_list_start(
/*=====================*/
page_t* new_page, /* in: index page where to move */
page_zip_des_t* new_page_zip, /* in/out: compressed page of
new_page, or NULL */
rec_t* split_rec, /* in: first record not to move */
page_zip_des_t* page_zip, /* in/out: compressed page of
split_rec, or NULL */
dict_index_t* index, /* in: record descriptor */
mtr_t* mtr) /* in: mtr */
__attribute__((nonnull(1, 3, 5, 6)));
/********************************************************************
Splits a directory slot which owns too many records. */
......
......@@ -627,6 +627,7 @@ page_copy_rec_list_end(
if (UNIV_UNLIKELY(!page_zip_decompress(
new_page_zip, new_page, mtr))) {
/* TODO: does not work */
ut_error;
}
return(FALSE);
......@@ -1057,6 +1058,83 @@ page_move_rec_list_end(
page_zip, mtr);
}
/*****************************************************************
Moves record list start to another page. Moved records do not include
split_rec. */
void
page_move_rec_list_start(
/*=====================*/
page_t* new_page, /* in: index page where to move */
page_zip_des_t* new_page_zip, /* in/out: compressed page of
new_page, or NULL */
rec_t* split_rec, /* in: first record not to move */
page_zip_des_t* page_zip, /* in/out: compressed page of
split_rec, or NULL */
dict_index_t* index, /* in: record descriptor */
mtr_t* mtr) /* in: mtr */
{
if (UNIV_UNLIKELY(!page_copy_rec_list_start(new_page, new_page_zip,
split_rec, index, mtr))) {
ut_error;
}
ut_ad(!page_zip == !new_page_zip);
if (UNIV_LIKELY_NULL(page_zip)) {
/* On compressed pages, instead of deleting the start
of the record list, recreate the page and copy the
end of the list. */
page_t* page;
page_t* temp_page;
ulint log_mode;
page = ut_align_down(split_rec, UNIV_PAGE_SIZE);
ut_ad(page_is_comp(page));
ut_ad(page_is_comp(new_page));
/* Disable logging */
log_mode = mtr_set_log_mode(mtr, MTR_LOG_NONE);
/* Copy the page to temporary space */
temp_page = buf_frame_alloc();
buf_frame_copy(page, temp_page);
/* TODO: will this fail in crash recovery? */
btr_search_drop_page_hash_index(page);
/* Recreate the page: note that global data on page (possible
segment headers, next page-field, etc.) is preserved intact */
page_create(page, NULL, mtr, TRUE);
buf_block_align(page)->check_index_page_at_flush = TRUE;
/* Copy the records from the temporary space to the
recreated page; do not copy the lock bits yet */
page_copy_rec_list_end_no_locks(page,
temp_page - page + split_rec, index, mtr);
/* Copy max trx id to recreated page */
page_set_max_trx_id(page, page_get_max_trx_id(temp_page));
/* Update the record lock bitmaps */
lock_move_reorganize_page(page, temp_page);
buf_frame_free(temp_page);
mtr_set_log_mode(mtr, log_mode);
if (UNIV_UNLIKELY(!page_zip_compress(page_zip, page))) {
/* Reorganizing a page should reduce entropy,
making the compressed page occupy less space. */
ut_error;
}
} else {
page_delete_rec_list_start(split_rec, index, mtr);
}
}
/***************************************************************************
This is a low-level operation which is used in a database index creation
to update the page number of a created B-tree to a data dictionary record. */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment