Commit f137c038 authored by marko's avatar marko

branches/zip: Improve the clearing of deleted records. Try to support

operations on BLOB columns.  There are some bugs in the code, because
test-insert and a few other tests fail.

page_mem_free(): Add parameter index.  Decrement PAGE_N_RECS here.
Move some operations to page_zip_dir_delete().

page_zip_clear_rec(): Make this a static function.

page_zip_dir_delete(): Add parameters index and offsets.
Decrement PAGE_N_RECS and clear info_bits and n_owned.

page_zip_get_n_prev_extern(): Correct the synopsis and the algorithm.
Add parameter page_zip.  Search the records in heap_no order instead
of collation order.

page_zip_compress(), page_zip_decompress(): Only copy BLOB pointers
and increment n_blobs for records that have not been deleted.

page_zip_clear_rec(): Clear trx_id and roll_ptr on the compressed page.

page_zip_dir_delete(): Decrement PAGE_N_RECS.  Shift the array of
BLOB pointers.  Call page_zip_clear_rec().

page_zip_dir_add_slot(): Shift the array of BLOB pointers to make
space of roll_ptr and trx_id.

page_cur_delete_rec(): Do not decrement PAGE_N_RECS or call
page_zip_clear_rec(), as page_mem_free() already does it.
parent cd7583af
...@@ -604,6 +604,7 @@ page_mem_free( ...@@ -604,6 +604,7 @@ page_mem_free(
page_zip_des_t* page_zip,/* in/out: compressed page with at least page_zip_des_t* page_zip,/* in/out: compressed page with at least
6 bytes available, or NULL */ 6 bytes available, or NULL */
rec_t* rec, /* in: pointer to the (origin of) record */ rec_t* rec, /* in: pointer to the (origin of) record */
dict_index_t* index, /* in: index of rec */
const ulint* offsets);/* in: array returned by rec_get_offsets() */ const ulint* offsets);/* in: array returned by rec_get_offsets() */
/************************************************************** /**************************************************************
The index page creation function. */ The index page creation function. */
......
...@@ -909,33 +909,29 @@ page_mem_free( ...@@ -909,33 +909,29 @@ page_mem_free(
page_zip_des_t* page_zip,/* in/out: compressed page with at least page_zip_des_t* page_zip,/* in/out: compressed page with at least
6 bytes available, or NULL */ 6 bytes available, or NULL */
rec_t* rec, /* in: pointer to the (origin of) record */ rec_t* rec, /* in: pointer to the (origin of) record */
dict_index_t* index, /* in: index of rec */
const ulint* offsets)/* in: array returned by rec_get_offsets() */ const ulint* offsets)/* in: array returned by rec_get_offsets() */
{ {
rec_t* free; rec_t* free;
ulint garbage; ulint garbage;
ut_ad(rec_offs_validate(rec, NULL, offsets)); ut_ad(rec_offs_validate(rec, index, offsets));
ut_ad(!rec_offs_comp(offsets) == !page_rec_is_comp(rec));
free = page_header_get_ptr(page, PAGE_FREE); free = page_header_get_ptr(page, PAGE_FREE);
page_rec_set_next(rec, free); page_rec_set_next(rec, free);
page_header_set_ptr(page, page_zip, PAGE_FREE, rec); page_header_set_ptr(page, page_zip, PAGE_FREE, rec);
if (UNIV_LIKELY_NULL(page_zip)) {
ut_ad(rec_offs_comp(offsets));
/* The compression algorithm expects info_bits and n_owned
to be 0 for deleted records. */
rec[-REC_N_NEW_EXTRA_BYTES] = 0; /* info_bits and n_owned */
/* Update the dense page directory. */
page_zip_dir_delete(page_zip, rec, free);
}
garbage = page_header_get_field(page, PAGE_GARBAGE); garbage = page_header_get_field(page, PAGE_GARBAGE);
page_header_set_field(page, page_zip, PAGE_GARBAGE, page_header_set_field(page, page_zip, PAGE_GARBAGE,
garbage + rec_offs_size(offsets)); garbage + rec_offs_size(offsets));
if (UNIV_LIKELY_NULL(page_zip)) {
page_zip_dir_delete(page_zip, rec, index, offsets, free);
} else {
page_header_set_field(page, page_zip, PAGE_N_RECS,
(ulint)(page_get_n_recs(page) - 1));
}
} }
#ifdef UNIV_MATERIALIZE #ifdef UNIV_MATERIALIZE
......
...@@ -53,19 +53,6 @@ page_zip_write( ...@@ -53,19 +53,6 @@ page_zip_write(
ulint length) /* in: length of the data */ ulint length) /* in: length of the data */
__attribute__((nonnull)); __attribute__((nonnull));
/**************************************************************************
Clear a record on the uncompressed and compressed page, if possible. */
void
page_zip_clear_rec(
/*===============*/
page_zip_des_t* page_zip,/* in/out: compressed page */
byte* rec, /* in: record to clear */
dict_index_t* index, /* in: index of rec */
const ulint* offsets,/* in: rec_get_offsets(rec, index) */
mtr_t* mtr) /* in: mini-transaction */
__attribute__((nonnull));
/************************************************************************** /**************************************************************************
Write data to the uncompressed header portion of a page. The data must Write data to the uncompressed header portion of a page. The data must
already have been written to the uncompressed page. */ already have been written to the uncompressed page. */
...@@ -110,9 +97,11 @@ void ...@@ -110,9 +97,11 @@ void
page_zip_dir_delete( page_zip_dir_delete(
/*================*/ /*================*/
page_zip_des_t* page_zip,/* in/out: compressed page */ page_zip_des_t* page_zip,/* in/out: compressed page */
const byte* rec, /* in: deleted record */ byte* rec, /* in: deleted record */
dict_index_t* index, /* in: index of rec */
const ulint* offsets,/* in: rec_get_offsets(rec) */
const byte* free) /* in: previous start of the free list */ const byte* free) /* in: previous start of the free list */
__attribute__((nonnull(1,2))); __attribute__((nonnull(1,2,3,4)));
/************************************************************************** /**************************************************************************
Add a slot to the dense page directory. */ Add a slot to the dense page directory. */
......
...@@ -161,19 +161,6 @@ page_zip_write_trx_id_and_roll_ptr( ...@@ -161,19 +161,6 @@ page_zip_write_trx_id_and_roll_ptr(
dulint roll_ptr)/* in: roll_ptr */ dulint roll_ptr)/* in: roll_ptr */
__attribute__((nonnull)); __attribute__((nonnull));
/**************************************************************************
Clear a record on the uncompressed and compressed page, if possible. */
void
page_zip_clear_rec(
/*===============*/
page_zip_des_t* page_zip,/* in/out: compressed page */
byte* rec, /* in/out: record to clear */
dict_index_t* index, /* in: index of rec */
const ulint* offsets,/* in: rec_get_offsets(rec, index) */
mtr_t* mtr) /* in: mini-transaction */
__attribute__((nonnull));
/************************************************************************** /**************************************************************************
Populate the dense page directory on the compressed page Populate the dense page directory on the compressed page
from the sparse directory on the uncompressed row_format=compact page. */ from the sparse directory on the uncompressed row_format=compact page. */
...@@ -216,9 +203,11 @@ void ...@@ -216,9 +203,11 @@ void
page_zip_dir_delete( page_zip_dir_delete(
/*================*/ /*================*/
page_zip_des_t* page_zip,/* in/out: compressed page */ page_zip_des_t* page_zip,/* in/out: compressed page */
const byte* rec, /* in: deleted record */ byte* rec, /* in: deleted record */
dict_index_t* index, /* in: index of rec */
const ulint* offsets,/* in: rec_get_offsets(rec) */
const byte* free) /* in: previous start of the free list */ const byte* free) /* in: previous start of the free list */
__attribute__((nonnull(1,2))); __attribute__((nonnull(1,2,3,4)));
/************************************************************************** /**************************************************************************
Add a slot to the dense page directory. */ Add a slot to the dense page directory. */
......
...@@ -1535,18 +1535,7 @@ page_cur_delete_rec( ...@@ -1535,18 +1535,7 @@ page_cur_delete_rec(
page_dir_slot_set_n_owned(cur_dir_slot, page_zip, cur_n_owned - 1); page_dir_slot_set_n_owned(cur_dir_slot, page_zip, cur_n_owned - 1);
/* 6. Free the memory occupied by the record */ /* 6. Free the memory occupied by the record */
page_mem_free(page, page_zip, current_rec, offsets); page_mem_free(page, page_zip, current_rec, index, offsets);
page_header_set_field(page, page_zip, PAGE_N_RECS,
(ulint)(page_get_n_recs(page) - 1));
if (UNIV_LIKELY_NULL(page_zip)) {
/* Clear the data bytes of the deleted record in order
to improve the compression ratio of the page. The fixed extra
bytes of the record, which will be omitted from the
stream compression algorithm, cannot be cleared, because
page_mem_alloc() needs them in order to determine the size
of the deleted record. */
page_zip_clear_rec(page_zip, rec, index, offsets, mtr);
}
/* 7. Now we have decremented the number of owned records of the slot. /* 7. Now we have decremented the number of owned records of the slot.
If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the If the number drops below PAGE_DIR_SLOT_MIN_N_OWNED, we balance the
......
...@@ -162,42 +162,50 @@ page_zip_dir_get( ...@@ -162,42 +162,50 @@ page_zip_dir_get(
/********************************************************** /**********************************************************
Determine how many externally stored columns are contained Determine how many externally stored columns are contained
in previous records on the page of rec. */ in records with smaller heap_no than rec. */
UNIV_INLINE static
ulint ulint
page_zip_get_n_prev_extern( page_zip_get_n_prev_extern(
/*=======================*/ /*=======================*/
const rec_t* rec, /* in: compact physical record const page_zip_des_t* page_zip,/* in: dense page directory on
on a B-tree leaf page */ compressed page */
dict_index_t* index) /* in: record descriptor */ const rec_t* rec, /* in: compact physical record
on a B-tree leaf page */
dict_index_t* index) /* in: record descriptor */
{ {
page_t* page = ut_align_down((byte*) rec, UNIV_PAGE_SIZE); page_t* page = ut_align_down((byte*) rec, UNIV_PAGE_SIZE);
ulint n_ext; ulint n_ext = 0;
ulint next_offs; ulint i;
ulint left;
ulint heap_no;
ulint n_recs = page_dir_get_n_heap((page_t*) page_zip->data) - 2;
ut_ad(page_is_leaf(page)); ut_ad(page_is_leaf(page));
ut_ad(page_is_comp(page)); ut_ad(page_is_comp(page));
ut_ad(dict_table_is_comp(index->table)); ut_ad(dict_table_is_comp(index->table));
n_ext = 0; heap_no = rec_get_heap_no_new((rec_t*) rec);
next_offs = rec_get_next_offs(page + PAGE_NEW_INFIMUM, TRUE); ut_ad(heap_no >= 2); /* exclude infimum and supremum */
ut_a(next_offs >= PAGE_NEW_SUPREMUM_END); left = heap_no - 2;
if (UNIV_UNLIKELY(!left)) {
return(0);
}
do { for (i = 0; i < n_recs; i++) {
rec_t* r = page + next_offs; rec_t* r = page + (page_zip_dir_get(page_zip, i)
& PAGE_ZIP_DIR_SLOT_MASK);
if (r == rec) { if (rec_get_heap_no_new(r) < heap_no) {
return(n_ext); n_ext += rec_get_n_extern_new(
r, index, ULINT_UNDEFINED);
if (!--left) {
break;
}
} }
}
n_ext += rec_get_n_extern_new(r, index, ULINT_UNDEFINED); ut_a(!left);
return(n_ext);
next_offs = rec_get_next_offs(r, TRUE);
ut_a(next_offs > 0);
} while (next_offs != PAGE_NEW_SUPREMUM);
ut_error;
return(0);
} }
/************************************************************************** /**************************************************************************
...@@ -589,9 +597,15 @@ page_zip_compress( ...@@ -589,9 +597,15 @@ page_zip_compress(
storage = buf_end - n_dense * PAGE_ZIP_DIR_SLOT_SIZE; storage = buf_end - n_dense * PAGE_ZIP_DIR_SLOT_SIZE;
if (UNIV_UNLIKELY(!n_dense)) {
goto recs_done;
}
if (page_is_leaf(page)) { if (page_is_leaf(page)) {
/* BTR_EXTERN_FIELD_REF storage */ /* BTR_EXTERN_FIELD_REF storage */
byte* externs; byte* externs;
ulint slot = 0;
if (UNIV_UNLIKELY(trx_id_col != ULINT_UNDEFINED)) { if (UNIV_UNLIKELY(trx_id_col != ULINT_UNDEFINED)) {
externs = storage - n_dense externs = storage - n_dense
* (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); * (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
...@@ -599,7 +613,7 @@ page_zip_compress( ...@@ -599,7 +613,7 @@ page_zip_compress(
externs = storage; externs = storage;
} }
while (n_dense--) { do {
ulint i; ulint i;
rec_t* rec = (rec_t*) *recs++; rec_t* rec = (rec_t*) *recs++;
...@@ -694,21 +708,28 @@ page_zip_compress( ...@@ -694,21 +708,28 @@ page_zip_compress(
goto zlib_error; goto zlib_error;
} }
c_stream.avail_out
-= BTR_EXTERN_FIELD_REF_SIZE;
externs -= BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(externs == c_stream.next_out ut_ad(externs == c_stream.next_out
+ c_stream.avail_out + c_stream.avail_out
+ 1/* end of modification log */); + 1/* end of modification log */);
/* Copy the BLOB pointer */
memcpy(externs, c_stream.next_in,
BTR_EXTERN_FIELD_REF_SIZE);
c_stream.next_in += c_stream.next_in +=
BTR_EXTERN_FIELD_REF_SIZE; BTR_EXTERN_FIELD_REF_SIZE;
/* Increment the BLOB counter */
if (slot >= page_get_n_recs(
(page_t*) page)) {
/* Skip deleted records */
continue;
}
n_blobs++; n_blobs++;
c_stream.avail_out
-= BTR_EXTERN_FIELD_REF_SIZE;
externs -= BTR_EXTERN_FIELD_REF_SIZE;
/* Copy the BLOB pointer */
memcpy(externs, c_stream.next_in
- BTR_EXTERN_FIELD_REF_SIZE,
BTR_EXTERN_FIELD_REF_SIZE);
} }
} }
...@@ -723,10 +744,10 @@ page_zip_compress( ...@@ -723,10 +744,10 @@ page_zip_compress(
} }
} }
ut_ad(!c_stream.avail_in); ut_ad(!c_stream.avail_in);
} } while (++slot < n_dense);
} else { } else {
/* This is a node pointer page. */ /* This is a node pointer page. */
while (n_dense--) { do {
rec_t* rec = (rec_t*) *recs++; rec_t* rec = (rec_t*) *recs++;
offsets = rec_get_offsets(rec, index, offsets, offsets = rec_get_offsets(rec, index, offsets,
...@@ -765,9 +786,10 @@ page_zip_compress( ...@@ -765,9 +786,10 @@ page_zip_compress(
* (rec_get_heap_no_new(rec) - 1), * (rec_get_heap_no_new(rec) - 1),
c_stream.next_in, REC_NODE_PTR_SIZE); c_stream.next_in, REC_NODE_PTR_SIZE);
c_stream.next_in += REC_NODE_PTR_SIZE; c_stream.next_in += REC_NODE_PTR_SIZE;
} } while (--n_dense);
} }
recs_done:
/* Finish the compression. */ /* Finish the compression. */
ut_ad(!c_stream.avail_in); ut_ad(!c_stream.avail_in);
/* Compress any trailing garbage, in case the last record was /* Compress any trailing garbage, in case the last record was
...@@ -1151,8 +1173,8 @@ page_zip_apply_log( ...@@ -1151,8 +1173,8 @@ page_zip_apply_log(
rec_t** recs, /* in: dense page directory, rec_t** recs, /* in: dense page directory,
sorted by address (indexed by heap_no - 2) */ sorted by address (indexed by heap_no - 2) */
ulint n_dense,/* in: size of recs[] */ ulint n_dense,/* in: size of recs[] */
ulint trx_id_col,/* in: column number of trx_id in the index; ulint trx_id_col,/* in: column number of trx_id in the index,
ignored if heap_status & REC_STATUS_NODE_PTR */ or ULINT_UNDEFINED if none */
ulint heap_status, ulint heap_status,
/* in: heap_no and status bits for /* in: heap_no and status bits for
the next record to uncompress */ the next record to uncompress */
...@@ -1256,7 +1278,7 @@ page_zip_apply_log( ...@@ -1256,7 +1278,7 @@ page_zip_apply_log(
/* Check if there are any externally stored columns. /* Check if there are any externally stored columns.
For each externally stored column, skip the For each externally stored column, skip the
BTR_EXTERN_FIELD_REF._*/ BTR_EXTERN_FIELD_REF. */
for (i = 0; i < rec_offs_n_fields(offsets); i++) { for (i = 0; i < rec_offs_n_fields(offsets); i++) {
byte* dst; byte* dst;
...@@ -1327,7 +1349,7 @@ page_zip_decompress( ...@@ -1327,7 +1349,7 @@ page_zip_decompress(
z_stream d_stream; z_stream d_stream;
dict_index_t* index = NULL; dict_index_t* index = NULL;
rec_t** recs; /* dense page directory, sorted by address */ rec_t** recs; /* dense page directory, sorted by address */
rec_t** recsc; /* cursor to dense page directory */ ulint slot;
ulint heap_status;/* heap_no and status bits */ ulint heap_status;/* heap_no and status bits */
ulint n_dense;/* number of user records on the page */ ulint n_dense;/* number of user records on the page */
ulint trx_id_col = ULINT_UNDEFINED; ulint trx_id_col = ULINT_UNDEFINED;
...@@ -1335,7 +1357,6 @@ page_zip_decompress( ...@@ -1335,7 +1357,6 @@ page_zip_decompress(
ulint* offsets = NULL; ulint* offsets = NULL;
ulint info_bits = 0; ulint info_bits = 0;
const byte* storage; const byte* storage;
const byte* externs;
ut_ad(page_zip_simple_validate(page_zip)); ut_ad(page_zip_simple_validate(page_zip));
...@@ -1347,7 +1368,7 @@ page_zip_decompress( ...@@ -1347,7 +1368,7 @@ page_zip_decompress(
} }
heap = mem_heap_create(n_dense * (3 * sizeof *recs)); heap = mem_heap_create(n_dense * (3 * sizeof *recs));
recsc = recs = mem_heap_alloc(heap, n_dense * (2 * sizeof *recs)); recs = mem_heap_alloc(heap, n_dense * (2 * sizeof *recs));
/* Copy the page header. */ /* Copy the page header. */
memcpy(page, page_zip->data, PAGE_DATA); memcpy(page, page_zip->data, PAGE_DATA);
...@@ -1362,7 +1383,7 @@ page_zip_decompress( ...@@ -1362,7 +1383,7 @@ page_zip_decompress(
/* Copy the infimum and supremum records. */ /* Copy the infimum and supremum records. */
memcpy(page + (PAGE_NEW_INFIMUM - REC_N_NEW_EXTRA_BYTES), memcpy(page + (PAGE_NEW_INFIMUM - REC_N_NEW_EXTRA_BYTES),
infimum_extra, sizeof infimum_extra); infimum_extra, sizeof infimum_extra);
if (UNIV_UNLIKELY(!page_get_n_recs((page_t*) page))) { if (UNIV_UNLIKELY(!page_get_n_recs(page))) {
rec_set_next_offs_new(page + PAGE_NEW_INFIMUM, rec_set_next_offs_new(page + PAGE_NEW_INFIMUM,
PAGE_NEW_SUPREMUM); PAGE_NEW_SUPREMUM);
} else { } else {
...@@ -1434,7 +1455,7 @@ page_zip_decompress( ...@@ -1434,7 +1455,7 @@ page_zip_decompress(
heap_status = REC_STATUS_NODE_PTR heap_status = REC_STATUS_NODE_PTR
| 2 << REC_HEAP_NO_SHIFT; | 2 << REC_HEAP_NO_SHIFT;
if (UNIV_UNLIKELY(mach_read_from_4((page_t*) page if (UNIV_UNLIKELY(mach_read_from_4(page
+ FIL_PAGE_PREV) == FIL_NULL)) { + FIL_PAGE_PREV) == FIL_NULL)) {
info_bits = REC_INFO_MIN_REC_FLAG; info_bits = REC_INFO_MIN_REC_FLAG;
} }
...@@ -1450,9 +1471,9 @@ page_zip_decompress( ...@@ -1450,9 +1471,9 @@ page_zip_decompress(
*offsets = n; *offsets = n;
} }
while (n_dense--) { for (slot = 0; slot < n_dense; slot++) {
byte* const last = d_stream.next_out; byte* const last = d_stream.next_out;
rec_t* rec = *recsc++; rec_t* rec = recs[slot];
d_stream.avail_out = rec - REC_N_NEW_EXTRA_BYTES - last; d_stream.avail_out = rec - REC_N_NEW_EXTRA_BYTES - last;
...@@ -1615,8 +1636,7 @@ page_zip_decompress( ...@@ -1615,8 +1636,7 @@ page_zip_decompress(
/* Decompress any trailing garbage, in case the last record was /* Decompress any trailing garbage, in case the last record was
allocated from an originally longer space on the free list. */ allocated from an originally longer space on the free list. */
d_stream.avail_out = page_header_get_field( d_stream.avail_out = page_header_get_field(page, PAGE_HEAP_TOP)
(page_t*) page, PAGE_HEAP_TOP)
- (d_stream.next_out - page); - (d_stream.next_out - page);
if (UNIV_UNLIKELY(d_stream.avail_out > UNIV_PAGE_SIZE if (UNIV_UNLIKELY(d_stream.avail_out > UNIV_PAGE_SIZE
- PAGE_ZIP_START - PAGE_DIR)) { - PAGE_ZIP_START - PAGE_DIR)) {
...@@ -1677,33 +1697,45 @@ err_exit: ...@@ -1677,33 +1697,45 @@ err_exit:
page_zip->n_blobs = 0; page_zip->n_blobs = 0;
storage = page_zip->data + page_zip->size storage = page_zip->data + page_zip->size
- n_dense * PAGE_ZIP_DIR_SLOT_SIZE; - n_dense * PAGE_ZIP_DIR_SLOT_SIZE;
if (trx_id_col != ULINT_UNDEFINED) {
externs = storage - n_dense if (UNIV_UNLIKELY(!n_dense)) {
* (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); goto recs_done;
} else {
externs = storage;
} }
recsc = recs; slot = 0;
while (n_dense--) { if (page_is_leaf(page)) {
rec_t* rec = *recsc++; const byte* externs;
/* Read the offsets. The status bits are needed here. */ if (UNIV_UNLIKELY(trx_id_col != ULINT_UNDEFINED)) {
offsets = rec_get_offsets(rec, index, offsets, externs = storage - n_dense
ULINT_UNDEFINED, &heap); * (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
} else {
externs = storage;
}
if (page_is_leaf(page)) { do {
ulint i; ulint i;
ulint len; ulint len;
byte* dst; byte* dst;
rec_t* rec = recs[slot];
if (trx_id_col != ULINT_UNDEFINED
|| slot < page_get_n_recs(page)) {
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &heap);
}
/* Check if there are any externally stored columns. /* Check if there are any externally stored columns.
For each externally stored column, restore the For each externally stored column, restore the
BTR_EXTERN_FIELD_REF separately._*/ BTR_EXTERN_FIELD_REF separately._*/
for (i = 0; i < rec_offs_n_fields(offsets); i++) { if (slot < page_get_n_recs(page)) {
if (rec_offs_nth_extern(offsets, i)) { for (i = 0; i < rec_offs_n_fields(offsets);
i++) {
if (!rec_offs_nth_extern(offsets, i)) {
continue;
}
dst = rec_get_nth_field( dst = rec_get_nth_field(
rec, offsets, i, &len); rec, offsets, i, &len);
ut_ad(len > BTR_EXTERN_FIELD_REF_SIZE); ut_ad(len > BTR_EXTERN_FIELD_REF_SIZE);
...@@ -1711,7 +1743,6 @@ err_exit: ...@@ -1711,7 +1743,6 @@ err_exit:
externs -= BTR_EXTERN_FIELD_REF_SIZE; externs -= BTR_EXTERN_FIELD_REF_SIZE;
/* Copy the BLOB pointer */
memcpy(dst, externs, memcpy(dst, externs,
BTR_EXTERN_FIELD_REF_SIZE); BTR_EXTERN_FIELD_REF_SIZE);
...@@ -1728,7 +1759,13 @@ err_exit: ...@@ -1728,7 +1759,13 @@ err_exit:
memcpy(dst, storage, memcpy(dst, storage,
DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN); DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
} }
} else { } while (++slot < n_dense);
} else {
do {
rec_t* rec = recs[slot];
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &heap);
/* Non-leaf nodes should not have any externally /* Non-leaf nodes should not have any externally
stored columns. */ stored columns. */
ut_ad(!rec_offs_any_extern(offsets)); ut_ad(!rec_offs_any_extern(offsets));
...@@ -1736,9 +1773,10 @@ err_exit: ...@@ -1736,9 +1773,10 @@ err_exit:
memcpy(rec_get_end(rec, offsets) - REC_NODE_PTR_SIZE, memcpy(rec_get_end(rec, offsets) - REC_NODE_PTR_SIZE,
storage, REC_NODE_PTR_SIZE); storage, REC_NODE_PTR_SIZE);
} } while (++slot < n_dense);
} }
recs_done:
ut_a(page_is_comp(page)); ut_a(page_is_comp(page));
ut_ad(page_simple_validate_new(page)); ut_ad(page_simple_validate_new(page));
...@@ -1918,7 +1956,7 @@ page_zip_write_rec( ...@@ -1918,7 +1956,7 @@ page_zip_write_rec(
if (UNIV_UNLIKELY(n_ext)) { if (UNIV_UNLIKELY(n_ext)) {
ulint blob_no = page_zip_get_n_prev_extern( ulint blob_no = page_zip_get_n_prev_extern(
rec, index); page_zip, rec, index);
byte* ext_end = externs - page_zip->n_blobs byte* ext_end = externs - page_zip->n_blobs
* BTR_EXTERN_FIELD_REF_SIZE; * BTR_EXTERN_FIELD_REF_SIZE;
ut_ad(blob_no <= page_zip->n_blobs); ut_ad(blob_no <= page_zip->n_blobs);
...@@ -2055,7 +2093,7 @@ page_zip_write_blob_ptr( ...@@ -2055,7 +2093,7 @@ page_zip_write_blob_ptr(
ut_ad(page_is_leaf(page)); ut_ad(page_is_leaf(page));
blob_no = page_zip_get_n_prev_extern(rec, index) blob_no = page_zip_get_n_prev_extern(page_zip, rec, index)
+ rec_get_n_extern_new(rec, index, n); + rec_get_n_extern_new(rec, index, n);
ut_a(blob_no <= page_zip->n_blobs); ut_a(blob_no <= page_zip->n_blobs);
...@@ -2197,7 +2235,7 @@ page_zip_write_trx_id_and_roll_ptr( ...@@ -2197,7 +2235,7 @@ page_zip_write_trx_id_and_roll_ptr(
/************************************************************************** /**************************************************************************
Clear an area on the uncompressed and compressed page, if possible. */ Clear an area on the uncompressed and compressed page, if possible. */
static
void void
page_zip_clear_rec( page_zip_clear_rec(
/*===============*/ /*===============*/
...@@ -2214,17 +2252,32 @@ page_zip_clear_rec( ...@@ -2214,17 +2252,32 @@ page_zip_clear_rec(
ut_ad(rec_offs_validate(rec, index, offsets)); ut_ad(rec_offs_validate(rec, index, offsets));
heap_no = rec_get_heap_no_new(rec); heap_no = rec_get_heap_no_new(rec);
ut_ad(heap_no >= 2); /* exclude infimum and supremum */
if (page_zip->m_end if (page_zip->m_end
+ 1 + ((heap_no - 1) >= 64)/* size of the log entry */ + 1 + ((heap_no - 1) >= 64)/* size of the log entry */
+ page_zip_get_trailer_len(page_zip, index, NULL) + page_zip_get_trailer_len(page_zip, index, NULL)
< page_zip->size) { < page_zip->size) {
byte* data; byte* data;
page_t* page;
/* Do not touch the extra bytes, because the page = ut_align_down(rec, UNIV_PAGE_SIZE);
decompressor depends on them. */
/* Clear only the data bytes, because the allocator and
the decompressor depend on the extra bytes. */
memset(rec, 0, rec_offs_data_size(offsets)); memset(rec, 0, rec_offs_data_size(offsets));
if (page_is_leaf(page) && dict_index_is_clust(index)) {
/* Clear trx_id and roll_ptr on the compressed page. */
byte* storage = page_zip->data + page_zip->size
- (page_dir_get_n_heap(page) - 2)
* PAGE_ZIP_DIR_SLOT_SIZE;
memset(storage - (heap_no - 1)
* (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN),
0, DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
}
/* Log that the data was zeroed out. */ /* Log that the data was zeroed out. */
data = page_zip->data + page_zip->m_end; data = page_zip->data + page_zip->m_end;
ut_ad(!*data); ut_ad(!*data);
...@@ -2234,8 +2287,6 @@ page_zip_clear_rec( ...@@ -2234,8 +2287,6 @@ page_zip_clear_rec(
*data++ = (heap_no - 1) << 1 | 1; *data++ = (heap_no - 1) << 1 | 1;
ut_ad(!*data); ut_ad(!*data);
page_zip->m_end = data - page_zip->data; page_zip->m_end = data - page_zip->data;
ut_ad(page_zip_validate(page_zip,
ut_align_down(rec, UNIV_PAGE_SIZE)));
} else { } else {
/* There is not enough space to log the clearing. /* There is not enough space to log the clearing.
Try to clear the block and to recompress the page. */ Try to clear the block and to recompress the page. */
...@@ -2326,19 +2377,28 @@ void ...@@ -2326,19 +2377,28 @@ void
page_zip_dir_delete( page_zip_dir_delete(
/*================*/ /*================*/
page_zip_des_t* page_zip,/* in/out: compressed page */ page_zip_des_t* page_zip,/* in/out: compressed page */
const byte* rec, /* in: deleted record */ byte* rec, /* in: record to delete */
dict_index_t* index, /* in: index of rec */
const ulint* offsets,/* in: rec_get_offsets(rec) */
const byte* free) /* in: previous start of the free list */ const byte* free) /* in: previous start of the free list */
{ {
byte* slot_rec; byte* slot_rec;
byte* slot_free; byte* slot_free;
ulint n_ext;
page_t* page = ut_align_down(rec, UNIV_PAGE_SIZE);
ut_ad(rec); ut_ad(rec_offs_validate(rec, index, offsets));
ut_ad(rec_offs_comp(offsets));
slot_rec = page_zip_dir_find(page_zip, slot_rec = page_zip_dir_find(page_zip,
ut_align_offset(rec, UNIV_PAGE_SIZE)); ut_align_offset(rec, UNIV_PAGE_SIZE));
ut_a(slot_rec); ut_a(slot_rec);
/* This could not be done before page_zip_dir_find(). */
page_header_set_field(page, page_zip, PAGE_N_RECS,
(ulint)(page_get_n_recs(page) - 1));
if (UNIV_UNLIKELY(!free)) { if (UNIV_UNLIKELY(!free)) {
/* Make the last slot the start of the free list. */ /* Make the last slot the start of the free list. */
slot_free = page_zip->data + page_zip->size slot_free = page_zip->data + page_zip->size
...@@ -2358,11 +2418,48 @@ page_zip_dir_delete( ...@@ -2358,11 +2418,48 @@ page_zip_dir_delete(
slot_rec - slot_free); slot_rec - slot_free);
} }
/* TODO: shift and zero fill the array of BLOB pointers, set n_blobs */
/* Write the entry for the deleted record. /* Write the entry for the deleted record.
The "owned" and "deleted" flags will be cleared. */ The "owned" and "deleted" flags will be cleared. */
mach_write_to_2(slot_free, ut_align_offset(rec, UNIV_PAGE_SIZE)); mach_write_to_2(slot_free, ut_align_offset(rec, UNIV_PAGE_SIZE));
n_ext = rec_offs_n_extern(offsets);
if (UNIV_UNLIKELY(n_ext)) {
/* Shift and zero fill the array of BLOB pointers. */
ulint blob_no;
byte* externs;
byte* ext_end;
blob_no = page_zip_get_n_prev_extern(page_zip, rec, index);
ut_a(blob_no + n_ext <= page_zip->n_blobs);
if (dict_index_is_clust(index)) {
externs = page_zip->data + page_zip->size
- (page_dir_get_n_heap(page) - 2)
* (PAGE_ZIP_DIR_SLOT_SIZE
+ DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
} else {
externs = page_zip->data + page_zip->size
- (page_dir_get_n_heap(page) - 2)
* PAGE_ZIP_DIR_SLOT_SIZE;
}
ext_end = externs - page_zip->n_blobs
* BTR_EXTERN_FIELD_REF_SIZE;
externs -= blob_no * BTR_EXTERN_FIELD_REF_SIZE;
page_zip->n_blobs -= n_ext;
/* Shift and zero fill the array. */
memmove(ext_end + n_ext * BTR_EXTERN_FIELD_REF_SIZE, ext_end,
(page_zip->n_blobs - blob_no)
* BTR_EXTERN_FIELD_REF_SIZE);
memset(ext_end, 0, n_ext * BTR_EXTERN_FIELD_REF_SIZE);
}
/* The compression algorithm expects info_bits and n_owned
to be 0 for deleted records. */
rec[-REC_N_NEW_EXTRA_BYTES] = 0; /* info_bits and n_owned */
page_zip_clear_rec(page_zip, rec, index, offsets, NULL);
} }
/************************************************************************** /**************************************************************************
...@@ -2392,9 +2489,17 @@ page_zip_dir_add_slot( ...@@ -2392,9 +2489,17 @@ page_zip_dir_add_slot(
ut_ad(!page_zip->n_blobs); ut_ad(!page_zip->n_blobs);
stored = dir - n_dense * REC_NODE_PTR_SIZE; stored = dir - n_dense * REC_NODE_PTR_SIZE;
} else if (UNIV_UNLIKELY(is_clustered)) { } else if (UNIV_UNLIKELY(is_clustered)) {
/* Move the BLOB pointer array backwards to make space for the
roll_ptr and trx_id columns and the dense directory slot. */
byte* externs;
stored = dir - n_dense stored = dir - n_dense
* (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN) * (DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN);
externs = stored
- page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE; - page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE;
memmove(externs - (PAGE_ZIP_DIR_SLOT_SIZE
+ DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN),
externs, stored - externs);
} else { } else {
stored = dir stored = dir
- page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE; - page_zip->n_blobs * BTR_EXTERN_FIELD_REF_SIZE;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment