Commit 6378cae6 authored by marko's avatar marko

branches/zip: Prevent infinite B-tree page splits by ensuring that

there will always be enough space for two node pointer records in an
empty B-tree page.  This was reported as Mantis issue #73.

page_zip_rec_needs_ext(): Add the parameter n_fields, for accurate
estimation of the compressed size of the data dictionary information.
Given that this function is only invoked for records on leaf pages,
require that there be enough space for one record in the compressed
page.  We check elsewhere that there will be enough room for two node
pointer records on higher-level pages.

btr_cur_optimistic_insert(): Ensure that there will be enough room for
two node pointer records on an empty non-leaf page.  The rule for
leaf-page records will be enforced by the callers of
page_zip_rec_needs_ext().

btr_cur_pessimistic_insert(): Remove the insufficient check that the
leaf page record should be compressible by itself.  Instead, now we
require that two node pointer records fit on a non-leaf page, and one
record will fit in uncompressed form on the leaf page.

page_zip_write_header(), page_zip_write_rec(): Re-enable the debug
assertions that were violated by the insufficient check in
btr_cur_pessimistic_insert().

innodb_bug36172.test: Use a larger compressed page size.
parent 222b221d
...@@ -1054,7 +1054,8 @@ btr_cur_optimistic_insert( ...@@ -1054,7 +1054,8 @@ btr_cur_optimistic_insert(
/* Calculate the record size when entry is converted to a record */ /* Calculate the record size when entry is converted to a record */
rec_size = rec_get_converted_size(index, entry, n_ext); rec_size = rec_get_converted_size(index, entry, n_ext);
if (page_zip_rec_needs_ext(rec_size, page_is_comp(page), zip_size)) { if (page_zip_rec_needs_ext(rec_size, page_is_comp(page),
dtuple_get_n_fields(entry), zip_size)) {
/* The record is so big that we have to store some fields /* The record is so big that we have to store some fields
externally on separate database pages */ externally on separate database pages */
...@@ -1068,6 +1069,46 @@ btr_cur_optimistic_insert( ...@@ -1068,6 +1069,46 @@ btr_cur_optimistic_insert(
rec_size = rec_get_converted_size(index, entry, n_ext); rec_size = rec_get_converted_size(index, entry, n_ext);
} }
if (UNIV_UNLIKELY(zip_size)) {
/* Estimate the free space of an empty compressed page.
Subtract one byte for the encoded heap_no in the
modification log. */
ulint free_space_zip = page_zip_empty_size(
cursor->index->n_fields, zip_size) - 1;
ulint extra;
ulint n_uniq = dict_index_get_n_unique_in_tree(index);
ut_ad(dict_table_is_comp(index->table));
/* There should be enough room for two node pointer
records on an empty non-leaf page. This prevents
infinite page splits. */
if (UNIV_LIKELY(entry->n_fields >= n_uniq)
&& UNIV_UNLIKELY(rec_get_converted_size_comp(
index, REC_STATUS_NODE_PTR,
entry->fields, n_uniq,
&extra)
/* On a compressed page, there is
a two-byte entry in the dense
page directory for every record.
But there is no record header. */
- (REC_N_NEW_EXTRA_BYTES - 2)
> free_space_zip / 2)) {
if (big_rec_vec) {
dtuple_convert_back_big_rec(
index, entry, big_rec_vec);
}
if (heap) {
mem_heap_free(heap);
}
return(DB_TOO_BIG_RECORD);
}
}
/* If there have been many consecutive inserts, and we are on the leaf /* If there have been many consecutive inserts, and we are on the leaf
level, check if we have to split the page to reserve enough free space level, check if we have to split the page to reserve enough free space
for future updates of records. */ for future updates of records. */
...@@ -1299,6 +1340,7 @@ btr_cur_pessimistic_insert( ...@@ -1299,6 +1340,7 @@ btr_cur_pessimistic_insert(
if (page_zip_rec_needs_ext(rec_get_converted_size(index, entry, n_ext), if (page_zip_rec_needs_ext(rec_get_converted_size(index, entry, n_ext),
dict_table_is_comp(index->table), dict_table_is_comp(index->table),
dict_index_get_n_fields(index),
zip_size)) { zip_size)) {
/* The record is so big that we have to store some fields /* The record is so big that we have to store some fields
externally on separate database pages */ externally on separate database pages */
...@@ -1322,45 +1364,6 @@ btr_cur_pessimistic_insert( ...@@ -1322,45 +1364,6 @@ btr_cur_pessimistic_insert(
} }
} }
if (UNIV_UNLIKELY(zip_size)) {
/* Estimate the free space of an empty compressed page. */
ulint free_space_zip = page_zip_empty_size(
cursor->index->n_fields, zip_size);
if (UNIV_UNLIKELY(rec_get_converted_size(index, entry, n_ext)
> free_space_zip)) {
/* Try to insert the record by itself on a new page.
If it fails, no amount of splitting will help. */
buf_block_t* temp_block
= buf_block_alloc(zip_size);
page_t* temp_page
= page_create_zip(temp_block, index, 0, NULL);
page_cur_t temp_cursor;
rec_t* temp_rec;
page_cur_position(temp_page + PAGE_NEW_INFIMUM,
temp_block, &temp_cursor);
temp_rec = page_cur_tuple_insert(&temp_cursor,
entry, index,
n_ext, NULL);
buf_block_free(temp_block);
if (UNIV_UNLIKELY(!temp_rec)) {
if (big_rec_vec) {
dtuple_convert_back_big_rec(
index, entry, big_rec_vec);
}
if (heap) {
mem_heap_free(heap);
}
return(DB_TOO_BIG_RECORD);
}
}
}
if (dict_index_get_page(index) if (dict_index_get_page(index)
== buf_block_get_page_no(btr_cur_get_block(cursor))) { == buf_block_get_page_no(btr_cur_get_block(cursor))) {
...@@ -2170,10 +2173,20 @@ btr_cur_pessimistic_update( ...@@ -2170,10 +2173,20 @@ btr_cur_pessimistic_update(
offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, heap); offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, heap);
n_ext += btr_push_update_extern_fields(new_entry, update, *heap); n_ext += btr_push_update_extern_fields(new_entry, update, *heap);
if (page_zip_rec_needs_ext(rec_get_converted_size(index, new_entry, if (UNIV_LIKELY_NULL(page_zip)) {
n_ext), ut_ad(page_is_comp(page));
page_is_comp(page), page_zip if (page_zip_rec_needs_ext(
? page_zip_get_size(page_zip) : 0)) { rec_get_converted_size(index, new_entry, n_ext),
TRUE,
dict_index_get_n_fields(index),
page_zip_get_size(page_zip))) {
goto make_external;
}
} else if (page_zip_rec_needs_ext(
rec_get_converted_size(index, new_entry, n_ext),
page_is_comp(page), 0, 0)) {
make_external:
big_rec_vec = dtuple_convert_big_rec(index, new_entry, &n_ext); big_rec_vec = dtuple_convert_big_rec(index, new_entry, &n_ext);
if (UNIV_UNLIKELY(big_rec_vec == NULL)) { if (UNIV_UNLIKELY(big_rec_vec == NULL)) {
......
...@@ -607,6 +607,7 @@ dtuple_convert_big_rec( ...@@ -607,6 +607,7 @@ dtuple_convert_big_rec(
while (page_zip_rec_needs_ext(rec_get_converted_size(index, entry, while (page_zip_rec_needs_ext(rec_get_converted_size(index, entry,
*n_ext), *n_ext),
dict_table_is_comp(index->table), dict_table_is_comp(index->table),
dict_index_get_n_fields(index),
dict_table_zip_size(index->table))) { dict_table_zip_size(index->table))) {
ulint i; ulint i;
ulint longest = 0; ulint longest = 0;
......
...@@ -48,6 +48,8 @@ page_zip_rec_needs_ext( ...@@ -48,6 +48,8 @@ page_zip_rec_needs_ext(
can be stored locally on the page */ can be stored locally on the page */
ulint rec_size, /* in: length of the record in bytes */ ulint rec_size, /* in: length of the record in bytes */
ulint comp, /* in: nonzero=compact format */ ulint comp, /* in: nonzero=compact format */
ulint n_fields, /* in: number of fields in the record;
ignored if zip_size == 0 */
ulint zip_size) /* in: compressed page size in bytes, or 0 */ ulint zip_size) /* in: compressed page size in bytes, or 0 */
__attribute__((const)); __attribute__((const));
......
...@@ -148,10 +148,13 @@ page_zip_rec_needs_ext( ...@@ -148,10 +148,13 @@ page_zip_rec_needs_ext(
can be stored locally on the page */ can be stored locally on the page */
ulint rec_size, /* in: length of the record in bytes */ ulint rec_size, /* in: length of the record in bytes */
ulint comp, /* in: nonzero=compact format */ ulint comp, /* in: nonzero=compact format */
ulint n_fields, /* in: number of fields in the record;
ignored if zip_size == 0 */
ulint zip_size) /* in: compressed page size in bytes, or 0 */ ulint zip_size) /* in: compressed page size in bytes, or 0 */
{ {
ut_ad(rec_size > comp ? REC_N_NEW_EXTRA_BYTES : REC_N_OLD_EXTRA_BYTES); ut_ad(rec_size > comp ? REC_N_NEW_EXTRA_BYTES : REC_N_OLD_EXTRA_BYTES);
ut_ad(ut_is_2pow(zip_size)); ut_ad(ut_is_2pow(zip_size));
ut_ad(comp || !zip_size);
#if UNIV_PAGE_SIZE > REC_MAX_DATA_SIZE #if UNIV_PAGE_SIZE > REC_MAX_DATA_SIZE
if (UNIV_UNLIKELY(rec_size >= REC_MAX_DATA_SIZE)) { if (UNIV_UNLIKELY(rec_size >= REC_MAX_DATA_SIZE)) {
...@@ -159,21 +162,18 @@ page_zip_rec_needs_ext( ...@@ -159,21 +162,18 @@ page_zip_rec_needs_ext(
} }
#endif #endif
if (UNIV_UNLIKELY(!comp)) { if (UNIV_UNLIKELY(zip_size)) {
ut_ad(!zip_size); ut_ad(comp);
return(rec_size >= page_get_free_space_of_empty(FALSE) / 2); /* On a compressed page, there is a two-byte entry in
the dense page directory for every record. But there
is no record header. There should be enough room for
one record on an empty leaf page. Subtract 1 byte for
the encoded heap number. */
return(rec_size - (REC_N_NEW_EXTRA_BYTES - 2)
>= (page_zip_empty_size(n_fields, zip_size) - 1));
} }
/* If zip_size != 0, the record should fit on the compressed page. return(rec_size >= page_get_free_space_of_empty(comp) / 2);
If not, the right-hand-side of the comparison will overwrap
and the condition will not hold. Thus, we do not need to test
for zip_size != 0. We subtract the size of the page header and
assume that compressing the index information takes 50 bytes. */
if (rec_size >= zip_size - (PAGE_DATA + 50)) {
return(TRUE);
}
return(rec_size >= page_get_free_space_of_empty(TRUE) / 2);
} }
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
...@@ -355,15 +355,7 @@ page_zip_write_header( ...@@ -355,15 +355,7 @@ page_zip_write_header(
{ {
ulint pos; ulint pos;
#if 0
/* In btr_cur_pessimistic_insert(), we allocate temp_page
from the buffer pool to see if a record fits on a compressed
page by itself. The buf_block_align() call in
buf_frame_get_page_zip() only works for file pages, not
temporarily allocated blocks. Thus, we must unfortunately
disable the following assertion. */
ut_ad(buf_frame_get_page_zip(str) == page_zip); ut_ad(buf_frame_get_page_zip(str) == page_zip);
#endif
ut_ad(page_zip_simple_validate(page_zip)); ut_ad(page_zip_simple_validate(page_zip));
UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip)); UNIV_MEM_ASSERT_RW(page_zip->data, page_zip_get_size(page_zip));
......
This diff is collapsed.
...@@ -3203,15 +3203,7 @@ page_zip_write_rec( ...@@ -3203,15 +3203,7 @@ page_zip_write_rec(
ulint heap_no; ulint heap_no;
byte* slot; byte* slot;
#if 0
/* In btr_cur_pessimistic_insert(), we allocate temp_page
from the buffer pool to see if a record fits on a compressed
page by itself. The buf_block_align() call in
buf_frame_get_page_zip() only works for file pages, not
temporarily allocated blocks. Thus, we must unfortunately
disable the following assertion. */
ut_ad(buf_frame_get_page_zip(rec) == page_zip); ut_ad(buf_frame_get_page_zip(rec) == page_zip);
#endif
ut_ad(page_zip_simple_validate(page_zip)); ut_ad(page_zip_simple_validate(page_zip));
ut_ad(page_zip_get_size(page_zip) ut_ad(page_zip_get_size(page_zip)
> PAGE_DATA + page_zip_dir_size(page_zip)); > PAGE_DATA + page_zip_dir_size(page_zip));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment