Commit b52a30e0 authored by marko's avatar marko

branches/zip: When storing a longer prefix of an externally stored column

to the undo log, also store the original length of the column, so that the
changes will be correctly undone in transaction rollback or when fetching
previous versions of the row.

innodb-zip.test: New file, for tests of the compression.

upd_field_t: Add orig_len, the original length of new_val.

btr_push_update_extern_fields(): Restore the original prefix of the column.
Add the parameter heap where memory will be allocated if necessary.

trx_undo_rec_get_col_val(): Add the output parameter orig_len.

trx_undo_page_report_modify_ext(): New function: Write an externally
stored column to the undo log.  This is only called from
trx_undo_page_report_modify(), and this is the only caller of
trx_undo_page_fetch_ext().

trx_undo_update_rec_get_update(): Read the original length of the column
prefix to upd_field->orig_len.
parent 5ee41a15
......@@ -2168,7 +2168,7 @@ btr_cur_pessimistic_update(
ut_ad(!page_is_comp(page) || !rec_get_node_ptr_flag(rec));
offsets = rec_get_offsets(rec, index, offsets, ULINT_UNDEFINED, heap);
n_ext += btr_push_update_extern_fields(new_entry, update);
n_ext += btr_push_update_extern_fields(new_entry, update, *heap);
if (page_zip_rec_needs_ext(rec_get_converted_size(index, new_entry,
n_ext),
......@@ -3555,7 +3555,8 @@ btr_push_update_extern_fields(
/*==========================*/
/* out: number of flagged external columns */
dtuple_t* tuple, /* in/out: data tuple */
const upd_t* update) /* in: update vector */
const upd_t* update, /* in: update vector */
mem_heap_t* heap) /* in: memory heap */
{
ulint n_pushed = 0;
ulint n;
......@@ -3576,6 +3577,48 @@ btr_push_update_extern_fields(
dfield_set_ext(field);
n_pushed++;
}
switch (uf->orig_len) {
byte* data;
ulint len;
byte* buf;
case 0:
break;
case BTR_EXTERN_FIELD_REF_SIZE:
/* Restore the original locally stored
part of the column. In the undo log,
InnoDB writes a longer prefix of externally
stored columns, so that column prefixes
in secondary indexes can be reconstructed. */
dfield_set_data(field, dfield_get_data(field)
+ dfield_get_len(field)
- BTR_EXTERN_FIELD_REF_SIZE,
BTR_EXTERN_FIELD_REF_SIZE);
dfield_set_ext(field);
break;
default:
/* Reconstruct the original locally
stored part of the column. The data
will have to be copied. */
ut_a(uf->orig_len > BTR_EXTERN_FIELD_REF_SIZE);
data = dfield_get_data(field);
len = dfield_get_len(field);
buf = mem_heap_alloc(heap, uf->orig_len);
/* Copy the locally stored prefix. */
memcpy(buf, data,
uf->orig_len
- BTR_EXTERN_FIELD_REF_SIZE);
/* Copy the BLOB pointer. */
memcpy(buf + uf->orig_len
- BTR_EXTERN_FIELD_REF_SIZE,
data + len - BTR_EXTERN_FIELD_REF_SIZE,
BTR_EXTERN_FIELD_REF_SIZE);
dfield_set_data(field, buf, uf->orig_len);
dfield_set_ext(field);
}
}
}
......
......@@ -3817,6 +3817,7 @@ calc_row_difference(
}
ufield->exp = NULL;
ufield->orig_len = 0;
ufield->field_no = dict_col_get_clust_pos(
&prebuilt->table->cols[i], clust_index);
n_changed++;
......
......@@ -569,7 +569,8 @@ btr_push_update_extern_fields(
/*==========================*/
/* out: number of flagged external columns */
dtuple_t* tuple, /* in/out: data tuple */
const upd_t* update) /* in: update vector */
const upd_t* update, /* in: update vector */
mem_heap_t* heap) /* in: memory heap */
__attribute__((nonnull));
/*######################################################################*/
......
......@@ -342,11 +342,14 @@ row_upd_index_parse(
/* Update vector field */
struct upd_field_struct{
ulint field_no; /* field number in an index, usually
unsigned field_no:16; /* field number in an index, usually
the clustered index, but in updating
a secondary index record in btr0cur.c
this is the position in the secondary
index */
unsigned orig_len:16; /* original length of the locally
stored part of an externally stored
column, or 0 */
que_node_t* exp; /* expression for calculating a new
value: it refers to column values and
constants in the symbol table of the
......
......@@ -81,6 +81,7 @@ upd_field_set_field_no(
trx_t* trx) /* in: transaction */
{
upd_field->field_no = field_no;
upd_field->orig_len = 0;
if (UNIV_UNLIKELY(field_no >= dict_index_get_n_fields(index))) {
fprintf(stderr,
......@@ -93,7 +94,7 @@ upd_field_set_field_no(
}
dict_col_copy_type(dict_index_get_nth_col(index, field_no),
(dtype_t*) dfield_get_type(&(upd_field->new_val)));
dfield_get_type(&upd_field->new_val));
}
/*************************************************************************
......
--innodb_file_per_table
\ No newline at end of file
create table t1(a int not null, b text, index(b(10))) engine=innodb
key_block_size=1;
insert into t1 values (1,1);
commit;
begin;
update t1 set b=repeat('B',100);
select a,left(b,40),b=1 is_equal from t1;
a left(b,40) is_equal
1 1 1
rollback;
select a,left(b,40),b=1 is_equal from t1;
a left(b,40) is_equal
1 1 1
drop table t1;
-- source include/have_innodb.inc
create table t1(a int not null, b text, index(b(10))) engine=innodb
key_block_size=1;
let $b=`select '1abcdefghijklmnopqrstuvwxyz'+repeat('A',5000)`;
eval insert into t1 values (1,$b);
commit;
connect (a,localhost,root,,);
connect (b,localhost,root,,);
connection a;
begin;
update t1 set b=repeat('B',100);
connection b;
eval select a,left(b,40),b=$b is_equal from t1;
connection a;
rollback;
connection b;
eval select a,left(b,40),b=$b is_equal from t1;
connection default;
disconnect a;
disconnect b;
drop table t1;
......@@ -1005,12 +1005,14 @@ row_ins_foreign_check_on_constraint(
update->n_fields = foreign->n_fields;
for (i = 0; i < foreign->n_fields; i++) {
(update->fields + i)->field_no
= dict_table_get_nth_col_pos(
upd_field_t* ufield = &update->fields[i];
ufield->field_no = dict_table_get_nth_col_pos(
table,
dict_index_get_nth_col_no(index, i));
(update->fields + i)->exp = NULL;
dfield_set_null(&update->fields[i].new_val);
ufield->orig_len = 0;
ufield->exp = NULL;
dfield_set_null(&ufield->new_val);
}
}
......
......@@ -649,16 +649,19 @@ row_upd_index_parse(
update->info_bits = info_bits;
for (i = 0; i < n_fields; i++) {
ulint field_no;
upd_field = upd_get_nth_field(update, i);
new_val = &(upd_field->new_val);
ptr = mach_parse_compressed(ptr, end_ptr,
&(upd_field->field_no));
ptr = mach_parse_compressed(ptr, end_ptr, &field_no);
if (ptr == NULL) {
return(NULL);
}
upd_field->field_no = field_no;
ptr = mach_parse_compressed(ptr, end_ptr, &len);
if (ptr == NULL) {
......
......@@ -311,16 +311,38 @@ trx_undo_rec_get_col_val(
reading these values */
byte* ptr, /* in: pointer to remaining part of undo log record */
byte** field, /* out: pointer to stored field */
ulint* len) /* out: length of the field, or UNIV_SQL_NULL */
ulint* len, /* out: length of the field, or UNIV_SQL_NULL */
ulint* orig_len)/* out: original length of the locally
stored part of an externally stored column, or 0 */
{
*len = mach_read_compressed(ptr);
ptr += mach_get_compressed_size(*len);
*orig_len = 0;
switch (*len) {
case UNIV_SQL_NULL:
*field = NULL;
break;
case UNIV_EXTERN_STORAGE_FIELD:
*orig_len = mach_read_compressed(ptr);
ptr += mach_get_compressed_size(*orig_len);
*len = mach_read_compressed(ptr);
ptr += mach_get_compressed_size(*len);
*field = ptr;
ptr += *len;
if (*len != UNIV_SQL_NULL) {
ut_ad(*orig_len >= BTR_EXTERN_FIELD_REF_SIZE);
ut_ad(*len > *orig_len);
ut_ad(*len >= REC_MAX_INDEX_COL_LEN
+ BTR_EXTERN_FIELD_REF_SIZE);
*len += UNIV_EXTERN_STORAGE_FIELD;
break;
default:
*field = ptr;
if (*len >= UNIV_EXTERN_STORAGE_FIELD) {
ptr += (*len - UNIV_EXTERN_STORAGE_FIELD);
ptr += *len - UNIV_EXTERN_STORAGE_FIELD;
} else {
ptr += *len;
}
......@@ -348,9 +370,6 @@ trx_undo_rec_get_row_ref(
mem_heap_t* heap) /* in: memory heap from which the memory
needed is allocated */
{
dfield_t* dfield;
byte* field;
ulint len;
ulint ref_len;
ulint i;
......@@ -364,9 +383,14 @@ trx_undo_rec_get_row_ref(
dict_index_copy_types(*ref, index, ref_len);
for (i = 0; i < ref_len; i++) {
dfield_t* dfield;
byte* field;
ulint len;
ulint orig_len;
dfield = dtuple_get_nth_field(*ref, i);
ptr = trx_undo_rec_get_col_val(ptr, &field, &len);
ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
dfield_set_data(dfield, field, len);
}
......@@ -386,8 +410,6 @@ trx_undo_rec_skip_row_ref(
record, at the start of the row reference */
dict_index_t* index) /* in: clustered index */
{
byte* field;
ulint len;
ulint ref_len;
ulint i;
......@@ -397,7 +419,11 @@ trx_undo_rec_skip_row_ref(
ref_len = dict_index_get_n_unique(index);
for (i = 0; i < ref_len; i++) {
ptr = trx_undo_rec_get_col_val(ptr, &field, &len);
byte* field;
ulint len;
ulint orig_len;
ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
}
return(ptr);
......@@ -433,6 +459,47 @@ trx_undo_page_fetch_ext(
return(ext_buf);
}
/**************************************************************************
Writes to the undo log a prefix of an externally stored column. */
static
byte*
trx_undo_page_report_modify_ext(
/*============================*/
/* out: undo log position */
byte* ptr, /* in: undo log position,
at least 15 bytes must be available */
byte* ext_buf, /* in: a buffer of
REC_MAX_INDEX_COL_LEN
+ BTR_EXTERN_FIELD_REF_SIZE,
or NULL when should not fetch
a longer prefix */
ulint zip_size, /* compressed page size in bytes,
or 0 for uncompressed BLOB */
const byte** field, /* in/out: the locally stored part of
the externally stored column */
ulint* len) /* in/out: length of field, in bytes */
{
if (ext_buf) {
/* If an ordering column is externally stored, we will
have to store a longer prefix of the field. In this
case, write to the log a marker followed by the
original length and the real length of the field. */
ptr += mach_write_compressed(ptr, UNIV_EXTERN_STORAGE_FIELD);
ptr += mach_write_compressed(ptr, *len);
*field = trx_undo_page_fetch_ext(ext_buf, zip_size,
*field, len);
ptr += mach_write_compressed(ptr, *len);
} else {
ptr += mach_write_compressed(ptr, UNIV_EXTERN_STORAGE_FIELD
+ *len);
}
return(ptr);
}
/**************************************************************************
Reports in the undo log of an update or delete marking of a clustered index
record. */
......@@ -586,30 +653,20 @@ trx_undo_page_report_modify(
/* Save the old value of field */
field = rec_get_nth_field(rec, offsets, pos, &flen);
if (trx_undo_left(undo_page, ptr) < 5) {
if (trx_undo_left(undo_page, ptr) < 15) {
return(0);
}
if (rec_offs_nth_extern(offsets, pos)) {
/* If an ordering field has external
storage, we will store a longer
prefix of the field. */
if (dict_index_get_nth_col(index,
pos)->ord_part) {
field = trx_undo_page_fetch_ext(
ext_buf,
dict_table_zip_size(table),
field, &flen);
}
/* If a field has external storage, we add
to flen the flag */
ptr += mach_write_compressed(
ptr = trx_undo_page_report_modify_ext(
ptr,
UNIV_EXTERN_STORAGE_FIELD + flen);
dict_index_get_nth_col(index, pos)
->ord_part
&& flen < REC_MAX_INDEX_COL_LEN
? ext_buf : NULL,
dict_table_zip_size(table),
&field, &flen);
/* Notify purge that it eventually has to
free the old externally stored field */
......@@ -672,7 +729,7 @@ trx_undo_page_report_modify(
ulint pos;
/* Write field number to undo log */
if (trx_undo_left(undo_page, ptr) < 5 + 5) {
if (trx_undo_left(undo_page, ptr) < 5 + 15) {
return(0);
}
......@@ -686,21 +743,12 @@ trx_undo_page_report_modify(
&flen);
if (rec_offs_nth_extern(offsets, pos)) {
/* If an ordering field has external
storage, we will store a longer
prefix of the field. */
field = trx_undo_page_fetch_ext(
ext_buf,
ptr = trx_undo_page_report_modify_ext(
ptr,
flen < REC_MAX_INDEX_COL_LEN
? ext_buf : NULL,
dict_table_zip_size(table),
field, &flen);
/* If a field has external
storage, we add to flen the flag */
ptr += mach_write_compressed(
ptr, flen
+ UNIV_EXTERN_STORAGE_FIELD);
&field, &flen);
} else {
ptr += mach_write_compressed(
ptr, flen);
......@@ -841,9 +889,6 @@ trx_undo_update_rec_get_update(
upd_t* update;
ulint n_fields;
byte* buf;
byte* field;
ulint len;
ulint field_no;
ulint i;
ut_a(dict_index_is_clust(index));
......@@ -882,6 +927,11 @@ trx_undo_update_rec_get_update(
for (i = 0; i < n_fields; i++) {
byte* field;
ulint len;
ulint field_no;
ulint orig_len;
ptr = trx_undo_update_rec_get_field_no(ptr, &field_no);
if (field_no >= dict_index_get_n_fields(index)) {
......@@ -903,19 +953,23 @@ trx_undo_update_rec_get_update(
return(NULL);
}
ptr = trx_undo_rec_get_col_val(ptr, &field, &len);
upd_field = upd_get_nth_field(update, i);
upd_field_set_field_no(upd_field, field_no, index, trx);
if (len != UNIV_SQL_NULL && len >= UNIV_EXTERN_STORAGE_FIELD) {
ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
len -= UNIV_EXTERN_STORAGE_FIELD;
upd_field->orig_len = orig_len;
if (len == UNIV_SQL_NULL) {
dfield_set_null(&upd_field->new_val);
} else if (len < UNIV_EXTERN_STORAGE_FIELD) {
dfield_set_data(&upd_field->new_val, field, len);
dfield_set_ext(&upd_field->new_val);
} else {
len -= UNIV_EXTERN_STORAGE_FIELD;
dfield_set_data(&upd_field->new_val, field, len);
dfield_set_ext(&upd_field->new_val);
}
}
......@@ -970,13 +1024,14 @@ trx_undo_rec_get_partial_row(
const dict_col_t* col;
ulint col_no;
ulint len;
ulint orig_len;
ptr = trx_undo_update_rec_get_field_no(ptr, &field_no);
col = dict_index_get_nth_col(index, field_no);
col_no = dict_col_get_no(col);
ptr = trx_undo_rec_get_col_val(ptr, &field, &len);
ptr = trx_undo_rec_get_col_val(ptr, &field, &len, &orig_len);
dfield = dtuple_get_nth_field(*row, col_no);
......@@ -1466,7 +1521,7 @@ trx_undo_prev_version_build(
entry = row_rec_to_index_entry(ROW_COPY_DATA, rec, index,
offsets, &n_ext, heap);
n_ext += btr_push_update_extern_fields(entry, update);
n_ext += btr_push_update_extern_fields(entry, update, heap);
/* The page containing the clustered index record
corresponding to entry is latched in mtr. Thus the
following call is safe. */
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment