Commit 5e51ce91 authored by unknown's avatar unknown

Merge desktop.sanja.is.com.ua:/home/bell/mysql/bk/mysql-maria

into  desktop.sanja.is.com.ua:/home/bell/mysql/bk/work-maria-callback


storage/maria/Makefile.am:
  Auto merged
storage/maria/ma_blockrec.c:
  Auto merged
storage/maria/ma_check.c:
  Auto merged
storage/maria/ma_create.c:
  Auto merged
storage/maria/ma_loghandler.h:
  Auto merged
storage/maria/ma_open.c:
  Auto merged
storage/maria/ma_page.c:
  Auto merged
storage/maria/ma_pagecache.c:
  Auto merged
storage/maria/maria_chk.c:
  Auto merged
storage/maria/maria_def.h:
  Auto merged
storage/maria/unittest/ma_test_loghandler-t.c:
  Auto merged
storage/maria/unittest/ma_test_loghandler_first_lsn-t.c:
  Auto merged
storage/maria/unittest/ma_test_loghandler_max_lsn-t.c:
  Auto merged
storage/maria/unittest/ma_test_loghandler_multigroup-t.c:
  Auto merged
storage/maria/unittest/ma_test_loghandler_multithread-t.c:
  Auto merged
storage/maria/unittest/ma_test_loghandler_noflush-t.c:
  Auto merged
storage/maria/unittest/ma_test_loghandler_pagecache-t.c:
  Auto merged
storage/maria/unittest/ma_test_loghandler_purge-t.c:
  Auto merged
storage/maria/ma_bitmap.c:
  Merge.
storage/maria/ma_loghandler.c:
  Merge.
storage/maria/unittest/Makefile.am:
  Merge.
parents ff512a81 fce0e6a0
...@@ -122,7 +122,8 @@ libmaria_a_SOURCES = ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c \ ...@@ -122,7 +122,8 @@ libmaria_a_SOURCES = ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c \
ma_rt_index.c ma_rt_key.c ma_rt_mbr.c ma_rt_split.c \ ma_rt_index.c ma_rt_key.c ma_rt_mbr.c ma_rt_split.c \
ma_sp_key.c ma_control_file.c ma_loghandler.c \ ma_sp_key.c ma_control_file.c ma_loghandler.c \
ma_pagecache.c ma_pagecaches.c \ ma_pagecache.c ma_pagecaches.c \
ma_checkpoint.c ma_recovery.c ma_commit.c ma_checkpoint.c ma_recovery.c ma_commit.c \
ma_pagecrc.c
CLEANFILES = test?.MA? FT?.MA? isam.log ma_test_all ma_rt_test.MA? sp_test.MA? CLEANFILES = test?.MA? FT?.MA? isam.log ma_test_all ma_rt_test.MA? sp_test.MA?
SUFFIXES = .sh SUFFIXES = .sh
......
...@@ -127,11 +127,6 @@ ...@@ -127,11 +127,6 @@
#define FULL_HEAD_PAGE 4 #define FULL_HEAD_PAGE 4
#define FULL_TAIL_PAGE 7 #define FULL_TAIL_PAGE 7
/* If we don't have page checksum enabled, the bitmap page ends with this */
uchar maria_bitmap_marker[4]=
{(uchar) 255, (uchar) 255, (uchar) 255, (uchar) 254};
uchar maria_normal_page_marker[4]=
{(uchar) 255, (uchar) 255, (uchar) 255, (uchar) 255};
/*#define WRONG_BITMAP_FLUSH 1*/ /*define only for provoking bugs*/ /*#define WRONG_BITMAP_FLUSH 1*/ /*define only for provoking bugs*/
#undef WRONG_BITMAP_FLUSH #undef WRONG_BITMAP_FLUSH
...@@ -147,6 +142,7 @@ static inline my_bool write_changed_bitmap(MARIA_SHARE *share, ...@@ -147,6 +142,7 @@ static inline my_bool write_changed_bitmap(MARIA_SHARE *share,
{ {
DBUG_ENTER("write_changed_bitmap"); DBUG_ENTER("write_changed_bitmap");
DBUG_ASSERT(share->pagecache->block_size == bitmap->block_size); DBUG_ASSERT(share->pagecache->block_size == bitmap->block_size);
DBUG_ASSERT(bitmap->file.write_callback != 0);
DBUG_PRINT("info", ("bitmap->non_flushable: %u", bitmap->non_flushable)); DBUG_PRINT("info", ("bitmap->non_flushable: %u", bitmap->non_flushable));
if ((bitmap->non_flushable == 0) if ((bitmap->non_flushable == 0)
#ifdef WRONG_BITMAP_FLUSH #ifdef WRONG_BITMAP_FLUSH
...@@ -223,7 +219,11 @@ my_bool _ma_bitmap_init(MARIA_SHARE *share, File file) ...@@ -223,7 +219,11 @@ my_bool _ma_bitmap_init(MARIA_SHARE *share, File file)
bitmap->file.file= file; bitmap->file.file= file;
bitmap->block_size= share->block_size; bitmap->block_size= share->block_size;
/* Size needs to be alligned on 6 */ pagecache_file_init(bitmap->file, &maria_page_crc_check_bitmap,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_bitmap), share);
/* Size needs to be aligned on 6 */
aligned_bit_blocks= (share->block_size - PAGE_SUFFIX_SIZE) / 6; aligned_bit_blocks= (share->block_size - PAGE_SUFFIX_SIZE) / 6;
bitmap->total_size= aligned_bit_blocks * 6; bitmap->total_size= aligned_bit_blocks * 6;
/* /*
...@@ -438,8 +438,6 @@ void _ma_bitmap_delete_all(MARIA_SHARE *share) ...@@ -438,8 +438,6 @@ void _ma_bitmap_delete_all(MARIA_SHARE *share)
if (bitmap->map) /* Not in create */ if (bitmap->map) /* Not in create */
{ {
bzero(bitmap->map, bitmap->block_size); bzero(bitmap->map, bitmap->block_size);
memcpy(bitmap->map + bitmap->block_size - sizeof(maria_bitmap_marker),
maria_bitmap_marker, sizeof(maria_bitmap_marker));
bitmap->changed= 1; bitmap->changed= 1;
bitmap->page= 0; bitmap->page= 0;
bitmap->used_size= bitmap->total_size; bitmap->used_size= bitmap->total_size;
...@@ -764,8 +762,6 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share, ...@@ -764,8 +762,6 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
*/ */
share->state.state.data_file_length= end_of_page; share->state.state.data_file_length= end_of_page;
bzero(bitmap->map, bitmap->block_size); bzero(bitmap->map, bitmap->block_size);
memcpy(bitmap->map + bitmap->block_size - sizeof(maria_bitmap_marker),
maria_bitmap_marker, sizeof(maria_bitmap_marker));
bitmap->used_size= 0; bitmap->used_size= 0;
#ifndef DBUG_OFF #ifndef DBUG_OFF
memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size); memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size);
...@@ -775,7 +771,7 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share, ...@@ -775,7 +771,7 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
bitmap->used_size= bitmap->total_size; bitmap->used_size= bitmap->total_size;
DBUG_ASSERT(share->pagecache->block_size == bitmap->block_size); DBUG_ASSERT(share->pagecache->block_size == bitmap->block_size);
res= pagecache_read(share->pagecache, res= pagecache_read(share->pagecache,
(PAGECACHE_FILE*)&bitmap->file, page, 0, &bitmap->file, page, 0,
(uchar*) bitmap->map, (uchar*) bitmap->map,
PAGECACHE_PLAIN_PAGE, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0) == NULL; PAGECACHE_LOCK_LEFT_UNLOCKED, 0) == NULL;
...@@ -789,15 +785,6 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share, ...@@ -789,15 +785,6 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
when running without any checksums. when running without any checksums.
*/ */
if (!res && !(share->options & HA_OPTION_PAGE_CHECKSUM) &&
!memcmp(bitmap->map + bitmap->block_size -
sizeof(maria_normal_page_marker),
maria_normal_page_marker,
sizeof(maria_normal_page_marker)))
{
res= 1;
my_errno= HA_ERR_WRONG_IN_RECORD; /* File crashed */
}
#ifndef DBUG_OFF #ifndef DBUG_OFF
if (!res) if (!res)
memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size); memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size);
...@@ -2478,17 +2465,18 @@ int _ma_bitmap_create_first(MARIA_SHARE *share) ...@@ -2478,17 +2465,18 @@ int _ma_bitmap_create_first(MARIA_SHARE *share)
{ {
uint block_size= share->bitmap.block_size; uint block_size= share->bitmap.block_size;
File file= share->bitmap.file.file; File file= share->bitmap.file.file;
char marker[sizeof(maria_bitmap_marker)]; char marker[CRC_SIZE];
if (share->options & HA_OPTION_PAGE_CHECKSUM) /*
bzero(marker, sizeof(marker)); Next write operation of the page will write correct CRC
else if it is needed
bmove(marker, maria_bitmap_marker, sizeof(marker)); */
int4store(marker, MARIA_NO_CRC_BITMAP_PAGE);
if (my_chsize(file, block_size - sizeof(maria_bitmap_marker), if (my_chsize(file, block_size - sizeof(marker),
0, MYF(MY_WME)) || 0, MYF(MY_WME)) ||
my_pwrite(file, marker, sizeof(maria_bitmap_marker), my_pwrite(file, marker, sizeof(marker),
block_size - sizeof(maria_bitmap_marker), block_size - sizeof(marker),
MYF(MY_NABP | MY_WME))) MYF(MY_NABP | MY_WME)))
return 1; return 1;
share->state.state.data_file_length= block_size; share->state.state.data_file_length= block_size;
......
...@@ -1242,9 +1242,7 @@ static void make_empty_page(MARIA_HA *info, uchar *buff, uint page_type) ...@@ -1242,9 +1242,7 @@ static void make_empty_page(MARIA_HA *info, uchar *buff, uint page_type)
buff[DIR_FREE_OFFSET]= END_OF_DIR_FREE_LIST; buff[DIR_FREE_OFFSET]= END_OF_DIR_FREE_LIST;
int2store(buff + block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE, int2store(buff + block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE,
PAGE_HEADER_SIZE); PAGE_HEADER_SIZE);
if (!(info->s->options & HA_OPTION_PAGE_CHECKSUM))
bfill(buff + block_size - KEYPAGE_CHECKSUM_SIZE,
KEYPAGE_CHECKSUM_SIZE, (uchar) 255);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -1555,10 +1553,6 @@ static my_bool write_full_pages(MARIA_HA *info, ...@@ -1555,10 +1553,6 @@ static my_bool write_full_pages(MARIA_HA *info,
(data_size - copy_length) + PAGE_SUFFIX_SIZE); (data_size - copy_length) + PAGE_SUFFIX_SIZE);
#endif #endif
if (!(info->s->options & HA_OPTION_PAGE_CHECKSUM))
bfill(buff + block_size - KEYPAGE_CHECKSUM_SIZE,
KEYPAGE_CHECKSUM_SIZE, (uchar) 255);
DBUG_ASSERT(share->pagecache->block_size == block_size); DBUG_ASSERT(share->pagecache->block_size == block_size);
if (pagecache_write(share->pagecache, if (pagecache_write(share->pagecache,
&info->dfile, page, 0, &info->dfile, page, 0,
......
...@@ -2727,6 +2727,20 @@ int maria_sort_index(HA_CHECK *param, register MARIA_HA *info, char *name) ...@@ -2727,6 +2727,20 @@ int maria_sort_index(HA_CHECK *param, register MARIA_HA *info, char *name)
} /* maria_sort_index */ } /* maria_sort_index */
/**
@brief put CRC on the page
@param buff reference on the page buffer.
@param pos position of the page in the file.
@param length length of the page
*/
static void put_crc(char *buff, my_off_t pos, MARIA_SHARE *share)
{
maria_page_crc_set_index(buff, pos / share->block_size, (uchar*) share);
}
/* Sort records recursive using one index */ /* Sort records recursive using one index */
static int sort_one_index(HA_CHECK *param, MARIA_HA *info, static int sort_one_index(HA_CHECK *param, MARIA_HA *info,
...@@ -2804,6 +2818,7 @@ static int sort_one_index(HA_CHECK *param, MARIA_HA *info, ...@@ -2804,6 +2818,7 @@ static int sort_one_index(HA_CHECK *param, MARIA_HA *info,
/* Fill block with zero and write it to the new index file */ /* Fill block with zero and write it to the new index file */
length= _ma_get_page_used(share, buff); length= _ma_get_page_used(share, buff);
bzero((uchar*) buff+length,keyinfo->block_length-length); bzero((uchar*) buff+length,keyinfo->block_length-length);
put_crc(buff, new_page_pos, share);
if (my_pwrite(new_file,(uchar*) buff,(uint) keyinfo->block_length, if (my_pwrite(new_file,(uchar*) buff,(uint) keyinfo->block_length,
new_page_pos,MYF(MY_NABP | MY_WAIT_IF_FULL))) new_page_pos,MYF(MY_NABP | MY_WAIT_IF_FULL)))
{ {
...@@ -4862,9 +4877,13 @@ static int sort_insert_key(MARIA_SORT_PARAM *sort_param, ...@@ -4862,9 +4877,13 @@ static int sort_insert_key(MARIA_SORT_PARAM *sort_param,
DFLT_INIT_HITS, anc_buff)) DFLT_INIT_HITS, anc_buff))
DBUG_RETURN(1); DBUG_RETURN(1);
} }
else if (my_pwrite(share->kfile.file, anc_buff, else
(uint) keyinfo->block_length,filepos, param->myf_rw)) {
put_crc(anc_buff, filepos, share);
if (my_pwrite(share->kfile.file, anc_buff,
(uint) keyinfo->block_length, filepos, param->myf_rw))
DBUG_RETURN(1); DBUG_RETURN(1);
}
DBUG_DUMP("buff", anc_buff, _ma_get_page_used(share, anc_buff)); DBUG_DUMP("buff", anc_buff, _ma_get_page_used(share, anc_buff));
/* Write separator-key to block in next level */ /* Write separator-key to block in next level */
...@@ -4982,9 +5001,13 @@ int _ma_flush_pending_blocks(MARIA_SORT_PARAM *sort_param) ...@@ -4982,9 +5001,13 @@ int _ma_flush_pending_blocks(MARIA_SORT_PARAM *sort_param)
DFLT_INIT_HITS, key_block->buff)) DFLT_INIT_HITS, key_block->buff))
DBUG_RETURN(1); DBUG_RETURN(1);
} }
else if (my_pwrite(info->s->kfile.file, key_block->buff, else
{
put_crc(key_block->buff, filepos, info->s);
if (my_pwrite(info->s->kfile.file, key_block->buff,
(uint) keyinfo->block_length,filepos, myf_rw)) (uint) keyinfo->block_length,filepos, myf_rw))
DBUG_RETURN(1); DBUG_RETURN(1);
}
DBUG_DUMP("buff",key_block->buff,length); DBUG_DUMP("buff",key_block->buff,length);
nod_flag=1; nod_flag=1;
} }
...@@ -5571,6 +5594,14 @@ my_bool create_new_data_handle(MARIA_SORT_PARAM *param, File new_file) ...@@ -5571,6 +5594,14 @@ my_bool create_new_data_handle(MARIA_SORT_PARAM *param, File new_file)
DBUG_RETURN(1); DBUG_RETURN(1);
new_info= sort_info->new_info; new_info= sort_info->new_info;
pagecache_file_init(new_info->s->bitmap.file, &maria_page_crc_check_bitmap,
(new_info->s->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_bitmap), new_info->s);
pagecache_file_init(new_info->dfile, &maria_page_crc_check_data,
(new_info->s->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_normal), new_info->s);
change_data_file_descriptor(new_info, new_file); change_data_file_descriptor(new_info, new_file);
maria_lock_database(new_info, F_EXTRA_LCK); maria_lock_database(new_info, F_EXTRA_LCK);
if ((sort_info->param->testflag & T_UNPACK) && if ((sort_info->param->testflag & T_UNPACK) &&
......
...@@ -1049,6 +1049,10 @@ int maria_create(const char *name, enum data_file_type datafile_type, ...@@ -1049,6 +1049,10 @@ int maria_create(const char *name, enum data_file_type datafile_type,
DROP+CREATE happened (applying REDOs to the wrong table). DROP+CREATE happened (applying REDOs to the wrong table).
*/ */
share.kfile.file= file; share.kfile.file= file;
pagecache_file_init(share.kfile, &maria_page_crc_check_index,
(share.options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_index :
&maria_page_filler_set_normal), &share);
if (_ma_update_create_rename_lsn_sub(&share, lsn, FALSE)) if (_ma_update_create_rename_lsn_sub(&share, lsn, FALSE))
goto err; goto err;
my_free(log_data, MYF(0)); my_free(log_data, MYF(0));
...@@ -1241,6 +1245,10 @@ int _ma_initialize_data_file(MARIA_SHARE *share, File dfile) ...@@ -1241,6 +1245,10 @@ int _ma_initialize_data_file(MARIA_SHARE *share, File dfile)
{ {
share->bitmap.block_size= share->base.block_size; share->bitmap.block_size= share->base.block_size;
share->bitmap.file.file = dfile; share->bitmap.file.file = dfile;
pagecache_file_init(share->bitmap.file, &maria_page_crc_check_bitmap,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_bitmap), share);
return _ma_bitmap_create_first(share); return _ma_bitmap_create_first(share);
} }
/* /*
......
...@@ -28,6 +28,21 @@ ...@@ -28,6 +28,21 @@
/* number of opened log files in the pagecache (should be at least 2) */ /* number of opened log files in the pagecache (should be at least 2) */
#define OPENED_FILES_NUM 3 #define OPENED_FILES_NUM 3
#define CACHED_FILES_NUM 5
#define CACHED_FILES_NUM_DIRECT_SEARCH_LIMIT 7
#if CACHED_FILES_NUM > CACHED_FILES_NUM_DIRECT_SEARCH_LIMIT
#include <hash.h>
#include <m_ctype.h>
#endif
/* transaction log file descriptor */
typedef struct st_translog_file
{
uint32 number;
PAGECACHE_FILE handler;
my_bool was_recovered;
my_bool is_sync;
} TRANSLOG_FILE;
/* records buffer size (should be TRANSLOG_PAGE_SIZE * n) */ /* records buffer size (should be TRANSLOG_PAGE_SIZE * n) */
#define TRANSLOG_WRITE_BUFFER (1024*1024) #define TRANSLOG_WRITE_BUFFER (1024*1024)
...@@ -72,7 +87,7 @@ struct st_translog_buffer ...@@ -72,7 +87,7 @@ struct st_translog_buffer
*/ */
translog_size_t size; translog_size_t size;
/* File handler for this buffer */ /* File handler for this buffer */
File file; TRANSLOG_FILE *file;
/* Threads which are waiting for buffer filling/freeing */ /* Threads which are waiting for buffer filling/freeing */
pthread_cond_t waiting_filling_buffer; pthread_cond_t waiting_filling_buffer;
/* Number of records which are in copy progress */ /* Number of records which are in copy progress */
...@@ -181,8 +196,13 @@ struct st_translog_descriptor ...@@ -181,8 +196,13 @@ struct st_translog_descriptor
char directory[FN_REFLEN]; char directory[FN_REFLEN];
/* *** Current state of the log handler *** */ /* *** Current state of the log handler *** */
/* Current and (OPENED_FILES_NUM-1) last logs number in the page cache */ /* list of opened files */
File log_file_num[OPENED_FILES_NUM]; DYNAMIC_ARRAY open_files;
/* min/max number of file in the array */
uint32 max_file, min_file;
/* the opened files list guard */
rw_lock_t open_files_lock;
/* /*
File descriptor of the directory where we store log files for syncing File descriptor of the directory where we store log files for syncing
it. it.
...@@ -274,9 +294,16 @@ static MARIA_SHARE **id_to_share= NULL; ...@@ -274,9 +294,16 @@ static MARIA_SHARE **id_to_share= NULL;
/* lock for id_to_share */ /* lock for id_to_share */
static my_atomic_rwlock_t LOCK_id_to_share; static my_atomic_rwlock_t LOCK_id_to_share;
static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr); static my_bool translog_dummy_callback(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
static my_bool translog_page_validator(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
static my_bool translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner); static my_bool translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner);
static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected);
/* /*
Initialize log_record_type_descriptors Initialize log_record_type_descriptors
...@@ -789,6 +816,53 @@ static File open_logfile_by_number_no_cache(uint32 file_no) ...@@ -789,6 +816,53 @@ static File open_logfile_by_number_no_cache(uint32 file_no)
} }
/**
@brief get file descriptor by given number using cache
@param file_no Number of the log we want to open
retval # file descriptor
*/
static TRANSLOG_FILE *get_logfile_by_number(uint32 file_no)
{
TRANSLOG_FILE *file;
DBUG_ENTER("get_logfile_by_number");
rw_rdlock(&log_descriptor.open_files_lock);
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
DBUG_ASSERT(log_descriptor.max_file >= file_no);
DBUG_ASSERT(log_descriptor.min_file <= file_no);
DBUG_ASSERT(log_descriptor.max_file - file_no <
log_descriptor.open_files.elements);
file= *dynamic_element(&log_descriptor.open_files,
log_descriptor.max_file - file_no, TRANSLOG_FILE **);
rw_unlock(&log_descriptor.open_files_lock);
DBUG_PRINT("info", ("File 0x%lx File no: %lu, File handler: %d",
(ulong)file, (ulong)file_no,
(file ? file->handler.file : -1)));
DBUG_ASSERT(!file || file->number == file_no);
DBUG_RETURN(file);
}
/**
@brief get current file descriptor
retval # file descriptor
*/
static TRANSLOG_FILE *get_current_logfile()
{
TRANSLOG_FILE *file;
rw_rdlock(&log_descriptor.open_files_lock);
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
file= *dynamic_element(&log_descriptor.open_files, 0, TRANSLOG_FILE **);
rw_unlock(&log_descriptor.open_files_lock);
return (file);
}
uchar NEAR maria_trans_file_magic[]= uchar NEAR maria_trans_file_magic[]=
{ (uchar) 254, (uchar) 254, (uchar) 11, '\001', 'M', 'A', 'R', 'I', 'A', { (uchar) 254, (uchar) 254, (uchar) 11, '\001', 'M', 'A', 'R', 'I', 'A',
'L', 'O', 'G' }; 'L', 'O', 'G' };
...@@ -813,8 +887,10 @@ uchar NEAR maria_trans_file_magic[]= ...@@ -813,8 +887,10 @@ uchar NEAR maria_trans_file_magic[]=
static my_bool translog_write_file_header() static my_bool translog_write_file_header()
{ {
TRANSLOG_FILE *file;
ulonglong timestamp; ulonglong timestamp;
uchar page_buff[TRANSLOG_PAGE_SIZE], *page= page_buff; uchar page_buff[TRANSLOG_PAGE_SIZE], *page= page_buff;
my_bool rc;
DBUG_ENTER("translog_write_file_header"); DBUG_ENTER("translog_write_file_header");
/* file tag */ /* file tag */
...@@ -843,8 +919,16 @@ static my_bool translog_write_file_header() ...@@ -843,8 +919,16 @@ static my_bool translog_write_file_header()
page+= LSN_STORE_SIZE; page+= LSN_STORE_SIZE;
memset(page, TRANSLOG_FILLER, sizeof(page_buff) - (page- page_buff)); memset(page, TRANSLOG_FILLER, sizeof(page_buff) - (page- page_buff));
DBUG_RETURN(my_pwrite(log_descriptor.log_file_num[0], page_buff, file= get_current_logfile();
sizeof(page_buff), 0, log_write_flags) != 0); rc= my_pwrite(file->handler.file, page_buff, sizeof(page_buff), 0,
log_write_flags) != 0;
/*
Dropping the flag in such way can make false alarm: signalling than the
file in not sync when it is sync, but the situation is quite rare and
protections with mutexes give much more overhead to the whole engine
*/
file->is_sync= 0;
DBUG_RETURN(rc);
} }
/* /*
...@@ -931,6 +1015,15 @@ my_bool translog_read_file_header(LOGHANDLER_FILE_INFO *desc, File file) ...@@ -931,6 +1015,15 @@ my_bool translog_read_file_header(LOGHANDLER_FILE_INFO *desc, File file)
desc->file_number= uint3korr(ptr); desc->file_number= uint3korr(ptr);
ptr+=3; ptr+=3;
desc->max_lsn= lsn_korr(ptr); desc->max_lsn= lsn_korr(ptr);
DBUG_PRINT("info", ("timestamp: %llu maria ver: %lu mysql ver: %lu "
"server id %lu page size %u file number %lu "
"max lsn: (%lu,0x%lx)",
(ulonglong) desc->timestamp,
(ulong) desc->maria_version,
(ulong) desc->mysql_version,
(ulong) desc->server_id,
desc->page_size, (ulong) desc->file_number,
LSN_IN_PARTS(desc->max_lsn)));
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1167,7 +1260,7 @@ LSN translog_get_file_max_lsn_stored(uint32 file) ...@@ -1167,7 +1260,7 @@ LSN translog_get_file_max_lsn_stored(uint32 file)
DBUG_PRINT("error", ("Can't read file header")); DBUG_PRINT("error", ("Can't read file header"));
DBUG_RETURN(LSN_ERROR); DBUG_RETURN(LSN_ERROR);
} }
DBUG_PRINT("error", ("Max lsn: (%lu,0x%lx)", DBUG_PRINT("info", ("Max lsn: (%lu,0x%lx)",
LSN_IN_PARTS(info.max_lsn))); LSN_IN_PARTS(info.max_lsn)));
DBUG_RETURN(info.max_lsn); DBUG_RETURN(info.max_lsn);
} }
...@@ -1190,7 +1283,7 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer) ...@@ -1190,7 +1283,7 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer)
DBUG_ENTER("translog_buffer_init"); DBUG_ENTER("translog_buffer_init");
buffer->last_lsn= LSN_IMPOSSIBLE; buffer->last_lsn= LSN_IMPOSSIBLE;
/* This Buffer File */ /* This Buffer File */
buffer->file= -1; buffer->file= NULL;
buffer->overlay= 0; buffer->overlay= 0;
/* cache for current log */ /* cache for current log */
memset(buffer->buffer, TRANSLOG_FILLER, TRANSLOG_WRITE_BUFFER); memset(buffer->buffer, TRANSLOG_FILLER, TRANSLOG_WRITE_BUFFER);
...@@ -1211,33 +1304,51 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer) ...@@ -1211,33 +1304,51 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer)
/* /*
Close transaction log file by descriptor @brief close transaction log file by descriptor
SYNOPSIS @param file pagegecache file descriptor reference
translog_close_log_file()
file file descriptor
RETURN @return Operation status
0 OK @retval 0 OK
1 Error @retval 1 Error
*/ */
static my_bool translog_close_log_file(File file) static my_bool translog_close_log_file(TRANSLOG_FILE *file)
{ {
int rc; int rc= 0;
PAGECACHE_FILE fl; flush_pagecache_blocks(log_descriptor.pagecache, &file->handler,
fl.file= file; FLUSH_RELEASE);
flush_pagecache_blocks(log_descriptor.pagecache, &fl, FLUSH_RELEASE);
/* /*
Sync file when we close it Sync file when we close it
TODO: sync only we have changed the log TODO: sync only we have changed the log
*/ */
rc= my_sync(file, MYF(MY_WME)); if (!file->is_sync)
rc|= my_close(file, MYF(MY_WME)); rc= my_sync(file->handler.file, MYF(MY_WME));
rc|= my_close(file->handler.file, MYF(MY_WME));
my_free(file, MYF(0));
return test(rc); return test(rc);
} }
/**
@brief Initializes TRANSLOG_FILE structure
@param file reference on the file to initialize
@param number file number
@param is_sync is file synced on disk
*/
static void translog_file_init(TRANSLOG_FILE *file, uint32 number,
my_bool is_sync)
{
pagecache_file_init(file->handler, &translog_page_validator,
&translog_dummy_callback, file);
file->number= number;
file->was_recovered= 0;
file->is_sync= is_sync;
}
/** /**
@brief Create and fill header of new file. @brief Create and fill header of new file.
...@@ -1252,47 +1363,57 @@ static my_bool translog_close_log_file(File file) ...@@ -1252,47 +1363,57 @@ static my_bool translog_close_log_file(File file)
static my_bool translog_create_new_file() static my_bool translog_create_new_file()
{ {
int i; TRANSLOG_FILE *file= (TRANSLOG_FILE*)my_malloc(sizeof(TRANSLOG_FILE),
MYF(0));
TRANSLOG_FILE *old= get_current_logfile();
uint32 file_no= LSN_FILE_NO(log_descriptor.horizon); uint32 file_no= LSN_FILE_NO(log_descriptor.horizon);
DBUG_ENTER("translog_create_new_file"); DBUG_ENTER("translog_create_new_file");
if (file == NULL)
goto error;
/* /*
Writes max_lsn to the file header before finishing it (there is no need Writes max_lsn to the file header before finishing it (there is no need
to lock file header buffer because it is still unfinished file, so only to lock file header buffer because it is still unfinished file, so only
one thread can finish the file and nobody interested of LSN of current one thread can finish the file and nobody interested of LSN of current
(unfinished) file, because no one can purge it). (unfinished) file, because no one can purge it).
*/ */
if (translog_max_lsn_to_header(log_descriptor.log_file_num[0], if (translog_max_lsn_to_header(old->handler.file, log_descriptor.max_lsn))
log_descriptor.max_lsn)) goto error;
{
translog_stop_writing(); rw_wrlock(&log_descriptor.open_files_lock);
DBUG_RETURN(1); DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
} log_descriptor.open_files.elements);
DBUG_ASSERT(file_no == log_descriptor.max_file + 1);
log_descriptor.max_lsn= LSN_IMPOSSIBLE; if (allocate_dynamic(&log_descriptor.open_files,
log_descriptor.max_file - log_descriptor.min_file + 2))
if (log_descriptor.log_file_num[OPENED_FILES_NUM - 1] != -1 && goto error_lock;
translog_close_log_file(log_descriptor.log_file_num[OPENED_FILES_NUM - if ((file->handler.file=
1])) create_logfile_by_number_no_cache(file_no)) == -1)
{ goto error_lock;
translog_stop_writing(); translog_file_init(file, file_no, 0);
/*
This should not be possible because we have not writing something /* this call just expand the array */
after last sync insert_dynamic(&log_descriptor.open_files, (uchar*)&file);
*/ log_descriptor.max_file++;
DBUG_ASSERT(0); {
DBUG_RETURN(1); char *start= (char*) dynamic_element(&log_descriptor.open_files, 0,
} TRANSLOG_FILE**);
for (i= OPENED_FILES_NUM - 1; i > 0; i--) memmove(start + sizeof(TRANSLOG_FILE*), start,
log_descriptor.log_file_num[i]= log_descriptor.log_file_num[i - 1]; sizeof(TRANSLOG_FILE*) *
(log_descriptor.max_file - log_descriptor.min_file + 1 - 1));
if ((log_descriptor.log_file_num[0]= }
create_logfile_by_number_no_cache(file_no)) == -1 || /* can't fail we because we expanded array */
translog_write_file_header()) set_dynamic(&log_descriptor.open_files, (uchar*)&file, 0);
{ DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
translog_stop_writing(); log_descriptor.open_files.elements);
rw_unlock(&log_descriptor.open_files_lock);
DBUG_PRINT("info", ("file_no: %lu", (ulong)file_no));
if (translog_write_file_header())
DBUG_RETURN(1); DBUG_RETURN(1);
}
if (ma_control_file_write_and_force(LSN_IMPOSSIBLE, file_no, if (ma_control_file_write_and_force(LSN_IMPOSSIBLE, file_no,
CONTROL_FILE_UPDATE_ONLY_LOGNO)) CONTROL_FILE_UPDATE_ONLY_LOGNO))
...@@ -1302,6 +1423,12 @@ static my_bool translog_create_new_file() ...@@ -1302,6 +1423,12 @@ static my_bool translog_create_new_file()
} }
DBUG_RETURN(0); DBUG_RETURN(0);
error_lock:
rw_unlock(&log_descriptor.open_files_lock);
error:
translog_stop_writing();
DBUG_RETURN(1);
} }
...@@ -1393,7 +1520,7 @@ static void translog_new_page_header(TRANSLOG_ADDRESS *horizon, ...@@ -1393,7 +1520,7 @@ static void translog_new_page_header(TRANSLOG_ADDRESS *horizon,
int4store(ptr, 0x11223344); int4store(ptr, 0x11223344);
#endif #endif
/* CRC will be put when page is finished */ /* CRC will be put when page is finished */
ptr+= CRC_LENGTH; ptr+= CRC_SIZE;
} }
if (log_descriptor.flags & TRANSLOG_SECTOR_PROTECTION) if (log_descriptor.flags & TRANSLOG_SECTOR_PROTECTION)
{ {
...@@ -1600,7 +1727,7 @@ static void translog_wait_for_writers(struct st_translog_buffer *buffer) ...@@ -1600,7 +1727,7 @@ static void translog_wait_for_writers(struct st_translog_buffer *buffer)
{ {
DBUG_PRINT("info", ("wait for writers... buffer: #%u 0x%lx", DBUG_PRINT("info", ("wait for writers... buffer: #%u 0x%lx",
(uint) buffer->buffer_no, (ulong) buffer)); (uint) buffer->buffer_no, (ulong) buffer));
DBUG_ASSERT(buffer->file != -1); DBUG_ASSERT(buffer->file != NULL);
pthread_cond_wait(&buffer->waiting_filling_buffer, &buffer->mutex); pthread_cond_wait(&buffer->waiting_filling_buffer, &buffer->mutex);
DBUG_PRINT("info", ("wait for writers done buffer: #%u 0x%lx", DBUG_PRINT("info", ("wait for writers done buffer: #%u 0x%lx",
(uint) buffer->buffer_no, (ulong) buffer)); (uint) buffer->buffer_no, (ulong) buffer));
...@@ -1626,14 +1753,15 @@ static void translog_wait_for_buffer_free(struct st_translog_buffer *buffer) ...@@ -1626,14 +1753,15 @@ static void translog_wait_for_buffer_free(struct st_translog_buffer *buffer)
{ {
DBUG_ENTER("translog_wait_for_buffer_free"); DBUG_ENTER("translog_wait_for_buffer_free");
DBUG_PRINT("enter", ("Buffer: #%u 0x%lx copies in progress: %u " DBUG_PRINT("enter", ("Buffer: #%u 0x%lx copies in progress: %u "
"File: %d size: 0x%lu", "File: %d size: %lu",
(uint) buffer->buffer_no, (ulong) buffer, (uint) buffer->buffer_no, (ulong) buffer,
(int) buffer->copy_to_buffer_in_progress, (int) buffer->copy_to_buffer_in_progress,
buffer->file, (ulong) buffer->size)); (buffer->file ? buffer->file->handler.file : -1),
(ulong) buffer->size));
translog_wait_for_writers(buffer); translog_wait_for_writers(buffer);
while (buffer->file != -1) while (buffer->file != NULL)
{ {
DBUG_PRINT("info", ("wait for writers... buffer: #%u 0x%lx", DBUG_PRINT("info", ("wait for writers... buffer: #%u 0x%lx",
(uint) buffer->buffer_no, (ulong) buffer)); (uint) buffer->buffer_no, (ulong) buffer));
...@@ -1687,20 +1815,22 @@ static void translog_start_buffer(struct st_translog_buffer *buffer, ...@@ -1687,20 +1815,22 @@ static void translog_start_buffer(struct st_translog_buffer *buffer,
{ {
DBUG_ENTER("translog_start_buffer"); DBUG_ENTER("translog_start_buffer");
DBUG_PRINT("enter", DBUG_PRINT("enter",
("Assign buffer: #%u (0x%lx) to file: %d offset: 0x%lx(%lu)", ("Assign buffer: #%u (0x%lx) offset: 0x%lx(%lu)",
(uint) buffer->buffer_no, (ulong) buffer, (uint) buffer->buffer_no, (ulong) buffer,
log_descriptor.log_file_num[0],
(ulong) LSN_OFFSET(log_descriptor.horizon), (ulong) LSN_OFFSET(log_descriptor.horizon),
(ulong) LSN_OFFSET(log_descriptor.horizon))); (ulong) LSN_OFFSET(log_descriptor.horizon)));
DBUG_ASSERT(buffer_no == buffer->buffer_no); DBUG_ASSERT(buffer_no == buffer->buffer_no);
buffer->last_lsn= LSN_IMPOSSIBLE; buffer->last_lsn= LSN_IMPOSSIBLE;
buffer->offset= log_descriptor.horizon; buffer->offset= log_descriptor.horizon;
buffer->next_buffer_offset= LSN_IMPOSSIBLE; buffer->next_buffer_offset= LSN_IMPOSSIBLE;
buffer->file= log_descriptor.log_file_num[0]; buffer->file= get_current_logfile();
buffer->overlay= 0; buffer->overlay= 0;
buffer->size= 0; buffer->size= 0;
translog_cursor_init(cursor, buffer, buffer_no); translog_cursor_init(cursor, buffer, buffer_no);
DBUG_PRINT("info", ("init cursor #%u: 0x%lx chaser: %d Size: %lu (%lu)", DBUG_PRINT("info", ("file: #%ld (%d) init cursor #%u: 0x%lx "
"chaser: %d Size: %lu (%lu)",
(long) (buffer->file ? buffer->file->number : 0),
(buffer->file ? buffer->file->handler.file : -1),
(uint) cursor->buffer->buffer_no, (ulong) cursor->buffer, (uint) cursor->buffer->buffer_no, (ulong) cursor->buffer,
cursor->chaser, (ulong) cursor->buffer->size, cursor->chaser, (ulong) cursor->buffer->size,
(ulong) (cursor->ptr - cursor->buffer->buffer))); (ulong) (cursor->ptr - cursor->buffer->buffer)));
...@@ -1747,7 +1877,7 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon, ...@@ -1747,7 +1877,7 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
translog_wait_for_buffer_free(new_buffer); translog_wait_for_buffer_free(new_buffer);
} }
else else
DBUG_ASSERT(new_buffer->file != 0); DBUG_ASSERT(new_buffer->file != NULL);
if (new_file) if (new_file)
{ {
...@@ -2084,17 +2214,17 @@ static uint16 translog_get_total_chunk_length(uchar *page, uint16 offset) ...@@ -2084,17 +2214,17 @@ static uint16 translog_get_total_chunk_length(uchar *page, uint16 offset)
static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
{ {
uint32 i, pg; uint32 i, pg;
PAGECACHE_FILE file; TRANSLOG_FILE *file;
DBUG_ENTER("translog_buffer_flush"); DBUG_ENTER("translog_buffer_flush");
DBUG_ASSERT(buffer->file != NULL);
DBUG_PRINT("enter", DBUG_PRINT("enter",
("Buffer: #%u 0x%lx file: %d offset: (%lu,0x%lx) size: %lu", ("Buffer: #%u 0x%lx file: %d offset: (%lu,0x%lx) size: %lu",
(uint) buffer->buffer_no, (ulong) buffer, (uint) buffer->buffer_no, (ulong) buffer,
buffer->file, buffer->file->handler.file,
LSN_IN_PARTS(buffer->offset), LSN_IN_PARTS(buffer->offset),
(ulong) buffer->size)); (ulong) buffer->size));
translog_buffer_lock_assert_owner(buffer); translog_buffer_lock_assert_owner(buffer);
DBUG_ASSERT(buffer->file != -1);
translog_wait_for_writers(buffer); translog_wait_for_writers(buffer);
...@@ -2108,11 +2238,11 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2108,11 +2238,11 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
*/ */
struct st_translog_buffer *overlay= buffer->overlay; struct st_translog_buffer *overlay= buffer->overlay;
TRANSLOG_ADDRESS buffer_offset= buffer->offset; TRANSLOG_ADDRESS buffer_offset= buffer->offset;
File file= buffer->file; TRANSLOG_FILE *fl= buffer->file;
translog_buffer_unlock(buffer); translog_buffer_unlock(buffer);
translog_buffer_lock(overlay); translog_buffer_lock(overlay);
/* rechecks under mutex protection that overlay is still our overlay */ /* rechecks under mutex protection that overlay is still our overlay */
if (buffer->overlay->file == file && if (buffer->overlay->file == fl &&
cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size, cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size,
buffer_offset) > 0) buffer_offset) > 0)
{ {
...@@ -2120,7 +2250,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2120,7 +2250,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
} }
translog_buffer_unlock(overlay); translog_buffer_unlock(overlay);
translog_buffer_lock(buffer); translog_buffer_lock(buffer);
if (buffer->file != -1 && buffer_offset == buffer->offset) if (buffer->file != NULL && buffer_offset == buffer->offset)
{ {
/* /*
This means that somebody else flushed the buffer while we was This means that somebody else flushed the buffer while we was
...@@ -2149,13 +2279,12 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2149,13 +2279,12 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
if (translog_status != TRANSLOG_OK && translog_status != TRANSLOG_SHUTDOWN) if (translog_status != TRANSLOG_OK && translog_status != TRANSLOG_SHUTDOWN)
DBUG_RETURN(1); DBUG_RETURN(1);
if (pagecache_inject(log_descriptor.pagecache, if (pagecache_inject(log_descriptor.pagecache,
&file, pg, 3, &file->handler, pg, 3,
buffer->buffer + i, buffer->buffer + i,
PAGECACHE_PLAIN_PAGE, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, PAGECACHE_LOCK_LEFT_UNLOCKED,
PAGECACHE_PIN_LEFT_UNPINNED, 0, PAGECACHE_PIN_LEFT_UNPINNED, 0,
LSN_IMPOSSIBLE, LSN_IMPOSSIBLE))
&translog_page_validator, (uchar*) &data))
{ {
DBUG_PRINT("error", ("Can't write page (%lu,0x%lx) to pagecache", DBUG_PRINT("error", ("Can't write page (%lu,0x%lx) to pagecache",
(ulong) buffer->file, (ulong) buffer->file,
...@@ -2164,18 +2293,25 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2164,18 +2293,25 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
DBUG_RETURN(1); DBUG_RETURN(1);
} }
} }
if (my_pwrite(buffer->file, (char*) buffer->buffer, file->is_sync= 0;
if (my_pwrite(file->handler.file, (char*) buffer->buffer,
buffer->size, LSN_OFFSET(buffer->offset), buffer->size, LSN_OFFSET(buffer->offset),
log_write_flags)) log_write_flags))
{ {
DBUG_PRINT("error", ("Can't write buffer (%lu,0x%lx) size %lu " DBUG_PRINT("error", ("Can't write buffer (%lu,0x%lx) size %lu "
"to the disk (%d)", "to the disk (%d)",
(ulong) buffer->file, (ulong) file->handler.file,
(ulong) LSN_OFFSET(buffer->offset), (ulong) LSN_OFFSET(buffer->offset),
(ulong) buffer->size, errno)); (ulong) buffer->size, errno));
translog_stop_writing(); translog_stop_writing();
DBUG_RETURN(1); DBUG_RETURN(1);
} }
/*
Dropping the flag in such way can make false alarm: signalling than the
file in not sync when it is sync, but the situation is quite rare and
protections with mutexes give much more overhead to the whole engine
*/
file->is_sync= 0;
if (LSN_OFFSET(buffer->last_lsn) != 0) /* if buffer->last_lsn is set */ if (LSN_OFFSET(buffer->last_lsn) != 0) /* if buffer->last_lsn is set */
translog_set_sent_to_disk(buffer->last_lsn, translog_set_sent_to_disk(buffer->last_lsn,
...@@ -2183,7 +2319,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2183,7 +2319,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
else else
translog_set_only_in_buffers(buffer->next_buffer_offset); translog_set_only_in_buffers(buffer->next_buffer_offset);
/* Free buffer */ /* Free buffer */
buffer->file= -1; buffer->file= NULL;
buffer->overlay= 0; buffer->overlay= 0;
pthread_cond_broadcast(&buffer->waiting_filling_buffer); pthread_cond_broadcast(&buffer->waiting_filling_buffer);
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -2258,39 +2394,59 @@ static my_bool translog_recover_page_up_to_sector(uchar *page, uint16 offset) ...@@ -2258,39 +2394,59 @@ static my_bool translog_recover_page_up_to_sector(uchar *page, uint16 offset)
} }
/* /**
Log page validator @brief Dummy write callback.
*/
SYNOPSIS static my_bool
translog_page_validator() translog_dummy_callback(__attribute__((unused)) uchar *page,
page_addr The page to check __attribute__((unused)) pgcache_page_no_t page_no,
data data, need for validation (address in this case) __attribute__((unused)) uchar* data_ptr)
{
return 0;
}
RETURN
0 OK /**
1 Error @brief Log page validator (read callback)
@param page The page data to check
@param page_no The page number (<offset>/<page length>)
@param data_ptr Read callback data pointer (pointer to TRANSLOG_FILE)
@todo: add turning loghandler to read-only mode after merging with
that patch.
@retval 0 OK
@retval 1 Error
*/ */
static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr)
static my_bool translog_page_validator(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr)
{ {
uint this_page_page_overhead; uint this_page_page_overhead;
uint flags; uint flags;
uchar *page= (uchar*) page_addr, *page_pos; uchar *page_pos;
TRANSLOG_VALIDATOR_DATA *data= (TRANSLOG_VALIDATOR_DATA *) data_ptr; TRANSLOG_FILE *data= (TRANSLOG_FILE *) data_ptr;
TRANSLOG_ADDRESS addr= *(data->addr); #ifndef DBUG_OFF
uint32 offset= page_no * TRANSLOG_PAGE_SIZE;
#endif
DBUG_ENTER("translog_page_validator"); DBUG_ENTER("translog_page_validator");
data->was_recovered= 0; data->was_recovered= 0;
if (uint3korr(page) != LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE || if (uint3korr(page) != page_no ||
uint3korr(page + 3) != LSN_FILE_NO(addr)) uint3korr(page + 3) != data->number)
{ {
DBUG_PRINT("error", ("Page (%lu,0x%lx): " DBUG_PRINT("error", ("Page (%lu,0x%lx): "
"page address written in the page is incorrect: " "page address written in the page is incorrect: "
"File %lu instead of %lu or page %lu instead of %lu", "File %lu instead of %lu or page %lu instead of %lu",
LSN_IN_PARTS(addr), (ulong) data->number, (ulong) offset,
(ulong) uint3korr(page + 3), (ulong) LSN_FILE_NO(addr), (ulong) uint3korr(page + 3), (ulong) data->number,
(ulong) uint3korr(page), (ulong) uint3korr(page),
(ulong) LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE)); (ulong) page_no));
DBUG_RETURN(1); DBUG_RETURN(1);
} }
flags= (uint)(page[TRANSLOG_PAGE_FLAGS]); flags= (uint)(page[TRANSLOG_PAGE_FLAGS]);
...@@ -2300,7 +2456,8 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr) ...@@ -2300,7 +2456,8 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr)
{ {
DBUG_PRINT("error", ("Page (%lu,0x%lx): " DBUG_PRINT("error", ("Page (%lu,0x%lx): "
"Garbage in the page flags field detected : %x", "Garbage in the page flags field detected : %x",
LSN_IN_PARTS(addr), (uint) flags)); (ulong) data->number, (ulong) offset,
(uint) flags));
DBUG_RETURN(1); DBUG_RETURN(1);
} }
page_pos= page + (3 + 3 + 1); page_pos= page + (3 + 3 + 1);
...@@ -2313,11 +2470,11 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr) ...@@ -2313,11 +2470,11 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr)
{ {
DBUG_PRINT("error", ("Page (%lu,0x%lx): " DBUG_PRINT("error", ("Page (%lu,0x%lx): "
"CRC mismatch: calculated: %lx on the page %lx", "CRC mismatch: calculated: %lx on the page %lx",
LSN_IN_PARTS(addr), (ulong) data->number, (ulong) offset,
(ulong) crc, (ulong) uint4korr(page_pos))); (ulong) crc, (ulong) uint4korr(page_pos)));
DBUG_RETURN(1); DBUG_RETURN(1);
} }
page_pos+= CRC_LENGTH; /* Skip crc */ page_pos+= CRC_SIZE; /* Skip crc */
} }
if (flags & TRANSLOG_SECTOR_PROTECTION) if (flags & TRANSLOG_SECTOR_PROTECTION)
{ {
...@@ -2440,8 +2597,8 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer, ...@@ -2440,8 +2597,8 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
PAGECACHE_BLOCK_LINK **direct_link) PAGECACHE_BLOCK_LINK **direct_link)
{ {
TRANSLOG_ADDRESS addr= *(data->addr), in_buffers; TRANSLOG_ADDRESS addr= *(data->addr), in_buffers;
uint cache_index;
uint32 file_no= LSN_FILE_NO(addr); uint32 file_no= LSN_FILE_NO(addr);
TRANSLOG_FILE *file;
DBUG_ENTER("translog_get_page"); DBUG_ENTER("translog_get_page");
DBUG_PRINT("enter", ("File: %lu Offset: %lu(0x%lx)", DBUG_PRINT("enter", ("File: %lu Offset: %lu(0x%lx)",
(ulong) file_no, (ulong) file_no,
...@@ -2477,7 +2634,7 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer, ...@@ -2477,7 +2634,7 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
if the page is in the buffer and it is the last version of the if the page is in the buffer and it is the last version of the
page (in case of division the page by buffer flush) page (in case of division the page by buffer flush)
*/ */
if (curr_buffer->file != -1 && if (curr_buffer->file != NULL &&
cmp_translog_addr(addr, curr_buffer->offset) >= 0 && cmp_translog_addr(addr, curr_buffer->offset) >= 0 &&
cmp_translog_addr(addr, cmp_translog_addr(addr,
(curr_buffer->next_buffer_offset ? (curr_buffer->next_buffer_offset ?
...@@ -2487,10 +2644,21 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer, ...@@ -2487,10 +2644,21 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
int is_last_unfinished_page; int is_last_unfinished_page;
uint last_protected_sector= 0; uint last_protected_sector= 0;
uchar *from, *table= NULL; uchar *from, *table= NULL;
TRANSLOG_FILE file_copy;
translog_wait_for_writers(curr_buffer); translog_wait_for_writers(curr_buffer);
DBUG_ASSERT(LSN_FILE_NO(addr) == LSN_FILE_NO(curr_buffer->offset)); DBUG_ASSERT(LSN_FILE_NO(addr) == LSN_FILE_NO(curr_buffer->offset));
from= curr_buffer->buffer + (addr - curr_buffer->offset); from= curr_buffer->buffer + (addr - curr_buffer->offset);
memcpy(buffer, from, TRANSLOG_PAGE_SIZE); memcpy(buffer, from, TRANSLOG_PAGE_SIZE);
/*
We can use copy then in translog_page_validator() because it
do not put it permanently somewhere.
We have to use copy because after releasing log lock we can't
guaranty that the file still be present (in real life it will be
present but theoretically possible that it will be released
already from last files cache);
*/
file_copy= *(curr_buffer->file);
file_copy.handler.callback_data= (uchar*) &file_copy;
is_last_unfinished_page= ((log_descriptor.bc.buffer == is_last_unfinished_page= ((log_descriptor.bc.buffer ==
curr_buffer) && curr_buffer) &&
(log_descriptor.bc.ptr >= from) && (log_descriptor.bc.ptr >= from) &&
...@@ -2538,10 +2706,12 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer, ...@@ -2538,10 +2706,12 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
This IF should be true because we use in-memory data which This IF should be true because we use in-memory data which
supposed to be correct. supposed to be correct.
*/ */
if (translog_page_validator((uchar*) buffer, (uchar*) data)) if (translog_page_validator((uchar*) buffer,
LSN_OFFSET(addr),
(uchar*) &file_copy))
{ {
DBUG_ASSERT(0);
buffer= NULL; buffer= NULL;
DBUG_ASSERT(FALSE);
} }
} }
DBUG_RETURN(buffer); DBUG_RETURN(buffer);
...@@ -2557,21 +2727,9 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer, ...@@ -2557,21 +2727,9 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
} }
translog_unlock(); translog_unlock();
} }
if ((cache_index= LSN_FILE_NO(log_descriptor.horizon) - file_no) < file= get_logfile_by_number(file_no);
OPENED_FILES_NUM)
{
PAGECACHE_FILE file;
/* file in the cache */
if (log_descriptor.log_file_num[cache_index] == -1)
{
if ((log_descriptor.log_file_num[cache_index]=
open_logfile_by_number_no_cache(file_no)) == -1)
DBUG_RETURN(NULL);
}
file.file= log_descriptor.log_file_num[cache_index];
buffer= buffer=
(uchar*) pagecache_valid_read(log_descriptor.pagecache, &file, (uchar*) pagecache_valid_read(log_descriptor.pagecache, &file->handler,
LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE, LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
3, (direct_link ? NULL : (char*) buffer), 3, (direct_link ? NULL : (char*) buffer),
PAGECACHE_PLAIN_PAGE, PAGECACHE_PLAIN_PAGE,
...@@ -2583,27 +2741,7 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer, ...@@ -2583,27 +2741,7 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
DBUG_PRINT("info", ("Direct link is assigned to : 0x%lx * 0x%lx", DBUG_PRINT("info", ("Direct link is assigned to : 0x%lx * 0x%lx",
(ulong) direct_link, (ulong) direct_link,
(ulong)(direct_link ? *direct_link : NULL))); (ulong)(direct_link ? *direct_link : NULL)));
} data->was_recovered= file->was_recovered;
else
{
/*
TODO: WE KEEP THE LAST OPENED_FILES_NUM FILES IN THE LOG CACHE, NOT
THE LAST USED FILES. THIS WILL BE A NOTABLE PROBLEM IF WE ARE
FOLLOWING AN UNDO CHAIN THAT GOES OVER MANY OLD LOG FILES. WE WILL
PROBABLY NEED SPECIAL HANDLING OF THIS OR HAVE A FILO FOR THE LOG
FILES.
*/
File file= open_logfile_by_number_no_cache(file_no);
if (file == -1)
DBUG_RETURN(NULL);
if (my_pread(file, (char*) buffer, TRANSLOG_PAGE_SIZE,
LSN_OFFSET(addr), MYF(MY_FNABP | MY_WME)))
buffer= NULL;
else if (translog_page_validator((uchar*) buffer, (uchar*) data))
buffer= NULL;
my_close(file, MYF(MY_WME));
}
DBUG_RETURN(buffer); DBUG_RETURN(buffer);
} }
...@@ -2825,6 +2963,47 @@ static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr) ...@@ -2825,6 +2963,47 @@ static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/**
@brief Check log files presence
@retval 0 no log files.
@retval 1 there is at least 1 log file in the directory
*/
my_bool translog_is_log_files()
{
MY_DIR *dirp;
uint i;
my_bool rc= FALSE;
/* Finds and removes transaction log files */
if (!(dirp = my_dir(log_descriptor.directory, MYF(MY_DONT_SORT))))
return 1;
for (i= 0; i < dirp->number_off_files; i++)
{
char *file= dirp->dir_entry[i].name;
if (strncmp(file, "maria_log.", 10) == 0 &&
file[10] >= '0' && file[10] <= '9' &&
file[11] >= '0' && file[11] <= '9' &&
file[12] >= '0' && file[12] <= '9' &&
file[13] >= '0' && file[13] <= '9' &&
file[14] >= '0' && file[14] <= '9' &&
file[15] >= '0' && file[15] <= '9' &&
file[16] >= '0' && file[16] <= '9' &&
file[17] >= '0' && file[17] <= '9' &&
file[18] == '\0')
{
rc= TRUE;
break;
}
}
my_dirend(dirp);
return FALSE;
}
/** /**
@brief Initialize transaction log @brief Initialize transaction log
...@@ -2855,6 +3034,7 @@ my_bool translog_init_with_table(const char *directory, ...@@ -2855,6 +3034,7 @@ my_bool translog_init_with_table(const char *directory,
int i; int i;
int old_log_was_recovered= 0, logs_found= 0; int old_log_was_recovered= 0, logs_found= 0;
uint old_flags= flags; uint old_flags= flags;
uint32 start_file_num= 1;
TRANSLOG_ADDRESS sure_page, last_page, last_valid_page; TRANSLOG_ADDRESS sure_page, last_page, last_valid_page;
my_bool version_changed= 0; my_bool version_changed= 0;
DBUG_ENTER("translog_init_with_table"); DBUG_ENTER("translog_init_with_table");
...@@ -2877,6 +3057,10 @@ my_bool translog_init_with_table(const char *directory, ...@@ -2877,6 +3057,10 @@ my_bool translog_init_with_table(const char *directory,
MY_MUTEX_INIT_FAST) || MY_MUTEX_INIT_FAST) ||
pthread_mutex_init(&log_descriptor.log_flush_lock, pthread_mutex_init(&log_descriptor.log_flush_lock,
MY_MUTEX_INIT_FAST) || MY_MUTEX_INIT_FAST) ||
my_rwlock_init(&log_descriptor.open_files_lock,
NULL) ||
my_init_dynamic_array(&log_descriptor.open_files,
sizeof(TRANSLOG_FILE*), 10, 10) ||
my_init_dynamic_array(&log_descriptor.unfinished_files, my_init_dynamic_array(&log_descriptor.unfinished_files,
sizeof(struct st_file_counter), sizeof(struct st_file_counter),
10, 10)) 10, 10))
...@@ -2918,7 +3102,7 @@ my_bool translog_init_with_table(const char *directory, ...@@ -2918,7 +3102,7 @@ my_bool translog_init_with_table(const char *directory,
{ {
page_overhead[i]= 7; page_overhead[i]= 7;
if (i & TRANSLOG_PAGE_CRC) if (i & TRANSLOG_PAGE_CRC)
page_overhead[i]+= CRC_LENGTH; page_overhead[i]+= CRC_SIZE;
if (i & TRANSLOG_SECTOR_PROTECTION) if (i & TRANSLOG_SECTOR_PROTECTION)
page_overhead[i]+= TRANSLOG_PAGE_SIZE / page_overhead[i]+= TRANSLOG_PAGE_SIZE /
DISK_DRIVE_SECTOR_SIZE; DISK_DRIVE_SECTOR_SIZE;
...@@ -2939,14 +3123,21 @@ my_bool translog_init_with_table(const char *directory, ...@@ -2939,14 +3123,21 @@ my_bool translog_init_with_table(const char *directory,
log_descriptor.buffer_capacity_chunk_2, log_descriptor.buffer_capacity_chunk_2,
log_descriptor.half_buffer_capacity_chunk_2)); log_descriptor.half_buffer_capacity_chunk_2));
/* *** Current state of the log handler *** */ /*
last_logno and last_checkpoint_lsn were set in
ma_control_file_create_or_open()
*/
logs_found= (last_logno != FILENO_IMPOSSIBLE);
/* Init log handler file handlers cache */
for (i= 0; i < OPENED_FILES_NUM; i++)
log_descriptor.log_file_num[i]= -1;
/* just to init it somehow */ /* Just to init it somehow (hack for bootstrap)*/
{
TRANSLOG_FILE *file= 0;
log_descriptor.min_file = log_descriptor.max_file= 1;
insert_dynamic(&log_descriptor.open_files, (uchar *)&file);
translog_start_buffer(log_descriptor.buffers, &log_descriptor.bc, 0); translog_start_buffer(log_descriptor.buffers, &log_descriptor.bc, 0);
pop_dynamic(&log_descriptor.open_files);
}
/* Buffers for log writing */ /* Buffers for log writing */
for (i= 0; i < TRANSLOG_BUFFERS_NO; i++) for (i= 0; i < TRANSLOG_BUFFERS_NO; i++)
...@@ -2994,8 +3185,17 @@ my_bool translog_init_with_table(const char *directory, ...@@ -2994,8 +3185,17 @@ my_bool translog_init_with_table(const char *directory,
/* Set horizon to the beginning of the last file first */ /* Set horizon to the beginning of the last file first */
log_descriptor.horizon= last_page= MAKE_LSN(last_logno, 0); log_descriptor.horizon= last_page= MAKE_LSN(last_logno, 0);
if (translog_get_last_page_addr(&last_page, &pageok)) if (translog_get_last_page_addr(&last_page, &pageok))
{
if (!translog_is_log_files())
{
/* files was deleted, just start from the next log number */
start_file_num= last_logno + 1;
logs_found= 0;
}
else
DBUG_RETURN(1); DBUG_RETURN(1);
if (LSN_OFFSET(last_page) == 0) }
else if (LSN_OFFSET(last_page) == 0)
{ {
if (LSN_FILE_NO(last_page) == 1) if (LSN_FILE_NO(last_page) == 1)
{ {
...@@ -3009,6 +3209,52 @@ my_bool translog_init_with_table(const char *directory, ...@@ -3009,6 +3209,52 @@ my_bool translog_init_with_table(const char *directory,
DBUG_RETURN(1); DBUG_RETURN(1);
} }
} }
if (logs_found)
{
uint32 i;
log_descriptor.min_file= translog_first_file(log_descriptor.horizon, 1);
log_descriptor.max_file= last_logno;
/* Open all files */
if (allocate_dynamic(&log_descriptor.open_files,
log_descriptor.max_file -
log_descriptor.min_file + 1))
DBUG_RETURN(1);
for (i = log_descriptor.max_file; i >= log_descriptor.min_file; i--)
{
/*
We can't allocate all file together because they will be freed
one by one
*/
TRANSLOG_FILE *file= (TRANSLOG_FILE *)my_malloc(sizeof(TRANSLOG_FILE),
MYF(0));
if (file == NULL ||
(file->handler.file=
open_logfile_by_number_no_cache(i)) < 0)
{
int j;
for (j= i - log_descriptor.min_file - 1; j > 0; j--)
{
TRANSLOG_FILE *el=
*dynamic_element(&log_descriptor.open_files, j,
TRANSLOG_FILE **);
my_close(el->handler.file, MYF(MY_WME));
my_free(el, MYF(0));
}
if (file)
{
free(file);
DBUG_RETURN(1);
}
else
DBUG_RETURN(1);
}
translog_file_init(file, i, 1);
/* we allocated space so it can't fail */
insert_dynamic(&log_descriptor.open_files, (uchar *)&file);
}
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
}
} }
else if (readonly) else if (readonly)
{ {
...@@ -3150,7 +3396,15 @@ my_bool translog_init_with_table(const char *directory, ...@@ -3150,7 +3396,15 @@ my_bool translog_init_with_table(const char *directory,
if (!old_log_was_recovered && old_flags == flags) if (!old_log_was_recovered && old_flags == flags)
{ {
LOGHANDLER_FILE_INFO info; LOGHANDLER_FILE_INFO info;
if (translog_read_file_header(&info, log_descriptor.log_file_num[0])) /*
Accessing &log_descriptor.open_files without mutex is safe
because it is initialization
*/
if (translog_read_file_header(&info,
(*dynamic_element(&log_descriptor.
open_files,
0, TRANSLOG_FILE **))->
handler.file))
DBUG_RETURN(1); DBUG_RETURN(1);
version_changed= (info.maria_version != TRANSLOG_VERSION_ID); version_changed= (info.maria_version != TRANSLOG_VERSION_ID);
} }
...@@ -3159,15 +3413,26 @@ my_bool translog_init_with_table(const char *directory, ...@@ -3159,15 +3413,26 @@ my_bool translog_init_with_table(const char *directory,
logs_found, old_log_was_recovered)); logs_found, old_log_was_recovered));
if (!logs_found) if (!logs_found)
{ {
TRANSLOG_FILE *file= (TRANSLOG_FILE*)my_malloc(sizeof(TRANSLOG_FILE),
MYF(0));
DBUG_PRINT("info", ("The log is not found => we will create new log")); DBUG_PRINT("info", ("The log is not found => we will create new log"));
if (file == NULL)
DBUG_RETURN(1);
/* Start new log system from scratch */ /* Start new log system from scratch */
/* Used space */ log_descriptor.horizon= MAKE_LSN(start_file_num,
log_descriptor.horizon= MAKE_LSN(1, TRANSLOG_PAGE_SIZE); /* header page */ TRANSLOG_PAGE_SIZE); /* header page */
/* Current logs file number in page cache */ if ((file->handler.file=
if ((log_descriptor.log_file_num[0]= create_logfile_by_number_no_cache(start_file_num)) == -1)
create_logfile_by_number_no_cache(1)) == -1 || DBUG_RETURN(1);
translog_write_file_header()) translog_file_init(file, start_file_num, 0);
if (insert_dynamic(&log_descriptor.open_files, (uchar*)&file))
DBUG_RETURN(1); DBUG_RETURN(1);
log_descriptor.min_file= log_descriptor.max_file= start_file_num;
if (translog_write_file_header())
DBUG_RETURN(1);
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
if (ma_control_file_write_and_force(LSN_IMPOSSIBLE, 1, if (ma_control_file_write_and_force(LSN_IMPOSSIBLE, 1,
CONTROL_FILE_UPDATE_ONLY_LOGNO)) CONTROL_FILE_UPDATE_ONLY_LOGNO))
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -3375,10 +3640,10 @@ static void translog_buffer_destroy(struct st_translog_buffer *buffer) ...@@ -3375,10 +3640,10 @@ static void translog_buffer_destroy(struct st_translog_buffer *buffer)
DBUG_PRINT("enter", DBUG_PRINT("enter",
("Buffer #%u: 0x%lx file: %d offset: (%lu,0x%lx) size: %lu", ("Buffer #%u: 0x%lx file: %d offset: (%lu,0x%lx) size: %lu",
(uint) buffer->buffer_no, (ulong) buffer, (uint) buffer->buffer_no, (ulong) buffer,
buffer->file, (buffer->file ? buffer->file->handler.file : -1),
LSN_IN_PARTS(buffer->offset), LSN_IN_PARTS(buffer->offset),
(ulong) buffer->size)); (ulong) buffer->size));
if (buffer->file != -1) if (buffer->file != NULL)
{ {
/* /*
We ignore errors here, because we can't do something about it We ignore errors here, because we can't do something about it
...@@ -3404,6 +3669,7 @@ static void translog_buffer_destroy(struct st_translog_buffer *buffer) ...@@ -3404,6 +3669,7 @@ static void translog_buffer_destroy(struct st_translog_buffer *buffer)
void translog_destroy() void translog_destroy()
{ {
TRANSLOG_FILE **file;
uint i; uint i;
DBUG_ENTER("translog_destroy"); DBUG_ENTER("translog_destroy");
...@@ -3413,7 +3679,7 @@ void translog_destroy() ...@@ -3413,7 +3679,7 @@ void translog_destroy()
translog_status= (translog_status == TRANSLOG_READONLY ? translog_status= (translog_status == TRANSLOG_READONLY ?
TRANSLOG_UNINITED : TRANSLOG_UNINITED :
TRANSLOG_SHUTDOWN); TRANSLOG_SHUTDOWN);
if (log_descriptor.bc.buffer->file != -1) if (log_descriptor.bc.buffer->file != NULL)
translog_finish_page(&log_descriptor.horizon, &log_descriptor.bc); translog_finish_page(&log_descriptor.horizon, &log_descriptor.bc);
translog_unlock(); translog_unlock();
...@@ -3425,16 +3691,15 @@ void translog_destroy() ...@@ -3425,16 +3691,15 @@ void translog_destroy()
translog_status= TRANSLOG_UNINITED; translog_status= TRANSLOG_UNINITED;
/* close files */ /* close files */
for (i= 0; i < OPENED_FILES_NUM; i++) while ((file= (TRANSLOG_FILE **)pop_dynamic(&log_descriptor.open_files)))
{ translog_close_log_file(*file);
if (log_descriptor.log_file_num[i] != -1)
translog_close_log_file(log_descriptor.log_file_num[i]);
}
pthread_mutex_destroy(&log_descriptor.sent_to_disk_lock); pthread_mutex_destroy(&log_descriptor.sent_to_disk_lock);
pthread_mutex_destroy(&log_descriptor.file_header_lock); pthread_mutex_destroy(&log_descriptor.file_header_lock);
pthread_mutex_destroy(&log_descriptor.unfinished_files_lock); pthread_mutex_destroy(&log_descriptor.unfinished_files_lock);
pthread_mutex_destroy(&log_descriptor.purger_lock); pthread_mutex_destroy(&log_descriptor.purger_lock);
pthread_mutex_destroy(&log_descriptor.log_flush_lock); pthread_mutex_destroy(&log_descriptor.log_flush_lock);
rwlock_destroy(&log_descriptor.open_files_lock);
delete_dynamic(&log_descriptor.open_files);
delete_dynamic(&log_descriptor.unfinished_files); delete_dynamic(&log_descriptor.unfinished_files);
my_close(log_descriptor.directory_fd, MYF(MY_WME)); my_close(log_descriptor.directory_fd, MYF(MY_WME));
...@@ -4963,15 +5228,20 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -4963,15 +5228,20 @@ translog_write_variable_record_mgroup(LSN *lsn,
if (translog_set_lsn_for_files(file_of_the_first_group, LSN_FILE_NO(*lsn), if (translog_set_lsn_for_files(file_of_the_first_group, LSN_FILE_NO(*lsn),
*lsn, FALSE)) *lsn, FALSE))
goto err; goto err;
translog_mark_file_finished(file_of_the_first_group);
translog_mark_file_finished(file_of_the_first_group);
delete_dynamic(&groups); delete_dynamic(&groups);
DBUG_RETURN(rc); DBUG_RETURN(rc);
err_unlock: err_unlock:
translog_unlock(); translog_unlock();
err: err:
translog_mark_file_finished(file_of_the_first_group);
delete_dynamic(&groups); delete_dynamic(&groups);
DBUG_RETURN(1); DBUG_RETURN(1);
} }
...@@ -5613,7 +5883,7 @@ my_bool translog_scanner_init(LSN lsn, ...@@ -5613,7 +5883,7 @@ my_bool translog_scanner_init(LSN lsn,
LSN_IN_PARTS(scanner->horizon))); LSN_IN_PARTS(scanner->horizon)));
/* lsn < horizon */ /* lsn < horizon */
DBUG_ASSERT(lsn < scanner->horizon); DBUG_ASSERT(lsn <= scanner->horizon);
scanner->page_addr= lsn; scanner->page_addr= lsn;
scanner->page_addr-= scanner->page_offset; /*decrease offset */ scanner->page_addr-= scanner->page_offset; /*decrease offset */
...@@ -6647,7 +6917,7 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn) ...@@ -6647,7 +6917,7 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
TRANSLOG_ADDRESS flush_horizon; TRANSLOG_ADDRESS flush_horizon;
int rc= 0; int rc= 0;
/* We can't have more different files then buffers */ /* We can't have more different files then buffers */
File file_handlers[TRANSLOG_BUFFERS_NO]; TRANSLOG_FILE *file_handlers[TRANSLOG_BUFFERS_NO];
int current_file_handler= -1; int current_file_handler= -1;
uint32 prev_file= 0; uint32 prev_file= 0;
my_bool full_circle= 0; my_bool full_circle= 0;
...@@ -6691,7 +6961,7 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn) ...@@ -6691,7 +6961,7 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
translog_buffer_lock(buffer); translog_buffer_lock(buffer);
translog_buffer_unlock(buffer_unlock); translog_buffer_unlock(buffer_unlock);
buffer_unlock= buffer; buffer_unlock= buffer;
if (buffer->file != -1) if (buffer->file != NULL)
{ {
buffer_unlock= NULL; buffer_unlock= NULL;
if (buffer_start == buffer_no) if (buffer_start == buffer_no)
...@@ -6709,27 +6979,14 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn) ...@@ -6709,27 +6979,14 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
if (prev_file != LSN_FILE_NO(buffer->offset)) if (prev_file != LSN_FILE_NO(buffer->offset))
{ {
uint cache_index; TRANSLOG_FILE *file;
uint32 fn= LSN_FILE_NO(buffer->offset); uint32 fn= LSN_FILE_NO(buffer->offset);
prev_file= fn; prev_file= fn;
if ((cache_index= LSN_FILE_NO(log_descriptor.horizon) - fn) < file= get_logfile_by_number(fn);
OPENED_FILES_NUM) if (!file->is_sync)
{
/* file in the cache */
if (log_descriptor.log_file_num[cache_index] == -1)
{ {
if ((log_descriptor.log_file_num[cache_index]=
open_logfile_by_number_no_cache(fn)) == -1)
{
/* We don't need translog_unlock() here */
translog_buffer_unlock(buffer);
rc= 1;
goto out;
}
}
current_file_handler++; current_file_handler++;
file_handlers[current_file_handler]= file_handlers[current_file_handler]= file;
log_descriptor.log_file_num[cache_index];
} }
/* We sync file when we are closing it => do nothing if file closed */ /* We sync file when we are closing it => do nothing if file closed */
} }
...@@ -6744,11 +7001,12 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn) ...@@ -6744,11 +7001,12 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
translog_unlock(); translog_unlock();
{ {
File *handler= file_handlers; TRANSLOG_FILE **cur= file_handlers;
File *end= file_handlers + current_file_handler; TRANSLOG_FILE **end= file_handlers + current_file_handler;
for (; handler <= end; handler++) for (; cur <= end; cur++)
{ {
if (my_sync(*handler, MYF(MY_WME))) (*cur)->is_sync= 1;
if (my_sync((*cur)->handler.file, MYF(MY_WME)))
{ {
rc= 1; rc= 1;
translog_stop_writing(); translog_stop_writing();
...@@ -6938,12 +7196,6 @@ static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected) ...@@ -6938,12 +7196,6 @@ static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected)
max_file= LSN_FILE_NO(horizon); max_file= LSN_FILE_NO(horizon);
if (MAKE_LSN(1, TRANSLOG_PAGE_SIZE) >= horizon)
{
/* there is no first page yet */
DBUG_RETURN(0);
}
/* binary search for last file */ /* binary search for last file */
while (min_file != max_file && min_file != (max_file - 1)) while (min_file != max_file && min_file != (max_file - 1))
{ {
...@@ -6960,6 +7212,8 @@ static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected) ...@@ -6960,6 +7212,8 @@ static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected)
log_descriptor.min_file_number= max_file; log_descriptor.min_file_number= max_file;
if (!is_protected) if (!is_protected)
pthread_mutex_unlock(&log_descriptor.purger_lock); pthread_mutex_unlock(&log_descriptor.purger_lock);
DBUG_PRINT("info", ("first file :%lu", (ulong) max_file));
DBUG_ASSERT(max_file >= 1);
DBUG_RETURN(max_file); DBUG_RETURN(max_file);
} }
...@@ -7170,7 +7424,30 @@ my_bool translog_purge(TRANSLOG_ADDRESS low) ...@@ -7170,7 +7424,30 @@ my_bool translog_purge(TRANSLOG_ADDRESS low)
} }
if (cmp_translog_addr(lsn, low) >= 0) if (cmp_translog_addr(lsn, low) >= 0)
break; break;
DBUG_PRINT("info", ("purge file %lu", (ulong) i)); DBUG_PRINT("info", ("purge file %lu", (ulong) i));
/* remove file descriptor from the cache */
/*
log_descriptor.min_file can be changed only here during execution
and the function is serialized, so we can access it without problems
*/
if (i >= log_descriptor.min_file)
{
TRANSLOG_FILE *file;
rw_wrlock(&log_descriptor.open_files_lock);
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
DBUG_ASSERT(log_descriptor.min_file == i);
file= *((TRANSLOG_FILE **)pop_dynamic(&log_descriptor.open_files));
DBUG_PRINT("info", ("Files : %d", log_descriptor.open_files.elements));
DBUG_ASSERT(i == file->number);
log_descriptor.min_file++;
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
rw_unlock(&log_descriptor.open_files_lock);
translog_close_log_file(file);
}
if (log_purge_type == TRANSLOG_PURGE_IMMIDIATE) if (log_purge_type == TRANSLOG_PURGE_IMMIDIATE)
{ {
char path[FN_REFLEN], *file_name; char path[FN_REFLEN], *file_name;
......
...@@ -59,7 +59,6 @@ struct st_maria_handler; ...@@ -59,7 +59,6 @@ struct st_maria_handler;
/* Length of CRC at end of pages */ /* Length of CRC at end of pages */
#define ROW_EXTENT_PAGE_SIZE 5 #define ROW_EXTENT_PAGE_SIZE 5
#define ROW_EXTENT_COUNT_SIZE 2 #define ROW_EXTENT_COUNT_SIZE 2
#define CRC_LENGTH 4
/* Size of file id in logs */ /* Size of file id in logs */
#define FILEID_STORE_SIZE 2 #define FILEID_STORE_SIZE 2
/* Size of page reference in log */ /* Size of page reference in log */
......
...@@ -151,6 +151,10 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode, ...@@ -151,6 +151,10 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode,
info.errkey= -1; info.errkey= -1;
info.page_changed=1; info.page_changed=1;
info.keyread_buff= info.buff + share->base.max_key_block_length; info.keyread_buff= info.buff + share->base.max_key_block_length;
pagecache_file_init(info.dfile, &maria_page_crc_check_data,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_normal), share);
bitmap_init(&info.changed_fields, changed_fields_bitmap, bitmap_init(&info.changed_fields, changed_fields_bitmap,
share->base.fields, 0); share->base.fields, 0);
if ((*share->init)(&info)) if ((*share->init)(&info))
...@@ -714,6 +718,10 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) ...@@ -714,6 +718,10 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
errpos= 5; errpos= 5;
share->kfile.file= kfile; share->kfile.file= kfile;
pagecache_file_init(share->kfile, &maria_page_crc_check_index,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_index :
&maria_page_filler_set_normal), share);
share->this_process=(ulong) getpid(); share->this_process=(ulong) getpid();
share->last_process= share->state.process; share->last_process= share->state.process;
share->base.key_parts=key_parts; share->base.key_parts=key_parts;
...@@ -1533,6 +1541,14 @@ int _ma_open_datafile(MARIA_HA *info, MARIA_SHARE *share, ...@@ -1533,6 +1541,14 @@ int _ma_open_datafile(MARIA_HA *info, MARIA_SHARE *share,
info->dfile.file= share->bitmap.file.file= info->dfile.file= share->bitmap.file.file=
my_open(share->data_file_name, share->mode | O_SHARE, my_open(share->data_file_name, share->mode | O_SHARE,
MYF(MY_WME)); MYF(MY_WME));
pagecache_file_init(share->bitmap.file, &maria_page_crc_check_bitmap,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_bitmap), share);
pagecache_file_init(info->dfile, &maria_page_crc_check_data,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_normal), share);
return info->dfile.file >= 0 ? 0 : 1; return info->dfile.file >= 0 ? 0 : 1;
} }
...@@ -1547,6 +1563,10 @@ int _ma_open_keyfile(MARIA_SHARE *share) ...@@ -1547,6 +1563,10 @@ int _ma_open_keyfile(MARIA_SHARE *share)
share->kfile.file= my_open(share->unique_file_name, share->kfile.file= my_open(share->unique_file_name,
share->mode | O_SHARE, share->mode | O_SHARE,
MYF(MY_WME)); MYF(MY_WME));
pagecache_file_init(share->kfile, &maria_page_crc_check_index,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_index :
&maria_page_filler_set_normal), share);
pthread_mutex_unlock(&share->intern_lock); pthread_mutex_unlock(&share->intern_lock);
return (share->kfile.file < 0); return (share->kfile.file < 0);
} }
......
...@@ -137,9 +137,6 @@ int _ma_write_keypage(register MARIA_HA *info, ...@@ -137,9 +137,6 @@ int _ma_write_keypage(register MARIA_HA *info,
} }
#endif #endif
DBUG_ASSERT(share->pagecache->block_size == block_size); DBUG_ASSERT(share->pagecache->block_size == block_size);
if (!(share->options & HA_OPTION_PAGE_CHECKSUM))
bfill(buff + block_size - KEYPAGE_CHECKSUM_SIZE,
KEYPAGE_CHECKSUM_SIZE, (uchar) 255);
res= pagecache_write(share->pagecache, res= pagecache_write(share->pagecache,
&share->kfile, page / block_size, &share->kfile, page / block_size,
...@@ -248,7 +245,7 @@ int _ma_dispose(register MARIA_HA *info, my_off_t pos, my_bool page_not_read) ...@@ -248,7 +245,7 @@ int _ma_dispose(register MARIA_HA *info, my_off_t pos, my_bool page_not_read)
lock_method, pin_method, lock_method, pin_method,
PAGECACHE_WRITE_DELAY, &page_link.link, PAGECACHE_WRITE_DELAY, &page_link.link,
LSN_IMPOSSIBLE, LSN_IMPOSSIBLE,
0, share->keypage_header+8, 0, 0)) 0, share->keypage_header + 8))
result= 1; result= 1;
#ifdef IDENTICAL_PAGES_AFTER_RECOVERY #ifdef IDENTICAL_PAGES_AFTER_RECOVERY
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/* /*
These functions handle page cacheing for Maria tables. These functions handle page caching for Maria tables.
One cache can handle many files. One cache can handle many files.
It must contain buffers of the same blocksize. It must contain buffers of the same blocksize.
...@@ -612,8 +612,18 @@ static uint pagecache_fwrite(PAGECACHE *pagecache, ...@@ -612,8 +612,18 @@ static uint pagecache_fwrite(PAGECACHE *pagecache,
/* TODO: integrate with page format */ /* TODO: integrate with page format */
lsn= lsn_korr(buffer + PAGE_LSN_OFFSET); lsn= lsn_korr(buffer + PAGE_LSN_OFFSET);
DBUG_ASSERT(LSN_VALID(lsn)); DBUG_ASSERT(LSN_VALID(lsn));
translog_flush(lsn); if (translog_flush(lsn))
DBUG_RETURN(1);
}
DBUG_PRINT("info", ("write_callback: 0x%lx data: 0x%lx",
(ulong) filedesc->write_callback,
(ulong) filedesc->callback_data));
if ((filedesc->write_callback)(buffer, pageno, filedesc->callback_data))
{
DBUG_PRINT("error", ("write callback problem"));
DBUG_RETURN(1);
} }
DBUG_RETURN(my_pwrite(filedesc->file, buffer, pagecache->block_size, DBUG_RETURN(my_pwrite(filedesc->file, buffer, pagecache->block_size,
(pageno)<<(pagecache->shift), flags)); (pageno)<<(pagecache->shift), flags));
} }
...@@ -2402,8 +2412,6 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache, ...@@ -2402,8 +2412,6 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache,
pagecache pointer to a page cache data structure pagecache pointer to a page cache data structure
block block to which buffer the data is to be read block block to which buffer the data is to be read
primary <-> the current thread will read the data primary <-> the current thread will read the data
validator validator of read from the disk data
validator_data pointer to the data need by the validator
RETURN VALUE RETURN VALUE
None None
...@@ -2417,9 +2425,7 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache, ...@@ -2417,9 +2425,7 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache,
static void read_block(PAGECACHE *pagecache, static void read_block(PAGECACHE *pagecache,
PAGECACHE_BLOCK_LINK *block, PAGECACHE_BLOCK_LINK *block,
my_bool primary, my_bool primary)
pagecache_disk_read_validator validator,
uchar* validator_data)
{ {
/* On entry cache_lock is locked */ /* On entry cache_lock is locked */
...@@ -2452,9 +2458,18 @@ static void read_block(PAGECACHE *pagecache, ...@@ -2452,9 +2458,18 @@ static void read_block(PAGECACHE *pagecache,
else else
block->status= PCBLOCK_READ; block->status= PCBLOCK_READ;
if (validator != NULL && DBUG_PRINT("info", ("read_callback: 0x%lx data: 0x%lx",
(*validator)(block->buffer, validator_data)) (ulong) block->hash_link->file.read_callback,
(ulong) block->hash_link->file.callback_data));
if ((*block->hash_link->file.read_callback)(block->buffer,
block->hash_link->pageno,
block->hash_link->
file.callback_data))
{
DBUG_PRINT("error", ("read callback problem"));
block->status|= PCBLOCK_ERROR; block->status|= PCBLOCK_ERROR;
}
DBUG_PRINT("read_block", DBUG_PRINT("read_block",
("primary request: new page in cache")); ("primary request: new page in cache"));
...@@ -2885,25 +2900,20 @@ void pagecache_unpin_by_link(PAGECACHE *pagecache, ...@@ -2885,25 +2900,20 @@ void pagecache_unpin_by_link(PAGECACHE *pagecache,
/* /*
Read a block of data from a cached file into a buffer; @brief Read a block of data from a cached file into a buffer;
SYNOPSIS @param pagecache pointer to a page cache data structure
pagecache_valid_read() @param file handler for the file for the block of data to be read
pagecache pointer to a page cache data structure @param pageno number of the block of data in the file
file handler for the file for the block of data to be read @param level determines the weight of the data
pageno number of the block of data in the file @param buff buffer to where the data must be placed
level determines the weight of the data @param type type of the page
buff buffer to where the data must be placed @param lock lock change
type type of the page @param link link to the page if we pin it
lock lock change
link link to the page if we pin it
validator validator of read from the disk data
validator_data pointer to the data need by the validator
RETURN VALUE @return address from where the data is placed if successful, 0 - otherwise.
Returns address from where the data is placed if successful, 0 - otherwise.
Pin will be chosen according to lock parameter (see lock_to_pin) @note Pin will be chosen according to lock parameter (see lock_to_pin)
*/ */
static enum pagecache_page_pin lock_to_pin[2][8]= static enum pagecache_page_pin lock_to_pin[2][8]=
{ {
...@@ -2929,16 +2939,14 @@ static enum pagecache_page_pin lock_to_pin[2][8]= ...@@ -2929,16 +2939,14 @@ static enum pagecache_page_pin lock_to_pin[2][8]=
} }
}; };
uchar *pagecache_valid_read(PAGECACHE *pagecache, uchar *pagecache_read(PAGECACHE *pagecache,
PAGECACHE_FILE *file, PAGECACHE_FILE *file,
pgcache_page_no_t pageno, pgcache_page_no_t pageno,
uint level, uint level,
uchar *buff, uchar *buff,
enum pagecache_page_type type, enum pagecache_page_type type,
enum pagecache_page_lock lock, enum pagecache_page_lock lock,
PAGECACHE_BLOCK_LINK **page_link, PAGECACHE_BLOCK_LINK **page_link)
pagecache_disk_read_validator validator,
uchar* validator_data)
{ {
int error= 0; int error= 0;
enum pagecache_page_pin pin= lock_to_pin[test(buff==0)][lock]; enum pagecache_page_pin pin= lock_to_pin[test(buff==0)][lock];
...@@ -2995,8 +3003,7 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache, ...@@ -2995,8 +3003,7 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache,
DBUG_PRINT("info", ("read block 0x%lx", (ulong)block)); DBUG_PRINT("info", ("read block 0x%lx", (ulong)block));
/* The requested page is to be read into the block buffer */ /* The requested page is to be read into the block buffer */
read_block(pagecache, block, read_block(pagecache, block,
(my_bool)(page_st == PAGE_TO_BE_READ), (my_bool)(page_st == PAGE_TO_BE_READ));
validator, validator_data);
DBUG_PRINT("info", ("read is done")); DBUG_PRINT("info", ("read is done"));
} }
...@@ -3313,9 +3320,7 @@ my_bool pagecache_write_part(PAGECACHE *pagecache, ...@@ -3313,9 +3320,7 @@ my_bool pagecache_write_part(PAGECACHE *pagecache,
enum pagecache_write_mode write_mode, enum pagecache_write_mode write_mode,
PAGECACHE_BLOCK_LINK **page_link, PAGECACHE_BLOCK_LINK **page_link,
LSN first_REDO_LSN_for_page, LSN first_REDO_LSN_for_page,
uint offset, uint size, uint offset, uint size)
pagecache_disk_read_validator validator,
uchar* validator_data)
{ {
PAGECACHE_BLOCK_LINK *block= NULL; PAGECACHE_BLOCK_LINK *block= NULL;
PAGECACHE_BLOCK_LINK *fake_link; PAGECACHE_BLOCK_LINK *fake_link;
...@@ -3416,12 +3421,20 @@ my_bool pagecache_write_part(PAGECACHE *pagecache, ...@@ -3416,12 +3421,20 @@ my_bool pagecache_write_part(PAGECACHE *pagecache,
memcpy(block->buffer + offset, buff, size); memcpy(block->buffer + offset, buff, size);
block->status= PCBLOCK_READ; block->status= PCBLOCK_READ;
/* /*
The validator can change the page content (removing page The read_callback can change the page content (removing page
protection) so it have to be called protection) so it have to be called
*/ */
if (validator != NULL && DBUG_PRINT("info", ("read_callback: 0x%lx data: 0x%lx",
(*validator)(block->buffer, validator_data)) (ulong) block->hash_link->file.read_callback,
(ulong) block->hash_link->file.callback_data));
if ((*block->hash_link->file.read_callback)(block->buffer,
block->hash_link->pageno,
block->hash_link->
file.callback_data))
{
DBUG_PRINT("error", ("read callback problem"));
block->status|= PCBLOCK_ERROR; block->status|= PCBLOCK_ERROR;
}
KEYCACHE_DBUG_PRINT("key_cache_insert", KEYCACHE_DBUG_PRINT("key_cache_insert",
("Page injection")); ("Page injection"));
#ifdef THREAD #ifdef THREAD
...@@ -3433,7 +3446,6 @@ my_bool pagecache_write_part(PAGECACHE *pagecache, ...@@ -3433,7 +3446,6 @@ my_bool pagecache_write_part(PAGECACHE *pagecache,
} }
else else
{ {
DBUG_ASSERT(validator == 0 && validator_data == 0);
if (! (block->status & PCBLOCK_CHANGED)) if (! (block->status & PCBLOCK_CHANGED))
link_to_changed_list(pagecache, block); link_to_changed_list(pagecache, block);
......
...@@ -74,15 +74,20 @@ enum pagecache_write_mode ...@@ -74,15 +74,20 @@ enum pagecache_write_mode
PAGECACHE_WRITE_DONE PAGECACHE_WRITE_DONE
}; };
/* page number for maria */
typedef uint32 pgcache_page_no_t;
/* file descriptor for Maria */ /* file descriptor for Maria */
typedef struct st_pagecache_file typedef struct st_pagecache_file
{ {
File file; File file;
my_bool (*read_callback)(uchar *page, pgcache_page_no_t offset,
uchar *data);
my_bool (*write_callback)(uchar *page, pgcache_page_no_t offset,
uchar *data);
uchar *callback_data;
} PAGECACHE_FILE; } PAGECACHE_FILE;
/* page number for maria */
typedef uint32 pgcache_page_no_t;
/* declare structures that is used by st_pagecache */ /* declare structures that is used by st_pagecache */
struct st_pagecache_block_link; struct st_pagecache_block_link;
...@@ -94,8 +99,6 @@ typedef struct st_pagecache_hash_link PAGECACHE_HASH_LINK; ...@@ -94,8 +99,6 @@ typedef struct st_pagecache_hash_link PAGECACHE_HASH_LINK;
#include <wqueue.h> #include <wqueue.h>
typedef my_bool (*pagecache_disk_read_validator)(uchar *page, uchar *data);
#define PAGECACHE_CHANGED_BLOCKS_HASH 128 /* must be power of 2 */ #define PAGECACHE_CHANGED_BLOCKS_HASH 128 /* must be power of 2 */
#define PAGECACHE_PRIORITY_LOW 0 #define PAGECACHE_PRIORITY_LOW 0
#define PAGECACHE_PRIORITY_DEFAULT 3 #define PAGECACHE_PRIORITY_DEFAULT 3
...@@ -192,26 +195,21 @@ extern ulong resize_pagecache(PAGECACHE *pagecache, ...@@ -192,26 +195,21 @@ extern ulong resize_pagecache(PAGECACHE *pagecache,
extern void change_pagecache_param(PAGECACHE *pagecache, uint division_limit, extern void change_pagecache_param(PAGECACHE *pagecache, uint division_limit,
uint age_threshold); uint age_threshold);
#define pagecache_read(P,F,N,L,B,T,K,I) \ extern uchar *pagecache_read(PAGECACHE *pagecache,
pagecache_valid_read(P,F,N,L,B,T,K,I,0,0)
extern uchar *pagecache_valid_read(PAGECACHE *pagecache,
PAGECACHE_FILE *file, PAGECACHE_FILE *file,
pgcache_page_no_t pageno, pgcache_page_no_t pageno,
uint level, uint level,
uchar *buff, uchar *buff,
enum pagecache_page_type type, enum pagecache_page_type type,
enum pagecache_page_lock lock, enum pagecache_page_lock lock,
PAGECACHE_BLOCK_LINK **link, PAGECACHE_BLOCK_LINK **link);
pagecache_disk_read_validator validator,
uchar* validator_data);
#define pagecache_write(P,F,N,L,B,T,O,I,M,K,R) \ #define pagecache_write(P,F,N,L,B,T,O,I,M,K,R) \
pagecache_write_part(P,F,N,L,B,T,O,I,M,K,R,0,(P)->block_size,0,0) pagecache_write_part(P,F,N,L,B,T,O,I,M,K,R,0,(P)->block_size)
#define pagecache_inject(P,F,N,L,B,T,O,I,K,R,V,D) \ #define pagecache_inject(P,F,N,L,B,T,O,I,K,R) \
pagecache_write_part(P,F,N,L,B,T,O,I,PAGECACHE_WRITE_DONE, \ pagecache_write_part(P,F,N,L,B,T,O,I,PAGECACHE_WRITE_DONE, \
K,R,0,(P)->block_size,V,D) K,R,0,(P)->block_size)
extern my_bool pagecache_write_part(PAGECACHE *pagecache, extern my_bool pagecache_write_part(PAGECACHE *pagecache,
PAGECACHE_FILE *file, PAGECACHE_FILE *file,
...@@ -225,9 +223,7 @@ extern my_bool pagecache_write_part(PAGECACHE *pagecache, ...@@ -225,9 +223,7 @@ extern my_bool pagecache_write_part(PAGECACHE *pagecache,
PAGECACHE_BLOCK_LINK **link, PAGECACHE_BLOCK_LINK **link,
LSN first_REDO_LSN_for_page, LSN first_REDO_LSN_for_page,
uint offset, uint offset,
uint size, uint size);
pagecache_disk_read_validator validator,
uchar* validator_data);
extern void pagecache_unlock(PAGECACHE *pagecache, extern void pagecache_unlock(PAGECACHE *pagecache,
PAGECACHE_FILE *file, PAGECACHE_FILE *file,
pgcache_page_no_t pageno, pgcache_page_no_t pageno,
...@@ -261,6 +257,11 @@ extern void pagecache_unpin_by_link(PAGECACHE *pagecache, ...@@ -261,6 +257,11 @@ extern void pagecache_unpin_by_link(PAGECACHE *pagecache,
/* PCFLUSH_ERROR and PCFLUSH_PINNED. */ /* PCFLUSH_ERROR and PCFLUSH_PINNED. */
#define PCFLUSH_PINNED_AND_ERROR (PCFLUSH_ERROR|PCFLUSH_PINNED) #define PCFLUSH_PINNED_AND_ERROR (PCFLUSH_ERROR|PCFLUSH_PINNED)
#define pagecache_file_init(F,RC,WC,D) \
do{ \
(F).read_callback= (RC); (F).write_callback= (WC); \
(F).callback_data= (uchar*)(D); \
} while(0)
#define flush_pagecache_blocks(A,B,C) \ #define flush_pagecache_blocks(A,B,C) \
flush_pagecache_blocks_with_filter(A,B,C,NULL,NULL) flush_pagecache_blocks_with_filter(A,B,C,NULL,NULL)
......
...@@ -99,12 +99,19 @@ int maria_panic(enum ha_panic_function flag) ...@@ -99,12 +99,19 @@ int maria_panic(enum ha_panic_function flag)
{ /* Open closed files */ { /* Open closed files */
char name_buff[FN_REFLEN]; char name_buff[FN_REFLEN];
if (info->s->kfile.file < 0) if (info->s->kfile.file < 0)
{
if ((info->s->kfile.file= my_open(fn_format(name_buff, if ((info->s->kfile.file= my_open(fn_format(name_buff,
info->filename, "", info->filename, "",
N_NAME_IEXT,4), N_NAME_IEXT,4),
info->mode, info->mode,
MYF(MY_WME))) < 0) MYF(MY_WME))) < 0)
error = my_errno; error = my_errno;
pagecache_file_init(info->s->kfile, &maria_page_crc_check_index,
(info->s->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_index :
&maria_page_filler_set_normal), info->s);
}
if (info->dfile.file < 0) if (info->dfile.file < 0)
{ {
if ((info->dfile.file= my_open(fn_format(name_buff, info->filename, if ((info->dfile.file= my_open(fn_format(name_buff, info->filename,
...@@ -112,6 +119,10 @@ int maria_panic(enum ha_panic_function flag) ...@@ -112,6 +119,10 @@ int maria_panic(enum ha_panic_function flag)
info->mode, info->mode,
MYF(MY_WME))) < 0) MYF(MY_WME))) < 0)
error = my_errno; error = my_errno;
pagecache_file_init(info->dfile, &maria_page_crc_check_data,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal:
&maria_page_filler_set_normal), share);
info->rec_cache.file= info->dfile.file; info->rec_cache.file= info->dfile.file;
} }
} }
......
...@@ -1670,6 +1670,10 @@ static int maria_sort_records(HA_CHECK *param, ...@@ -1670,6 +1670,10 @@ static int maria_sort_records(HA_CHECK *param,
VOID(my_close(info->dfile.file, MYF(MY_WME))); VOID(my_close(info->dfile.file, MYF(MY_WME)));
param->out_flag|=O_NEW_DATA; /* Data in new file */ param->out_flag|=O_NEW_DATA; /* Data in new file */
info->dfile.file= new_file; /* Use new datafile */ info->dfile.file= new_file; /* Use new datafile */
pagecache_file_init(info->dfile, &maria_page_crc_check_data,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_normal), share);
info->state->del=0; info->state->del=0;
info->state->empty=0; info->state->empty=0;
share->state.dellink= HA_OFFSET_ERROR; share->state.dellink= HA_OFFSET_ERROR;
......
...@@ -46,6 +46,8 @@ struct st_transaction; ...@@ -46,6 +46,8 @@ struct st_transaction;
/* undef map from my_nosys; We need test-if-disk full */ /* undef map from my_nosys; We need test-if-disk full */
#undef my_write #undef my_write
#define CRC_SIZE 4
typedef struct st_maria_status_info typedef struct st_maria_status_info
{ {
ha_rows records; /* Rows in table */ ha_rows records; /* Rows in table */
...@@ -574,9 +576,19 @@ struct st_maria_handler ...@@ -574,9 +576,19 @@ struct st_maria_handler
#define _ma_store_keypage_flag(share,x,flag) x[(share)->keypage_header - KEYPAGE_USED_SIZE - KEYPAGE_FLAG_SIZE]= (flag) #define _ma_store_keypage_flag(share,x,flag) x[(share)->keypage_header - KEYPAGE_USED_SIZE - KEYPAGE_FLAG_SIZE]= (flag)
/*
TODO: write int4store_aligned as *((uint32 *) (T))= (uint32) (A) for
architectures where it is possible
*/
#define int4store_aligned(A,B) int4store((A),(B))
#define maria_mark_crashed(x) do{(x)->s->state.changed|= STATE_CRASHED; \ #define maria_mark_crashed(x) do{(x)->s->state.changed|= STATE_CRASHED; \
DBUG_PRINT("error", ("Marked table crashed")); \ DBUG_PRINT("error", ("Marked table crashed")); \
}while(0) }while(0)
#define maria_mark_crashed_share(x) \
do{(x)->state.changed|= STATE_CRASHED; \
DBUG_PRINT("error", ("Marked table crashed")); \
}while(0)
#define maria_mark_crashed_on_repair(x) do{(x)->s->state.changed|= \ #define maria_mark_crashed_on_repair(x) do{(x)->s->state.changed|= \
STATE_CRASHED|STATE_CRASHED_ON_REPAIR; \ STATE_CRASHED|STATE_CRASHED_ON_REPAIR; \
(x)->update|= HA_STATE_CHANGED; \ (x)->update|= HA_STATE_CHANGED; \
...@@ -1040,4 +1052,27 @@ void _ma_tmp_disable_logging_for_table(MARIA_HA *info, ...@@ -1040,4 +1052,27 @@ void _ma_tmp_disable_logging_for_table(MARIA_HA *info,
{ if (((S)->now_transactional= (S)->base.born_transactional)) \ { if (((S)->now_transactional= (S)->base.born_transactional)) \
(S)->page_type= PAGECACHE_LSN_PAGE; } (S)->page_type= PAGECACHE_LSN_PAGE; }
#define MARIA_NO_CRC_NORMAL_PAGE 0xffffffff
#define MARIA_NO_CRC_BITMAP_PAGE 0xfffffffe
extern my_bool maria_page_crc_set_index(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
extern my_bool maria_page_crc_set_normal(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
extern my_bool maria_page_crc_check_bitmap(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
extern my_bool maria_page_crc_check_data(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
extern my_bool maria_page_crc_check_index(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
extern my_bool maria_page_filler_set_bitmap(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
extern my_bool maria_page_filler_set_normal(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
extern PAGECACHE *maria_log_pagecache; extern PAGECACHE *maria_log_pagecache;
...@@ -47,7 +47,8 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \ ...@@ -47,7 +47,8 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \
ma_test_loghandler_first_lsn-t \ ma_test_loghandler_first_lsn-t \
ma_test_loghandler_max_lsn-t \ ma_test_loghandler_max_lsn-t \
ma_test_loghandler_purge-t \ ma_test_loghandler_purge-t \
ma_test_loghandler_readonly-t ma_test_loghandler_readonly-t\
ma_test_loghandler_nologs-t
ma_test_loghandler_t_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c ma_test_loghandler_t_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_multigroup_t_SOURCES = ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c ma_test_loghandler_multigroup_t_SOURCES = ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
...@@ -61,6 +62,7 @@ ma_test_loghandler_max_lsn_t_SOURCES = ma_test_loghandler_max_lsn-t.c ma_maria_l ...@@ -61,6 +62,7 @@ ma_test_loghandler_max_lsn_t_SOURCES = ma_test_loghandler_max_lsn-t.c ma_maria_l
ma_test_loghandler_purge_t_SOURCES = ma_test_loghandler_purge-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c ma_test_loghandler_purge_t_SOURCES = ma_test_loghandler_purge-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_readonly_t_SOURCES = ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c ma_test_loghandler_readonly_t_SOURCES = ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_readonly_t_CPPFLAGS = -DREADONLY_TEST ma_test_loghandler_readonly_t_CPPFLAGS = -DREADONLY_TEST
ma_test_loghandler_nologs_t_SOURCES = ma_test_loghandler_nologs-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_pagecache_single_src = ma_pagecache_single.c test_file.c test_file.h ma_pagecache_single_src = ma_pagecache_single.c test_file.c test_file.h
ma_pagecache_consist_src = ma_pagecache_consist.c test_file.c test_file.h ma_pagecache_consist_src = ma_pagecache_consist.c test_file.c test_file.h
......
...@@ -37,9 +37,13 @@ my_bool maria_log_remove() ...@@ -37,9 +37,13 @@ my_bool maria_log_remove()
if (fn_format(file_name, file, if (fn_format(file_name, file,
maria_data_root, "", MYF(MY_WME)) == NullS || maria_data_root, "", MYF(MY_WME)) == NullS ||
my_delete(file_name, MYF(MY_WME)) != 0) my_delete(file_name, MYF(MY_WME)) != 0)
{
my_dirend(dirp);
return 1; return 1;
} }
} }
}
my_dirend(dirp);
return 0; return 0;
} }
...@@ -57,6 +57,18 @@ static uint flush_divider= 1000; ...@@ -57,6 +57,18 @@ static uint flush_divider= 1000;
#endif /*TEST_READERS*/ #endif /*TEST_READERS*/
#endif /*TEST_HIGH_CONCURENCY*/ #endif /*TEST_HIGH_CONCURENCY*/
/**
@brief Dummy pagecache callback.
*/
static my_bool
dummy_callback(__attribute__((unused)) uchar *page,
__attribute__((unused)) pgcache_page_no_t page_no,
__attribute__((unused)) uchar* data_ptr)
{
return 0;
}
/* /*
Get pseudo-random length of the field in (0;limit) Get pseudo-random length of the field in (0;limit)
...@@ -321,6 +333,7 @@ int main(int argc __attribute__((unused)), ...@@ -321,6 +333,7 @@ int main(int argc __attribute__((unused)),
errno); errno);
exit(1); exit(1);
} }
pagecache_file_init(file1, &dummy_callback, &dummy_callback, NULL);
DBUG_PRINT("info", ("file1: %d", file1.file)); DBUG_PRINT("info", ("file1: %d", file1.file));
if (chmod(file1_name, S_IRWXU | S_IRWXG | S_IRWXO) != 0) if (chmod(file1_name, S_IRWXU | S_IRWXG | S_IRWXO) != 0)
{ {
......
...@@ -60,6 +60,19 @@ static struct file_desc simple_delete_flush_test_file[]= ...@@ -60,6 +60,19 @@ static struct file_desc simple_delete_flush_test_file[]=
{ 0, 0} { 0, 0}
}; };
/**
@brief Dummy pagecache callback.
*/
static my_bool
dummy_callback(__attribute__((unused)) uchar *page,
__attribute__((unused)) pgcache_page_no_t page_no,
__attribute__((unused)) uchar* data_ptr)
{
return 0;
}
/* /*
Recreate and reopen a file for test Recreate and reopen a file for test
...@@ -496,7 +509,6 @@ int main(int argc __attribute__((unused)), ...@@ -496,7 +509,6 @@ int main(int argc __attribute__((unused)),
#endif #endif
DBUG_ENTER("main"); DBUG_ENTER("main");
DBUG_PRINT("info", ("Main thread: %s\n", my_thread_name())); DBUG_PRINT("info", ("Main thread: %s\n", my_thread_name()));
if ((tmp_file= my_open(file2_name, O_CREAT | O_TRUNC | O_RDWR, if ((tmp_file= my_open(file2_name, O_CREAT | O_TRUNC | O_RDWR,
MYF(MY_WME))) < 0) MYF(MY_WME))) < 0)
exit(1); exit(1);
...@@ -508,6 +520,7 @@ int main(int argc __attribute__((unused)), ...@@ -508,6 +520,7 @@ int main(int argc __attribute__((unused)),
errno); errno);
exit(1); exit(1);
} }
pagecache_file_init(file1, &dummy_callback, &dummy_callback, NULL);
my_close(tmp_file, MYF(0)); my_close(tmp_file, MYF(0));
my_delete(file2_name, MYF(0)); my_delete(file2_name, MYF(0));
......
...@@ -180,7 +180,6 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -180,7 +180,6 @@ int main(int argc __attribute__((unused)), char *argv[])
LOG_FLAGS, 0, &translog_example_table_init)) LOG_FLAGS, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1); exit(1);
} }
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
......
...@@ -70,7 +70,6 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -70,7 +70,6 @@ int main(int argc __attribute__((unused)), char *argv[])
LOG_FLAGS, 0, &translog_example_table_init)) LOG_FLAGS, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1); exit(1);
} }
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
......
...@@ -64,7 +64,6 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -64,7 +64,6 @@ int main(int argc __attribute__((unused)), char *argv[])
LOG_FLAGS, 0, &translog_example_table_init)) LOG_FLAGS, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1); exit(1);
} }
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
......
...@@ -189,7 +189,6 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -189,7 +189,6 @@ int main(int argc __attribute__((unused)), char *argv[])
0, 0, &translog_example_table_init)) 0, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1); exit(1);
} }
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
...@@ -354,7 +353,6 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -354,7 +353,6 @@ int main(int argc __attribute__((unused)), char *argv[])
0, READONLY, &translog_example_table_init)) 0, READONLY, &translog_example_table_init))
{ {
fprintf(stderr, "pass2: Can't init loghandler (%d)\n", errno); fprintf(stderr, "pass2: Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1); exit(1);
} }
......
...@@ -287,7 +287,6 @@ int main(int argc __attribute__((unused)), ...@@ -287,7 +287,6 @@ int main(int argc __attribute__((unused)),
LOG_FLAGS, 0, &translog_example_table_init)) LOG_FLAGS, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1); exit(1);
} }
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
......
...@@ -72,7 +72,6 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -72,7 +72,6 @@ int main(int argc __attribute__((unused)), char *argv[])
LOG_FLAGS, 0, &translog_example_table_init)) LOG_FLAGS, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1); exit(1);
} }
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
......
...@@ -20,6 +20,19 @@ static char *first_translog_file= (char*)"maria_log.00000001"; ...@@ -20,6 +20,19 @@ static char *first_translog_file= (char*)"maria_log.00000001";
static char *file1_name= (char*)"page_cache_test_file_1"; static char *file1_name= (char*)"page_cache_test_file_1";
static PAGECACHE_FILE file1; static PAGECACHE_FILE file1;
/**
@brief Dummy pagecache callback.
*/
static my_bool
dummy_callback(__attribute__((unused)) uchar *page,
__attribute__((unused)) pgcache_page_no_t page_no,
__attribute__((unused)) uchar* data_ptr)
{
return 0;
}
int main(int argc __attribute__((unused)), char *argv[]) int main(int argc __attribute__((unused)), char *argv[])
{ {
uint pagen; uint pagen;
...@@ -72,7 +85,6 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -72,7 +85,6 @@ int main(int argc __attribute__((unused)), char *argv[])
LOG_FLAGS, 0, &translog_example_table_init)) LOG_FLAGS, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1); exit(1);
} }
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
...@@ -112,6 +124,7 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -112,6 +124,7 @@ int main(int argc __attribute__((unused)), char *argv[])
errno); errno);
exit(1); exit(1);
} }
pagecache_file_init(file1, &dummy_callback, &dummy_callback, NULL);
if (chmod(file1_name, S_IRWXU | S_IRWXG | S_IRWXO) != 0) if (chmod(file1_name, S_IRWXU | S_IRWXG | S_IRWXO) != 0)
{ {
fprintf(stderr, "Got error during file1 chmod() (errno: %d)\n", fprintf(stderr, "Got error during file1 chmod() (errno: %d)\n",
......
...@@ -67,7 +67,6 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -67,7 +67,6 @@ int main(int argc __attribute__((unused)), char *argv[])
LOG_FLAGS, 0, &translog_example_table_init)) LOG_FLAGS, 0, &translog_example_table_init))
{ {
fprintf(stderr, "Can't init loghandler (%d)\n", errno); fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1); exit(1);
} }
/* Suppressing of automatic record writing */ /* Suppressing of automatic record writing */
......
...@@ -30,11 +30,6 @@ int test_file(PAGECACHE_FILE file, char *file_name, ...@@ -30,11 +30,6 @@ int test_file(PAGECACHE_FILE file, char *file_name,
int step= 0; int step= 0;
int res= 1; /* ok */ int res= 1; /* ok */
if (my_sync(file.file, MYF(MY_WME | MY_IGNORE_BADFD)))
{
diag("Got error during syncing file\n");
exit(1);
}
if ((stat= my_stat(file_name, &stat_buff, MYF(0))) == NULL) if ((stat= my_stat(file_name, &stat_buff, MYF(0))) == NULL)
{ {
diag("Can't stat() %s (errno: %d)\n", file_name, errno); diag("Can't stat() %s (errno: %d)\n", file_name, errno);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment