Commit 8edd94a7 authored by marko's avatar marko

branches/zip: page_zip_write_rec(): Treat records containing externally

stored columns as a special case.

page_zip_write_rec_ext(): New function for writing records containing
externally stored columns.
parent a18381a9
......@@ -2688,6 +2688,131 @@ page_zip_header_cmp(
}
#endif /* UNIV_DEBUG */
/**************************************************************************
Write a record on the compressed page that contains externally stored
columns. The data must already have been written to the uncompressed page. */
static
byte*
page_zip_write_rec_ext(
/*===================*/
/* out: end of modification log */
page_zip_des_t* page_zip, /* in/out: compressed page */
const page_t* page, /* in: page containing rec */
const byte* rec, /* in: record being written */
dict_index_t* index, /* in: record descriptor */
const ulint* offsets, /* in: rec_get_offsets(rec, index) */
ulint create, /* in: nonzero=insert, zero=update */
ulint trx_id_col, /* in: position of DB_TRX_ID */
ulint heap_no, /* in: heap number of rec */
byte* storage, /* in: end of dense page directory */
byte* data) /* in: end of modification log */
{
const byte* start = rec;
ulint i;
ulint len;
byte* externs = storage;
ulint n_ext = rec_offs_n_extern(offsets);
ut_ad(rec_offs_validate(rec, index, offsets));
externs -= (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
* (page_dir_get_n_heap(page) - PAGE_HEAP_NO_USER_LOW);
/* Note that this will not take into account
the BLOB columns of rec if create==TRUE. */
ut_ad(data + rec_offs_data_size(offsets)
- (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
- n_ext * BTR_EXTERN_FIELD_REF_SIZE
< externs - BTR_EXTERN_FIELD_REF_SIZE * page_zip->n_blobs);
{
ulint blob_no = page_zip_get_n_prev_extern(
page_zip, rec, index);
byte* ext_end = externs - page_zip->n_blobs
* BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(blob_no <= page_zip->n_blobs);
externs -= blob_no * BTR_EXTERN_FIELD_REF_SIZE;
if (create) {
page_zip->n_blobs += n_ext;
ut_ad(!memcmp
(ext_end - n_ext
* BTR_EXTERN_FIELD_REF_SIZE,
zero,
BTR_EXTERN_FIELD_REF_SIZE));
memmove(ext_end - n_ext
* BTR_EXTERN_FIELD_REF_SIZE,
ext_end,
externs - ext_end);
}
ut_a(blob_no + n_ext <= page_zip->n_blobs);
}
for (i = 0; i < rec_offs_n_fields(offsets); i++) {
const byte* src;
if (UNIV_UNLIKELY(i == trx_id_col)) {
ut_ad(!rec_offs_nth_extern(offsets,
i));
ut_ad(!rec_offs_nth_extern(offsets,
i + 1));
/* Locate trx_id and roll_ptr. */
src = rec_get_nth_field(rec, offsets,
i, &len);
ut_ad(len == DATA_TRX_ID_LEN);
ut_ad(src + DATA_TRX_ID_LEN
== rec_get_nth_field(
rec, offsets,
i + 1, &len));
ut_ad(len == DATA_ROLL_PTR_LEN);
/* Log the preceding fields. */
ut_ad(!memcmp(data, zero,
ut_min(src - start,
sizeof zero)));
memcpy(data, start, src - start);
data += src - start;
start = src + (DATA_TRX_ID_LEN
+ DATA_ROLL_PTR_LEN);
/* Store trx_id and roll_ptr. */
memcpy(storage - (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
* (heap_no - 1),
src, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
i++; /* skip also roll_ptr */
} else if (rec_offs_nth_extern(offsets, i)) {
src = rec_get_nth_field(rec, offsets,
i, &len);
ut_ad(dict_index_is_clust(index));
ut_ad(len
>= BTR_EXTERN_FIELD_REF_SIZE);
src += len - BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(!memcmp(data, zero,
ut_min(src - start, sizeof zero)));
memcpy(data, start, src - start);
data += src - start;
start = src + BTR_EXTERN_FIELD_REF_SIZE;
/* Store the BLOB pointer. */
externs -= BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(data < externs);
memcpy(externs, src, BTR_EXTERN_FIELD_REF_SIZE);
}
}
/* Log the last bytes of the record. */
len = rec_offs_data_size(offsets) - (start - rec);
ut_ad(!memcmp(data, zero, ut_min(len, sizeof zero)));
memcpy(data, start, len);
data += len;
return(data);
}
/**************************************************************************
Write an entire record on the compressed page. The data must already
have been written to the uncompressed page. */
......@@ -2701,7 +2826,7 @@ page_zip_write_rec(
const ulint* offsets,/* in: rec_get_offsets(rec, index) */
ulint create) /* in: nonzero=insert, zero=update */
{
page_t* page;
const page_t* page;
byte* data;
byte* storage;
ulint heap_no;
......@@ -2719,7 +2844,7 @@ page_zip_write_rec(
page = page_align((rec_t*) rec);
ut_ad(page_zip_header_cmp(page_zip, page));
ut_ad(page_simple_validate_new(page));
ut_ad(page_simple_validate_new((page_t*) page));
slot = page_zip_dir_find(page_zip, page_offset(rec));
ut_a(slot);
......@@ -2775,115 +2900,56 @@ page_zip_write_rec(
if (page_is_leaf(page)) {
ulint len;
const byte* start = rec;
if (dict_index_is_clust(index)) {
ulint i;
byte* externs = storage;
ulint trx_id_col;
ulint n_ext = rec_offs_n_extern(offsets);
trx_id_col = dict_index_get_sys_col_pos(index,
DATA_TRX_ID);
ut_ad(trx_id_col != ULINT_UNDEFINED);
externs -= (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
* (page_dir_get_n_heap(page)
- PAGE_HEAP_NO_USER_LOW);
/* Note that this will not take into account
the BLOB columns of rec if create==TRUE */
ut_ad(data + rec_offs_data_size(offsets)
- (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
- n_ext * BTR_EXTERN_FIELD_REF_SIZE
< externs
- BTR_EXTERN_FIELD_REF_SIZE
* page_zip->n_blobs);
if (UNIV_UNLIKELY(n_ext)) {
ulint blob_no = page_zip_get_n_prev_extern(
page_zip, rec, index);
byte* ext_end = externs - page_zip->n_blobs
* BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(blob_no <= page_zip->n_blobs);
externs -= blob_no * BTR_EXTERN_FIELD_REF_SIZE;
if (create) {
page_zip->n_blobs += n_ext;
ut_ad(!memcmp
(ext_end - n_ext
* BTR_EXTERN_FIELD_REF_SIZE,
zero,
BTR_EXTERN_FIELD_REF_SIZE));
memmove(ext_end - n_ext
* BTR_EXTERN_FIELD_REF_SIZE,
ext_end,
externs - ext_end);
}
ut_a(blob_no + n_ext <= page_zip->n_blobs);
}
/* Store separately trx_id, roll_ptr and
the BTR_EXTERN_FIELD_REF of each BLOB column. */
for (i = 0; i < rec_offs_n_fields(offsets); i++) {
const byte* src;
if (UNIV_UNLIKELY(i == trx_id_col)) {
ut_ad(!rec_offs_nth_extern(offsets,
i));
ut_ad(!rec_offs_nth_extern(offsets,
i + 1));
if (rec_offs_any_extern(offsets)) {
data = page_zip_write_rec_ext(
page_zip, page,
rec, index, offsets, create,
trx_id_col, heap_no, storage, data);
} else {
/* Locate trx_id and roll_ptr. */
src = rec_get_nth_field(rec, offsets,
i, &len);
const byte* src
= rec_get_nth_field(rec, offsets,
trx_id_col, &len);
ut_ad(len == DATA_TRX_ID_LEN);
ut_ad(src + DATA_TRX_ID_LEN
== rec_get_nth_field(
rec, offsets,
i + 1, &len));
trx_id_col + 1, &len));
ut_ad(len == DATA_ROLL_PTR_LEN);
/* Log the preceding fields. */
ut_ad(!memcmp(data, zero,
ut_min(src - start,
sizeof zero)));
memcpy(data, start, src - start);
data += src - start;
start = src + (DATA_TRX_ID_LEN
+ DATA_ROLL_PTR_LEN);
ut_min(src - rec, sizeof zero)));
memcpy(data, rec, src - rec);
data += src - rec;
/* Store trx_id and roll_ptr. */
memcpy(storage - (DATA_TRX_ID_LEN
+ DATA_ROLL_PTR_LEN)
memcpy(storage
- (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
* (heap_no - 1),
src, DATA_TRX_ID_LEN
+ DATA_ROLL_PTR_LEN);
i++; /* skip also roll_ptr */
} else if (rec_offs_nth_extern(offsets, i)) {
src = rec_get_nth_field(rec, offsets,
i, &len);
src,
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
ut_ad(dict_index_is_clust(index));
ut_ad(len
>= BTR_EXTERN_FIELD_REF_SIZE);
src += len - BTR_EXTERN_FIELD_REF_SIZE;
src += DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN;
ut_ad(!memcmp(data, zero,
ut_min(src - start,
sizeof zero)));
memcpy(data, start, src - start);
data += src - start;
start = src
+ BTR_EXTERN_FIELD_REF_SIZE;
/* Log the last bytes of the record. */
len = rec_offs_data_size(offsets)
- (src - rec);
/* Store the BLOB pointer. */
externs -= BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(data < externs);
memcpy(externs, src,
BTR_EXTERN_FIELD_REF_SIZE);
}
ut_ad(!memcmp(data, zero,
ut_min(len, sizeof zero)));
memcpy(data, src, len);
data += len;
}
} else {
/* Leaf page of a secondary index:
......@@ -2891,14 +2957,14 @@ page_zip_write_rec(
ut_ad(dict_index_get_sys_col_pos(index, DATA_TRX_ID)
== ULINT_UNDEFINED);
ut_ad(!rec_offs_any_extern(offsets));
}
/* Log the last bytes of the record. */
len = rec + rec_offs_data_size(offsets) - start;
/* Log the entire record. */
len = rec_offs_data_size(offsets);
ut_ad(!memcmp(data, zero, ut_min(len, sizeof zero)));
memcpy(data, start, len);
memcpy(data, rec, len);
data += len;
}
} else {
/* This is a node pointer page. */
ulint len;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment