Commit 1aca6cad authored by marko's avatar marko

branches/zip: Fix some bugs in the insertion of records.

row_upd_rec_in_place(), page_zip_write_rec(): Add parameter "index".

page_dir_set_n_heap(): Add a debug assertion that on compressed
pages, n_heap will always be incremented by one.  Improve code formatting.

page_zip_dir_add_slot(): New function, called from
page_cur_insert_rec_low() after page_mem_alloc_heap().

rec_set_n_owned_new(): Do not call page_zip_rec_set_owned()
on the supremum record.

rec_offs_make_valid(): Add debug assertions.

page_zip_dir_user_size(): Correct an off-by-one error in the debug assertion.

page_zip_apply_log(): Add parameter trx_id_col.  Skip trx_id and roll_ptr.

page_zip_decompress(): Simplify the handling of "storage" in the loop that
copies the uncompressed fields.

page_zip_write_rec(): Store trx_id and roll_ptr separately.

page_zip_write_trx_id(), page_zip_write_roll_ptr(): Fix off-by-one errors.

page_cur_insert_rec_low(): Call page_zip_dir_add_slot() after
page_mem_alloc_heap().  Remove some redundant assertions.
Pass page_zip to page_dir_split_slot().
parent 3cdf3e1c
......@@ -1519,7 +1519,7 @@ btr_cur_parse_update_in_place(
pos, trx_id, roll_ptr);
}
row_upd_rec_in_place(rec, offsets, update, page_zip);
row_upd_rec_in_place(rec, index, offsets, update, page_zip);
func_exit:
mem_heap_free(heap);
......@@ -1618,7 +1618,7 @@ btr_cur_update_in_place(
was_delete_marked = rec_get_deleted_flag(rec,
page_is_comp(buf_block_get_frame(block)));
row_upd_rec_in_place(rec, offsets, update, page_zip);
row_upd_rec_in_place(rec, index, offsets, update, page_zip);
if (block->is_hashed) {
rw_lock_x_unlock(&btr_search_latch);
......@@ -1628,7 +1628,7 @@ btr_cur_update_in_place(
trx, roll_ptr, mtr);
if (UNIV_LIKELY_NULL(page_zip)) {
page_zip_write_rec(page_zip, rec, offsets);
page_zip_write_rec(page_zip, rec, index, offsets);
}
if (was_delete_marked && !rec_get_deleted_flag(rec,
......@@ -2084,7 +2084,7 @@ btr_cur_pessimistic_update(
}
if (UNIV_LIKELY_NULL(page_zip)) {
page_zip_write_rec(page_zip, rec, offsets);
page_zip_write_rec(page_zip, rec, index, offsets);
}
btr_cur_compress_if_useful(cursor, mtr);
......
......@@ -478,9 +478,11 @@ page_dir_set_n_heap(
ulint n_heap) /* in: number of records */
{
ut_ad(n_heap < 0x8000);
ut_ad(!page_zip || n_heap
== (page_header_get_field(page, PAGE_N_HEAP) & 0x7fff) + 1);
page_header_set_field(page, page_zip, PAGE_N_HEAP, n_heap | (0x8000 &
page_header_get_field(page, PAGE_N_HEAP)));
page_header_set_field(page, page_zip, PAGE_N_HEAP, n_heap
| (0x8000 & page_header_get_field(page, PAGE_N_HEAP)));
}
/*****************************************************************
......
......@@ -114,4 +114,12 @@ page_zip_dir_delete(
const byte* free) /* in: previous start of the free list */
__attribute__((nonnull));
/**************************************************************************
Add a slot to the dense page directory. */
void
page_zip_dir_add_slot(
/*==================*/
page_zip_des_t* page_zip)/* in/out: compressed page */
__attribute__((nonnull));
#endif
......@@ -112,6 +112,7 @@ page_zip_write_rec(
/*===============*/
page_zip_des_t* page_zip,/* in/out: compressed page */
const byte* rec, /* in: record being written */
dict_index_t* index, /* in: the index the record belongs to */
const ulint* offsets)/* in: rec_get_offsets(rec, index) */
__attribute__((nonnull));
......@@ -229,6 +230,15 @@ page_zip_dir_delete(
const byte* free) /* in: previous start of the free list */
__attribute__((nonnull));
/**************************************************************************
Add a slot to the dense page directory. */
void
page_zip_dir_add_slot(
/*==================*/
page_zip_des_t* page_zip)/* in/out: compressed page */
__attribute__((nonnull));
/**************************************************************************
Write data to the uncompressed header portion of a page. The data must
already have been written to the uncompressed page.
......
......@@ -528,7 +528,9 @@ rec_set_n_owned_new(
{
rec_set_bit_field_1(rec, n_owned, REC_NEW_N_OWNED,
REC_N_OWNED_MASK, REC_N_OWNED_SHIFT);
if (UNIV_LIKELY_NULL(page_zip)) {
if (UNIV_LIKELY_NULL(page_zip)
&& UNIV_LIKELY(rec_get_status(rec)
!= REC_STATUS_SUPREMUM)) {
page_zip_rec_set_owned(page_zip, rec, n_owned);
}
}
......@@ -964,6 +966,9 @@ rec_offs_make_valid(
ulint* offsets __attribute__((unused)))
/* in: array returned by rec_get_offsets() */
{
ut_ad(rec);
ut_ad(index);
ut_ad(offsets);
#ifdef UNIV_DEBUG
ut_ad(rec_get_n_fields(rec, index) >= rec_offs_n_fields(offsets));
offsets[2] = (ulint) rec;
......
......@@ -139,6 +139,7 @@ void
row_upd_rec_in_place(
/*=================*/
rec_t* rec, /* in/out: record where replaced */
dict_index_t* index, /* in: the index the record belongs to */
const ulint* offsets,/* in: array returned by rec_get_offsets() */
upd_t* update, /* in: update vector */
page_zip_des_t* page_zip);/* in: compressed page with enough space
......
......@@ -995,16 +995,16 @@ use_heap:
if (UNIV_UNLIKELY(insert_buf == NULL)) {
return(NULL);
}
if (UNIV_LIKELY_NULL(page_zip)) {
page_zip_dir_add_slot(page_zip);
}
}
/* 3. Create the record */
insert_rec = rec_copy(insert_buf, rec, offsets);
ut_ad(rec_offs_validate(rec, index, offsets));
rec_offs_make_valid(insert_rec, index, offsets);
ut_ad(insert_rec);
ut_ad(rec_size == rec_offs_size(offsets));
/* Set the "extern storage" flags */
if (UNIV_UNLIKELY(n_ext)) {
rec_set_field_extern_bits(insert_rec, index, ext, n_ext);
......@@ -1093,7 +1093,7 @@ use_heap:
we have to split the corresponding directory slot in two. */
if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
page_dir_split_slot(page, NULL,
page_dir_split_slot(page, page_zip,
page_dir_find_owner_slot(owner_rec));
}
}
......@@ -1102,7 +1102,7 @@ use_heap:
/* TODO: something similar to page_zip_dir_delete() */
page_zip_dir_rewrite(page_zip, page);
page_zip_write_rec(page_zip, insert_rec, offsets);
page_zip_write_rec(page_zip, insert_rec, index, offsets);
}
/* 9. Write log record of the insert */
......
......@@ -75,7 +75,7 @@ page_zip_dir_user_size(
{
ulint size = PAGE_ZIP_DIR_SLOT_SIZE
* page_get_n_recs((page_t*) page_zip->data);
ut_ad(size < page_zip_dir_size(page_zip));
ut_ad(size <= page_zip_dir_size(page_zip));
return(size);
}
......@@ -1072,7 +1072,8 @@ page_zip_set_extra_bytes(
}
/**************************************************************************
Apply the modification log to an uncompressed page. */
Apply the modification log to an uncompressed page.
Do not copy the fields that are stored separately. */
static
const byte*
page_zip_apply_log(
......@@ -1084,6 +1085,8 @@ page_zip_apply_log(
rec_t** recs, /* in: dense page directory,
sorted by address (indexed by heap_no - 2) */
ulint n_dense,/* in: size of recs[] */
ulint trx_id_col,/* in: column number of trx_id in the index;
ignored if heap_status & REC_STATUS_NODE_PTR */
ulint heap_status,
/* in: heap_no and status bits for
the next record to uncompress */
......@@ -1101,7 +1104,7 @@ page_zip_apply_log(
val = *data++;
if (UNIV_UNLIKELY(!val)) {
break;
return(data - 1);
}
if (val & 0x80) {
val = (val & 0x7f) << 7 | *data++;
......@@ -1171,8 +1174,28 @@ page_zip_apply_log(
BTR_EXTERN_FIELD_REF._*/
for (i = 0; i < rec_offs_n_fields(offsets); i++) {
if (rec_offs_nth_extern(offsets, i)) {
byte* dst = rec_get_nth_field(
byte* dst;
if (UNIV_UNLIKELY(i == trx_id_col)) {
/* Skip trx_id and roll_ptr */
dst = rec_get_nth_field(
rec, offsets, i, &len);
if (UNIV_UNLIKELY(dst - next_out
>= end - data)
|| UNIV_UNLIKELY(len
< DATA_TRX_ID_LEN
+ DATA_ROLL_PTR_LEN)
|| rec_offs_nth_extern(
offsets, i)) {
return(NULL);
}
memcpy(next_out, data, dst - next_out);
data += dst - next_out;
next_out = dst + (DATA_TRX_ID_LEN
+ DATA_ROLL_PTR_LEN);
} else if (rec_offs_nth_extern(offsets, i)) {
dst = rec_get_nth_field(
rec, offsets, i, &len);
ut_ad(len > BTR_EXTERN_FIELD_REF_SIZE);
......@@ -1182,6 +1205,7 @@ page_zip_apply_log(
if (UNIV_UNLIKELY(data + len >= end)) {
return(NULL);
}
memcpy(next_out, data, len);
data += len;
next_out += len
......@@ -1189,11 +1213,8 @@ page_zip_apply_log(
}
}
/* Copy the last bytes of the record.
Skip roll_ptr and trx_id. */
len = rec_get_end(rec, offsets)
- (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
- next_out;
/* Copy the last bytes of the record. */
len = rec_get_end(rec, offsets) - next_out;
if (UNIV_UNLIKELY(data + len >= end)) {
return(NULL);
}
......@@ -1201,8 +1222,6 @@ page_zip_apply_log(
data += len;
}
}
return(data);
}
/**************************************************************************
......@@ -1554,7 +1573,7 @@ zlib_done:
const byte* mod_log_ptr;
mod_log_ptr = page_zip_apply_log(
page_zip->data + page_zip->m_start,
d_stream.avail_in, recs, n_dense,
d_stream.avail_in, recs, n_dense, trx_id_col,
heap_status, index, offsets);
if (UNIV_UNLIKELY(!mod_log_ptr)) {
......@@ -1615,21 +1634,18 @@ err_exit:
trx_id_col, &len);
ut_ad(len >= DATA_TRX_ID_LEN
+ DATA_ROLL_PTR_LEN);
memcpy(dst, storage
- (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
* (rec_get_heap_no_new(rec) - 1),
storage -= DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
memcpy(dst, storage,
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
}
} else {
/* Non-leaf nodes should not have any externally
stored columns. */
ut_ad(!rec_offs_any_extern(offsets));
storage -= REC_NODE_PTR_SIZE;
memcpy(rec_get_end(rec, offsets) - REC_NODE_PTR_SIZE,
storage - REC_NODE_PTR_SIZE
* (rec_get_heap_no_new(rec) - 1),
REC_NODE_PTR_SIZE);
storage, REC_NODE_PTR_SIZE);
}
}
......@@ -1653,6 +1669,7 @@ Check that the compressed and decompressed pages match. */
ibool
page_zip_validate(
/*==============*/
/* out: TRUE if valid, FALSE if not */
const page_zip_des_t* page_zip,/* in: compressed page */
const page_t* page) /* in: uncompressed page */
{
......@@ -1681,6 +1698,7 @@ page_zip_write_rec(
/*===============*/
page_zip_des_t* page_zip,/* in/out: compressed page */
const byte* rec, /* in: record being written */
dict_index_t* index, /* in: the index the record belongs to */
const ulint* offsets)/* in: rec_get_offsets(rec, index) */
{
page_t* page;
......@@ -1692,7 +1710,7 @@ page_zip_write_rec(
ut_ad(page_zip_simple_validate(page_zip));
ut_ad(page_zip->size > PAGE_DATA + page_zip_dir_size(page_zip));
ut_ad(rec_offs_comp(offsets));
ut_ad(rec_offs_validate((rec_t*) rec, NULL, offsets));
ut_ad(rec_offs_validate((rec_t*) rec, index, offsets));
ut_ad(page_zip->m_start >= PAGE_DATA);
ut_ad(!memcmp(ut_align_down((byte*) rec, UNIV_PAGE_SIZE),
......@@ -1744,22 +1762,51 @@ page_zip_write_rec(
ulint i;
ulint len;
const byte* start = rec;
ulint trx_id_col = dict_index_get_sys_col_pos(
index, DATA_TRX_ID);
/* Check if there are any externally stored columns.
For each externally stored column, store the
BTR_EXTERN_FIELD_REF separately._*/
for (i = 0; i < rec_offs_n_fields(offsets); i++) {
if (rec_offs_nth_extern(offsets, i)) {
ulint len;
const byte* src = rec_get_nth_field(
(rec_t*) rec, offsets, i, &len);
const byte* src;
if (UNIV_UNLIKELY(i == trx_id_col)) {
ut_ad(!rec_offs_nth_extern(offsets, i));
ut_ad(!rec_offs_nth_extern(offsets, i + 1));
/* Store trx_id and roll_ptr separately. */
src = rec_get_nth_field((rec_t*) rec,
offsets, i, &len);
#ifdef UNIV_DEBUG
ut_ad(len == DATA_TRX_ID_LEN);
rec_get_nth_field((rec_t*) rec,
offsets, i + 1, &len);
ut_ad(len == DATA_ROLL_PTR_LEN);
#endif /* UNIV_DEBUG */
/* Log the preceding fields. */
memcpy(data, start, src - start);
data += src - start;
start = src + (DATA_TRX_ID_LEN
+ DATA_ROLL_PTR_LEN);
/* Store trx_id and roll_ptr separately. */
memcpy(storage
- (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
* (heap_no - 1),
src,
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
i++; /* skip also roll_ptr */
} else if (rec_offs_nth_extern(offsets, i)) {
src = rec_get_nth_field((rec_t*) rec,
offsets, i, &len);
ut_ad(len > BTR_EXTERN_FIELD_REF_SIZE);
src += len - BTR_EXTERN_FIELD_REF_SIZE;
memcpy(data, start, src - start);
data += src - start;
start = src;
start = src + BTR_EXTERN_FIELD_REF_SIZE;
/* TODO: copy the BLOB pointer to
the appropriate place in the
......@@ -1767,30 +1814,13 @@ page_zip_write_rec(
/* TODO: update page_zip->n_blobs */
}
/* TODO: observe trx_id_col */
}
/* Log the last bytes of the record.
Skip roll_ptr and trx_id. */
len = rec_get_end((rec_t*) rec, offsets)
- (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN) - start;
/* Log the last bytes of the record. */
len = rec_get_end((rec_t*) rec, offsets) - start;
memcpy(data, start, len);
data += len;
start += len;
/* Copy roll_ptr and trx_id to the uncompressed area. */
memcpy(storage - (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
* (heap_no - 1),
start,
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
ut_a(data <= storage
- (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
* (page_dir_get_n_heap(page) - 2)
- page_zip->n_blobs
* BTR_EXTERN_FIELD_REF_SIZE
- 1 /* for the modification log terminator */);
} else {
/* This is a node pointer page. */
ulint len;
......@@ -1816,7 +1846,7 @@ page_zip_write_rec(
}
page_zip->m_end = data - page_zip->data;
ut_a(!mach_read_from_2(data));
ut_a(!*data);
#if defined UNIV_DEBUG || defined UNIV_ZIP_DEBUG
ut_a(page_zip_validate(page_zip,
......@@ -1982,7 +2012,7 @@ page_zip_write_trx_id(
storage = page_zip->data + page_zip->size
- (page_dir_get_n_heap(page) - 2)
* PAGE_ZIP_DIR_SLOT_SIZE
- (rec_get_heap_no_new(rec) - 2)
- (rec_get_heap_no_new(rec) - 1)
* (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
field = rec + size
- (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
......@@ -2031,7 +2061,7 @@ page_zip_write_roll_ptr(
storage = page_zip->data + page_zip->size
- (page_dir_get_n_heap(page) - 2)
* PAGE_ZIP_DIR_SLOT_SIZE
- (rec_get_heap_no_new(rec) - 2)
- (rec_get_heap_no_new(rec) - 1)
* (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
+ DATA_TRX_ID_LEN;
field = rec + size
......@@ -2075,7 +2105,7 @@ page_zip_clear_rec(
memset(rec, 0, rec_offs_data_size(offsets));
/* Log that the data was zeroed out. */
page_zip_write_rec(page_zip, rec, offsets);
page_zip_write_rec(page_zip, rec, index, offsets);
} else {
/* There is not enough space to log the clearing.
Try to clear the block and to recompress the page. */
......@@ -2200,6 +2230,36 @@ page_zip_dir_delete(
mach_write_to_2(slot_free, ut_align_offset(rec, UNIV_PAGE_SIZE));
}
/**************************************************************************
Add a slot to the dense page directory. */
void
page_zip_dir_add_slot(
/*==================*/
page_zip_des_t* page_zip)/* in/out: compressed page */
{
ulint n_dense;
byte* dir;
byte* stored;
n_dense = page_dir_get_n_heap(page_zip->data) - 2;
ut_a(n_dense > 0); /* This field should have already been updated. */
dir = page_zip->data + page_zip->size
- PAGE_ZIP_DIR_SLOT_SIZE * n_dense;
if (page_is_leaf(page_zip->data)) {
stored = dir - (n_dense - 1)
* (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
- page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE;
} else {
ut_ad(!page_zip->n_blobs);
stored = dir - (n_dense - 1) * REC_NODE_PTR_SIZE;
}
memmove(stored, stored + PAGE_ZIP_DIR_SLOT_SIZE, dir - stored);
}
/**************************************************************************
Write a log record of writing to the uncompressed header portion of a page. */
......
......@@ -442,6 +442,7 @@ void
row_upd_rec_in_place(
/*=================*/
rec_t* rec, /* in/out: record where replaced */
dict_index_t* index, /* in: the index the record belongs to */
const ulint* offsets,/* in: array returned by rec_get_offsets() */
upd_t* update, /* in: update vector */
page_zip_des_t* page_zip)/* in: compressed page with enough space
......@@ -472,7 +473,7 @@ row_upd_rec_in_place(
}
if (UNIV_LIKELY_NULL(page_zip)) {
page_zip_write_rec(page_zip, rec, offsets);
page_zip_write_rec(page_zip, rec, index, offsets);
}
}
......
......@@ -1413,7 +1413,7 @@ trx_undo_prev_version_build(
buf = mem_heap_alloc(heap, rec_offs_size(offsets));
*old_vers = rec_copy(buf, rec, offsets);
rec_offs_make_valid(*old_vers, index, offsets);
row_upd_rec_in_place(*old_vers, offsets, update, NULL);
row_upd_rec_in_place(*old_vers, index, offsets, update, NULL);
}
return(DB_SUCCESS);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment