Commit 69f236c2 authored by marko's avatar marko

branches/zip:

Fix the way how btr_free_externally_stored_field() is called in purge.

btr_free_externally_stored_field(): Add parameter field_ref that points
directly to the BLOB reference.  Use rec, offsets, page_zip, and i
only for the page_zip_write_blob_ptr() call.

row_purge_upd_exist_or_extern(): Do not assume that the undo log contains
the entire record.  Only pass the BLOB reference to
btr_free_externally_stored_field().
parent 748f0694
...@@ -3779,12 +3779,15 @@ btr_free_externally_stored_field( ...@@ -3779,12 +3779,15 @@ btr_free_externally_stored_field(
from purge where 'data' is located on from purge where 'data' is located on
an undo log page, not an index an undo log page, not an index
page) */ page) */
rec_t* rec, /* in/out: record */ byte* field_ref, /* in/out: field reference */
const ulint* offsets, /* in: rec_get_offsets(rec, index) */ rec_t* rec, /* in: record containing field_ref, for
page_zip_des_t* page_zip, /* in: compressed page whose page_zip_write_blob_ptr(), or NULL */
uncompressed part will be updated, const ulint* offsets, /* in: rec_get_offsets(rec, index),
or NULL */ or NULL */
ulint i, /* in: field number */ page_zip_des_t* page_zip, /* in: compressed page corresponding
to rec, or NULL if rec == NULL */
ulint i, /* in: field number of field_ref;
ignored if rec == NULL */
ibool do_not_free_inherited,/* in: TRUE if called in a ibool do_not_free_inherited,/* in: TRUE if called in a
rollback and we do not want to free rollback and we do not want to free
inherited fields */ inherited fields */
...@@ -3794,7 +3797,6 @@ btr_free_externally_stored_field( ...@@ -3794,7 +3797,6 @@ btr_free_externally_stored_field(
{ {
page_t* page; page_t* page;
page_t* rec_page; page_t* rec_page;
byte* field_ref;
ulint space_id; ulint space_id;
ulint page_no; ulint page_no;
ulint next_page_no; ulint next_page_no;
...@@ -3802,17 +3804,20 @@ btr_free_externally_stored_field( ...@@ -3802,17 +3804,20 @@ btr_free_externally_stored_field(
ut_ad(mtr_memo_contains(local_mtr, dict_tree_get_lock(index->tree), ut_ad(mtr_memo_contains(local_mtr, dict_tree_get_lock(index->tree),
MTR_MEMO_X_LOCK)); MTR_MEMO_X_LOCK));
ut_ad(mtr_memo_contains(local_mtr, buf_block_align(rec), ut_ad(mtr_memo_contains(local_mtr, buf_block_align(field_ref),
MTR_MEMO_PAGE_X_FIX)); MTR_MEMO_PAGE_X_FIX));
ut_ad(rec_offs_validate(rec, index, offsets)); ut_ad(!rec || rec_offs_validate(rec, index, offsets));
{ #ifdef UNIV_DEBUG
if (rec) {
ulint local_len; ulint local_len;
field_ref = rec_get_nth_field(rec, offsets, i, &local_len); byte* f = rec_get_nth_field(rec, offsets, i, &local_len);
ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE); ut_a(local_len >= BTR_EXTERN_FIELD_REF_SIZE);
local_len -= BTR_EXTERN_FIELD_REF_SIZE; local_len -= BTR_EXTERN_FIELD_REF_SIZE;
field_ref += local_len; f += local_len;
ut_ad(f == field_ref);
} }
#endif /* UNIV_DEBUG */
for (;;) { for (;;) {
mtr_start(&mtr); mtr_start(&mtr);
...@@ -3858,11 +3863,10 @@ btr_free_externally_stored_field( ...@@ -3858,11 +3863,10 @@ btr_free_externally_stored_field(
mlog_write_ulint(field_ref + BTR_EXTERN_LEN + 4, mlog_write_ulint(field_ref + BTR_EXTERN_LEN + 4,
0, 0,
MLOG_4BYTES, &mtr); MLOG_4BYTES, &mtr);
#if ZIP_BLOB if (UNIV_LIKELY_NULL(page_zip)) {
if (page_zip) page_zip_write_blob_ptr(page_zip,
#endif rec, index, offsets, i, &mtr);
page_zip_write_blob_ptr(page_zip, rec, index, offsets, }
i, &mtr);
} else { } else {
ulint extern_len = mach_read_from_4( ulint extern_len = mach_read_from_4(
field_ref + BTR_EXTERN_LEN + 4); field_ref + BTR_EXTERN_LEN + 4);
...@@ -3935,10 +3939,15 @@ btr_rec_free_externally_stored_fields( ...@@ -3935,10 +3939,15 @@ btr_rec_free_externally_stored_fields(
for (i = 0; i < n_fields; i++) { for (i = 0; i < n_fields; i++) {
if (rec_offs_nth_extern(offsets, i)) { if (rec_offs_nth_extern(offsets, i)) {
ulint len;
btr_free_externally_stored_field(index, rec, offsets, byte* data = rec_get_nth_field(
page_zip, i, rec, offsets, i, &len);
do_not_free_inherited, mtr); ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
btr_free_externally_stored_field(index,
data + len - BTR_EXTERN_FIELD_REF_SIZE,
rec, offsets, page_zip, i,
do_not_free_inherited, mtr);
} }
} }
} }
...@@ -3979,10 +3988,16 @@ btr_rec_free_updated_extern_fields( ...@@ -3979,10 +3988,16 @@ btr_rec_free_updated_extern_fields(
ufield = upd_get_nth_field(update, i); ufield = upd_get_nth_field(update, i);
if (rec_offs_nth_extern(offsets, ufield->field_no)) { if (rec_offs_nth_extern(offsets, ufield->field_no)) {
ulint len;
btr_free_externally_stored_field(index, rec, offsets, byte* data = rec_get_nth_field(
page_zip, ufield->field_no, rec, offsets, ufield->field_no, &len);
do_not_free_inherited, mtr); ut_a(len >= BTR_EXTERN_FIELD_REF_SIZE);
btr_free_externally_stored_field(index,
data + len - BTR_EXTERN_FIELD_REF_SIZE,
rec, offsets, page_zip,
ufield->field_no,
do_not_free_inherited, mtr);
} }
} }
} }
......
...@@ -481,12 +481,15 @@ btr_free_externally_stored_field( ...@@ -481,12 +481,15 @@ btr_free_externally_stored_field(
from purge where 'data' is located on from purge where 'data' is located on
an undo log page, not an index an undo log page, not an index
page) */ page) */
rec_t* rec, /* in/out: record */ byte* field_ref, /* in/out: field reference */
const ulint* offsets, /* in: rec_get_offsets(rec, index) */ rec_t* rec, /* in: record containing field_ref, for
page_zip_des_t* page_zip, /* in: compressed page whose page_zip_write_blob_ptr(), or NULL */
uncompressed part will be updated, const ulint* offsets, /* in: rec_get_offsets(rec, index),
or NULL */ or NULL */
ulint i, /* in: field number */ page_zip_des_t* page_zip, /* in: compressed page corresponding
to rec, or NULL if rec == NULL */
ulint i, /* in: field number of field_ref;
ignored if rec == NULL */
ibool do_not_free_inherited,/* in: TRUE if called in a ibool do_not_free_inherited,/* in: TRUE if called in a
rollback and we do not want to free rollback and we do not want to free
inherited fields */ inherited fields */
......
...@@ -371,15 +371,12 @@ row_purge_upd_exist_or_extern( ...@@ -371,15 +371,12 @@ row_purge_upd_exist_or_extern(
ulint page_no; ulint page_no;
ulint offset; ulint offset;
ulint i; ulint i;
ulint* offsets;
mtr_t mtr; mtr_t mtr;
ut_ad(node); ut_ad(node);
offsets = NULL;
if (node->rec_type == TRX_UNDO_UPD_DEL_REC) { if (node->rec_type == TRX_UNDO_UPD_DEL_REC) {
heap = NULL;
goto skip_secondaries; goto skip_secondaries;
} }
...@@ -399,7 +396,7 @@ row_purge_upd_exist_or_extern( ...@@ -399,7 +396,7 @@ row_purge_upd_exist_or_extern(
node->index = dict_table_get_next_index(node->index); node->index = dict_table_get_next_index(node->index);
} }
mem_heap_empty(heap); mem_heap_free(heap);
skip_secondaries: skip_secondaries:
/* Free possible externally stored fields */ /* Free possible externally stored fields */
...@@ -408,9 +405,8 @@ row_purge_upd_exist_or_extern( ...@@ -408,9 +405,8 @@ row_purge_upd_exist_or_extern(
ufield = upd_get_nth_field(node->update, i); ufield = upd_get_nth_field(node->update, i);
if (UNIV_UNLIKELY(ufield->extern_storage)) { if (UNIV_UNLIKELY(ufield->extern_storage)) {
byte* rec;
ulint j;
ulint internal_offset; ulint internal_offset;
byte* data_field;
/* We use the fact that new_val points to /* We use the fact that new_val points to
node->undo_rec and get thus the offset of node->undo_rec and get thus the offset of
...@@ -449,44 +445,22 @@ row_purge_upd_exist_or_extern( ...@@ -449,44 +445,22 @@ row_purge_upd_exist_or_extern(
/* We assume in purge of externally stored fields /* We assume in purge of externally stored fields
that the space id of the undo log record is 0! */ that the space id of the undo log record is 0! */
rec = buf_page_get(0, page_no, RW_X_LATCH, &mtr) data_field = buf_page_get(0, page_no, RW_X_LATCH, &mtr)
+ offset; + offset + internal_offset;
#ifdef UNIV_SYNC_DEBUG #ifdef UNIV_SYNC_DEBUG
buf_page_dbg_add_level(buf_frame_align(rec), buf_page_dbg_add_level(buf_frame_align(data_field),
SYNC_TRX_UNDO_PAGE); SYNC_TRX_UNDO_PAGE);
#endif /* UNIV_SYNC_DEBUG */ #endif /* UNIV_SYNC_DEBUG */
ut_a(ufield->new_val.len >= BTR_EXTERN_FIELD_REF_SIZE);
offsets = rec_get_offsets(rec, index, offsets, btr_free_externally_stored_field(index, data_field
ULINT_UNDEFINED, &heap); + ufield->new_val.len
for (j = 0; j < rec_offs_n_fields(offsets); j++) { - BTR_EXTERN_FIELD_REF_SIZE,
ulint len; NULL, NULL, NULL, 0,
byte* field = rec_get_nth_field( FALSE, &mtr);
rec, offsets, j, &len);
if (UNIV_UNLIKELY(rec + internal_offset
== field)) {
ut_a(len == ufield->new_val.len);
ut_a(rec_offs_nth_extern(offsets, j));
goto found_field;
}
}
/* field not found */
ut_error;
found_field:
btr_free_externally_stored_field(index, rec, offsets,
buf_block_get_page_zip(
buf_block_align(rec)),
j, FALSE, &mtr);
mtr_commit(&mtr); mtr_commit(&mtr);
} }
} }
if (heap) {
mem_heap_free(heap);
}
} }
/*************************************************************** /***************************************************************
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment