Commit c952dc5b authored by marko's avatar marko

branches/zip: When allocating records from the free list,

do not allow extra_size to decrease on compressed pages.

Split page_mem_alloc() to page_mem_alloc_free() and page_mem_alloc_heap().

page_cur_insert_rec_low(): Remove parameter "tuple".  Implement some of the
logic from page_mem_alloc().

page_cur_tuple_insert(): Convert the tuple to a record and calculate offsets.

page_zip_validate(): Assert that the page is in compact format.
parent 4167b06c
......@@ -165,10 +165,9 @@ page_cur_insert_rec_low(
otherwise */
page_cur_t* cursor, /* in: a page cursor */
page_zip_des_t* page_zip,/* in/out: compressed page, or NULL */
dtuple_t* tuple, /* in: pointer to a data tuple or NULL */
dict_index_t* index, /* in: record descriptor */
rec_t* rec, /* in: pointer to a physical record or NULL */
ulint* offsets,/* in: rec_get_offsets(rec, index) or NULL */
rec_t* rec, /* in: pointer to a physical record */
ulint* offsets,/* in: rec_get_offsets(rec, index) */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext, /* in: number of elements in vec */
mtr_t* mtr); /* in: mini-transaction handle */
......
......@@ -188,8 +188,22 @@ page_cur_tuple_insert(
ulint n_ext, /* in: number of elements in vec */
mtr_t* mtr) /* in: mini-transaction handle */
{
return(page_cur_insert_rec_low(cursor, page_zip, tuple,
index, NULL, NULL, ext, n_ext, mtr));
mem_heap_t* heap;
ulint* offsets;
ulint size = rec_get_converted_size(index, tuple);
rec_t* rec;
heap = mem_heap_create(size
+ (4 + REC_OFFS_HEADER_SIZE + dtuple_get_n_fields(tuple))
* sizeof *offsets);
rec = rec_convert_dtuple_to_rec(
mem_heap_alloc(heap, size), index, tuple);
offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, &heap);
rec = page_cur_insert_rec_low(cursor, page_zip,
index, rec, offsets, ext, n_ext, mtr);
mem_heap_free(heap);
return(rec);
}
/***************************************************************
......@@ -209,7 +223,7 @@ page_cur_rec_insert(
ulint* offsets,/* in: rec_get_offsets(rec, index) */
mtr_t* mtr) /* in: mini-transaction handle */
{
return(page_cur_insert_rec_low(cursor, page_zip, NULL,
return(page_cur_insert_rec_low(cursor, page_zip,
index, rec, offsets, NULL, 0, mtr));
}
......@@ -558,22 +558,35 @@ page_get_data_size(
/* out: data in bytes */
page_t* page); /* in: index page */
/****************************************************************
Allocates a block of memory from an index page. */
Allocates a block of memory from the head of the free list
of an index page. */
UNIV_INLINE
void
page_mem_alloc_free(
/*================*/
page_t* page, /* in/out: index page */
page_zip_des_t* page_zip,/* in/out: compressed page with enough
space available for inserting the record,
or NULL */
rec_t* next_rec,/* in: pointer to the new head of the
free record list */
ulint need); /* in: number of bytes allocated */
/****************************************************************
Allocates a block of memory from the heap of an index page. */
byte*
page_mem_alloc(
/*===========*/
page_mem_alloc_heap(
/*================*/
/* out: pointer to start of allocated
buffer, or NULL if allocation fails */
page_t* page, /* in/out: index page */
page_zip_des_t* page_zip,/* in/out: compressed page, or NULL */
ulint need, /* in: number of bytes needed */
dict_index_t* index, /* in: record descriptor */
ulint* heap_no,/* out: this contains the heap number
page_zip_des_t* page_zip,/* in/out: compressed page with enough
space available for inserting the record,
or NULL */
ulint need, /* in: total number of bytes needed */
ulint* heap_no);/* out: this contains the heap number
of the allocated record
if allocation succeeds */
mtr_t* mtr); /* in: mini-transaction handle, or NULL
if page_zip == NULL */
/****************************************************************
Puts a record to free list. */
UNIV_INLINE
......
......@@ -773,6 +773,37 @@ page_get_data_size(
return(ret);
}
/****************************************************************
Allocates a block of memory from the free list of an index page. */
void
page_mem_alloc_free(
/*================*/
page_t* page, /* in/out: index page */
page_zip_des_t* page_zip,/* in/out: compressed page with enough
space available for inserting the record,
or NULL */
rec_t* next_rec,/* in: pointer to the new head of the
free record list */
ulint need) /* in: number of bytes allocated */
{
ulint garbage;
#ifdef UNIV_DEBUG
rec_t* old_rec = page_header_get_ptr(page, PAGE_FREE);
ut_ad(old_rec);
ut_ad(next_rec == rec_get_next_ptr(old_rec, page_is_comp(page)));
#endif
page_header_set_ptr(page, page_zip, PAGE_FREE, next_rec);
garbage = page_header_get_field(page, PAGE_GARBAGE);
ut_ad(garbage >= need);
page_header_set_field(page, page_zip, PAGE_GARBAGE, garbage - need);
}
/*****************************************************************
Calculates free space if a page is emptied. */
UNIV_INLINE
......
......@@ -889,10 +889,9 @@ page_cur_insert_rec_low(
otherwise */
page_cur_t* cursor, /* in: a page cursor */
page_zip_des_t* page_zip,/* in/out: compressed page, or NULL */
dtuple_t* tuple, /* in: pointer to a data tuple or NULL */
dict_index_t* index, /* in: record descriptor */
rec_t* rec, /* in: pointer to a physical record or NULL */
ulint* offsets,/* in: rec_get_offsets(rec, index) or NULL */
rec_t* rec, /* in: pointer to a physical record */
ulint* offsets,/* in: rec_get_offsets(rec, index) */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext, /* in: number of elements in vec */
mtr_t* mtr) /* in: mini-transaction handle */
......@@ -905,12 +904,9 @@ page_cur_insert_rec_low(
ulint heap_no; /* heap number of the inserted record */
rec_t* current_rec; /* current record after which the
new record is inserted */
mem_heap_t* heap = NULL;
ut_ad(cursor && mtr);
ut_ad(tuple || rec);
ut_ad(!(tuple && rec));
ut_ad(rec || dtuple_check_typed(tuple));
ut_ad(rec_offs_validate(rec, index, offsets));
page = page_cur_get_page(cursor);
ut_ad(index->table->comp == (ibool) !!page_is_comp(page));
......@@ -918,39 +914,87 @@ page_cur_insert_rec_low(
ut_ad(!page_rec_is_supremum(cursor->rec));
/* 1. Get the size of the physical record in the page */
if (tuple != NULL) {
rec_size = rec_get_converted_size(index, tuple);
} else {
if (!offsets) {
offsets = rec_get_offsets(rec, index, offsets,
rec_size = rec_offs_size(offsets);
/* 2. Try to find suitable space from page memory management */
if (UNIV_LIKELY_NULL(page_zip)
&& !page_zip_alloc(page_zip, page, index, mtr, rec_size, 1)) {
return(NULL);
}
insert_buf = page_header_get_ptr(page, PAGE_FREE);
if (insert_buf) {
/* Try to allocate from the head of the free list. */
rec_t* free_rec = insert_buf;
ulint foffsets_[REC_OFFS_NORMAL_SIZE];
ulint* foffsets = foffsets_;
mem_heap_t* heap = NULL;
*foffsets = sizeof(foffsets_) / sizeof *foffsets_;
foffsets = rec_get_offsets(free_rec, index, foffsets,
ULINT_UNDEFINED, &heap);
if (rec_offs_size(foffsets) < rec_size) {
too_small:
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
ut_ad(rec_offs_validate(rec, index, offsets));
rec_size = rec_offs_size(offsets);
goto use_heap;
}
/* 2. Try to find suitable space from page memory management */
insert_buf = page_mem_alloc(page, page_zip, rec_size,
index, &heap_no, mtr);
insert_buf -= rec_offs_extra_size(foffsets);
if (page_is_comp(page)) {
if (UNIV_LIKELY_NULL(page_zip)) {
/* On compressed pages, records from
the free list may only be relocated so
that extra_size will not decrease. */
lint extra_size_diff
= rec_offs_extra_size(offsets)
- rec_offs_extra_size(foffsets);
if (UNIV_UNLIKELY(extra_size_diff < 0)) {
/* Add an offset to the extra_size. */
if (rec_offs_size(foffsets)
< rec_size - extra_size_diff) {
goto too_small;
}
insert_buf -= extra_size_diff;
}
}
heap_no = rec_get_heap_no_new(free_rec);
page_mem_alloc_free(page, page_zip,
rec_get_next_ptr(free_rec, TRUE),
rec_size);
} else {
ut_ad(!page_zip);
heap_no = rec_get_heap_no_old(free_rec);
page_mem_alloc_free(page, NULL,
rec_get_next_ptr(free_rec, FALSE),
rec_size);
}
if (UNIV_UNLIKELY(insert_buf == NULL)) {
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
} else {
use_heap:
insert_buf = page_mem_alloc_heap(
page, page_zip, rec_size, &heap_no);
if (UNIV_UNLIKELY(insert_buf == NULL)) {
return(NULL);
}
}
/* 3. Create the record */
if (tuple != NULL) {
insert_rec = rec_convert_dtuple_to_rec(insert_buf,
index, tuple);
offsets = rec_get_offsets(insert_rec, index, offsets,
ULINT_UNDEFINED, &heap);
} else {
insert_rec = rec_copy(insert_buf, rec, offsets);
ut_ad(rec_offs_validate(rec, index, offsets));
rec_offs_make_valid(insert_rec, index, offsets);
}
ut_ad(insert_rec);
ut_ad(rec_size == rec_offs_size(offsets));
......@@ -1059,9 +1103,6 @@ page_cur_insert_rec_low(
page_cur_insert_rec_write_log(insert_rec, rec_size, current_rec,
index, mtr);
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
return(insert_rec);
}
......
......@@ -231,86 +231,27 @@ page_set_max_trx_id(
}
/****************************************************************
Allocates a block of memory from an index page. */
Allocates a block of memory from the heap of an index page. */
byte*
page_mem_alloc(
/*===========*/
page_mem_alloc_heap(
/*================*/
/* out: pointer to start of allocated
buffer, or NULL if allocation fails */
page_t* page, /* in/out: index page */
page_zip_des_t* page_zip,/* in/out: compressed page, or NULL */
ulint need, /* in: number of bytes needed */
dict_index_t* index, /* in: record descriptor */
ulint* heap_no,/* out: this contains the heap number
page_zip_des_t* page_zip,/* in/out: compressed page with enough
space available for inserting the record,
or NULL */
ulint need, /* in: total number of bytes needed */
ulint* heap_no)/* out: this contains the heap number
of the allocated record
if allocation succeeds */
mtr_t* mtr) /* in: mini-transaction handle, or NULL
if page_zip == NULL */
{
rec_t* rec;
byte* block;
ulint avl_space;
ulint garbage;
ut_ad(page && heap_no);
/* TODO: add parameter n_extra */
if (UNIV_LIKELY_NULL(page_zip)) {
ut_ad(page_is_comp(page));
ut_ad(page_zip_validate(page_zip, page));
if (!page_zip_alloc(page_zip, page, index, mtr, need, 1)) {
return(NULL);
}
}
/* If there are records in the free list, look if the first is
big enough */
rec = page_header_get_ptr(page, PAGE_FREE);
if (rec) {
mem_heap_t* heap = NULL;
ulint offsets_[REC_OFFS_NORMAL_SIZE];
ulint* offsets = offsets_;
*offsets_ = (sizeof offsets_) / sizeof *offsets_;
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &heap);
if (rec_offs_size(offsets) >= need) {
page_header_set_ptr(page, page_zip, PAGE_FREE,
page_rec_get_next(rec));
garbage = page_header_get_field(page, PAGE_GARBAGE);
ut_ad(garbage >= need);
page_header_set_field(page, page_zip, PAGE_GARBAGE,
garbage - need);
if (page_is_comp(page)) {
*heap_no = rec_get_heap_no_new(rec);
} else {
*heap_no = rec_get_heap_no_old(rec);
}
block = rec_get_start(rec, offsets);
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
return(block);
}
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
}
/* Could not find space from the free list, try top of heap */
avl_space = page_get_max_insert_size(page, 1);
if (avl_space >= need) {
......
......@@ -1630,8 +1630,9 @@ page_zip_validate(
page_t* temp_page = buf_frame_alloc();
ibool valid;
ut_ad(buf_block_get_page_zip(buf_block_align((byte*)page))
ut_a(buf_block_get_page_zip(buf_block_align((byte*)page))
== page_zip);
ut_a(page_is_comp((page_t*) page));
valid = page_zip_decompress(&temp_page_zip, temp_page, NULL)
&& !memcmp(page, temp_page,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment