Commit 174bd7b7 authored by marko's avatar marko

branches/zip: dtuple_convert_big_rec(): Do not store anything locally

of externally stored columns, and fix bugs introduced in r873.  (Bug #22496)

btr_page_get_sure_split_rec(), btr_page_insert_fits(),
rec_get_converted_size(), rec_convert_dtuple_to_rec(),
rec_convert_dtuple_to_rec_old(), rec_convert_dtuple_to_rec_new():
Add parameters ext and n_ext.  Flag external fields during the
conversion.

rec_set_field_extern_bits(), rec_set_field_extern_bits_new(),
rec_offs_set_nth_extern(), rec_set_nth_field_extern_bit_old():
Remove.  The bits are set by rec_convert_dtuple_to_rec().

page_cur_insert_rec_low(): Remove the parameters ext and n_ext.

btr_cur_add_ext(): New utility function for updating and sorting ext[].
Low-level functions now expect the array to be in ascending order
for performance reasons.  Used in btr_cur_optimistic_insert(),
btr_cur_pessimistic_insert(), and btr_cur_pessimistic_update().

btr_cur_optimistic_insert(): Remove some defensive code, because we cannot
compute the added parameters of rec_get_converted_size().

btr_push_update_extern_fields(): Sort the array.  Require the array to
be twice the maximum usage, so that ut_ulint_sort() can be used.

dtuple_convert_big_rec(): Allocate new space for the BLOB pointer,
to avoid overwriting prefix indexes to the same column.  Adapt
dtuple_convert_back_big_rec().

row_build_index_entry(): Fetch the columns also for prefix indexes of
the clustered index.

page_zip_apply_log(), page_zip_decompress_clust(): Allow externally
stored fields to lack a locally stored part.
parent 60504d8e
......@@ -100,23 +100,6 @@ btr_page_empty(
page_zip_des_t* page_zip,/* out: compressed page, or NULL */
mtr_t* mtr, /* in: mtr */
dict_index_t* index); /* in: the index of the page */
/*****************************************************************
Returns TRUE if the insert fits on the appropriate half-page
with the chosen split_rec. */
static
ibool
btr_page_insert_fits(
/*=================*/
/* out: TRUE if fits */
btr_cur_t* cursor, /* in: cursor at which insert
should be made */
rec_t* split_rec, /* in: suggestion for first record
on upper half-page, or NULL if
tuple should be first */
const ulint* offsets, /* in: rec_get_offsets(
split_rec, cursor->index) */
dtuple_t* tuple, /* in: tuple to insert */
mem_heap_t* heap); /* in: temporary memory heap */
/******************************************************************
Gets the root node of a tree and x-latches it. */
......@@ -1308,7 +1291,9 @@ btr_page_get_sure_split_rec(
upper half-page */
btr_cur_t* cursor, /* in: cursor at which insert
should be made */
dtuple_t* tuple) /* in: tuple to insert */
dtuple_t* tuple, /* in: tuple to insert */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext) /* in: number of elements in ext */
{
page_t* page;
page_zip_des_t* page_zip;
......@@ -1327,7 +1312,7 @@ btr_page_get_sure_split_rec(
page = btr_cur_get_page(cursor);
insert_size = rec_get_converted_size(cursor->index, tuple);
insert_size = rec_get_converted_size(cursor->index, tuple, ext, n_ext);
free_space = page_get_free_space_of_empty(page_is_comp(page));
page_zip = buf_block_get_page_zip(buf_block_align(page));
......@@ -1432,6 +1417,8 @@ btr_page_insert_fits(
const ulint* offsets, /* in: rec_get_offsets(
split_rec, cursor->index) */
dtuple_t* tuple, /* in: tuple to insert */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext, /* in: number of elements in ext */
mem_heap_t* heap) /* in: temporary memory heap */
{
page_t* page;
......@@ -1451,7 +1438,7 @@ btr_page_insert_fits(
ut_ad(!offsets
|| rec_offs_validate(split_rec, cursor->index, offsets));
insert_size = rec_get_converted_size(cursor->index, tuple);
insert_size = rec_get_converted_size(cursor->index, tuple, ext, n_ext);
free_space = page_get_free_space_of_empty(page_is_comp(page));
/* free_space is now the free space of a created new page */
......@@ -1751,7 +1738,8 @@ btr_page_split_and_insert(
if (n_iterations > 0) {
direction = FSP_UP;
hint_page_no = page_no + 1;
split_rec = btr_page_get_sure_split_rec(cursor, tuple);
split_rec = btr_page_get_sure_split_rec(cursor, tuple,
ext, n_ext);
} else if (btr_page_get_split_rec_to_right(cursor, &split_rec)) {
direction = FSP_UP;
......@@ -1780,10 +1768,11 @@ btr_page_split_and_insert(
if (split_rec) {
first_rec = move_limit = split_rec;
} else {
buf = mem_alloc(rec_get_converted_size(cursor->index, tuple));
buf = mem_alloc(rec_get_converted_size(cursor->index,
tuple, ext, n_ext));
first_rec = rec_convert_dtuple_to_rec(buf,
cursor->index, tuple);
first_rec = rec_convert_dtuple_to_rec(buf, cursor->index,
tuple, ext, n_ext);
move_limit = page_rec_get_next(btr_cur_get_rec(cursor));
}
......@@ -1802,15 +1791,15 @@ btr_page_split_and_insert(
n_uniq, &heap);
insert_left = cmp_dtuple_rec(tuple, split_rec, offsets) < 0;
insert_will_fit = btr_page_insert_fits(cursor,
split_rec, offsets,
tuple, heap);
insert_will_fit = btr_page_insert_fits(cursor, split_rec,
offsets, tuple,
ext, n_ext, heap);
} else {
mem_free(buf);
insert_left = FALSE;
insert_will_fit = btr_page_insert_fits(cursor,
NULL, NULL,
tuple, heap);
insert_will_fit = btr_page_insert_fits(cursor, NULL,
NULL, tuple,
ext, n_ext, heap);
}
if (insert_will_fit && page_is_leaf(page) && !page_zip) {
......
......@@ -978,6 +978,45 @@ btr_cur_trx_report(
}
#endif /* UNIV_DEBUG */
/*****************************************************************
Add the numbers of externally stored fields from big_rec to ext[]. */
static
const ulint*
btr_cur_add_ext(
/*============*/
const ulint* ext, /* in: numbers of externally stored fields
so far */
ulint* n_ext, /* in: number of externally stored fields
so far */
const big_rec_t*big_rec,/* in: additional externally stored fields */
mem_heap_t** heap) /* out: memory heap */
{
ulint n_old_ext = *n_ext;
ulint n_more_ext = big_rec->n_fields;
ulint* more_ext;
ulint i;
ut_ad(n_more_ext);
*n_ext += n_more_ext;
if (!*heap) {
*heap = mem_heap_create(*n_ext * sizeof(ulint) * 2);
}
more_ext = mem_heap_alloc(*heap, *n_ext * sizeof(ulint) * 2);
if (n_old_ext) {
memcpy(more_ext, ext, n_old_ext * sizeof(ulint));
}
for (i = 0; i < n_more_ext; i++) {
more_ext[n_old_ext++] = big_rec->fields[i].field_no;
}
ut_ulint_sort(more_ext, more_ext + *n_ext, 0, *n_ext);
return(more_ext);
}
/*****************************************************************
Tries to perform an insert to a page in an index tree, next to cursor.
It is assumed that mtr holds an x-latch on the page. The operation does
......@@ -1044,7 +1083,7 @@ btr_cur_optimistic_insert(
level = btr_page_get_level(page, mtr);
/* Calculate the record size when entry is converted to a record */
rec_size = rec_get_converted_size(index, entry);
rec_size = rec_get_converted_size(index, entry, ext, n_ext);
if (page_zip_rec_needs_ext(rec_size, page_is_comp(page),
page_zip ? page_zip->size : 0)) {
......@@ -1057,19 +1096,6 @@ btr_cur_optimistic_insert(
return(DB_TOO_BIG_RECORD);
}
rec_size = rec_get_converted_size(index, entry);
if (UNIV_UNLIKELY
(page_zip_rec_needs_ext(rec_size, page_is_comp(page),
page_zip ? page_zip->size : 0))) {
/* This should never happen, but we handle
the situation in a robust manner. */
ut_ad(0);
dtuple_convert_back_big_rec(index, entry, big_rec_vec);
return(DB_TOO_BIG_RECORD);
}
}
/* If there have been many consecutive inserts, and we are on the leaf
......@@ -1117,22 +1143,7 @@ btr_cur_optimistic_insert(
/* Add externally stored records, if needed */
if (UNIV_LIKELY_NULL(big_rec_vec)) {
ulint n_more_ext = big_rec_vec->n_fields;
ulint* more_ext;
ulint i;
heap = mem_heap_create((n_more_ext + n_ext) * sizeof(ulint));
more_ext = mem_heap_alloc(
heap, (n_more_ext + n_ext) * sizeof(ulint));
if (n_ext) {
memcpy(more_ext, ext, n_ext * sizeof(ulint));
}
for (i = 0; i < n_more_ext; i++) {
more_ext[n_ext++] = big_rec_vec->fields[i].field_no;
}
ext = more_ext;
ext = btr_cur_add_ext(ext, &n_ext, big_rec_vec, &heap);
}
/* Now, try the insert */
......@@ -1296,7 +1307,8 @@ btr_cur_pessimistic_insert(
}
}
if (page_zip_rec_needs_ext(rec_get_converted_size(index, entry),
if (page_zip_rec_needs_ext(rec_get_converted_size(index, entry,
ext, n_ext),
page_is_comp(page), zip_size)) {
/* The record is so big that we have to store some fields
externally on separate database pages */
......@@ -1322,22 +1334,7 @@ btr_cur_pessimistic_insert(
/* Add externally stored records, if needed */
if (UNIV_LIKELY_NULL(big_rec_vec)) {
ulint n_more_ext = big_rec_vec->n_fields;
ulint* more_ext;
ulint i;
heap = mem_heap_create((n_more_ext + n_ext) * sizeof(ulint));
more_ext = mem_heap_alloc(
heap, (n_more_ext + n_ext) * sizeof(ulint));
if (n_ext) {
memcpy(more_ext, ext, n_ext * sizeof(ulint));
}
for (i = 0; i < n_more_ext; i++) {
more_ext[n_ext++] = big_rec_vec->fields[i].field_no;
}
ext = more_ext;
ext = btr_cur_add_ext(ext, &n_ext, big_rec_vec, &heap);
}
if (UNIV_UNLIKELY(zip_size)) {
......@@ -1345,7 +1342,8 @@ btr_cur_pessimistic_insert(
ulint free_space_zip = page_zip_empty_size(
cursor->index->n_fields, zip_size);
if (UNIV_UNLIKELY(rec_get_converted_size(index, entry)
if (UNIV_UNLIKELY(rec_get_converted_size(index, entry,
ext, n_ext)
> free_space_zip)) {
/* Try to insert the record by itself on a new page.
If it fails, no amount of splitting will help. */
......@@ -1817,7 +1815,7 @@ btr_cur_optimistic_update(
row_upd_index_replace_new_col_vals_index_pos(new_entry, index, update,
FALSE, NULL);
old_rec_size = rec_offs_size(offsets);
new_rec_size = rec_get_converted_size(index, new_entry);
new_rec_size = rec_get_converted_size(index, new_entry, NULL, 0);
page_zip = buf_block_get_page_zip(buf_block_align(page));
#ifdef UNIV_ZIP_DEBUG
......@@ -2101,31 +2099,28 @@ btr_cur_pessimistic_update(
/* We have to set appropriate extern storage bits in the new
record to be inserted: we have to remember which fields were such */
ext_vect = mem_heap_alloc(heap, sizeof(ulint)
ext_vect = mem_heap_alloc(heap, sizeof(ulint) * 2
* dict_index_get_n_fields(index));
ut_ad(!page_is_comp(page) || !rec_get_node_ptr_flag(rec));
offsets = rec_get_offsets(rec, index, offsets,
ULINT_UNDEFINED, &heap);
n_ext_vect = btr_push_update_extern_fields(ext_vect, offsets, update);
if (page_zip_rec_needs_ext(rec_get_converted_size(index, new_entry),
if (page_zip_rec_needs_ext(rec_get_converted_size(index, new_entry,
ext_vect,
n_ext_vect),
page_is_comp(page),
page_zip ? page_zip->size : 0)) {
ulint i;
big_rec_vec = dtuple_convert_big_rec(index, new_entry,
ext_vect, n_ext_vect);
if (big_rec_vec == NULL) {
if (UNIV_UNLIKELY(big_rec_vec == NULL)) {
err = DB_TOO_BIG_RECORD;
goto return_after_reservations;
}
/* Add the externally stored records. */
for (i = 0; i < big_rec_vec->n_fields; i++) {
ext_vect[n_ext_vect++]
= big_rec_vec->fields[i].field_no;
}
ext_vect = (ulint*) btr_cur_add_ext(ext_vect, &n_ext_vect,
big_rec_vec, &heap);
}
page_cursor = btr_cur_get_page_cur(cursor);
......@@ -3493,8 +3488,9 @@ ulint
btr_push_update_extern_fields(
/*==========================*/
/* out: number of values stored in ext_vect */
ulint* ext_vect,/* in: array of ulints, must be preallocated
to have space for all fields in rec */
ulint* ext_vect,/* out: array of ulints, must be preallocated
to have twice the space for all fields
in rec */
const ulint* offsets,/* in: array returned by rec_get_offsets() */
upd_t* update) /* in: update vector or NULL */
{
......@@ -3543,6 +3539,8 @@ btr_push_update_extern_fields(
}
}
ut_ulint_sort(ext_vect, ext_vect + n_pushed, 0, n_pushed);
return(n_pushed);
}
......
......@@ -516,7 +516,7 @@ dtuple_convert_big_rec(
ut_a(dtuple_check_typed_no_assert(entry));
size = rec_get_converted_size(index, entry);
size = rec_get_converted_size(index, entry, ext_vec, n_ext_vec);
if (UNIV_UNLIKELY(size > 1000000000)) {
fprintf(stderr,
......@@ -542,7 +542,9 @@ dtuple_convert_big_rec(
n_fields = 0;
while (page_zip_rec_needs_ext(rec_get_converted_size(index, entry),
while (page_zip_rec_needs_ext(rec_get_converted_size(index, entry,
ext_vec,
n_ext_vec),
dict_table_is_comp(index->table),
dict_table_zip_size(index->table))) {
ulint i;
......@@ -560,12 +562,11 @@ dtuple_convert_big_rec(
if (ifield->fixed_len
|| dfield->len == UNIV_SQL_NULL
|| dfield->len <= REC_1BYTE_OFFS_LIMIT + 1) {
|| dfield->len <= BTR_EXTERN_FIELD_REF_SIZE * 2) {
goto skip_field;
}
savings = dfield->len - (REC_1BYTE_OFFS_LIMIT + 1
- BTR_EXTERN_FIELD_REF_SIZE);
savings = dfield->len - BTR_EXTERN_FIELD_REF_SIZE;
/* Check that there would be savings */
if (longest >= savings) {
......@@ -609,26 +610,14 @@ dtuple_convert_big_rec(
ifield = dict_index_get_nth_field(index, longest_i);
vector->fields[n_fields].field_no = longest_i;
vector->fields[n_fields].len
= dfield->len - (REC_1BYTE_OFFS_LIMIT + 1
- BTR_EXTERN_FIELD_REF_SIZE);
vector->fields[n_fields].len = dfield->len;
vector->fields[n_fields].data
= mem_heap_alloc(heap, vector->fields[n_fields].len);
/* Copy data (from the end of field) to big rec vector */
memcpy(vector->fields[n_fields].data,
((byte*)dfield->data) + dfield->len
- vector->fields[n_fields].len,
vector->fields[n_fields].len);
dfield->len -= vector->fields[n_fields].len
- BTR_EXTERN_FIELD_REF_SIZE;
vector->fields[n_fields].data = dfield->data;
/* Set the extern field reference in dfield to zero */
memset(((byte*)dfield->data)
+ dfield->len - BTR_EXTERN_FIELD_REF_SIZE,
0, BTR_EXTERN_FIELD_REF_SIZE);
dfield->len = BTR_EXTERN_FIELD_REF_SIZE;
dfield->data = mem_heap_alloc(heap, BTR_EXTERN_FIELD_REF_SIZE);
memset(dfield->data, 0, BTR_EXTERN_FIELD_REF_SIZE);
n_fields++;
ut_ad(n_fields < dtuple_get_n_fields(entry));
}
......@@ -657,13 +646,8 @@ dtuple_convert_back_big_rec(
dfield = dtuple_get_nth_field(entry,
vector->fields[i].field_no);
/* Copy data from big rec vector */
memcpy((byte*) dfield->data
+ dfield->len - BTR_EXTERN_FIELD_REF_SIZE,
vector->fields[i].data, vector->fields[i].len);
dfield->len += vector->fields[i].len
- BTR_EXTERN_FIELD_REF_SIZE;
dfield->data = vector->fields[i].data;
dfield->len = vector->fields[i].len;
}
mem_heap_free(vector->heap);
......
......@@ -1367,7 +1367,8 @@ ibuf_rec_get_volume(
mem_heap_t* heap = mem_heap_create(500);
dtuple_t* entry = ibuf_build_entry_from_ibuf_rec(
ibuf_rec, heap, &dummy_index);
volume = rec_get_converted_size(dummy_index, entry);
volume = rec_get_converted_size(dummy_index, entry,
NULL, 0);
ibuf_dummy_index_free(dummy_index);
mem_heap_free(heap);
return(volume + page_dir_calc_reserved_space(1));
......@@ -1400,7 +1401,7 @@ ibuf_rec_get_volume(
}
}
return(data_size + rec_get_converted_extra_size(data_size, n_fields)
return(data_size + rec_get_converted_extra_size(data_size, n_fields, 0)
+ page_dir_calc_reserved_space(1));
}
......@@ -2654,7 +2655,7 @@ ibuf_insert_low(
ibuf_enter();
}
entry_size = rec_get_converted_size(index, entry);
entry_size = rec_get_converted_size(index, entry, NULL, 0);
heap = mem_heap_create(512);
......@@ -2838,7 +2839,7 @@ ibuf_insert(
ut_a(!dict_index_is_clust(index));
ut_a(!dict_table_zip_size(index->table));
if (rec_get_converted_size(index, entry)
if (rec_get_converted_size(index, entry, NULL, 0)
>= (page_get_free_space_of_empty(dict_table_is_comp(index->table))
/ 2)) {
return(FALSE);
......@@ -2958,7 +2959,7 @@ ibuf_insert_to_index_page(
(ulong) page_get_max_insert_size(
page, 1),
(ulong) rec_get_converted_size(
index, entry));
index, entry, NULL, 0));
fputs("InnoDB: Cannot insert index record ",
stderr);
dtuple_print(stderr, entry);
......
......@@ -542,8 +542,9 @@ ulint
btr_push_update_extern_fields(
/*==========================*/
/* out: number of values stored in ext_vect */
ulint* ext_vect,/* in: array of ulints, must be preallocated
to have space for all fields in rec */
ulint* ext_vect,/* out: array of ulints, must be preallocated
to have twice the space for all fields
in rec */
const ulint* offsets,/* in: array returned by rec_get_offsets() */
upd_t* update);/* in: update vector or NULL */
......
......@@ -134,7 +134,7 @@ page_cur_tuple_insert(
dtuple_t* tuple, /* in: pointer to a data tuple */
dict_index_t* index, /* in: record descriptor */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext, /* in: number of elements in vec */
ulint n_ext, /* in: number of elements in ext */
mtr_t* mtr); /* in: mini-transaction handle, or NULL */
/***************************************************************
Inserts a record next to page cursor. Returns pointer to inserted record if
......@@ -168,8 +168,6 @@ page_cur_insert_rec_low(
dict_index_t* index, /* in: record descriptor */
rec_t* rec, /* in: pointer to a physical record */
ulint* offsets,/* in/out: rec_get_offsets(rec, index) */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext, /* in: number of elements in vec */
mtr_t* mtr); /* in: mini-transaction handle, or NULL */
/*****************************************************************
Copies records from page to a newly created page, from a given record onward,
......
......@@ -185,12 +185,13 @@ page_cur_tuple_insert(
dtuple_t* tuple, /* in: pointer to a data tuple */
dict_index_t* index, /* in: record descriptor */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext, /* in: number of elements in vec */
ulint n_ext, /* in: number of elements in ext */
mtr_t* mtr) /* in: mini-transaction handle, or NULL */
{
mem_heap_t* heap;
ulint* offsets;
ulint size = rec_get_converted_size(index, tuple);
ulint size
= rec_get_converted_size(index, tuple, ext, n_ext);
rec_t* rec;
heap = mem_heap_create(size
......@@ -198,11 +199,11 @@ page_cur_tuple_insert(
+ dtuple_get_n_fields(tuple))
* sizeof *offsets);
rec = rec_convert_dtuple_to_rec(mem_heap_alloc(heap, size),
index, tuple);
index, tuple, ext, n_ext);
offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, &heap);
rec = page_cur_insert_rec_low(cursor, page_zip,
index, rec, offsets, ext, n_ext, mtr);
index, rec, offsets, mtr);
mem_heap_free(heap);
return(rec);
}
......@@ -225,6 +226,6 @@ page_cur_rec_insert(
mtr_t* mtr) /* in: mini-transaction handle, or NULL */
{
return(page_cur_insert_rec_low(cursor, page_zip,
index, rec, offsets, NULL, 0, mtr));
index, rec, offsets, mtr));
}
......@@ -462,18 +462,6 @@ rec_offs_n_extern(
/* out: number of externally stored fields */
const ulint* offsets);/* in: array returned by rec_get_offsets() */
/***************************************************************
Sets TRUE the extern storage bits of fields mentioned in an array. */
void
rec_set_field_extern_bits(
/*======================*/
rec_t* rec, /* in: record */
dict_index_t* index, /* in: record descriptor */
ulint* offsets,/* in/out: rec_get_offsets(rec, index),
or NULL */
const ulint* vec, /* in: array of field numbers */
ulint n_fields);/* in: number of fields numbers */
/***************************************************************
This is used to modify the value of an already existing field in a record.
The previous value must have exactly the same size as the new value. If len
is UNIV_SQL_NULL then the field is treated as an SQL null for old-style
......@@ -608,7 +596,10 @@ rec_convert_dtuple_to_rec(
byte* buf, /* in: start address of the
physical record */
dict_index_t* index, /* in: record descriptor */
dtuple_t* dtuple);/* in: data tuple */
dtuple_t* dtuple, /* in: data tuple */
const ulint* ext, /* in: array of extern field numbers,
in ascending order */
ulint n_ext); /* in: number of elements in ext */
/**************************************************************
Returns the extra size of an old-style physical record if we know its
data size and number of fields. */
......@@ -618,7 +609,8 @@ rec_get_converted_extra_size(
/*=========================*/
/* out: extra size */
ulint data_size, /* in: data size */
ulint n_fields) /* in: number of fields */
ulint n_fields, /* in: number of fields */
ulint n_ext) /* in: number of externally stored columns */
__attribute__((const));
/**************************************************************
The following function returns the size of a data tuple when converted to
......@@ -629,7 +621,9 @@ rec_get_converted_size(
/*===================*/
/* out: size */
dict_index_t* index, /* in: record descriptor */
dtuple_t* dtuple);/* in: data tuple */
dtuple_t* dtuple, /* in: data tuple */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext); /* in: number of elements in ext */
/******************************************************************
Copies the first n fields of a physical record to a data tuple.
The fields are copied to the memory heap. */
......
......@@ -1497,9 +1497,10 @@ rec_get_converted_extra_size(
/*=========================*/
/* out: extra size */
ulint data_size, /* in: data size */
ulint n_fields) /* in: number of fields */
ulint n_fields, /* in: number of fields */
ulint n_ext) /* in: number of externally stored columns */
{
if (data_size <= REC_1BYTE_OFFS_LIMIT) {
if (!n_ext && data_size <= REC_1BYTE_OFFS_LIMIT) {
return(REC_N_OLD_EXTRA_BYTES + n_fields);
}
......@@ -1516,7 +1517,9 @@ rec_get_converted_size_new(
/*=======================*/
/* out: size */
dict_index_t* index, /* in: record descriptor */
dtuple_t* dtuple);/* in: data tuple */
dtuple_t* dtuple, /* in: data tuple */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext); /* in: number of elements in ext */
/**************************************************************
The following function returns the size of a data tuple when converted to
a physical record. */
......@@ -1526,7 +1529,9 @@ rec_get_converted_size(
/*===================*/
/* out: size */
dict_index_t* index, /* in: record descriptor */
dtuple_t* dtuple) /* in: data tuple */
dtuple_t* dtuple, /* in: data tuple */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext) /* in: number of elements in ext */
{
ulint data_size;
ulint extra_size;
......@@ -1543,13 +1548,13 @@ rec_get_converted_size(
: dict_index_get_n_fields(index)));
if (dict_table_is_comp(index->table)) {
return(rec_get_converted_size_new(index, dtuple));
return(rec_get_converted_size_new(index, dtuple, ext, n_ext));
}
data_size = dtuple_get_data_size(dtuple);
extra_size = rec_get_converted_extra_size(
data_size, dtuple_get_n_fields(dtuple));
data_size, dtuple_get_n_fields(dtuple), n_ext);
return(data_size + extra_size);
}
......
......@@ -902,8 +902,6 @@ page_cur_insert_rec_low(
dict_index_t* index, /* in: record descriptor */
rec_t* rec, /* in: pointer to a physical record */
ulint* offsets,/* in/out: rec_get_offsets(rec, index) */
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext, /* in: number of elements in vec */
mtr_t* mtr) /* in: mini-transaction handle, or NULL */
{
byte* insert_buf = NULL;
......@@ -1025,12 +1023,6 @@ page_cur_insert_rec_low(
insert_rec = rec_copy(insert_buf, rec, offsets);
rec_offs_make_valid(insert_rec, index, offsets);
/* Set the "extern storage" flags */
if (UNIV_UNLIKELY(n_ext)) {
rec_set_field_extern_bits(insert_rec, index, offsets,
ext, n_ext);
}
/* 4. Insert the record in the linked list of records */
current_rec = cursor->rec;
ut_ad(current_rec != insert_rec);
......
......@@ -360,7 +360,8 @@ page_create_low(
heap_top = page + PAGE_DATA;
infimum_rec = rec_convert_dtuple_to_rec(heap_top, index, tuple);
infimum_rec = rec_convert_dtuple_to_rec(heap_top, index,
tuple, NULL, 0);
if (UNIV_LIKELY(comp)) {
ut_a(infimum_rec == page + PAGE_NEW_INFIMUM);
......@@ -389,7 +390,8 @@ page_create_low(
dtype_set(dfield_get_type(field),
DATA_VARCHAR, DATA_ENGLISH | DATA_NOT_NULL, comp ? 8 : 9);
supremum_rec = rec_convert_dtuple_to_rec(heap_top, index, tuple);
supremum_rec = rec_convert_dtuple_to_rec(heap_top, index,
tuple, NULL, 0);
if (UNIV_LIKELY(comp)) {
ut_a(supremum_rec == page + PAGE_NEW_SUPREMUM);
......
......@@ -832,8 +832,7 @@ page_zip_compress_clust(
} else if (rec_offs_nth_extern(offsets, i)) {
src = rec_get_nth_field(rec, offsets, i, &len);
ut_ad(dict_index_is_clust(index));
ut_ad(len
>= BTR_EXTERN_FIELD_REF_SIZE);
ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE);
src += len - BTR_EXTERN_FIELD_REF_SIZE;
c_stream->avail_in = src
......@@ -1601,7 +1600,8 @@ page_zip_apply_log(
} else if (rec_offs_nth_extern(offsets, i)) {
dst = rec_get_nth_field(rec, offsets,
i, &len);
ut_ad(len > BTR_EXTERN_FIELD_REF_SIZE);
ut_ad(len
>= BTR_EXTERN_FIELD_REF_SIZE);
len += dst - next_out
- BTR_EXTERN_FIELD_REF_SIZE;
......@@ -2021,7 +2021,7 @@ page_zip_decompress_clust(
+ DATA_ROLL_PTR_LEN;
} else if (rec_offs_nth_extern(offsets, i)) {
dst = rec_get_nth_field(rec, offsets, i, &len);
ut_ad(len > BTR_EXTERN_FIELD_REF_SIZE);
ut_ad(len >= BTR_EXTERN_FIELD_REF_SIZE);
dst += len - BTR_EXTERN_FIELD_REF_SIZE;
d_stream->avail_out = dst - d_stream->next_out;
......
......@@ -624,11 +624,14 @@ rec_get_converted_size_new(
/*=======================*/
/* out: size */
dict_index_t* index, /* in: record descriptor */
dtuple_t* dtuple) /* in: data tuple */
dtuple_t* dtuple,
const ulint* ext, /* in: array of extern field numbers */
ulint n_ext) /* in: number of elements in ext */
{
ulint size = REC_N_NEW_EXTRA_BYTES
+ (index->n_nullable + 7) / 8;
ulint i;
ulint j;
ulint n_fields;
ut_ad(index && dtuple);
ut_ad(dict_table_is_comp(index->table));
......@@ -649,12 +652,12 @@ rec_get_converted_size_new(
/* infimum or supremum record, 8 bytes */
return(size + 8); /* no extra data needed */
default:
ut_a(0);
ut_error;
return(ULINT_UNDEFINED);
}
/* read the lengths of fields 0..n */
for (i = 0; i < n_fields; i++) {
for (i = j = 0; i < n_fields; i++) {
dict_field_t* field;
ulint len;
const dict_col_t* col;
......@@ -674,6 +677,9 @@ rec_get_converted_size_new(
ut_ad(!field->fixed_len || len == field->fixed_len);
if (field->fixed_len) {
} else if (UNIV_UNLIKELY(j < n_ext) && i == ext[j]) {
j++;
size += 2;
} else if (len < 128
|| (col->len < 256 && col->mtype != DATA_BLOB)) {
size++;
......@@ -683,6 +689,8 @@ rec_get_converted_size_new(
size += len;
}
ut_ad(j == n_ext);
return(size);
}
......@@ -724,162 +732,6 @@ rec_set_nth_field_null_bit(
rec_2_set_field_end_info(rec, i, info);
}
/**********************************************************
Sets the extern bit in nth field of rec. */
UNIV_INLINE
void
rec_offs_set_nth_extern(
/*====================*/
ulint* offsets,/* in: array returned by rec_get_offsets() */
ulint n) /* in: nth field */
{
rec_offs_base(offsets)[1 + n] |= REC_OFFS_EXTERNAL;
}
/***************************************************************
Sets the ith field extern storage bit of an old-style record. */
static
void
rec_set_nth_field_extern_bit_old(
/*=============================*/
rec_t* rec, /* in: old-style record */
ulint i) /* in: ith field */
{
ulint info;
ut_a(i < rec_get_n_fields_old(rec));
info = rec_2_get_field_end_info(rec, i);
info |= REC_2BYTE_EXTERN_MASK;
rec_2_set_field_end_info(rec, i, info);
}
/***************************************************************
Sets the extern storage bits of a new-style record. */
static
void
rec_set_field_extern_bits_new(
/*==========================*/
rec_t* rec, /* in: record */
dict_index_t* index, /* in: record descriptor */
ulint* offsets,/* in/out: rec_get_offsets(rec, index),
or NULL */
const ulint* ext, /* in: array of field numbers */
ulint n_ext) /* in: number of elements in ext */
{
byte* nulls = rec - (REC_N_NEW_EXTRA_BYTES + 1);
byte* lens = nulls - (index->n_nullable + 7) / 8;
ulint i;
ulint n_fields;
ulint null_mask = 1;
ut_ad(rec && index);
ut_ad(dict_table_is_comp(index->table));
ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY);
n_fields = dict_index_get_n_fields(index);
ut_ad(n_ext <= n_fields);
ut_ad(n_ext);
/* read the lengths of fields 0..n */
for (i = 0; i < n_fields; i++) {
ibool flag_external = FALSE;
const dict_field_t* field;
const dict_col_t* col;
{
ulint j = 0;
do {
ut_ad(ext[j] < n_fields);
flag_external = (i == ext[j++]);
} while (!flag_external && j < n_ext);
}
field = dict_index_get_nth_field(index, i);
col = dict_field_get_col(field);
if (!(col->prtype & DATA_NOT_NULL)) {
if (UNIV_UNLIKELY(!(byte) null_mask)) {
nulls--;
null_mask = 1;
}
if (*nulls & null_mask) {
null_mask <<= 1;
/* NULL fields cannot be external. */
ut_ad(!flag_external);
continue;
}
null_mask <<= 1;
}
if (field->fixed_len) {
/* fixed-length fields cannot be external
(Fixed-length fields longer than
DICT_MAX_INDEX_COL_LEN will be treated as
variable-length ones in dict_index_add_col().) */
ut_ad(!flag_external);
continue;
}
lens--;
if (col->len > 255 || col->mtype == DATA_BLOB) {
ulint len = lens[1];
if (len & 0x80) { /* 1exxxxxx: 2-byte length */
if (flag_external) {
/* set the extern bit */
len |= 0x40;
lens[1] = (byte) len;
if (offsets) {
rec_offs_set_nth_extern(
offsets, i);
}
}
lens--;
} else {
/* short fields cannot be external */
ut_ad(!flag_external);
}
} else {
/* short fields cannot be external */
ut_ad(!flag_external);
}
}
}
/***************************************************************
Sets TRUE the extern storage bits of fields mentioned in an array. */
void
rec_set_field_extern_bits(
/*======================*/
rec_t* rec, /* in: record */
dict_index_t* index, /* in: record descriptor */
ulint* offsets,/* in/out: rec_get_offsets(rec, index),
or NULL */
const ulint* vec, /* in: array of field numbers */
ulint n_fields)/* in: number of fields numbers */
{
ut_ad(!offsets || rec_offs_validate(rec, index, offsets));
if (dict_table_is_comp(index->table)) {
rec_set_field_extern_bits_new(rec, index, offsets,
vec, n_fields);
} else {
ulint i;
ut_a(!rec_get_1byte_offs_flag(rec));
for (i = 0; i < n_fields; i++) {
rec_set_nth_field_extern_bit_old(rec, vec[i]);
if (offsets) {
rec_offs_set_nth_extern(offsets, vec[i]);
}
}
}
}
/***************************************************************
Sets an old-style record field to SQL null.
The physical size of the field is not changed. */
......@@ -909,7 +761,10 @@ rec_convert_dtuple_to_rec_old(
/* out: pointer to the origin of
physical record */
byte* buf, /* in: start address of the physical record */
dtuple_t* dtuple)/* in: data tuple */
dtuple_t* dtuple, /* in: data tuple */
const ulint* ext, /* in: array of extern field numbers,
in ascending order */
ulint n_ext) /* in: number of externally stored columns */
{
dfield_t* field;
ulint n_fields;
......@@ -932,7 +787,7 @@ rec_convert_dtuple_to_rec_old(
/* Calculate the offset of the origin in the physical record */
rec = buf + rec_get_converted_extra_size(data_size, n_fields);
rec = buf + rec_get_converted_extra_size(data_size, n_fields, n_ext);
#ifdef UNIV_DEBUG
/* Suppress Valgrind warnings of ut_ad()
in mach_write_to_1(), mach_write_to_2() et al. */
......@@ -949,7 +804,7 @@ rec_convert_dtuple_to_rec_old(
end_offset = 0;
if (data_size <= REC_1BYTE_OFFS_LIMIT) {
if (!n_ext && data_size <= REC_1BYTE_OFFS_LIMIT) {
rec_set_1byte_offs_flag(rec, TRUE);
......@@ -979,9 +834,11 @@ rec_convert_dtuple_to_rec_old(
rec_1_set_field_end_info(rec, i, ored_offset);
}
} else {
ulint j;
rec_set_1byte_offs_flag(rec, FALSE);
for (i = 0; i < n_fields; i++) {
for (i = j = 0; i < n_fields; i++) {
field = dtuple_get_nth_field(dtuple, i);
......@@ -1002,10 +859,17 @@ rec_convert_dtuple_to_rec_old(
end_offset += len;
ored_offset = end_offset;
if (UNIV_UNLIKELY(j < n_ext) && i == ext[j]) {
j++;
ored_offset |= REC_2BYTE_EXTERN_MASK;
}
}
rec_2_set_field_end_info(rec, i, ored_offset);
}
ut_ad(j == n_ext);
}
return(rec);
......@@ -1022,7 +886,10 @@ rec_convert_dtuple_to_rec_new(
of physical record */
byte* buf, /* in: start address of the physical record */
dict_index_t* index, /* in: record descriptor */
dtuple_t* dtuple) /* in: data tuple */
dtuple_t* dtuple, /* in: data tuple */
const ulint* ext, /* in: array of extern field numbers,
in ascending order */
ulint n_ext) /* in: number of elements in ext */
{
dfield_t* field;
dtype_t* type;
......@@ -1032,6 +899,7 @@ rec_convert_dtuple_to_rec_new(
byte* lens;
ulint len;
ulint i;
ulint j;
ulint n_node_ptr_field;
ulint fixed_len;
ulint null_mask = 1;
......@@ -1059,6 +927,7 @@ rec_convert_dtuple_to_rec_new(
case REC_STATUS_SUPREMUM:
ut_ad(n_fields == 1);
n_node_ptr_field = ULINT_UNDEFINED;
ut_d(j = 0);
goto init;
default:
ut_error;
......@@ -1069,7 +938,7 @@ rec_convert_dtuple_to_rec_new(
We must loop over all fields to do this. */
rec += (index->n_nullable + 7) / 8;
for (i = 0; i < n_fields; i++) {
for (i = j = 0; i < n_fields; i++) {
if (UNIV_UNLIKELY(i == n_node_ptr_field)) {
#ifdef UNIV_DEBUG
field = dtuple_get_nth_field(dtuple, i);
......@@ -1100,11 +969,15 @@ rec_convert_dtuple_to_rec_new(
&& (dtype_get_len(type) >= 256
|| dtype_get_mtype(type) == DATA_BLOB)) {
rec++;
} else if (UNIV_UNLIKELY(j < n_ext) && i == ext[j]) {
j++;
rec++;
}
}
}
init:
ut_ad(j == n_ext);
end = rec;
nulls = rec - (REC_N_NEW_EXTRA_BYTES + 1);
lens = nulls - (index->n_nullable + 7) / 8;
......@@ -1116,7 +989,7 @@ rec_convert_dtuple_to_rec_new(
/* Store the data and the offsets */
for (i = 0; i < n_fields; i++) {
for (i = j = 0; i < n_fields; i++) {
field = dtuple_get_nth_field(dtuple, i);
type = dfield_get_type(field);
len = dfield_get_len(field);
......@@ -1157,13 +1030,17 @@ rec_convert_dtuple_to_rec_new(
} else {
ut_ad(len <= dtype_get_len(type)
|| dtype_get_mtype(type) == DATA_BLOB);
if (len < 128
if (UNIV_UNLIKELY(j < n_ext) && i == ext[j]) {
j++;
ut_ad(len < REC_MAX_INDEX_COL_LEN);
*lens-- = (byte) (len >> 8) | 0xc0;
*lens-- = (byte) len;
} else if (len < 128
|| (dtype_get_len(type) < 256
&& dtype_get_mtype(type) != DATA_BLOB)) {
&& dtype_get_mtype(type)
!= DATA_BLOB)) {
*lens-- = (byte) len;
} else {
/* the extern bits will be set later */
ut_ad(len < 16384);
*lens-- = (byte) (len >> 8) | 0x80;
*lens-- = (byte) len;
......@@ -1174,6 +1051,8 @@ rec_convert_dtuple_to_rec_new(
end += len;
}
ut_ad(j == n_ext);
return(rec);
}
......@@ -1189,7 +1068,10 @@ rec_convert_dtuple_to_rec(
byte* buf, /* in: start address of the
physical record */
dict_index_t* index, /* in: record descriptor */
dtuple_t* dtuple) /* in: data tuple */
dtuple_t* dtuple, /* in: data tuple */
const ulint* ext, /* in: array of extern field numbers,
in ascending order */
ulint n_ext) /* in: number of elements in ext */
{
rec_t* rec;
......@@ -1198,9 +1080,10 @@ rec_convert_dtuple_to_rec(
ut_ad(dtuple_check_typed(dtuple));
if (dict_table_is_comp(index->table)) {
rec = rec_convert_dtuple_to_rec_new(buf, index, dtuple);
rec = rec_convert_dtuple_to_rec_new(buf, index, dtuple,
ext, n_ext);
} else {
rec = rec_convert_dtuple_to_rec_old(buf, dtuple);
rec = rec_convert_dtuple_to_rec_old(buf, dtuple, ext, n_ext);
}
#ifdef UNIV_DEBUG
......
......@@ -1275,7 +1275,7 @@ row_upd_store_row(
&node->ext, node->heap);
if (UNIV_LIKELY_NULL(node->ext)) {
node->ext_vec = mem_heap_alloc(node->heap, sizeof(ulint)
* node->ext->n_ext);
* 2 * node->ext->n_ext);
node->n_ext_vec = btr_push_update_extern_fields(
node->ext_vec, offsets,
node->is_delete ? NULL : node->update);
......
......@@ -1404,34 +1404,29 @@ trx_undo_prev_version_build(
}
if (row_upd_changes_field_size_or_external(index, offsets, update)) {
ulint* ext_vect;
ulint n_ext_vect;
ulint* ext;
ulint n_ext;
/* We have to set the appropriate extern storage bits in the
old version of the record: the extern bits in rec for those
fields that update does NOT update, as well as the the bits for
those fields that update updates to become externally stored
fields. Store the info to ext_vect: */
fields. Store the info to ext: */
ext_vect = mem_alloc(sizeof(ulint)
ext = mem_alloc(sizeof(ulint) * 2
* rec_offs_n_fields(offsets));
n_ext_vect = btr_push_update_extern_fields(ext_vect, offsets,
update);
n_ext = btr_push_update_extern_fields(ext, offsets, update);
entry = row_rec_to_index_entry(ROW_COPY_DATA, index, rec,
heap);
row_upd_index_replace_new_col_vals(entry, index, update, heap);
buf = mem_heap_alloc(heap,
rec_get_converted_size(index, entry));
buf = mem_heap_alloc(heap, rec_get_converted_size(
index, entry, ext, n_ext));
*old_vers = rec_convert_dtuple_to_rec(buf, index, entry);
*old_vers = rec_convert_dtuple_to_rec(buf, index, entry,
ext, n_ext);
/* Now set the extern bits in the old version of the record */
if (n_ext_vect) {
rec_set_field_extern_bits(*old_vers, index, NULL,
ext_vect, n_ext_vect);
}
mem_free(ext_vect);
mem_free(ext);
} else {
buf = mem_heap_alloc(heap, rec_offs_size(offsets));
*old_vers = rec_copy(buf, rec, offsets);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment