Commit 47da8abc authored by marko's avatar marko

branches/zip: Try to synchronize the updates of uncompressed and

compressed pages.

btr_root_raise_and_insert(): Distinguish root_page_zip and new_page_zip.

btr_cur_set_ownership_of_extern_field(): Do not log the write on the
uncompressed page if it will be logged for page_zip.

lock_rec_insert_check_and_lock(), lock_sec_rec_modify_check_and_lock():
Update the max_trx_id field also on the compressed page.

mlog_write_ulint(): Add UNIV_UNLIKELY hints.  Remove trailing white space.

mlog_log_string(): Remove trailing white space.

rec_set_field_extern_bits(): Remove parameter mtr, as the write will either
occur in the heap, or it will be logged at a higher level.

recv_parse_or_apply_log_rec_body(),
page_zip_write_header(): Add log record type MLOG_ZIP_WRITE_HEADER.

page_header_set_field(): Pass mtr=NULL to page_zip_write_header().

page_header_reset_last_insert(): Pass mtr to page_zip_write_header().

btr_page_set_index_id(), btr_page_set_level(),
btr_page_set_next(), btr_page_set_prev(): Pass mtr to page_zip_write_header().

row_upd_rec_sys_fields(): Pass mtr=NULL to page_zip_write_trx_id() and
page_zip_write_roll_ptr(), since the write will be logged at a higher level.

page_zip_write_header(): Add parameter mtr.
page_zip_write_header_log(): New function.

Remove rec_set_nth_field_extern_bit().
Make rec_set_nth_field_extern_bit_old() static.
Rename rec_set_nth_field_extern_bit_new()
to rec_set_field_extern_bits_new() and make it static.

row_ins_index_entry_low(): Remove bogus TODO comment.
parent 03a6c35b
......@@ -1045,7 +1045,8 @@ btr_root_raise_and_insert(
ulint level;
rec_t* node_ptr_rec;
page_cur_t* page_cursor;
page_zip_des_t* page_zip;
page_zip_des_t* root_page_zip;
page_zip_des_t* new_page_zip;
root = btr_cur_get_page(cursor);
tree = btr_cur_get_tree(cursor);
......@@ -1066,11 +1067,12 @@ btr_root_raise_and_insert(
btr_page_create(new_page, tree, mtr);
root_page_zip = buf_block_get_page_zip(buf_block_align(root));
level = btr_page_get_level(root, mtr);
/* Set the levels of the new index page and root page */
btr_page_set_level(new_page, NULL, level, mtr);
btr_page_set_level(root, NULL/* TODO */, level + 1, mtr);
btr_page_set_level(root, root_page_zip, level + 1, mtr);
/* Set the next node and previous node fields of new page */
btr_page_set_next(new_page, NULL, FIL_NULL, mtr);
......@@ -1078,10 +1080,10 @@ btr_root_raise_and_insert(
/* Move the records from root to the new page */
page_zip = buf_block_get_page_zip(buf_block_align(new_page));
new_page_zip = buf_block_get_page_zip(buf_block_align(new_page));
page_move_rec_list_end(new_page, page_zip,
page_get_infimum_rec(root), NULL,
page_move_rec_list_end(new_page, new_page_zip,
page_get_infimum_rec(root), root_page_zip,
cursor->index, mtr);
/* If this is a pessimistic insert which is actually done to
......@@ -1103,8 +1105,11 @@ btr_root_raise_and_insert(
node_ptr = dict_tree_build_node_ptr(tree, rec, new_page_no, heap,
level);
/* Reorganize the root to get free space */
if (!btr_page_reorganize_low(FALSE, root, NULL, cursor->index, mtr)) {
ut_error; /* TODO: page_zip */
if (!btr_page_reorganize_low(FALSE, root, root_page_zip,
cursor->index, mtr)) {
/* The page should be empty at this point.
Thus, the operation should succeed. */
ut_error;
}
page_cursor = btr_cur_get_page_cur(cursor);
......@@ -1118,16 +1123,14 @@ btr_root_raise_and_insert(
ut_ad(node_ptr_rec);
page_zip = buf_block_get_page_zip(buf_block_align(root));
/* The node pointer must be marked as the predefined minimum record,
as there is no lower alphabetical limit to records in the leftmost
node of a level: */
btr_set_min_rec_mark(node_ptr_rec, mtr);
if (UNIV_LIKELY_NULL(page_zip)
&& !UNIV_UNLIKELY(page_zip_compress(page_zip, root,
if (UNIV_LIKELY_NULL(root_page_zip)
&& !UNIV_UNLIKELY(page_zip_compress(root_page_zip, root,
cursor->index, mtr))) {
/* The root page should only contain the
node pointer to new_page at this point.
......
......@@ -3195,8 +3195,8 @@ btr_cur_set_ownership_of_extern_field(
byte_val = byte_val | BTR_EXTERN_OWNER_FLAG;
}
if (UNIV_LIKELY(mtr != NULL)) {
/* TODO: log this differently for page_zip */
if (UNIV_LIKELY(mtr != NULL) && UNIV_LIKELY(!page_zip)) {
mlog_write_ulint(data + local_len + BTR_EXTERN_LEN, byte_val,
MLOG_1BYTE, mtr);
} else {
......
......@@ -47,11 +47,13 @@ btr_page_set_index_id(
dulint id, /* in: index id */
mtr_t* mtr) /* in: mtr */
{
mlog_write_dulint(page + PAGE_HEADER + PAGE_INDEX_ID, id, mtr);
if (UNIV_LIKELY_NULL(page_zip)) {
mach_write_to_8(page + (PAGE_HEADER + PAGE_INDEX_ID), id);
page_zip_write_header(page_zip,
page + PAGE_HEADER + PAGE_INDEX_ID, 8);
page + (PAGE_HEADER + PAGE_INDEX_ID), 8, mtr);
} else {
mlog_write_dulint(page + (PAGE_HEADER + PAGE_INDEX_ID),
id, mtr);
}
}
......@@ -117,13 +119,13 @@ btr_page_set_level(
ut_ad(page && mtr);
ut_ad(level <= BTR_MAX_NODE_LEVEL);
/* TODO: log this differently for page_zip */
mlog_write_ulint(page + PAGE_HEADER + PAGE_LEVEL, level,
MLOG_2BYTES, mtr);
if (UNIV_LIKELY_NULL(page_zip)) {
mach_write_to_2(page + (PAGE_HEADER + PAGE_LEVEL), level);
page_zip_write_header(page_zip,
page + PAGE_HEADER + PAGE_LEVEL, 2);
page + (PAGE_HEADER + PAGE_LEVEL), 2, mtr);
} else {
mlog_write_ulint(page + (PAGE_HEADER + PAGE_LEVEL), level,
MLOG_2BYTES, mtr);
}
}
......@@ -160,11 +162,11 @@ btr_page_set_next(
{
ut_ad(page && mtr);
/* TODO: log this differently for page_zip */
mlog_write_ulint(page + FIL_PAGE_NEXT, next, MLOG_4BYTES, mtr);
if (UNIV_LIKELY_NULL(page_zip)) {
page_zip_write_header(page_zip, page + FIL_PAGE_NEXT, 4);
mach_write_to_4(page + FIL_PAGE_NEXT, next);
page_zip_write_header(page_zip, page + FIL_PAGE_NEXT, 4, mtr);
} else {
mlog_write_ulint(page + FIL_PAGE_NEXT, next, MLOG_4BYTES, mtr);
}
}
......@@ -197,11 +199,11 @@ btr_page_set_prev(
{
ut_ad(page && mtr);
/* TODO: log this differently for page_zip */
mlog_write_ulint(page + FIL_PAGE_PREV, prev, MLOG_4BYTES, mtr);
if (UNIV_LIKELY_NULL(page_zip)) {
page_zip_write_header(page_zip, page + FIL_PAGE_PREV, 4);
mach_write_to_4(page + FIL_PAGE_PREV, prev);
page_zip_write_header(page_zip, page + FIL_PAGE_PREV, 4, mtr);
} else {
mlog_write_ulint(page + FIL_PAGE_PREV, prev, MLOG_4BYTES, mtr);
}
}
......
......@@ -145,9 +145,11 @@ flag value must give the length also! */
#define MLOG_ZIP_DECOMPRESS ((byte)52) /* decompress a page
to undo a compressed page
overflow */
#define MLOG_BIGGEST_TYPE ((byte)52) /* biggest value (used in
#define MLOG_ZIP_WRITE_HEADER ((byte)53) /* write to compressed page
header */
#define MLOG_BIGGEST_TYPE ((byte)53) /* biggest value (used in
asserts) */
/*******************************************************************
Starts a mini-transaction and creates a mini-transaction handle
and buffer in the memory buffer given by the caller. */
......
......@@ -43,7 +43,7 @@ page_update_max_trx_id(
ut_ad(page);
if (ut_dulint_cmp(page_get_max_trx_id(page), trx_id) < 0) {
page_set_max_trx_id(page, page_zip, trx_id);
}
}
......@@ -82,7 +82,8 @@ page_header_set_field(
mach_write_to_2(page + PAGE_HEADER + field, val);
if (UNIV_LIKELY_NULL(page_zip)) {
page_zip_write_header(page_zip, page + PAGE_HEADER + field, 2);
page_zip_write_header(page_zip,
page + PAGE_HEADER + field, 2, NULL);
}
}
......@@ -159,12 +160,13 @@ page_header_reset_last_insert(
{
ut_ad(page && mtr);
/* TODO: log this differently for page_zip */
mlog_write_ulint(page + (PAGE_HEADER + PAGE_LAST_INSERT), 0,
MLOG_2BYTES, mtr);
if (UNIV_LIKELY_NULL(page_zip)) {
mach_write_to_2(page + (PAGE_HEADER + PAGE_LAST_INSERT), 0);
page_zip_write_header(page_zip,
page + (PAGE_HEADER + PAGE_LAST_INSERT), 2);
page + (PAGE_HEADER + PAGE_LAST_INSERT), 2, mtr);
} else {
mlog_write_ulint(page + (PAGE_HEADER + PAGE_LAST_INSERT), 0,
MLOG_2BYTES, mtr);
}
}
......
......@@ -75,8 +75,9 @@ page_zip_write_header(
/*==================*/
page_zip_des_t* page_zip,/* in/out: compressed page */
const byte* str, /* in: address on the uncompressed page */
ulint length) /* in: length of the data */
__attribute__((nonnull));
ulint length, /* in: length of the data */
mtr_t* mtr) /* in: mini-transaction, or NULL */
__attribute__((nonnull(1,2)));
/**************************************************************************
Write the "deleted" flag of a record on a compressed page. The flag must
......
......@@ -370,8 +370,9 @@ page_zip_write_header(
/*==================*/
page_zip_des_t* page_zip,/* in/out: compressed page */
const byte* str, /* in: address on the uncompressed page */
ulint length) /* in: length of the data */
__attribute__((nonnull));
ulint length, /* in: length of the data */
mtr_t* mtr) /* in: mini-transaction, or NULL */
__attribute__((nonnull(1,2)));
#ifdef UNIV_MATERIALIZE
# undef UNIV_INLINE
......
......@@ -426,6 +426,17 @@ page_zip_available(
< page_zip->size));
}
/**************************************************************************
Write a log record of writing to the uncompressed header portion of a page. */
void
page_zip_write_header_log(
/*======================*/
const page_zip_des_t* page_zip,/* in: compressed page */
ulint offset, /* in: offset to the data */
ulint length, /* in: length of the data */
mtr_t* mtr); /* in: mini-transaction */
/**************************************************************************
Write data to the uncompressed header portion of a page. The data must
already have been written to the uncompressed page.
......@@ -438,7 +449,8 @@ page_zip_write_header(
/*==================*/
page_zip_des_t* page_zip,/* in/out: compressed page */
const byte* str, /* in: address on the uncompressed page */
ulint length) /* in: length of the data */
ulint length, /* in: length of the data */
mtr_t* mtr) /* in: mini-transaction, or NULL */
{
ulint pos;
......@@ -453,6 +465,10 @@ page_zip_write_header(
/* The following would fail in page_cur_insert_rec_low(). */
/* ut_ad(page_zip_validate(page_zip, str - pos)); */
if (UNIV_LIKELY_NULL(mtr)) {
page_zip_write_header_log(page_zip, pos, length, mtr);
}
}
#ifdef UNIV_MATERIALIZE
......
......@@ -455,18 +455,6 @@ rec_offs_any_extern(
/* out: TRUE if a field is stored externally */
const ulint* offsets);/* in: array returned by rec_get_offsets() */
/***************************************************************
Sets the ith field extern storage bit. */
UNIV_INLINE
void
rec_set_nth_field_extern_bit(
/*=========================*/
rec_t* rec, /* in: record */
dict_index_t* index, /* in: record descriptor */
ulint i, /* in: ith field */
mtr_t* mtr); /* in: mtr holding an X-latch to the page
where rec is, or NULL; in the NULL case
we do not write to log about the change */
/***************************************************************
Sets TRUE the extern storage bits of fields mentioned in an array. */
void
......@@ -475,10 +463,7 @@ rec_set_field_extern_bits(
rec_t* rec, /* in: record */
dict_index_t* index, /* in: record descriptor */
const ulint* vec, /* in: array of field numbers */
ulint n_fields,/* in: number of fields numbers */
mtr_t* mtr); /* in: mtr holding an X-latch to the page
where rec is, or NULL; in the NULL case
we do not write to log about the change */
ulint n_fields);/* in: number of fields numbers */
/***************************************************************
This is used to modify the value of an already existing field in a record.
The previous value must have exactly the same size as the new value. If len
......
......@@ -146,30 +146,6 @@ rec_set_nth_field_sql_null(
rec_t* rec, /* in: record */
ulint n); /* in: index of the field */
/***************************************************************
Sets the ith field extern storage bit of an old-style record. */
void
rec_set_nth_field_extern_bit_old(
/*=============================*/
rec_t* rec, /* in: old-style record */
ulint i, /* in: ith field */
mtr_t* mtr); /* in: mtr holding an X-latch to the page where
rec is, or NULL; in the NULL case we do not
write to log about the change */
/***************************************************************
Sets the ith field extern storage bit of a new-style record. */
void
rec_set_nth_field_extern_bit_new(
/*=============================*/
rec_t* rec, /* in: record */
dict_index_t* index, /* in: record descriptor */
ulint ith, /* in: ith field */
mtr_t* mtr); /* in: mtr holding an X-latch to the page
where rec is, or NULL; in the NULL case
we do not write to log about the change */
/**********************************************************
Gets a bit field from within 1 byte. */
UNIV_INLINE
......@@ -1114,26 +1090,6 @@ rec_offs_any_extern(
return(FALSE);
}
/***************************************************************
Sets the ith field extern storage bit. */
UNIV_INLINE
void
rec_set_nth_field_extern_bit(
/*=========================*/
rec_t* rec, /* in: record */
dict_index_t* index, /* in: record descriptor */
ulint i, /* in: ith field */
mtr_t* mtr) /* in: mtr holding an X-latch to the page
where rec is, or NULL; in the NULL case
we do not write to log about the change */
{
if (UNIV_LIKELY(index->table->comp)) {
rec_set_nth_field_extern_bit_new(rec, index, i, mtr);
} else {
rec_set_nth_field_extern_bit_old(rec, i, mtr);
}
}
/**********************************************************
Returns the offset of n - 1th field end if the record is stored in the 1-byte
offsets form. If the field is SQL null, the flag is ORed in the returned
......
......@@ -113,8 +113,6 @@ row_upd_rec_sys_fields(
trx_t* trx, /* in: transaction */
dulint roll_ptr)/* in: roll ptr of the undo log record */
{
ulint offset;
ut_ad(index->type & DICT_CLUSTERED);
ut_ad(rec_offs_validate(rec, index, offsets));
#ifdef UNIV_SYNC_DEBUG
......@@ -122,21 +120,21 @@ row_upd_rec_sys_fields(
|| rw_lock_own(&btr_search_latch, RW_LOCK_EX));
#endif /* UNIV_SYNC_DEBUG */
offset = index->trx_id_offset;
if (!offset) {
offset = row_get_trx_id_offset(rec, index, offsets);
}
trx_write_trx_id(rec + offset, trx->id);
trx_write_roll_ptr(rec + offset + DATA_TRX_ID_LEN, roll_ptr);
if (UNIV_LIKELY_NULL(page_zip)) {
page_zip_write_trx_id(
page_zip, rec, rec_offs_data_size(offsets),
trx->id, NULL/* TODO: mtr */);
trx->id, NULL);
page_zip_write_roll_ptr(
page_zip, rec, rec_offs_data_size(offsets),
roll_ptr, NULL/* TODO: mtr */);
roll_ptr, NULL);
} else {
ulint offset = index->trx_id_offset;
if (!offset) {
offset = row_get_trx_id_offset(rec, index, offsets);
}
trx_write_trx_id(rec + offset, trx->id);
trx_write_roll_ptr(rec + offset + DATA_TRX_ID_LEN, roll_ptr);
}
}
......@@ -4883,13 +4883,14 @@ lock_rec_insert_check_and_lock(
lock_mutex_exit_kernel();
if (!(index->type & DICT_CLUSTERED)) {
buf_block_t* block = buf_block_align(rec);
/* Update the page max trx id field */
page_update_max_trx_id(buf_frame_align(rec),
NULL/*TODO*/,
thr_get_trx(thr)->id);
page_update_max_trx_id(buf_block_get_frame(block),
buf_block_get_page_zip(block),
thr_get_trx(thr)->id);
}
*inherit = FALSE;
return(DB_SUCCESS);
......@@ -4921,12 +4922,13 @@ lock_rec_insert_check_and_lock(
lock_mutex_exit_kernel();
if (!(index->type & DICT_CLUSTERED) && (err == DB_SUCCESS)) {
if ((err == DB_SUCCESS) && !(index->type & DICT_CLUSTERED)) {
buf_block_t* block = buf_block_align(rec);
/* Update the page max trx id field */
page_update_max_trx_id(buf_frame_align(rec),
NULL/*TODO*/,
thr_get_trx(thr)->id);
page_update_max_trx_id(buf_block_get_frame(block),
buf_block_get_page_zip(block),
thr_get_trx(thr)->id);
}
#ifdef UNIV_DEBUG
......@@ -5103,11 +5105,12 @@ lock_sec_rec_modify_check_and_lock(
#endif /* UNIV_DEBUG */
if (err == DB_SUCCESS) {
/* Update the page max trx id field */
buf_block_t* block = buf_block_align(rec);
page_update_max_trx_id(buf_frame_align(rec),
NULL/*TODO*/,
thr_get_trx(thr)->id);
/* Update the page max trx id field */
page_update_max_trx_id(buf_block_get_frame(block),
buf_block_get_page_zip(block),
thr_get_trx(thr)->id);
}
return(err);
......
......@@ -889,6 +889,7 @@ recv_parse_or_apply_log_rec_body(
case MLOG_ZIP_WRITE_NODE_PTR:
case MLOG_ZIP_WRITE_TRX_ID:
case MLOG_ZIP_WRITE_ROLL_PTR:
case MLOG_ZIP_WRITE_HEADER:
ut_error; /* TODO */
break;
case MLOG_ZIP_COMPRESS:
......
......@@ -219,24 +219,30 @@ mlog_write_ulint(
mtr_t* mtr) /* in: mini-transaction handle */
{
byte* log_ptr;
if (ptr < buf_pool->frame_zero || ptr >= buf_pool->high_end) {
if (UNIV_UNLIKELY(ptr < buf_pool->frame_zero)
|| UNIV_UNLIKELY(ptr >= buf_pool->high_end)) {
fprintf(stderr,
"InnoDB: Error: trying to write to a stray memory location %p\n", ptr);
ut_error;
}
if (type == MLOG_1BYTE) {
switch (type) {
case MLOG_1BYTE:
mach_write_to_1(ptr, val);
} else if (type == MLOG_2BYTES) {
break;
case MLOG_2BYTES:
mach_write_to_2(ptr, val);
} else {
ut_ad(type == MLOG_4BYTES);
break;
case MLOG_4BYTES:
mach_write_to_4(ptr, val);
break;
default:
ut_error;
}
log_ptr = mlog_open(mtr, 11 + 2 + 5);
/* If no logging is requested, we may return now */
if (log_ptr == NULL) {
......@@ -245,9 +251,9 @@ mlog_write_ulint(
log_ptr = mlog_write_initial_log_record_fast(ptr, type, log_ptr, mtr);
mach_write_to_2(log_ptr, ptr - buf_frame_align(ptr));
mach_write_to_2(log_ptr, ut_align_offset(ptr, UNIV_PAGE_SIZE));
log_ptr += 2;
log_ptr += mach_write_compressed(log_ptr, val);
mlog_close(mtr, log_ptr);
......@@ -338,7 +344,7 @@ mlog_log_string(
ut_ad(len <= UNIV_PAGE_SIZE);
log_ptr = mlog_open(mtr, 30);
/* If no logging is requested, we may return now */
if (log_ptr == NULL) {
......@@ -349,7 +355,7 @@ mlog_log_string(
log_ptr, mtr);
mach_write_to_2(log_ptr, ut_align_offset(ptr, UNIV_PAGE_SIZE));
log_ptr += 2;
mach_write_to_2(log_ptr, len);
log_ptr += 2;
......
......@@ -957,9 +957,9 @@ page_cur_insert_rec_low(
/* Set the "extern storage" flags */
if (UNIV_UNLIKELY(n_ext)) {
rec_set_field_extern_bits(insert_rec, index, ext, n_ext, mtr);
rec_set_field_extern_bits(insert_rec, index, ext, n_ext);
}
/* 4. Insert the record in the linked list of records */
current_rec = cursor->rec;
......
......@@ -218,11 +218,11 @@ page_set_max_trx_id(
/* It is not necessary to write this change to the redo log, as
during a database recovery we assume that the max trx id of every
page is the maximum trx id assigned before the crash. */
mach_write_to_8(page + (PAGE_HEADER + PAGE_MAX_TRX_ID), trx_id);
if (UNIV_LIKELY_NULL(page_zip)) {
page_zip_write_header(page_zip,
page + (PAGE_HEADER + PAGE_MAX_TRX_ID), 8);
page + (PAGE_HEADER + PAGE_MAX_TRX_ID), 8, NULL);
}
if (block->is_hashed) {
......
......@@ -2148,3 +2148,39 @@ page_zip_dir_delete(
The "owned" and "deleted" flags will be cleared. */
mach_write_to_2(slot_free, ut_align_offset(rec, UNIV_PAGE_SIZE));
}
/**************************************************************************
Write a log record of writing to the uncompressed header portion of a page. */
void
page_zip_write_header_log(
/*======================*/
const page_zip_des_t* page_zip,/* in: compressed page */
ulint offset, /* in: offset to the data */
ulint length, /* in: length of the data */
mtr_t* mtr) /* in: mini-transaction */
{
byte* log_ptr = mlog_open(mtr, 11 + 2 + 1);
ut_ad(offset < PAGE_DATA);
ut_ad(offset + length < PAGE_DATA);
#if PAGE_DATA > 255
# error "PAGE_DATA > 255"
#endif
ut_ad(length < 256);
/* If no logging is requested, we may return now */
if (log_ptr == NULL) {
return;
}
log_ptr = mlog_write_initial_log_record_fast(page_zip->data + offset,
MLOG_ZIP_WRITE_HEADER, log_ptr, mtr);
mach_write_to_2(log_ptr, offset);
log_ptr += 2;
mach_write_to_1(log_ptr, length);
mlog_close(mtr, log_ptr);
mlog_catenate_string(mtr, page_zip->data + offset, length);
}
......@@ -728,45 +728,34 @@ rec_set_nth_field_null_bit(
/***************************************************************
Sets the ith field extern storage bit of an old-style record. */
static
void
rec_set_nth_field_extern_bit_old(
/*=============================*/
rec_t* rec, /* in: old-style record */
ulint i, /* in: ith field */
mtr_t* mtr) /* in: mtr holding an X-latch to the page where
rec is, or NULL; in the NULL case we do not
write to log about the change */
ulint i) /* in: ith field */
{
ulint info;
ut_a(!rec_get_1byte_offs_flag(rec));
ut_a(i < rec_get_n_fields_old(rec));
info = rec_2_get_field_end_info(rec, i);
info |= REC_2BYTE_EXTERN_MASK;
if (mtr) {
mlog_write_ulint(rec - REC_N_OLD_EXTRA_BYTES - 2 * (i + 1),
info, MLOG_2BYTES, mtr);
} else {
rec_2_set_field_end_info(rec, i, info);
}
rec_2_set_field_end_info(rec, i, info);
}
/***************************************************************
Sets the ith field extern storage bit of a new-style record. */
Sets the extern storage bits of a new-style record. */
static
void
rec_set_nth_field_extern_bit_new(
/*=============================*/
rec_set_field_extern_bits_new(
/*==========================*/
rec_t* rec, /* in: record */
dict_index_t* index, /* in: record descriptor */
ulint ith, /* in: ith field */
mtr_t* mtr) /* in: mtr holding an X-latch to the page
where rec is, or NULL; in the NULL case
we do not write to log about the change */
const ulint* ext, /* in: array of field numbers */
ulint n_ext) /* in: number of elements in ext */
{
byte* nulls = rec - (REC_N_NEW_EXTRA_BYTES + 1);
byte* lens = nulls - (index->n_nullable + 7) / 8;
......@@ -781,10 +770,21 @@ rec_set_nth_field_extern_bit_new(
n_fields = dict_index_get_n_fields(index);
ut_ad(ith < n_fields);
ut_ad(n_ext <= n_fields);
ut_ad(n_ext);
/* read the lengths of fields 0..n */
for (i = 0; i < n_fields; i++) {
ibool flag_external = FALSE;
{
ulint j = 0;
do {
ut_ad(ext[j] < n_fields);
flag_external = (i == ext[j++]);
} while (!flag_external && j < n_ext);
}
field = dict_index_get_nth_field(index, i);
type = dict_col_get_type(dict_field_get_col(field));
if (!(dtype_get_prtype(type) & DATA_NOT_NULL)) {
......@@ -796,7 +796,7 @@ rec_set_nth_field_extern_bit_new(
if (*nulls & null_mask) {
null_mask <<= 1;
/* NULL fields cannot be external. */
ut_ad(i != ith);
ut_ad(!flag_external);
continue;
}
......@@ -807,7 +807,7 @@ rec_set_nth_field_extern_bit_new(
(Fixed-length fields longer than
DICT_MAX_INDEX_COL_LEN will be treated as
variable-length ones in dict_index_add_col().) */
ut_ad(i != ith);
ut_ad(!flag_external);
continue;
}
lens--;
......@@ -815,31 +815,19 @@ rec_set_nth_field_extern_bit_new(
|| dtype_get_mtype(type) == DATA_BLOB) {
ulint len = lens[1];
if (len & 0x80) { /* 1exxxxxx: 2-byte length */
if (i == ith) {
if (len & 0x40) {
return; /* no change */
}
/* toggle the extern bit */
if (flag_external) {
/* set the extern bit */
len |= 0x40;
if (mtr) {
/* TODO: page_zip:
log this differently,
or remove altogether */
mlog_write_ulint(lens + 1, len,
MLOG_1BYTE, mtr);
} else {
lens[1] = (byte) len;
}
return;
lens[1] = (byte) len;
}
lens--;
} else {
/* short fields cannot be external */
ut_ad(i != ith);
ut_ad(!flag_external);
}
} else {
/* short fields cannot be external */
ut_ad(i != ith);
ut_ad(!flag_external);
}
}
}
......@@ -853,22 +841,16 @@ rec_set_field_extern_bits(
rec_t* rec, /* in: record */
dict_index_t* index, /* in: record descriptor */
const ulint* vec, /* in: array of field numbers */
ulint n_fields,/* in: number of fields numbers */
mtr_t* mtr) /* in: mtr holding an X-latch to the
page where rec is, or NULL;
in the NULL case we do not write
to log about the change */
ulint n_fields)/* in: number of fields numbers */
{
ulint i;
if (UNIV_LIKELY(index->table->comp)) {
for (i = 0; i < n_fields; i++) {
rec_set_nth_field_extern_bit_new(rec, index, vec[i],
mtr);
}
rec_set_field_extern_bits_new(rec, index, vec, n_fields);
} else {
ut_a(!rec_get_1byte_offs_flag(rec));
ulint i;
for (i = 0; i < n_fields; i++) {
rec_set_nth_field_extern_bit_old(rec, vec[i], mtr);
rec_set_nth_field_extern_bit_old(rec, vec[i]);
}
}
}
......
......@@ -2083,7 +2083,6 @@ row_ins_index_entry_low(
entry,
thr, &mtr);
}
} else {
if (mode == BTR_MODIFY_LEAF) {
err = btr_cur_optimistic_insert(0, &cursor, entry,
......@@ -2103,14 +2102,13 @@ function_exit:
if (UNIV_LIKELY_NULL(big_rec)) {
rec_t* rec;
mtr_start(&mtr);
btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
BTR_MODIFY_TREE, &cursor, 0, &mtr);
rec = btr_cur_get_rec(&cursor);
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &heap);
/* TODO: set the extern bits outside this function */
err = btr_store_big_rec_extern_fields(index, rec,
offsets, big_rec, &mtr);
......
......@@ -1402,7 +1402,7 @@ trx_undo_prev_version_build(
/* Now set the extern bits in the old version of the record */
rec_set_field_extern_bits(*old_vers, index,
ext_vect, n_ext_vect, NULL);
ext_vect, n_ext_vect);
mem_free(ext_vect);
} else {
buf = mem_heap_alloc(heap, rec_offs_size(offsets));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment