Commit 52f7bb36 authored by marko's avatar marko

branches/zip: page_zip_write_rec(): Treat records containing externally

stored columns as a special case.

page_zip_write_rec_ext(): New function for writing records containing
externally stored columns.
parent 8fb1f513
...@@ -2688,6 +2688,131 @@ page_zip_header_cmp( ...@@ -2688,6 +2688,131 @@ page_zip_header_cmp(
} }
#endif /* UNIV_DEBUG */ #endif /* UNIV_DEBUG */
/**************************************************************************
Write a record on the compressed page that contains externally stored
columns. The data must already have been written to the uncompressed page. */
static
byte*
page_zip_write_rec_ext(
/*===================*/
/* out: end of modification log */
page_zip_des_t* page_zip, /* in/out: compressed page */
const page_t* page, /* in: page containing rec */
const byte* rec, /* in: record being written */
dict_index_t* index, /* in: record descriptor */
const ulint* offsets, /* in: rec_get_offsets(rec, index) */
ulint create, /* in: nonzero=insert, zero=update */
ulint trx_id_col, /* in: position of DB_TRX_ID */
ulint heap_no, /* in: heap number of rec */
byte* storage, /* in: end of dense page directory */
byte* data) /* in: end of modification log */
{
const byte* start = rec;
ulint i;
ulint len;
byte* externs = storage;
ulint n_ext = rec_offs_n_extern(offsets);
ut_ad(rec_offs_validate(rec, index, offsets));
externs -= (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
* (page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW);
/* Note that this will not take into account
the BLOB columns of rec if create==TRUE. */
ut_ad(data + rec_offs_data_size(offsets)
- (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
- n_ext * BTR_EXTERN_FIELD_REF_SIZE
< externs - BTR_EXTERN_FIELD_REF_SIZE * page_zip->n_blobs);
{
ulint blob_no = page_zip_get_n_prev_extern(
page_zip, rec, index);
byte* ext_end = externs - page_zip->n_blobs
* BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(blob_no <= page_zip->n_blobs);
externs -= blob_no * BTR_EXTERN_FIELD_REF_SIZE;
if (create) {
page_zip->n_blobs += n_ext;
ut_ad(!memcmp
(ext_end - n_ext
* BTR_EXTERN_FIELD_REF_SIZE,
zero,
BTR_EXTERN_FIELD_REF_SIZE));
memmove(ext_end - n_ext
* BTR_EXTERN_FIELD_REF_SIZE,
ext_end,
externs - ext_end);
}
ut_a(blob_no + n_ext <= page_zip->n_blobs);
}
for (i = 0; i < rec_offs_n_fields(offsets); i++) {
const byte* src;
if (UNIV_UNLIKELY(i == trx_id_col)) {
ut_ad(!rec_offs_nth_extern(offsets,
i));
ut_ad(!rec_offs_nth_extern(offsets,
i + 1));
/* Locate trx_id and roll_ptr. */
src = rec_get_nth_field(rec, offsets,
i, &len);
ut_ad(len == DATA_TRX_ID_LEN);
ut_ad(src + DATA_TRX_ID_LEN
== rec_get_nth_field(
rec, offsets,
i + 1, &len));
ut_ad(len == DATA_ROLL_PTR_LEN);
/* Log the preceding fields. */
ut_ad(!memcmp(data, zero,
ut_min(src - start,
sizeof zero)));
memcpy(data, start, src - start);
data += src - start;
start = src + (DATA_TRX_ID_LEN
+ DATA_ROLL_PTR_LEN);
/* Store trx_id and roll_ptr. */
memcpy(storage - (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
* (heap_no - 1),
src, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
i++; /* skip also roll_ptr */
} else if (rec_offs_nth_extern(offsets, i)) {
src = rec_get_nth_field(rec, offsets,
i, &len);
ut_ad(dict_index_is_clust(index));
ut_ad(len
>= BTR_EXTERN_FIELD_REF_SIZE);
src += len - BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(!memcmp(data, zero,
ut_min(src - start, sizeof zero)));
memcpy(data, start, src - start);
data += src - start;
start = src + BTR_EXTERN_FIELD_REF_SIZE;
/* Store the BLOB pointer. */
externs -= BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(data < externs);
memcpy(externs, src, BTR_EXTERN_FIELD_REF_SIZE);
}
}
/* Log the last bytes of the record. */
len = rec_offs_data_size(offsets) - (start - rec);
ut_ad(!memcmp(data, zero, ut_min(len, sizeof zero)));
memcpy(data, start, len);
data += len;
return(data);
}
/************************************************************************** /**************************************************************************
Write an entire record on the compressed page. The data must already Write an entire record on the compressed page. The data must already
have been written to the uncompressed page. */ have been written to the uncompressed page. */
...@@ -2701,7 +2826,7 @@ page_zip_write_rec( ...@@ -2701,7 +2826,7 @@ page_zip_write_rec(
const ulint* offsets,/* in: rec_get_offsets(rec, index) */ const ulint* offsets,/* in: rec_get_offsets(rec, index) */
ulint create) /* in: nonzero=insert, zero=update */ ulint create) /* in: nonzero=insert, zero=update */
{ {
page_t* page; const page_t* page;
byte* data; byte* data;
byte* storage; byte* storage;
ulint heap_no; ulint heap_no;
...@@ -2719,7 +2844,7 @@ page_zip_write_rec( ...@@ -2719,7 +2844,7 @@ page_zip_write_rec(
page = page_align((rec_t*) rec); page = page_align((rec_t*) rec);
ut_ad(page_zip_header_cmp(page_zip, page)); ut_ad(page_zip_header_cmp(page_zip, page));
ut_ad(page_simple_validate_new(page)); ut_ad(page_simple_validate_new((page_t*) page));
slot = page_zip_dir_find(page_zip, page_offset(rec)); slot = page_zip_dir_find(page_zip, page_offset(rec));
ut_a(slot); ut_a(slot);
...@@ -2775,115 +2900,56 @@ page_zip_write_rec( ...@@ -2775,115 +2900,56 @@ page_zip_write_rec(
if (page_is_leaf(page)) { if (page_is_leaf(page)) {
ulint len; ulint len;
const byte* start = rec;
if (dict_index_is_clust(index)) { if (dict_index_is_clust(index)) {
ulint i;
byte* externs = storage;
ulint trx_id_col; ulint trx_id_col;
ulint n_ext = rec_offs_n_extern(offsets);
trx_id_col = dict_index_get_sys_col_pos(index, trx_id_col = dict_index_get_sys_col_pos(index,
DATA_TRX_ID); DATA_TRX_ID);
ut_ad(trx_id_col != ULINT_UNDEFINED); ut_ad(trx_id_col != ULINT_UNDEFINED);
externs -= (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
* (page_dir_get_n_heap(page)
- PAGE_HEAP_NO_USER_LOW);
/* Note that this will not take into account
the BLOB columns of rec if create==TRUE */
ut_ad(data + rec_offs_data_size(offsets)
- (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
- n_ext * BTR_EXTERN_FIELD_REF_SIZE
< externs
- BTR_EXTERN_FIELD_REF_SIZE
* page_zip->n_blobs);
if (UNIV_UNLIKELY(n_ext)) {
ulint blob_no = page_zip_get_n_prev_extern(
page_zip, rec, index);
byte* ext_end = externs - page_zip->n_blobs
* BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(blob_no <= page_zip->n_blobs);
externs -= blob_no * BTR_EXTERN_FIELD_REF_SIZE;
if (create) {
page_zip->n_blobs += n_ext;
ut_ad(!memcmp
(ext_end - n_ext
* BTR_EXTERN_FIELD_REF_SIZE,
zero,
BTR_EXTERN_FIELD_REF_SIZE));
memmove(ext_end - n_ext
* BTR_EXTERN_FIELD_REF_SIZE,
ext_end,
externs - ext_end);
}
ut_a(blob_no + n_ext <= page_zip->n_blobs);
}
/* Store separately trx_id, roll_ptr and /* Store separately trx_id, roll_ptr and
the BTR_EXTERN_FIELD_REF of each BLOB column. */ the BTR_EXTERN_FIELD_REF of each BLOB column. */
if (rec_offs_any_extern(offsets)) {
for (i = 0; i < rec_offs_n_fields(offsets); i++) { data = page_zip_write_rec_ext(
const byte* src; page_zip, page,
rec, index, offsets, create,
if (UNIV_UNLIKELY(i == trx_id_col)) { trx_id_col, heap_no, storage, data);
ut_ad(!rec_offs_nth_extern(offsets, } else {
i));
ut_ad(!rec_offs_nth_extern(offsets,
i + 1));
/* Locate trx_id and roll_ptr. */ /* Locate trx_id and roll_ptr. */
src = rec_get_nth_field(rec, offsets, const byte* src
i, &len); = rec_get_nth_field(rec, offsets,
trx_id_col, &len);
ut_ad(len == DATA_TRX_ID_LEN); ut_ad(len == DATA_TRX_ID_LEN);
ut_ad(src + DATA_TRX_ID_LEN ut_ad(src + DATA_TRX_ID_LEN
== rec_get_nth_field( == rec_get_nth_field(
rec, offsets, rec, offsets,
i + 1, &len)); trx_id_col + 1, &len));
ut_ad(len == DATA_ROLL_PTR_LEN); ut_ad(len == DATA_ROLL_PTR_LEN);
/* Log the preceding fields. */ /* Log the preceding fields. */
ut_ad(!memcmp(data, zero, ut_ad(!memcmp(data, zero,
ut_min(src - start, ut_min(src - rec, sizeof zero)));
sizeof zero))); memcpy(data, rec, src - rec);
memcpy(data, start, src - start); data += src - rec;
data += src - start;
start = src + (DATA_TRX_ID_LEN
+ DATA_ROLL_PTR_LEN);
/* Store trx_id and roll_ptr. */ /* Store trx_id and roll_ptr. */
memcpy(storage - (DATA_TRX_ID_LEN memcpy(storage
+ DATA_ROLL_PTR_LEN) - (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
* (heap_no - 1), * (heap_no - 1),
src, DATA_TRX_ID_LEN src,
+ DATA_ROLL_PTR_LEN); DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
i++; /* skip also roll_ptr */
} else if (rec_offs_nth_extern(offsets, i)) {
src = rec_get_nth_field(rec, offsets,
i, &len);
ut_ad(dict_index_is_clust(index)); src += DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
ut_ad(len
>= BTR_EXTERN_FIELD_REF_SIZE);
src += len - BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(!memcmp(data, zero, /* Log the last bytes of the record. */
ut_min(src - start, len = rec_offs_data_size(offsets)
sizeof zero))); - (src - rec);
memcpy(data, start, src - start);
data += src - start;
start = src
+ BTR_EXTERN_FIELD_REF_SIZE;
/* Store the BLOB pointer. */ ut_ad(!memcmp(data, zero,
externs -= BTR_EXTERN_FIELD_REF_SIZE; ut_min(len, sizeof zero)));
ut_ad(data < externs); memcpy(data, src, len);
memcpy(externs, src, data += len;
BTR_EXTERN_FIELD_REF_SIZE);
}
} }
} else { } else {
/* Leaf page of a secondary index: /* Leaf page of a secondary index:
...@@ -2891,14 +2957,14 @@ page_zip_write_rec( ...@@ -2891,14 +2957,14 @@ page_zip_write_rec(
ut_ad(dict_index_get_sys_col_pos(index, DATA_TRX_ID) ut_ad(dict_index_get_sys_col_pos(index, DATA_TRX_ID)
== ULINT_UNDEFINED); == ULINT_UNDEFINED);
ut_ad(!rec_offs_any_extern(offsets)); ut_ad(!rec_offs_any_extern(offsets));
}
/* Log the last bytes of the record. */ /* Log the entire record. */
len = rec + rec_offs_data_size(offsets) - start; len = rec_offs_data_size(offsets);
ut_ad(!memcmp(data, zero, ut_min(len, sizeof zero))); ut_ad(!memcmp(data, zero, ut_min(len, sizeof zero)));
memcpy(data, start, len); memcpy(data, rec, len);
data += len; data += len;
}
} else { } else {
/* This is a node pointer page. */ /* This is a node pointer page. */
ulint len; ulint len;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment