Commit 986b3d5a authored by marko's avatar marko

branches/zip: Fix a critical bug in fast index creation that could

corrupt the created indexes.

row_merge(): Make "half" an in/out parameter. Determine the offset of
half the output file. Copy the last blocks record-by-record instead of
block-by-block, so that the records can be counted. Check that the
input and output have matching n_rec.

row_merge_sort(): Do not assume that two blocks of size N are merged
into a block of size 2*N. The output block can be shorter than the
input if the last page of each input block is almost empty. Use an
accurate termination condition, based on the "half" computed by
row_merge().

row_merge_read(), row_merge_write(), row_merge_blocks(): Add debug output.

merge_file_t, row_merge_file_create(): Add n_rec, the number of records
in the merge file.

row_merge_read_clustered_index(): Update n_rec.

row_merge_blocks(): Update and check n_rec.

row_merge_blocks_copy(): New function, for copying the last blocks in
row_merge().  Update and check n_rec.

This bug was discovered with a user-supplied test case that creates an
index where the initial temporary file is 249 one-megabyte blocks and
the merged files become smaller. In the test, possible merge record
sizes are 10, 18, and 26 bytes.

rb://150 approved by Sunny Bains.  This addresses Issue #320.
parent 716e0769
2009-08-27 The InnoDB Team
* row/row0merge.c:
Fix a bug in the merge sort that can corrupt indexes in fast index
creation. Add some consistency checks. Check that the number of
records remains constant in every merge sort pass.
2009-08-27 The InnoDB Team
* buf/buf0buf.c, buf/buf0lru.c, buf/buf0rea.c,
......
......@@ -60,9 +60,19 @@ Completed by Sunny Bains and Marko Makela
#ifdef UNIV_DEBUG
/** Set these in order ot enable debug printout. */
/* @{ */
/** Log the outcome of each row_merge_cmp() call, comparing records. */
static ibool row_merge_print_cmp;
/** Log each record read from temporary file. */
static ibool row_merge_print_read;
/** Log each record write to temporary file. */
static ibool row_merge_print_write;
/** Log each row_merge_blocks() call, merging two blocks of records to
a bigger one. */
static ibool row_merge_print_block;
/** Log each block read from temporary file. */
static ibool row_merge_print_block_read;
/** Log each block read from temporary file. */
static ibool row_merge_print_block_write;
/* @} */
#endif /* UNIV_DEBUG */
......@@ -110,7 +120,8 @@ typedef struct row_merge_buf_struct row_merge_buf_t;
/** Information about temporary files used in merge sort */
struct merge_file_struct {
int fd; /*!< file descriptor */
ulint offset; /*!< file offset */
ulint offset; /*!< file offset (end of file) */
ib_uint64_t n_rec; /*!< number of records in the file */
};
/** Information about temporary files used in merge sort */
......@@ -682,6 +693,13 @@ row_merge_read(
ib_uint64_t ofs = ((ib_uint64_t) offset) * sizeof *buf;
ibool success;
#ifdef UNIV_DEBUG
if (row_merge_print_block_read) {
fprintf(stderr, "row_merge_read fd=%d ofs=%lu\n",
fd, (ulong) offset);
}
#endif /* UNIV_DEBUG */
success = os_file_read_no_error_handling(OS_FILE_FROM_FD(fd), buf,
(ulint) (ofs & 0xFFFFFFFF),
(ulint) (ofs >> 32),
......@@ -709,6 +727,13 @@ row_merge_write(
ib_uint64_t ofs = ((ib_uint64_t) offset)
* sizeof(row_merge_block_t);
#ifdef UNIV_DEBUG
if (row_merge_print_block_write) {
fprintf(stderr, "row_merge_write fd=%d ofs=%lu\n",
fd, (ulong) offset);
}
#endif /* UNIV_DEBUG */
return(UNIV_LIKELY(os_file_write("(merge)", OS_FILE_FROM_FD(fd), buf,
(ulint) (ofs & 0xFFFFFFFF),
(ulint) (ofs >> 32),
......@@ -718,7 +743,7 @@ row_merge_write(
/********************************************************************//**
Read a merge record.
@return pointer to next record, or NULL on I/O error or end of list */
static
static __attribute__((nonnull))
const byte*
row_merge_read_rec(
/*===============*/
......@@ -1070,7 +1095,7 @@ row_merge_cmp(
Reads clustered index of the table and create temporary files
containing the index entries for the indexes to be built.
@return DB_SUCCESS or error */
static
static __attribute__((nonnull))
ulint
row_merge_read_clustered_index(
/*===========================*/
......@@ -1233,6 +1258,7 @@ row_merge_read_clustered_index(
if (UNIV_LIKELY
(row && row_merge_buf_add(buf, row, ext))) {
file->n_rec++;
continue;
}
......@@ -1274,15 +1300,20 @@ row_merge_read_clustered_index(
UNIV_MEM_INVALID(block[0], sizeof block[0]);
merge_buf[i] = row_merge_buf_empty(buf);
/* Try writing the record again, now that
the buffer has been written out and emptied. */
if (UNIV_LIKELY(row != NULL)) {
/* Try writing the record again, now
that the buffer has been written out
and emptied. */
if (UNIV_UNLIKELY
(row && !row_merge_buf_add(buf, row, ext))) {
(!row_merge_buf_add(buf, row, ext))) {
/* An empty buffer should have enough
room for at least one record. */
ut_error;
}
file->n_rec++;
}
}
mem_heap_empty(row_heap);
......@@ -1320,7 +1351,7 @@ row_merge_read_clustered_index(
b2 = row_merge_write_rec(&block[2], &buf[2], b2, \
of->fd, &of->offset, \
mrec##N, offsets##N); \
if (UNIV_UNLIKELY(!b2)) { \
if (UNIV_UNLIKELY(!b2 || ++of->n_rec > file->n_rec)) { \
goto corrupt; \
} \
b##N = row_merge_read_rec(&block[N], &buf[N], \
......@@ -1336,14 +1367,14 @@ row_merge_read_clustered_index(
} while (0)
/*************************************************************//**
Merge two blocks of linked lists on disk and write a bigger block.
Merge two blocks of records on disk and write a bigger block.
@return DB_SUCCESS or error code */
static
ulint
row_merge_blocks(
/*=============*/
const dict_index_t* index, /*!< in: index being created */
merge_file_t* file, /*!< in/out: file containing
const merge_file_t* file, /*!< in: file containing
index entries */
row_merge_block_t* block, /*!< in/out: 3 buffers */
ulint* foffs0, /*!< in/out: offset of first
......@@ -1366,6 +1397,17 @@ row_merge_blocks(
ulint* offsets0;/* offsets of mrec0 */
ulint* offsets1;/* offsets of mrec1 */
#ifdef UNIV_DEBUG
if (row_merge_print_block) {
fprintf(stderr,
"row_merge_blocks fd=%d ofs=%lu + fd=%d ofs=%lu"
" = fd=%d ofs=%lu\n",
file->fd, (ulong) *foffs0,
file->fd, (ulong) *foffs1,
of->fd, (ulong) of->offset);
}
#endif /* UNIV_DEBUG */
heap = row_merge_heap_create(index, &offsets0, &offsets1);
/* Write a record and read the next record. Split the output
......@@ -1437,17 +1479,88 @@ row_merge_blocks(
return(b2 ? DB_SUCCESS : DB_CORRUPTION);
}
/*************************************************************//**
Copy a block of index entries.
@return TRUE on success, FALSE on failure */
static __attribute__((nonnull))
ibool
row_merge_blocks_copy(
/*==================*/
const dict_index_t* index, /*!< in: index being created */
const merge_file_t* file, /*!< in: input file */
row_merge_block_t* block, /*!< in/out: 3 buffers */
ulint* foffs0, /*!< in/out: input file offset */
merge_file_t* of) /*!< in/out: output file */
{
mem_heap_t* heap; /*!< memory heap for offsets0, offsets1 */
mrec_buf_t buf[3]; /*!< buffer for handling
split mrec in block[] */
const byte* b0; /*!< pointer to block[0] */
byte* b2; /*!< pointer to block[2] */
const mrec_t* mrec0; /*!< merge rec, points to block[0] */
ulint* offsets0;/* offsets of mrec0 */
ulint* offsets1;/* dummy offsets */
#ifdef UNIV_DEBUG
if (row_merge_print_block) {
fprintf(stderr,
"row_merge_blocks_copy fd=%d ofs=%lu"
" = fd=%d ofs=%lu\n",
file->fd, (ulong) foffs0,
of->fd, (ulong) of->offset);
}
#endif /* UNIV_DEBUG */
heap = row_merge_heap_create(index, &offsets0, &offsets1);
/* Write a record and read the next record. Split the output
file in two halves, which can be merged on the following pass. */
if (!row_merge_read(file->fd, *foffs0, &block[0])) {
corrupt:
mem_heap_free(heap);
return(FALSE);
}
b0 = block[0];
b2 = block[2];
b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
foffs0, &mrec0, offsets0);
if (UNIV_UNLIKELY(!b0 && mrec0)) {
goto corrupt;
}
if (mrec0) {
/* append all mrec0 to output */
for (;;) {
ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
}
}
done0:
/* The file offset points to the beginning of the last page
that has been read. Update it to point to the next block. */
(*foffs0)++;
mem_heap_free(heap);
return(row_merge_write_eof(&block[2], b2, of->fd, &of->offset)
!= NULL);
}
/*************************************************************//**
Merge disk files.
@return DB_SUCCESS or error code */
static
static __attribute__((nonnull))
ulint
row_merge(
/*======*/
const dict_index_t* index, /*!< in: index being created */
merge_file_t* file, /*!< in/out: file containing
index entries */
ulint half, /*!< in: half the file */
ulint* half, /*!< in/out: half the file */
row_merge_block_t* block, /*!< in/out: 3 buffers */
int* tmpfd, /*!< in/out: temporary file handle */
TABLE* table) /*!< in/out: MySQL table, for
......@@ -1458,43 +1571,76 @@ row_merge(
ulint foffs1; /*!< second input offset */
ulint error; /*!< error code */
merge_file_t of; /*!< output file */
const ulint ihalf = *half;
/*!< half the input file */
ulint ohalf; /*!< half the output file */
UNIV_MEM_ASSERT_W(block[0], 3 * sizeof block[0]);
ut_ad(half > 0);
ut_ad(ihalf > 0);
ut_ad(ihalf < file->offset);
of.fd = *tmpfd;
of.offset = 0;
of.n_rec = 0;
/* Merge blocks to the output file. */
ohalf = 0;
foffs0 = 0;
foffs1 = half;
foffs1 = ihalf;
for (; foffs0 < ihalf && foffs1 < file->offset; foffs0++, foffs1++) {
ulint ahalf; /*!< arithmetic half the input file */
for (; foffs0 < half && foffs1 < file->offset; foffs0++, foffs1++) {
error = row_merge_blocks(index, file, block,
&foffs0, &foffs1, &of, table);
if (error != DB_SUCCESS) {
return(error);
}
/* Record the offset of the output file when
approximately half the output has been generated. In
this way, the next invocation of row_merge() will
spend most of the time in this loop. The initial
estimate is ohalf==0. */
ahalf = file->offset / 2;
ut_ad(ohalf <= of.offset);
/* Improve the estimate until reaching half the input
file size, or we can not get any closer to it. All
comparands should be non-negative when !(ohalf < ahalf)
because ohalf <= of.offset. */
if (ohalf < ahalf || of.offset - ahalf < ohalf - ahalf) {
ohalf = of.offset;
}
}
/* Copy the last block, if there is one. */
while (foffs0 < half) {
if (!row_merge_read(file->fd, foffs0++, block)
|| !row_merge_write(of.fd, of.offset++, block)) {
/* Copy the last blocks, if there are any. */
while (foffs0 < ihalf) {
if (!row_merge_blocks_copy(index, file, block, &foffs0, &of)) {
return(DB_CORRUPTION);
}
}
ut_ad(foffs0 == ihalf);
while (foffs1 < file->offset) {
if (!row_merge_read(file->fd, foffs1++, block)
|| !row_merge_write(of.fd, of.offset++, block)) {
if (!row_merge_blocks_copy(index, file, block, &foffs1, &of)) {
return(DB_CORRUPTION);
}
}
ut_ad(foffs1 == file->offset);
if (UNIV_UNLIKELY(of.n_rec != file->n_rec)) {
return(DB_CORRUPTION);
}
/* Swap file descriptors for the next pass. */
*tmpfd = file->fd;
*file = of;
*half = ohalf;
UNIV_MEM_INVALID(block[0], 3 * sizeof block[0]);
......@@ -1517,20 +1663,17 @@ row_merge_sort(
reporting erroneous key value
if applicable */
{
ulint blksz; /*!< block size */
ulint half = file->offset / 2;
for (blksz = 1; blksz < file->offset; blksz *= 2) {
ulint half;
do {
ulint error;
ut_ad(ut_is_2pow(blksz));
half = ut_2pow_round((file->offset + (blksz - 1)) / 2, blksz);
error = row_merge(index, file, half, block, tmpfd, table);
error = row_merge(index, file, &half, block, tmpfd, table);
if (error != DB_SUCCESS) {
return(error);
}
}
} while (half < file->offset && half > 0);
return(DB_SUCCESS);
}
......@@ -1909,6 +2052,7 @@ row_merge_file_create(
{
merge_file->fd = innobase_mysql_tmpfile();
merge_file->offset = 0;
merge_file->n_rec = 0;
}
/*********************************************************************//**
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment