Commit ee5a92aa authored by marko's avatar marko

branches/zip: Implement the compression of BLOB columns.

This has not been extensively tested yet, because some other part of the
code breaks in "ibtestblob".

btr_free_page_low(): Add parameters "space" and "page_no", because they
are omitted from compressed BLOB pages.

btr0cur.c: Implement the compression and decompression of BLOB columns,
enabled at compile-time (#define ZIP_BLOB TRUE) for now.

btr_rec_free_externally_stored_fields(),
btr_copy_externally_stored_field(): Made static

mlog_log_string(): New function, split from mlog_write_string(), allows
to avoid a dummy memcpy() of compressed BLOB pages.
parent d8e3b811
......@@ -436,13 +436,13 @@ btr_page_free_low(
/*==============*/
dict_tree_t* tree, /* in: index tree */
page_t* page, /* in: page to be freed, x-latched */
ulint space, /* in: space */
ulint page_no,/* in: page number */
ulint level, /* in: page level */
mtr_t* mtr) /* in: mtr */
{
fseg_header_t* seg_header;
page_t* root;
ulint space;
ulint page_no;
ut_ad(mtr_memo_contains(mtr, buf_block_align(page),
MTR_MEMO_PAGE_X_FIX));
......@@ -466,9 +466,6 @@ btr_page_free_low(
seg_header = root + PAGE_HEADER + PAGE_BTR_SEG_TOP;
}
space = buf_frame_get_space_id(page);
page_no = buf_frame_get_page_no(page);
fseg_free_page(seg_header, space, page_no, mtr);
}
......@@ -484,12 +481,17 @@ btr_page_free(
mtr_t* mtr) /* in: mtr */
{
ulint level;
ulint space;
ulint page_no;
ut_ad(mtr_memo_contains(mtr, buf_block_align(page),
MTR_MEMO_PAGE_X_FIX));
level = btr_page_get_level(page, mtr);
btr_page_free_low(tree, page, level, mtr);
space = buf_frame_get_space_id(page);
page_no = buf_frame_get_page_no(page);
btr_page_free_low(tree, page, space, page_no, level, mtr);
}
/******************************************************************
......
......@@ -36,6 +36,9 @@ Created 10/16/1994 Heikki Tuuri
#include "srv0srv.h"
#include "ibuf0ibuf.h"
#include "lock0lock.h"
#include "zlib.h"
#define ZIP_BLOB TRUE /* testing */
#ifdef UNIV_DEBUG
/* If the following is set to TRUE, this module prints a lot of
......@@ -111,6 +114,24 @@ btr_rec_free_updated_extern_fields(
mtr_t* mtr); /* in: mini-transaction handle which contains
an X-latch to record page and to the tree */
/***************************************************************
Frees the externally stored fields for a record. */
static
void
btr_rec_free_externally_stored_fields(
/*==================================*/
dict_index_t* index, /* in: index of the data, the index
tree MUST be X-latched */
rec_t* rec, /* in: record */
const ulint* offsets,/* in: rec_get_offsets(rec, index) */
page_zip_des_t* page_zip,/* in: compressed page whose uncompressed
part will be updated, or NULL */
ibool do_not_free_inherited,/* in: TRUE if called in a
rollback and we do not want to free
inherited fields */
mtr_t* mtr); /* in: mini-transaction handle which contains
an X-latch to record page and to the index
tree */
/***************************************************************
Gets the externally stored size of a record, in units of a database page. */
static
ulint
......@@ -2632,9 +2653,8 @@ btr_cur_pessimistic_delete(
/* Free externally stored fields if the record is neither
a node pointer nor in two-byte format.
This condition avoids an unnecessary loop. */
if (page_is_comp(page)
? !rec_get_node_ptr_flag(rec)
: !rec_get_1byte_offs_flag(rec)) {
if (page_is_leaf(page)
&& (page_is_comp(page) || !rec_get_1byte_offs_flag(rec))) {
btr_rec_free_externally_stored_fields(cursor->index, rec,
offsets, page_zip, in_rollback, mtr);
}
......@@ -3452,6 +3472,8 @@ btr_store_big_rec_extern_fields(
ulint hint_page_no;
ulint i;
mtr_t mtr;
page_zip_des_t* page_zip;
z_stream c_stream;
ut_ad(rec_offs_validate(rec, index, offsets));
ut_ad(mtr_memo_contains(local_mtr, dict_tree_get_lock(index->tree),
......@@ -3461,12 +3483,26 @@ btr_store_big_rec_extern_fields(
ut_a(index->type & DICT_CLUSTERED);
space_id = buf_frame_get_space_id(rec);
page_zip = buf_block_get_page_zip(buf_block_align(rec));
if (ZIP_BLOB || UNIV_LIKELY_NULL(page_zip)) {
int err;
c_stream.zalloc = (alloc_func) 0;
c_stream.zfree = (free_func) 0;
c_stream.opaque = (voidpf) 0;
err = deflateInit(&c_stream, Z_DEFAULT_COMPRESSION);
ut_a(err == Z_OK);
}
/* We have to create a file segment to the tablespace
for each field and put the pointer to the field in rec */
for (i = 0; i < big_rec_vec->n_fields; i++) {
ut_ad(rec_offs_nth_extern(offsets,
big_rec_vec->fields[i].field_no));
{
ulint local_len;
field_ref = rec_get_nth_field(rec, offsets,
......@@ -3481,7 +3517,15 @@ btr_store_big_rec_extern_fields(
prev_page_no = FIL_NULL;
while (extern_len > 0) {
if (ZIP_BLOB || UNIV_LIKELY_NULL(page_zip)) {
int err = deflateReset(&c_stream);
ut_a(err == Z_OK);
c_stream.next_in = big_rec_vec->fields[i].data;
c_stream.avail_in = extern_len;
}
for (;;) {
mtr_start(&mtr);
if (prev_page_no == FIL_NULL) {
......@@ -3490,19 +3534,23 @@ btr_store_big_rec_extern_fields(
hint_page_no = prev_page_no + 1;
}
/* TODO: allocate compressed BLOB storage */
page = btr_page_alloc(index->tree, hint_page_no,
FSP_NO_DIR, 0, &mtr);
if (page == NULL) {
if (UNIV_UNLIKELY(page == NULL)) {
mtr_commit(&mtr);
if (ZIP_BLOB || UNIV_LIKELY_NULL(page_zip)) {
deflateEnd(&c_stream);
}
return(DB_OUT_OF_FILE_SPACE);
}
page_no = buf_frame_get_page_no(page);
if (prev_page_no != FIL_NULL) {
byte* next_ptr;
prev_page = buf_page_get(space_id,
prev_page_no,
RW_X_LATCH, &mtr);
......@@ -3511,97 +3559,175 @@ btr_store_big_rec_extern_fields(
buf_page_dbg_add_level(prev_page,
SYNC_EXTERN_STORAGE);
#endif /* UNIV_SYNC_DEBUG */
mlog_write_ulint(prev_page + FIL_PAGE_DATA
+ BTR_BLOB_HDR_NEXT_PAGE_NO,
if (ZIP_BLOB||UNIV_LIKELY_NULL(page_zip)) {
next_ptr = prev_page;
} else {
next_ptr = prev_page + FIL_PAGE_DATA
+ BTR_BLOB_HDR_NEXT_PAGE_NO;
}
mlog_write_ulint(next_ptr,
page_no, MLOG_4BYTES, &mtr);
}
if (extern_len > (UNIV_PAGE_SIZE - FIL_PAGE_DATA
if (UNIV_LIKELY_NULL(ZIP_BLOB||page_zip)) {
int err;
c_stream.next_out = page + 4;
c_stream.avail_out = UNIV_PAGE_SIZE - 4;
err = deflate(&c_stream, Z_FINISH);
ut_a(err == Z_OK || err == Z_STREAM_END);
ut_a(err == Z_STREAM_END
|| c_stream.avail_out == 0);
/* Write the "next BLOB page" pointer */
mach_write_to_4(page, FIL_NULL);
/* Zero out the unused part of the page. */
memset(page + UNIV_PAGE_SIZE
- c_stream.avail_out,
0, c_stream.avail_out);
mlog_log_string(page, UNIV_PAGE_SIZE, &mtr);
if (err == Z_OK && prev_page_no != FIL_NULL) {
goto next_zip_page;
}
rec_page = buf_page_get(space_id,
buf_frame_get_page_no(
field_ref),
RW_X_LATCH, &mtr);
#ifdef UNIV_SYNC_DEBUG
buf_page_dbg_add_level(
rec_page, SYNC_NO_ORDER_CHECK);
#endif /* UNIV_SYNC_DEBUG */
mlog_write_ulint(field_ref
+ BTR_EXTERN_LEN, 0,
MLOG_4BYTES, &mtr);
if (err == Z_STREAM_END) {
fprintf(stderr,"in:%lu out:%lu\n", c_stream.total_in, c_stream.total_out);
mlog_write_ulint(field_ref
+ BTR_EXTERN_LEN + 4,
c_stream.total_in,
MLOG_4BYTES, &mtr);
} else {
mlog_write_ulint(field_ref
+ BTR_EXTERN_LEN + 4,
0,
MLOG_4BYTES, &mtr);
}
if (prev_page_no == FIL_NULL) {
mlog_write_ulint(field_ref
+ BTR_EXTERN_SPACE_ID,
space_id,
MLOG_4BYTES, &mtr);
mlog_write_ulint(field_ref
+ BTR_EXTERN_PAGE_NO,
page_no,
MLOG_4BYTES, &mtr);
mlog_write_ulint(field_ref
+ BTR_EXTERN_OFFSET,
FIL_PAGE_DATA,
MLOG_4BYTES, &mtr);
}
#if ZIP_BLOB
if (page_zip)
#endif
page_zip_write_blob_ptr(page_zip, rec,
index, offsets,
big_rec_vec->fields[i].field_no, &mtr);
next_zip_page:
prev_page_no = page_no;
mtr_commit(&mtr);
if (err == Z_STREAM_END) {
break;
}
} else {
if (extern_len > (UNIV_PAGE_SIZE
- FIL_PAGE_DATA
- BTR_BLOB_HDR_SIZE
- FIL_PAGE_DATA_END)) {
store_len = UNIV_PAGE_SIZE - FIL_PAGE_DATA
store_len = UNIV_PAGE_SIZE
- FIL_PAGE_DATA
- BTR_BLOB_HDR_SIZE
- FIL_PAGE_DATA_END;
} else {
store_len = extern_len;
}
} else {
store_len = extern_len;
}
/* TODO: log these writes differently for page_zip */
mlog_write_string(page + FIL_PAGE_DATA
mlog_write_string(page + FIL_PAGE_DATA
+ BTR_BLOB_HDR_SIZE,
big_rec_vec->fields[i].data
+ big_rec_vec->fields[i].len
- extern_len,
store_len, &mtr);
mlog_write_ulint(page + FIL_PAGE_DATA
mlog_write_ulint(page + FIL_PAGE_DATA
+ BTR_BLOB_HDR_PART_LEN,
store_len, MLOG_4BYTES, &mtr);
mlog_write_ulint(page + FIL_PAGE_DATA
mlog_write_ulint(page + FIL_PAGE_DATA
+ BTR_BLOB_HDR_NEXT_PAGE_NO,
FIL_NULL, MLOG_4BYTES, &mtr);
extern_len -= store_len;
extern_len -= store_len;
rec_page = buf_page_get(space_id,
rec_page = buf_page_get(space_id,
buf_frame_get_page_no(
field_ref),
RW_X_LATCH, &mtr);
#ifdef UNIV_SYNC_DEBUG
buf_page_dbg_add_level(rec_page, SYNC_NO_ORDER_CHECK);
buf_page_dbg_add_level(
rec_page, SYNC_NO_ORDER_CHECK);
#endif /* UNIV_SYNC_DEBUG */
mlog_write_ulint(field_ref + BTR_EXTERN_LEN, 0,
MLOG_4BYTES, &mtr);
mlog_write_ulint(field_ref + BTR_EXTERN_LEN + 4,
big_rec_vec->fields[i].len
- extern_len,
MLOG_4BYTES, &mtr);
if (prev_page_no == FIL_NULL) {
page_zip_des_t* page_zip;
mlog_write_ulint(field_ref
+ BTR_EXTERN_SPACE_ID,
space_id,
mlog_write_ulint(field_ref + BTR_EXTERN_LEN, 0,
MLOG_4BYTES, &mtr);
mlog_write_ulint(field_ref
+ BTR_EXTERN_PAGE_NO,
page_no,
+ BTR_EXTERN_LEN + 4,
big_rec_vec->fields[i].len
- extern_len,
MLOG_4BYTES, &mtr);
if (prev_page_no == FIL_NULL) {
mlog_write_ulint(field_ref
+ BTR_EXTERN_SPACE_ID,
space_id,
MLOG_4BYTES, &mtr);
mlog_write_ulint(field_ref
+ BTR_EXTERN_PAGE_NO,
page_no,
MLOG_4BYTES, &mtr);
mlog_write_ulint(field_ref + BTR_EXTERN_OFFSET,
FIL_PAGE_DATA,
MLOG_4BYTES, &mtr);
mlog_write_ulint(field_ref
+ BTR_EXTERN_OFFSET,
FIL_PAGE_DATA,
MLOG_4BYTES, &mtr);
}
ut_ad(rec_offs_nth_extern(offsets,
big_rec_vec->fields[i].field_no));
#if 0 /* TODO:remove */
/* Set the bit denoting that this field
in rec is stored externally */
prev_page_no = page_no;
rec_set_nth_field_extern_bit(rec, index,
big_rec_vec->fields[i].field_no,
&mtr);
#endif
page_zip = buf_block_get_page_zip(
buf_block_align(rec));
mtr_commit(&mtr);
if (UNIV_LIKELY_NULL(page_zip)) {
page_zip_write_blob_ptr(page_zip, rec,
index, offsets,
big_rec_vec->
fields[i].field_no,
&mtr);
if (extern_len == 0) {
break;
}
}
prev_page_no = page_no;
mtr_commit(&mtr);
}
}
if (ZIP_BLOB||UNIV_LIKELY_NULL(page_zip)) {
deflateEnd(&c_stream);
}
return(DB_SUCCESS);
}
......@@ -3640,11 +3766,7 @@ btr_free_externally_stored_field(
byte* field_ref;
ulint space_id;
ulint page_no;
ulint offset;
ulint extern_len;
ulint next_page_no;
ulint part_len;
ulint local_len;
mtr_t mtr;
ut_ad(mtr_memo_contains(local_mtr, dict_tree_get_lock(index->tree),
......@@ -3653,16 +3775,19 @@ btr_free_externally_stored_field(
MTR_MEMO_PAGE_X_FIX));
ut_ad(rec_offs_validate(rec, index, offsets));
field_ref = rec_get_nth_field(rec, offsets, i, &local_len);
ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
local_len -= BTR_EXTERN_FIELD_REF_SIZE;
field_ref += local_len;
{
ulint local_len;
field_ref = rec_get_nth_field(rec, offsets, i, &local_len);
ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
local_len -= BTR_EXTERN_FIELD_REF_SIZE;
field_ref += local_len;
}
for (;;) {
mtr_start(&mtr);
rec_page = buf_page_get(buf_frame_get_space_id(rec),
buf_frame_get_page_no(rec), RW_X_LATCH, &mtr);
rec_page = buf_page_get(buf_frame_get_space_id(field_ref),
buf_frame_get_page_no(field_ref), RW_X_LATCH, &mtr);
#ifdef UNIV_SYNC_DEBUG
buf_page_dbg_add_level(rec_page, SYNC_NO_ORDER_CHECK);
#endif /* UNIV_SYNC_DEBUG */
......@@ -3670,74 +3795,77 @@ btr_free_externally_stored_field(
page_no = mach_read_from_4(field_ref + BTR_EXTERN_PAGE_NO);
offset = mach_read_from_4(field_ref + BTR_EXTERN_OFFSET);
extern_len = mach_read_from_4(field_ref + BTR_EXTERN_LEN + 4);
/* If extern len is 0, then there is no external storage data
at all */
if (extern_len == 0) {
mtr_commit(&mtr);
return;
}
if (mach_read_from_1(field_ref + BTR_EXTERN_LEN)
& BTR_EXTERN_OWNER_FLAG) {
/* This field does not own the externally
stored field: do not free! */
mtr_commit(&mtr);
return;
}
if (do_not_free_inherited
if (/* There is no external storage data */
page_no == FIL_NULL
/* This field does not own the externally stored field */
|| (mach_read_from_1(field_ref + BTR_EXTERN_LEN)
& BTR_EXTERN_OWNER_FLAG)
/* Rollback and inherited field */
|| (do_not_free_inherited
&& mach_read_from_1(field_ref + BTR_EXTERN_LEN)
& BTR_EXTERN_INHERITED_FLAG) {
/* Rollback and inherited field: do not free! */
& BTR_EXTERN_INHERITED_FLAG)) {
/* Do not free */
mtr_commit(&mtr);
return;
}
page = buf_page_get(space_id, page_no, RW_X_LATCH, &mtr);
#ifdef UNIV_SYNC_DEBUG
buf_page_dbg_add_level(page, SYNC_EXTERN_STORAGE);
#endif /* UNIV_SYNC_DEBUG */
next_page_no = mach_read_from_4(page + FIL_PAGE_DATA
+ BTR_BLOB_HDR_NEXT_PAGE_NO);
if (ZIP_BLOB||UNIV_LIKELY_NULL(page_zip)) {
next_page_no = mach_read_from_4(page);
part_len = btr_blob_get_part_len(page + FIL_PAGE_DATA);
btr_page_free_low(index->tree, page,
space_id, page_no, 0, &mtr);
ut_a(extern_len >= part_len);
mlog_write_ulint(field_ref + BTR_EXTERN_PAGE_NO,
next_page_no,
MLOG_4BYTES, &mtr);
mlog_write_ulint(field_ref + BTR_EXTERN_LEN + 4,
0,
MLOG_4BYTES, &mtr);
#if ZIP_BLOB
if (page_zip)
#endif
page_zip_write_blob_ptr(page_zip, rec, index, offsets,
i, &mtr);
} else {
ulint extern_len = mach_read_from_4(
field_ref + BTR_EXTERN_LEN + 4);
ulint part_len = btr_blob_get_part_len(
page + FIL_PAGE_DATA);
/* We must supply the page level (= 0) as an argument
because we did not store it on the page (we save the space
overhead from an index page header. */
ut_a(extern_len >= part_len);
btr_page_free_low(index->tree, page, 0, &mtr);
next_page_no = mach_read_from_4(page + FIL_PAGE_DATA
+ BTR_BLOB_HDR_NEXT_PAGE_NO);
/* TODO: log these writes differently for page_zip */
mlog_write_ulint(field_ref + BTR_EXTERN_PAGE_NO,
/* We must supply the page level (= 0) as an argument
because we did not store it on the page (we save the
space overhead from an index page header. */
ut_a(space_id == buf_frame_get_space_id(page));
ut_a(page_no == buf_frame_get_page_no(page));
btr_page_free_low(index->tree, page,
space_id, page_no, 0, &mtr);
mlog_write_ulint(field_ref + BTR_EXTERN_PAGE_NO,
next_page_no,
MLOG_4BYTES, &mtr);
mlog_write_ulint(field_ref + BTR_EXTERN_LEN + 4,
mlog_write_ulint(field_ref + BTR_EXTERN_LEN + 4,
extern_len - part_len,
MLOG_4BYTES, &mtr);
if (next_page_no == FIL_NULL) {
ut_a(extern_len - part_len == 0);
}
if (extern_len - part_len == 0) {
ut_a(next_page_no == FIL_NULL);
}
if (next_page_no == FIL_NULL) {
ut_a(extern_len - part_len == 0);
}
if (UNIV_LIKELY_NULL(page_zip)) {
page_zip_write_blob_ptr(page_zip, rec, index, offsets,
i, &mtr);
if (extern_len - part_len == 0) {
ut_a(next_page_no == FIL_NULL);
}
}
mtr_commit(&mtr);
......@@ -3746,7 +3874,7 @@ btr_free_externally_stored_field(
/***************************************************************
Frees the externally stored fields for a record. */
static
void
btr_rec_free_externally_stored_fields(
/*==================================*/
......@@ -3832,8 +3960,8 @@ btr_rec_free_updated_extern_fields(
Copies an externally stored field of a record to mem heap. Parameter
data contains a pointer to 'internally' stored part of the field:
possibly some data, and the reference to the externally stored part in
the last 20 bytes of data. */
the last BTR_EXTERN_FIELD_REF_SIZE bytes of data. */
static
byte*
btr_copy_externally_stored_field(
/*=============================*/
......@@ -3842,6 +3970,7 @@ btr_copy_externally_stored_field(
byte* data, /* in: 'internally' stored part of the
field containing also the reference to
the external part */
ibool zip, /* in: TRUE=compressed BLOB */
ulint local_len,/* in: length of data */
mem_heap_t* heap) /* in: mem heap */
{
......@@ -3850,11 +3979,10 @@ btr_copy_externally_stored_field(
ulint page_no;
ulint offset;
ulint extern_len;
byte* blob_header;
ulint part_len;
byte* buf;
ulint copied_len;
byte* buf;
mtr_t mtr;
z_stream d_stream;
ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
......@@ -3866,14 +3994,14 @@ btr_copy_externally_stored_field(
offset = mach_read_from_4(data + local_len + BTR_EXTERN_OFFSET);
/* Currently a BLOB cannot be bigger that 4 GB; we
/* Currently a BLOB cannot be bigger than 4 GB; we
leave the 4 upper bytes in the length field unused */
extern_len = mach_read_from_4(data + local_len + BTR_EXTERN_LEN + 4);
buf = mem_heap_alloc(heap, local_len + extern_len);
ut_memcpy(buf, data, local_len);
memcpy(buf, data, local_len);
copied_len = local_len;
if (extern_len == 0) {
......@@ -3882,6 +4010,20 @@ btr_copy_externally_stored_field(
return(buf);
}
if (UNIV_UNLIKELY(zip)) {
int err;
d_stream.zalloc = (alloc_func) 0;
d_stream.zfree = (free_func) 0;
d_stream.opaque = (voidpf) 0;
err = inflateInit(&d_stream);
ut_a(err == Z_OK);
d_stream.next_out = buf + local_len;
d_stream.avail_out = extern_len;
d_stream.avail_in = 0;
}
for (;;) {
mtr_start(&mtr);
......@@ -3889,32 +4031,72 @@ btr_copy_externally_stored_field(
#ifdef UNIV_SYNC_DEBUG
buf_page_dbg_add_level(page, SYNC_EXTERN_STORAGE);
#endif /* UNIV_SYNC_DEBUG */
blob_header = page + offset;
if (UNIV_UNLIKELY(zip)) {
int err;
d_stream.next_in = page + 4;
d_stream.avail_in = UNIV_PAGE_SIZE - 4;/* TODO */
part_len = btr_blob_get_part_len(blob_header);
page_no = mach_read_from_4(page);
ut_memcpy(buf + copied_len, blob_header + BTR_BLOB_HDR_SIZE,
part_len);
copied_len += part_len;
err = inflate(&d_stream, Z_NO_FLUSH);
switch (err) {
case Z_OK:
break;
case Z_STREAM_END:
if (page_no == FIL_NULL) {
goto end_of_blob;
}
/* fall through */
default:
mtr_commit(&mtr);
inflateEnd(&d_stream);
ut_error;/* TODO: report error */
return(buf);
}
page_no = btr_blob_get_next_page_no(blob_header);
if (page_no == FIL_NULL) {
err = inflate(&d_stream, Z_FINISH);
/* TODO: report error instead of
assertion failure? */
ut_a(err == Z_STREAM_END);
end_of_blob:
mtr_commit(&mtr);
ut_a(!d_stream.avail_out);
/* On other BLOB pages except the first the BLOB header
always is at the page data start: */
inflateEnd(&d_stream);
*len = d_stream.total_out;
return(buf);
}
offset = FIL_PAGE_DATA;
mtr_commit(&mtr);
} else {
byte* blob_header = page + offset;
ulint part_len = btr_blob_get_part_len(
blob_header);
mtr_commit(&mtr);
memcpy(buf + copied_len,
blob_header + BTR_BLOB_HDR_SIZE, part_len);
copied_len += part_len;
page_no = btr_blob_get_next_page_no(blob_header);
/* On other BLOB pages except the first the BLOB header
always is at the page data start: */
if (page_no == FIL_NULL) {
ut_a(copied_len == local_len + extern_len);
offset = FIL_PAGE_DATA;
mtr_commit(&mtr);
*len = copied_len;
if (page_no == FIL_NULL) {
ut_a(copied_len == local_len + extern_len);
*len = copied_len;
return(buf);
}
return(buf);
}
ut_a(copied_len < local_len + extern_len);
ut_a(copied_len < local_len + extern_len);
}
}
}
......@@ -3948,5 +4130,7 @@ btr_rec_copy_externally_stored_field(
data = rec_get_nth_field(rec, offsets, no, &local_len);
return(btr_copy_externally_stored_field(len, data, local_len, heap));
return(btr_copy_externally_stored_field(len, data,
ZIP_BLOB,
local_len, heap));
}
......@@ -392,6 +392,8 @@ btr_page_free_low(
/*==============*/
dict_tree_t* tree, /* in: index tree */
page_t* page, /* in: page to be freed, x-latched */
ulint space, /* in: space */
ulint page_no,/* in: page number */
ulint level, /* in: page level */
mtr_t* mtr); /* in: mtr */
#ifdef UNIV_BTR_PRINT
......
......@@ -493,24 +493,6 @@ btr_free_externally_stored_field(
mtr_t* local_mtr); /* in: mtr containing the latch to
data an an X-latch to the index
tree */
/***************************************************************
Frees the externally stored fields for a record. */
void
btr_rec_free_externally_stored_fields(
/*==================================*/
dict_index_t* index, /* in: index of the data, the index
tree MUST be X-latched */
rec_t* rec, /* in: record */
const ulint* offsets,/* in: rec_get_offsets(rec, index) */
page_zip_des_t* page_zip,/* in: compressed page whose uncompressed
part will be updated, or NULL */
ibool do_not_free_inherited,/* in: TRUE if called in a
rollback and we do not want to free
inherited fields */
mtr_t* mtr); /* in: mini-transaction handle which contains
an X-latch to record page and to the index
tree */
/***********************************************************************
Copies an externally stored field of a record to mem heap. */
......@@ -524,22 +506,6 @@ btr_rec_copy_externally_stored_field(
ulint* len, /* out: length of the field */
mem_heap_t* heap); /* in: mem heap */
/***********************************************************************
Copies an externally stored field of a record to mem heap. Parameter
data contains a pointer to 'internally' stored part of the field:
possibly some data, and the reference to the externally stored part in
the last 20 bytes of data. */
byte*
btr_copy_externally_stored_field(
/*=============================*/
/* out: the whole field copied to heap */
ulint* len, /* out: length of the whole field */
byte* data, /* in: 'internally' stored part of the
field containing also the reference to
the external part */
ulint local_len,/* in: length of data */
mem_heap_t* heap); /* in: mem heap */
/***********************************************************************
Stores the positions of the fields marked as extern storage in the update
vector, and also those fields who are marked as extern storage in rec
and not mentioned in updated fields. We use this function to remember
......
......@@ -46,6 +46,16 @@ mlog_write_string(
ulint len, /* in: string length */
mtr_t* mtr); /* in: mini-transaction handle */
/************************************************************
Logs a write of a string to a file page buffered in the buffer pool.
Writes the corresponding log record to the mini-transaction log. */
void
mlog_log_string(
/*============*/
byte* ptr, /* in: pointer written to */
ulint len, /* in: string length */
mtr_t* mtr); /* in: mini-transaction handle */
/************************************************************
Writes initial part of a log record consisting of one-byte item
type and four-byte space and page numbers. */
......
......@@ -307,8 +307,6 @@ mlog_write_string(
ulint len, /* in: string length */
mtr_t* mtr) /* in: mini-transaction handle */
{
byte* log_ptr;
if (UNIV_UNLIKELY(ptr < buf_pool->frame_zero)
|| UNIV_UNLIKELY(ptr >= buf_pool->high_end)) {
fprintf(stderr,
......@@ -318,7 +316,26 @@ mlog_write_string(
ut_ad(ptr && mtr);
ut_a(len < UNIV_PAGE_SIZE);
ut_memcpy(ptr, str, len);
memcpy(ptr, str, len);
mlog_log_string(ptr, len, mtr);
}
/************************************************************
Logs a write of a string to a file page buffered in the buffer pool.
Writes the corresponding log record to the mini-transaction log. */
void
mlog_log_string(
/*============*/
byte* ptr, /* in: pointer written to */
ulint len, /* in: string length */
mtr_t* mtr) /* in: mini-transaction handle */
{
byte* log_ptr;
ut_ad(ptr && mtr);
ut_ad(len <= UNIV_PAGE_SIZE);
log_ptr = mlog_open(mtr, 30);
......@@ -330,7 +347,7 @@ mlog_write_string(
log_ptr = mlog_write_initial_log_record_fast(ptr, MLOG_WRITE_STRING,
log_ptr, mtr);
mach_write_to_2(log_ptr, ptr - buf_frame_align(ptr));
mach_write_to_2(log_ptr, ut_align_offset(ptr, UNIV_PAGE_SIZE));
log_ptr += 2;
mach_write_to_2(log_ptr, len);
......@@ -338,7 +355,7 @@ mlog_write_string(
mlog_close(mtr, log_ptr);
mlog_catenate_string(mtr, str, len);
mlog_catenate_string(mtr, ptr, len);
}
/************************************************************
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment