Commit 65dfefe2 authored by marko's avatar marko

branches/zip: Implement in-place updates of BLOB pointers.

There are still some bugs in the code.

btr_store_big_rec_extern_fields(): Remove assertion on dict_table_is_zip()
to ease testing.

btr_free_externally_stored_field(): Test page_zip instead of
dict_table_is_zip().

page_zip_write_rec(): Add parameter "create".  Try to handle externally
stored columns.

rec_offs_any_extern(): Correct the function comment.

Add rec_offs_n_extern() and page_zip_get_n_prev_extern().

page_zip_dir_decode(): Replace assertion with if (...) return(FALSE).

page_zip_decompress(): Do not clear page_zip->n_blobs after counting the
BLOBs.

page_zip_write_blob_ptr(): Use page_zip_get_n_prev_extern().
Correct an off-by-one error in memcpy().
parent 23d19f8f
......@@ -3512,7 +3512,6 @@ btr_store_big_rec_extern_fields(
space_id = buf_frame_get_space_id(rec);
page_zip = buf_block_get_page_zip(buf_block_align(rec));
ut_ad(!page_zip == !dict_table_is_zip(index->table));
if (UNIV_LIKELY_NULL(page_zip)) {
int err;
......@@ -3848,7 +3847,7 @@ btr_free_externally_stored_field(
#ifdef UNIV_SYNC_DEBUG
buf_page_dbg_add_level(page, SYNC_EXTERN_STORAGE);
#endif /* UNIV_SYNC_DEBUG */
if (dict_table_is_zip(index->table)) {
if (UNIV_LIKELY_NULL(page_zip)) {
next_page_no = mach_read_from_4(page);
btr_page_free_low(index->tree, page,
......@@ -3860,10 +3859,8 @@ btr_free_externally_stored_field(
mlog_write_ulint(field_ref + BTR_EXTERN_LEN + 4,
0,
MLOG_4BYTES, &mtr);
if (UNIV_LIKELY_NULL(page_zip)) {
page_zip_write_blob_ptr(page_zip,
rec, index, offsets, i, &mtr);
}
} else {
ulint extern_len = mach_read_from_4(
field_ref + BTR_EXTERN_LEN + 4);
......
......@@ -113,7 +113,8 @@ page_zip_write_rec(
page_zip_des_t* page_zip,/* in/out: compressed page */
const byte* rec, /* in: record being written */
dict_index_t* index, /* in: the index the record belongs to */
const ulint* offsets)/* in: rec_get_offsets(rec, index) */
const ulint* offsets,/* in: rec_get_offsets(rec, index) */
ulint create) /* in: nonzero=insert, zero=update */
__attribute__((nonnull));
/**************************************************************************
......
......@@ -447,13 +447,21 @@ rec_offs_nth_size(
/**********************************************************
Returns TRUE if the extern bit is set in any of the fields
of rec. */
of a record. */
UNIV_INLINE
ibool
rec_offs_any_extern(
/*================*/
/* out: TRUE if a field is stored externally */
const ulint* offsets);/* in: array returned by rec_get_offsets() */
/**********************************************************
Returns the number of extern bits set in a record. */
UNIV_INLINE
ulint
rec_offs_n_extern(
/*==============*/
/* out: number of externally stored fields */
const ulint* offsets);/* in: array returned by rec_get_offsets() */
/***************************************************************
Sets TRUE the extern storage bits of fields mentioned in an array. */
......
......@@ -1080,7 +1080,7 @@ rec_offs_nth_size(
/**********************************************************
Returns TRUE if the extern bit is set in any of the fields
of an old-style record. */
of a record. */
UNIV_INLINE
ibool
rec_offs_any_extern(
......@@ -1097,6 +1097,25 @@ rec_offs_any_extern(
return(FALSE);
}
/**********************************************************
Returns the number of extern bits set in a record. */
UNIV_INLINE
ulint
rec_offs_n_extern(
/*==============*/
/* out: number of externally stored fields */
const ulint* offsets)/* in: array returned by rec_get_offsets() */
{
ulint i;
ulint n = 0;
for (i = rec_offs_n_fields(offsets); i--; ) {
if (rec_offs_nth_extern(offsets, i)) {
n++;
}
}
return(n);
}
/**********************************************************
Returns the offset of n - 1th field end if the record is stored in the 1-byte
offsets form. If the field is SQL null, the flag is ORed in the returned
......
......@@ -1104,7 +1104,7 @@ page_cur_insert_rec_low(
/* TODO: something similar to page_zip_dir_delete() */
page_zip_dir_rewrite(page_zip, page);
page_zip_write_rec(page_zip, insert_rec, index, offsets);
page_zip_write_rec(page_zip, insert_rec, index, offsets, 1);
}
/* 9. Write log record of the insert */
......
......@@ -160,6 +160,46 @@ page_zip_dir_get(
- PAGE_ZIP_DIR_SLOT_SIZE * (slot + 1)));
}
/**********************************************************
Determine how many externally stored columns are contained
in previous records on the page of rec. */
UNIV_INLINE
ulint
page_zip_get_n_prev_extern(
/*=======================*/
const rec_t* rec, /* in: compact physical record
on a B-tree leaf page */
dict_index_t* index) /* in: record descriptor */
{
page_t* page = ut_align_down((byte*) rec, UNIV_PAGE_SIZE);
ulint n_ext;
ulint next_offs;
ut_ad(page_is_leaf(page));
ut_ad(page_is_comp(page));
ut_ad(dict_table_is_comp(index->table));
n_ext = 0;
next_offs = rec_get_next_offs(page + PAGE_NEW_INFIMUM, TRUE);
ut_a(next_offs >= PAGE_NEW_SUPREMUM_END);
do {
rec_t* r = page + next_offs;
if (r == rec) {
return(n_ext);
}
n_ext += rec_get_n_extern_new(r, index, ULINT_UNDEFINED);
next_offs = rec_get_next_offs(r, TRUE);
ut_a(next_offs > 0);
} while (next_offs != PAGE_NEW_SUPREMUM);
ut_error;
return(0);
}
/**************************************************************************
Encode the length of a fixed-length column. */
static
......@@ -980,8 +1020,10 @@ page_zip_dir_decode(
UNIV_PREFETCH_RW(slot);
}
ut_ad((offs & PAGE_ZIP_DIR_SLOT_MASK)
>= PAGE_ZIP_START + REC_N_NEW_EXTRA_BYTES);
if (UNIV_UNLIKELY((offs & PAGE_ZIP_DIR_SLOT_MASK)
< PAGE_ZIP_START + REC_N_NEW_EXTRA_BYTES)) {
return(FALSE);
}
recs[i] = page + (offs & PAGE_ZIP_DIR_SLOT_MASK);
}
......@@ -1628,7 +1670,7 @@ page_zip_decompress(
} else {
externs = storage;
}
page_zip->n_blobs = 0;
recsc = recs;
while (n_dense--) {
......@@ -1732,7 +1774,8 @@ page_zip_write_rec(
page_zip_des_t* page_zip,/* in/out: compressed page */
const byte* rec, /* in: record being written */
dict_index_t* index, /* in: the index the record belongs to */
const ulint* offsets)/* in: rec_get_offsets(rec, index) */
const ulint* offsets,/* in: rec_get_offsets(rec, index) */
ulint create) /* in: nonzero=insert, zero=update */
{
page_t* page;
byte* data;
......@@ -1795,32 +1838,63 @@ page_zip_write_rec(
ulint i;
ulint len;
const byte* start = rec;
byte* externs;
ulint trx_id_col;
ulint n_ext = rec_offs_n_extern(offsets);
if (dict_index_is_clust(index)) {
trx_id_col = dict_index_get_sys_col_pos(
index, DATA_TRX_ID);
ut_ad(trx_id_col != ULINT_UNDEFINED);
/* Do not assert on page_zip->n_blobs, because
it may change later in this function. */
ut_ad(data + rec_offs_data_size(offsets)
externs = storage
- (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
< storage
* (page_dir_get_n_heap(page) - 2);
/* Note that this will not take into account
the BLOB columns of rec if create==TRUE */
ut_ad(data + rec_offs_data_size(offsets)
- (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)
* (page_dir_get_n_heap(page) - 2));
- n_ext * BTR_EXTERN_FIELD_REF_SIZE
< externs
- BTR_EXTERN_FIELD_REF_SIZE
* page_zip->n_blobs);
} else {
trx_id_col = ULINT_UNDEFINED;
ut_ad(dict_index_get_sys_col_pos(
index, DATA_TRX_ID)
== ULINT_UNDEFINED);
/* Do not assert on page_zip->n_blobs, because
it may change later in this function. */
ut_ad(data + rec_offs_data_size(offsets) < storage);
externs = storage;
/* Note that this will not take into account
the BLOB columns of rec if create==TRUE */
ut_ad(data + rec_offs_data_size(offsets)
- n_ext * BTR_EXTERN_FIELD_REF_SIZE
< externs
- BTR_EXTERN_FIELD_REF_SIZE
* page_zip->n_blobs);
}
/* Check if there are any externally stored columns.
For each externally stored column, store the
BTR_EXTERN_FIELD_REF separately._*/
if (UNIV_UNLIKELY(n_ext)) {
ulint blob_no = page_zip_get_n_prev_extern(
rec, index);
byte* ext_end = externs - page_zip->n_blobs
* BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(blob_no <= page_zip->n_blobs);
externs -= blob_no * BTR_EXTERN_FIELD_REF_SIZE;
if (create) {
page_zip->n_blobs += n_ext;
memmove(ext_end - n_ext
* BTR_EXTERN_FIELD_REF_SIZE,
ext_end,
externs - ext_end);
}
ut_a(blob_no + n_ext <= page_zip->n_blobs);
}
/* Store separately trx_id, roll_ptr and
the BTR_EXTERN_FIELD_REF of each BLOB column._*/
for (i = 0; i < rec_offs_n_fields(offsets); i++) {
const byte* src;
......@@ -1861,11 +1935,11 @@ page_zip_write_rec(
data += src - start;
start = src + BTR_EXTERN_FIELD_REF_SIZE;
/* TODO: copy the BLOB pointer to
the appropriate place in the
uncompressed BLOB pointer array */
/* TODO: update page_zip->n_blobs */
/* Store the BLOB pointer separately. */
externs -= BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(data < externs);
memcpy(externs, src,
BTR_EXTERN_FIELD_REF_SIZE);
}
}
......@@ -1923,9 +1997,8 @@ page_zip_write_blob_ptr(
{
byte* field;
byte* externs;
page_t* page = buf_frame_align((byte*) rec);
page_t* page = ut_align_down((byte*) rec, UNIV_PAGE_SIZE);
ulint blob_no;
ulint next_offs;
ulint len;
ut_ad(buf_block_get_page_zip(buf_block_align((byte*)rec)) == page_zip);
......@@ -1940,28 +2013,9 @@ page_zip_write_blob_ptr(
ut_ad(page_is_leaf(page));
blob_no = 0;
next_offs = rec_get_next_offs(page + PAGE_NEW_INFIMUM, TRUE);
ut_a(next_offs > PAGE_NEW_SUPREMUM_END);
do {
rec_t* r = page + next_offs;
if (r == rec) {
goto found;
}
blob_no += rec_get_n_extern_new(r, index, ULINT_UNDEFINED);
next_offs = rec_get_next_offs(r, TRUE);
ut_a(next_offs > 0);
} while (next_offs != PAGE_NEW_SUPREMUM);
ut_error;
found:
blob_no += rec_get_n_extern_new(rec, index, n);
ut_a(blob_no < page_zip->n_blobs);
blob_no = page_zip_get_n_prev_extern(rec, index)
+ rec_get_n_extern_new(rec, index, n);
ut_a(blob_no <= page_zip->n_blobs);
/* The heap number of the first user record is 2. */
if (dict_index_is_clust(index)) {
......@@ -1977,7 +2031,7 @@ page_zip_write_blob_ptr(
field = rec_get_nth_field((rec_t*) rec, offsets, n, &len);
memcpy(externs - blob_no * BTR_EXTERN_FIELD_REF_SIZE,
memcpy(externs - (blob_no + 1) * BTR_EXTERN_FIELD_REF_SIZE,
field + len - BTR_EXTERN_FIELD_REF_SIZE,
BTR_EXTERN_FIELD_REF_SIZE);
......@@ -2119,7 +2173,7 @@ page_zip_clear_rec(
memset(rec, 0, rec_offs_data_size(offsets));
/* Log that the data was zeroed out. */
page_zip_write_rec(page_zip, rec, index, offsets);
page_zip_write_rec(page_zip, rec, index, offsets, 0);
} else {
/* There is not enough space to log the clearing.
Try to clear the block and to recompress the page. */
......
......@@ -477,7 +477,7 @@ row_upd_rec_in_place(
}
if (UNIV_LIKELY_NULL(page_zip)) {
page_zip_write_rec(page_zip, rec, index, offsets);
page_zip_write_rec(page_zip, rec, index, offsets, 0);
}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment