Commit fce0e6a0 authored by unknown's avatar unknown

Pagecache callbacks support added.

Page CRC check based on pagecache support added.
Loghandler pagecache callbacks support added.
Loghandler filecache rewritten.
Support of deletting all logs added.


storage/maria/Makefile.am:
  New file with functions for CRC calculation.
storage/maria/ma_bitmap.c:
  Page CRC support.
storage/maria/ma_blockrec.c:
  Removed code replaced by pagecache callbacks.
storage/maria/ma_check.c:
  Page CRC support.
storage/maria/ma_create.c:
  Page CRC support.
storage/maria/ma_loghandler.c:
  Pagecache callbacks support.
  New file cache support.
  Removing log files support.
storage/maria/ma_loghandler.h:
  CRC_LENGTH replaced with CRC_SIZE
storage/maria/ma_open.c:
  Page CRC support.
storage/maria/ma_page.c:
  Page CRC support.
storage/maria/ma_pagecache.c:
  Pagecache callbacks support.
storage/maria/ma_pagecache.h:
  Pagecache callbacks support.
storage/maria/ma_panic.c:
  Page CRC support.
storage/maria/maria_chk.c:
  Page CRC support.
storage/maria/maria_def.h:
  Page CRC support.
storage/maria/unittest/Makefile.am:
  New test of removing logs.
storage/maria/unittest/ma_maria_log_cleanup.c:
  Memory leack fixed.
storage/maria/unittest/ma_pagecache_consist.c:
  Pagecache callbacks support.
storage/maria/unittest/ma_pagecache_single.c:
  Pagecache callbacks support.
storage/maria/unittest/ma_test_loghandler-t.c:
  Fixed the test error processing.
storage/maria/unittest/ma_test_loghandler_first_lsn-t.c:
  Fixed the test error processing.
storage/maria/unittest/ma_test_loghandler_max_lsn-t.c:
  Fixed the test error processing.
storage/maria/unittest/ma_test_loghandler_multigroup-t.c:
  Fixed the test error processing.
storage/maria/unittest/ma_test_loghandler_multithread-t.c:
  Fixed the test error processing.
storage/maria/unittest/ma_test_loghandler_noflush-t.c:
  Fixed the test error processing.
storage/maria/unittest/ma_test_loghandler_pagecache-t.c:
  Pagecache callbacks support.
storage/maria/unittest/ma_test_loghandler_purge-t.c:
  Fixed the test error processing.
storage/maria/unittest/test_file.c:
  Removed unneeded sync.
parent eb14b3b0
......@@ -122,7 +122,8 @@ libmaria_a_SOURCES = ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c \
ma_rt_index.c ma_rt_key.c ma_rt_mbr.c ma_rt_split.c \
ma_sp_key.c ma_control_file.c ma_loghandler.c \
ma_pagecache.c ma_pagecaches.c \
ma_checkpoint.c ma_recovery.c ma_commit.c
ma_checkpoint.c ma_recovery.c ma_commit.c \
ma_pagecrc.c
CLEANFILES = test?.MA? FT?.MA? isam.log ma_test_all ma_rt_test.MA? sp_test.MA?
SUFFIXES = .sh
......
......@@ -127,12 +127,6 @@
#define FULL_HEAD_PAGE 4
#define FULL_TAIL_PAGE 7
/* If we don't have page checksum enabled, the bitmap page ends with this */
uchar maria_bitmap_marker[4]=
{(uchar) 255, (uchar) 255, (uchar) 255, (uchar) 254};
uchar maria_normal_page_marker[4]=
{(uchar) 255, (uchar) 255, (uchar) 255, (uchar) 255};
static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
MARIA_FILE_BITMAP *bitmap,
ulonglong page);
......@@ -144,6 +138,7 @@ static inline my_bool write_changed_bitmap(MARIA_SHARE *share,
MARIA_FILE_BITMAP *bitmap)
{
DBUG_ASSERT(share->pagecache->block_size == bitmap->block_size);
DBUG_ASSERT(bitmap->file.write_callback != 0);
return (pagecache_write(share->pagecache,
&bitmap->file, bitmap->page, 0,
(uchar*) bitmap->map, PAGECACHE_PLAIN_PAGE,
......@@ -185,7 +180,11 @@ my_bool _ma_bitmap_init(MARIA_SHARE *share, File file)
bitmap->file.file= file;
bitmap->block_size= share->block_size;
/* Size needs to be alligned on 6 */
pagecache_file_init(bitmap->file, &maria_page_crc_check_bitmap,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_bitmap), share);
/* Size needs to be aligned on 6 */
aligned_bit_blocks= (share->block_size - PAGE_SUFFIX_SIZE) / 6;
bitmap->total_size= aligned_bit_blocks * 6;
/*
......@@ -290,8 +289,6 @@ void _ma_bitmap_delete_all(MARIA_SHARE *share)
if (bitmap->map) /* Not in create */
{
bzero(bitmap->map, bitmap->block_size);
memcpy(bitmap->map + bitmap->block_size - sizeof(maria_bitmap_marker),
maria_bitmap_marker, sizeof(maria_bitmap_marker));
bitmap->changed= 1;
bitmap->page= 0;
bitmap->used_size= bitmap->total_size;
......@@ -616,8 +613,6 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
*/
share->state.state.data_file_length= end_of_page;
bzero(bitmap->map, bitmap->block_size);
memcpy(bitmap->map + bitmap->block_size - sizeof(maria_bitmap_marker),
maria_bitmap_marker, sizeof(maria_bitmap_marker));
bitmap->used_size= 0;
#ifndef DBUG_OFF
memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size);
......@@ -627,9 +622,9 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
bitmap->used_size= bitmap->total_size;
DBUG_ASSERT(share->pagecache->block_size == bitmap->block_size);
res= pagecache_read(share->pagecache,
(PAGECACHE_FILE*)&bitmap->file, page, 0,
(uchar*) bitmap->map,
PAGECACHE_PLAIN_PAGE,
&bitmap->file, page, 0,
(uchar*) bitmap->map,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0) == NULL;
/*
......@@ -641,15 +636,6 @@ static my_bool _ma_read_bitmap_page(MARIA_SHARE *share,
when running without any checksums.
*/
if (!res && !(share->options & HA_OPTION_PAGE_CHECKSUM) &&
!memcmp(bitmap->map + bitmap->block_size -
sizeof(maria_normal_page_marker),
maria_normal_page_marker,
sizeof(maria_normal_page_marker)))
{
res= 1;
my_errno= HA_ERR_WRONG_IN_RECORD; /* File crashed */
}
#ifndef DBUG_OFF
if (!res)
memcpy(bitmap->map + bitmap->block_size, bitmap->map, bitmap->block_size);
......@@ -2257,17 +2243,18 @@ int _ma_bitmap_create_first(MARIA_SHARE *share)
{
uint block_size= share->bitmap.block_size;
File file= share->bitmap.file.file;
char marker[sizeof(maria_bitmap_marker)];
char marker[CRC_SIZE];
if (share->options & HA_OPTION_PAGE_CHECKSUM)
bzero(marker, sizeof(marker));
else
bmove(marker, maria_bitmap_marker, sizeof(marker));
/*
Next write operation of the page will write correct CRC
if it is needed
*/
int4store(marker, MARIA_NO_CRC_BITMAP_PAGE);
if (my_chsize(file, block_size - sizeof(maria_bitmap_marker),
if (my_chsize(file, block_size - sizeof(marker),
0, MYF(MY_WME)) ||
my_pwrite(file, marker, sizeof(maria_bitmap_marker),
block_size - sizeof(maria_bitmap_marker),
my_pwrite(file, marker, sizeof(marker),
block_size - sizeof(marker),
MYF(MY_NABP | MY_WME)))
return 1;
share->state.state.data_file_length= block_size;
......
......@@ -1242,9 +1242,7 @@ static void make_empty_page(MARIA_HA *info, uchar *buff, uint page_type)
buff[DIR_FREE_OFFSET]= END_OF_DIR_FREE_LIST;
int2store(buff + block_size - PAGE_SUFFIX_SIZE - DIR_ENTRY_SIZE,
PAGE_HEADER_SIZE);
if (!(info->s->options & HA_OPTION_PAGE_CHECKSUM))
bfill(buff + block_size - KEYPAGE_CHECKSUM_SIZE,
KEYPAGE_CHECKSUM_SIZE, (uchar) 255);
DBUG_VOID_RETURN;
}
......@@ -1555,10 +1553,6 @@ static my_bool write_full_pages(MARIA_HA *info,
(data_size - copy_length) + PAGE_SUFFIX_SIZE);
#endif
if (!(info->s->options & HA_OPTION_PAGE_CHECKSUM))
bfill(buff + block_size - KEYPAGE_CHECKSUM_SIZE,
KEYPAGE_CHECKSUM_SIZE, (uchar) 255);
DBUG_ASSERT(share->pagecache->block_size == block_size);
if (pagecache_write(share->pagecache,
&info->dfile, page, 0,
......
......@@ -2704,6 +2704,20 @@ int maria_sort_index(HA_CHECK *param, register MARIA_HA *info, char *name)
} /* maria_sort_index */
/**
@brief put CRC on the page
@param buff reference on the page buffer.
@param pos position of the page in the file.
@param length length of the page
*/
static void put_crc(char *buff, my_off_t pos, MARIA_SHARE *share)
{
maria_page_crc_set_index(buff, pos / share->block_size, (uchar*) share);
}
/* Sort records recursive using one index */
static int sort_one_index(HA_CHECK *param, MARIA_HA *info,
......@@ -2781,6 +2795,7 @@ static int sort_one_index(HA_CHECK *param, MARIA_HA *info,
/* Fill block with zero and write it to the new index file */
length= _ma_get_page_used(share, buff);
bzero((uchar*) buff+length,keyinfo->block_length-length);
put_crc(buff, new_page_pos, share);
if (my_pwrite(new_file,(uchar*) buff,(uint) keyinfo->block_length,
new_page_pos,MYF(MY_NABP | MY_WAIT_IF_FULL)))
{
......@@ -4869,9 +4884,13 @@ static int sort_insert_key(MARIA_SORT_PARAM *sort_param,
DFLT_INIT_HITS, anc_buff))
DBUG_RETURN(1);
}
else if (my_pwrite(share->kfile.file, anc_buff,
(uint) keyinfo->block_length,filepos, param->myf_rw))
DBUG_RETURN(1);
else
{
put_crc(anc_buff, filepos, share);
if (my_pwrite(share->kfile.file, anc_buff,
(uint) keyinfo->block_length, filepos, param->myf_rw))
DBUG_RETURN(1);
}
DBUG_DUMP("buff", anc_buff, _ma_get_page_used(share, anc_buff));
/* Write separator-key to block in next level */
......@@ -4989,9 +5008,13 @@ int _ma_flush_pending_blocks(MARIA_SORT_PARAM *sort_param)
DFLT_INIT_HITS, key_block->buff))
DBUG_RETURN(1);
}
else if (my_pwrite(info->s->kfile.file, key_block->buff,
(uint) keyinfo->block_length,filepos, myf_rw))
else
{
put_crc(key_block->buff, filepos, info->s);
if (my_pwrite(info->s->kfile.file, key_block->buff,
(uint) keyinfo->block_length,filepos, myf_rw))
DBUG_RETURN(1);
}
DBUG_DUMP("buff",key_block->buff,length);
nod_flag=1;
}
......@@ -5576,6 +5599,14 @@ my_bool create_new_data_handle(MARIA_SORT_PARAM *param, File new_file)
DBUG_RETURN(1);
new_info= sort_info->new_info;
pagecache_file_init(new_info->s->bitmap.file, &maria_page_crc_check_bitmap,
(new_info->s->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_bitmap), new_info->s);
pagecache_file_init(new_info->dfile, &maria_page_crc_check_data,
(new_info->s->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_normal), new_info->s);
change_data_file_descriptor(new_info, new_file);
maria_lock_database(new_info, F_EXTRA_LCK);
if ((sort_info->param->testflag & T_UNPACK) &&
......
......@@ -1042,6 +1042,10 @@ int maria_create(const char *name, enum data_file_type datafile_type,
DROP+CREATE happened (applying REDOs to the wrong table).
*/
share.kfile.file= file;
pagecache_file_init(share.kfile, &maria_page_crc_check_index,
(share.options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_index :
&maria_page_filler_set_normal), &share);
if (_ma_update_create_rename_lsn_sub(&share, lsn, FALSE))
goto err;
my_free(log_data, MYF(0));
......@@ -1234,6 +1238,10 @@ int _ma_initialize_data_file(MARIA_SHARE *share, File dfile)
{
share->bitmap.block_size= share->base.block_size;
share->bitmap.file.file = dfile;
pagecache_file_init(share->bitmap.file, &maria_page_crc_check_bitmap,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_bitmap), share);
return _ma_bitmap_create_first(share);
}
/*
......
......@@ -28,6 +28,21 @@
/* number of opened log files in the pagecache (should be at least 2) */
#define OPENED_FILES_NUM 3
#define CACHED_FILES_NUM 5
#define CACHED_FILES_NUM_DIRECT_SEARCH_LIMIT 7
#if CACHED_FILES_NUM > CACHED_FILES_NUM_DIRECT_SEARCH_LIMIT
#include <hash.h>
#include <m_ctype.h>
#endif
/* transaction log file descriptor */
typedef struct st_translog_file
{
uint32 number;
PAGECACHE_FILE handler;
my_bool was_recovered;
my_bool is_sync;
} TRANSLOG_FILE;
/* records buffer size (should be TRANSLOG_PAGE_SIZE * n) */
#define TRANSLOG_WRITE_BUFFER (1024*1024)
......@@ -81,7 +96,7 @@ struct st_translog_buffer
*/
translog_size_t size;
/* File handler for this buffer */
File file;
TRANSLOG_FILE *file;
/* Threads which are waiting for buffer filling/freeing */
WQUEUE waiting_filling_buffer;
/* Number of records which are in copy progress */
......@@ -170,8 +185,13 @@ struct st_translog_descriptor
char directory[FN_REFLEN];
/* *** Current state of the log handler *** */
/* Current and (OPENED_FILES_NUM-1) last logs number in the page cache */
File log_file_num[OPENED_FILES_NUM];
/* list of opened files */
DYNAMIC_ARRAY open_files;
/* min/max number of file in the array */
uint32 max_file, min_file;
/* the opened files list guard */
rw_lock_t open_files_lock;
/*
File descriptor of the directory where we store log files for syncing
it.
......@@ -258,9 +278,16 @@ static MARIA_SHARE **id_to_share= NULL;
/* lock for id_to_share */
static my_atomic_rwlock_t LOCK_id_to_share;
static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr);
static my_bool translog_dummy_callback(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
static my_bool translog_page_validator(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
static my_bool translog_get_next_chunk(TRANSLOG_SCANNER_DATA *scanner);
static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected);
/*
Initialize log_record_type_descriptors
......@@ -756,6 +783,53 @@ static File open_logfile_by_number_no_cache(uint32 file_no)
}
/**
@brief get file descriptor by given number using cache
@param file_no Number of the log we want to open
retval # file descriptor
*/
static TRANSLOG_FILE *get_logfile_by_number(uint32 file_no)
{
TRANSLOG_FILE *file;
DBUG_ENTER("get_logfile_by_number");
rw_rdlock(&log_descriptor.open_files_lock);
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
DBUG_ASSERT(log_descriptor.max_file >= file_no);
DBUG_ASSERT(log_descriptor.min_file <= file_no);
DBUG_ASSERT(log_descriptor.max_file - file_no <
log_descriptor.open_files.elements);
file= *dynamic_element(&log_descriptor.open_files,
log_descriptor.max_file - file_no, TRANSLOG_FILE **);
rw_unlock(&log_descriptor.open_files_lock);
DBUG_PRINT("info", ("File 0x%lx File no: %lu, File handler: %d",
(ulong)file, (ulong)file_no,
(file ? file->handler.file : -1)));
DBUG_ASSERT(!file || file->number == file_no);
DBUG_RETURN(file);
}
/**
@brief get current file descriptor
retval # file descriptor
*/
static TRANSLOG_FILE *get_current_logfile()
{
TRANSLOG_FILE *file;
rw_rdlock(&log_descriptor.open_files_lock);
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
file= *dynamic_element(&log_descriptor.open_files, 0, TRANSLOG_FILE **);
rw_unlock(&log_descriptor.open_files_lock);
return (file);
}
uchar NEAR maria_trans_file_magic[]=
{ (uchar) 254, (uchar) 254, (uchar) 11, '\001', 'M', 'A', 'R', 'I', 'A',
'L', 'O', 'G' };
......@@ -780,8 +854,10 @@ uchar NEAR maria_trans_file_magic[]=
static my_bool translog_write_file_header()
{
TRANSLOG_FILE *file;
ulonglong timestamp;
uchar page_buff[TRANSLOG_PAGE_SIZE], *page= page_buff;
my_bool rc;
DBUG_ENTER("translog_write_file_header");
/* file tag */
......@@ -810,8 +886,16 @@ static my_bool translog_write_file_header()
page+= LSN_STORE_SIZE;
memset(page, TRANSLOG_FILLER, sizeof(page_buff) - (page- page_buff));
DBUG_RETURN(my_pwrite(log_descriptor.log_file_num[0], page_buff,
sizeof(page_buff), 0, log_write_flags) != 0);
file= get_current_logfile();
rc= my_pwrite(file->handler.file, page_buff, sizeof(page_buff), 0,
log_write_flags) != 0;
/*
Dropping the flag in such way can make false alarm: signalling than the
file in not sync when it is sync, but the situation is quite rare and
protections with mutexes give much more overhead to the whole engine
*/
file->is_sync= 0;
DBUG_RETURN(rc);
}
/*
......@@ -898,6 +982,15 @@ my_bool translog_read_file_header(LOGHANDLER_FILE_INFO *desc, File file)
desc->file_number= uint3korr(ptr);
ptr+=3;
desc->max_lsn= lsn_korr(ptr);
DBUG_PRINT("info", ("timestamp: %llu maria ver: %lu mysql ver: %lu "
"server id %lu page size %u file number %lu "
"max lsn: (%lu,0x%lx)",
(ulonglong) desc->timestamp,
(ulong) desc->maria_version,
(ulong) desc->mysql_version,
(ulong) desc->server_id,
desc->page_size, (ulong) desc->file_number,
LSN_IN_PARTS(desc->max_lsn)));
DBUG_RETURN(0);
}
......@@ -1130,7 +1223,7 @@ LSN translog_get_file_max_lsn_stored(uint32 file)
DBUG_PRINT("error", ("Can't read file header"));
DBUG_RETURN(LSN_ERROR);
}
DBUG_PRINT("error", ("Max lsn: (%lu,0x%lx)",
DBUG_PRINT("info", ("Max lsn: (%lu,0x%lx)",
LSN_IN_PARTS(info.max_lsn)));
DBUG_RETURN(info.max_lsn);
}
......@@ -1153,7 +1246,7 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer)
DBUG_ENTER("translog_buffer_init");
buffer->last_lsn= LSN_IMPOSSIBLE;
/* This Buffer File */
buffer->file= -1;
buffer->file= NULL;
buffer->overlay= 0;
/* cache for current log */
memset(buffer->buffer, TRANSLOG_FILLER, TRANSLOG_WRITE_BUFFER);
......@@ -1173,33 +1266,51 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer)
/*
Close transaction log file by descriptor
@brief close transaction log file by descriptor
SYNOPSIS
translog_close_log_file()
file file descriptor
@param file pagegecache file descriptor reference
RETURN
0 OK
1 Error
@return Operation status
@retval 0 OK
@retval 1 Error
*/
static my_bool translog_close_log_file(File file)
static my_bool translog_close_log_file(TRANSLOG_FILE *file)
{
int rc;
PAGECACHE_FILE fl;
fl.file= file;
flush_pagecache_blocks(log_descriptor.pagecache, &fl, FLUSH_RELEASE);
int rc= 0;
flush_pagecache_blocks(log_descriptor.pagecache, &file->handler,
FLUSH_RELEASE);
/*
Sync file when we close it
TODO: sync only we have changed the log
*/
rc= my_sync(file, MYF(MY_WME));
rc|= my_close(file, MYF(MY_WME));
if (!file->is_sync)
rc= my_sync(file->handler.file, MYF(MY_WME));
rc|= my_close(file->handler.file, MYF(MY_WME));
my_free(file, MYF(0));
return test(rc);
}
/**
@brief Initializes TRANSLOG_FILE structure
@param file reference on the file to initialize
@param number file number
@param is_sync is file synced on disk
*/
static void translog_file_init(TRANSLOG_FILE *file, uint32 number,
my_bool is_sync)
{
pagecache_file_init(file->handler, &translog_page_validator,
&translog_dummy_callback, file);
file->number= number;
file->was_recovered= 0;
file->is_sync= is_sync;
}
/**
@brief Create and fill header of new file.
......@@ -1214,30 +1325,50 @@ static my_bool translog_close_log_file(File file)
static my_bool translog_create_new_file()
{
int i;
TRANSLOG_FILE *file= (TRANSLOG_FILE*)my_malloc(sizeof(TRANSLOG_FILE),
MYF(0));
TRANSLOG_FILE *old= get_current_logfile();
uint32 file_no= LSN_FILE_NO(log_descriptor.horizon);
DBUG_ENTER("translog_create_new_file");
/*
Writes max_lsn to the file header before finishing it (there is no need
to lock file header buffer because it is still unfinished file, so only
one thread can finish the file and nobody interested of LSN of current
(unfinished) file, because no one can purge it).
*/
translog_max_lsn_to_header(log_descriptor.log_file_num[0],
log_descriptor.max_lsn);
log_descriptor.max_lsn= LSN_IMPOSSIBLE;
if (file == NULL)
DBUG_RETURN(1);
if (log_descriptor.log_file_num[OPENED_FILES_NUM - 1] != -1 &&
translog_close_log_file(log_descriptor.log_file_num[OPENED_FILES_NUM -
1]))
if (translog_max_lsn_to_header(old->handler.file, log_descriptor.max_lsn))
DBUG_RETURN(1);
for (i= OPENED_FILES_NUM - 1; i > 0; i--)
log_descriptor.log_file_num[i]= log_descriptor.log_file_num[i - 1];
if ((log_descriptor.log_file_num[0]=
create_logfile_by_number_no_cache(file_no)) == -1 ||
translog_write_file_header())
rw_wrlock(&log_descriptor.open_files_lock);
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
DBUG_ASSERT(file_no == log_descriptor.max_file + 1);
if (allocate_dynamic(&log_descriptor.open_files,
log_descriptor.max_file - log_descriptor.min_file + 2))
goto error;
if ((file->handler.file=
create_logfile_by_number_no_cache(file_no)) == -1)
goto error;
translog_file_init(file, file_no, 0);
/* this call just expand the array */
insert_dynamic(&log_descriptor.open_files, (uchar*)&file);
log_descriptor.max_file++;
{
char *start= (char*) dynamic_element(&log_descriptor.open_files, 0,
TRANSLOG_FILE**);
memmove(start + sizeof(TRANSLOG_FILE*), start,
sizeof(TRANSLOG_FILE*) *
(log_descriptor.max_file - log_descriptor.min_file + 1 - 1));
}
/* can't fail we because we expanded array */
set_dynamic(&log_descriptor.open_files, (uchar*)&file, 0);
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
rw_unlock(&log_descriptor.open_files_lock);
DBUG_PRINT("info", ("file_no: %lu", (ulong)file_no));
if (translog_write_file_header())
DBUG_RETURN(1);
if (ma_control_file_write_and_force(LSN_IMPOSSIBLE, file_no,
......@@ -1245,6 +1376,10 @@ static my_bool translog_create_new_file()
DBUG_RETURN(1);
DBUG_RETURN(0);
error:
rw_unlock(&log_descriptor.open_files_lock);
DBUG_RETURN(1);
}
......@@ -1353,7 +1488,7 @@ static void translog_new_page_header(TRANSLOG_ADDRESS *horizon,
int4store(ptr, 0x11223344);
#endif
/* CRC will be put when page is finished */
ptr+= CRC_LENGTH;
ptr+= CRC_SIZE;
}
if (log_descriptor.flags & TRANSLOG_SECTOR_PROTECTION)
{
......@@ -1567,7 +1702,7 @@ static void translog_wait_for_writers(struct st_translog_buffer *buffer)
"mutex: 0x%lx",
(uint) buffer->buffer_no, (ulong) buffer,
(ulong) &buffer->mutex));
DBUG_ASSERT(buffer->file != -1);
DBUG_ASSERT(buffer->file != NULL);
wqueue_add_and_wait(&buffer->waiting_filling_buffer, thread,
&buffer->mutex);
DBUG_PRINT("info", ("wait for writers done "
......@@ -1598,14 +1733,15 @@ static void translog_wait_for_buffer_free(struct st_translog_buffer *buffer)
struct st_my_thread_var *thread= my_thread_var;
DBUG_ENTER("translog_wait_for_buffer_free");
DBUG_PRINT("enter", ("Buffer: #%u 0x%lx copies in progress: %u "
"File: %d size: 0x%lu",
"File: %d size: %lu",
(uint) buffer->buffer_no, (ulong) buffer,
(int) buffer->copy_to_buffer_in_progress,
buffer->file, (ulong) buffer->size));
(buffer->file ? buffer->file->handler.file : -1),
(ulong) buffer->size));
translog_wait_for_writers(buffer);
while (buffer->file != -1)
while (buffer->file != NULL)
{
DBUG_PRINT("info", ("wait for writers... "
"buffer: #%u 0x%lx "
......@@ -1666,20 +1802,22 @@ static void translog_start_buffer(struct st_translog_buffer *buffer,
{
DBUG_ENTER("translog_start_buffer");
DBUG_PRINT("enter",
("Assign buffer: #%u (0x%lx) to file: %d offset: 0x%lx(%lu)",
("Assign buffer: #%u (0x%lx) offset: 0x%lx(%lu)",
(uint) buffer->buffer_no, (ulong) buffer,
log_descriptor.log_file_num[0],
(ulong) LSN_OFFSET(log_descriptor.horizon),
(ulong) LSN_OFFSET(log_descriptor.horizon)));
DBUG_ASSERT(buffer_no == buffer->buffer_no);
buffer->last_lsn= LSN_IMPOSSIBLE;
buffer->offset= log_descriptor.horizon;
buffer->next_buffer_offset= LSN_IMPOSSIBLE;
buffer->file= log_descriptor.log_file_num[0];
buffer->file= get_current_logfile();
buffer->overlay= 0;
buffer->size= 0;
translog_cursor_init(cursor, buffer, buffer_no);
DBUG_PRINT("info", ("init cursor #%u: 0x%lx chaser: %d Size: %lu (%lu)",
DBUG_PRINT("info", ("file: #%ld (%d) init cursor #%u: 0x%lx "
"chaser: %d Size: %lu (%lu)",
(long) (buffer->file ? buffer->file->number : 0),
(buffer->file ? buffer->file->handler.file : -1),
(uint) cursor->buffer->buffer_no, (ulong) cursor->buffer,
cursor->chaser, (ulong) cursor->buffer->size,
(ulong) (cursor->ptr - cursor->buffer->buffer)));
......@@ -1726,7 +1864,7 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
translog_wait_for_buffer_free(new_buffer);
}
else
DBUG_ASSERT(new_buffer->file != 0);
DBUG_ASSERT(new_buffer->file != NULL);
if (new_file)
{
......@@ -2061,17 +2199,17 @@ static uint16 translog_get_total_chunk_length(uchar *page, uint16 offset)
static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
{
uint32 i, pg;
PAGECACHE_FILE file;
TRANSLOG_FILE *file;
DBUG_ENTER("translog_buffer_flush");
DBUG_ASSERT(buffer->file != NULL);
DBUG_PRINT("enter",
("Buffer: #%u 0x%lx file: %d offset: (%lu,0x%lx) size: %lu",
(uint) buffer->buffer_no, (ulong) buffer,
buffer->file,
buffer->file->handler.file,
LSN_IN_PARTS(buffer->offset),
(ulong) buffer->size));
translog_buffer_lock_assert_owner(buffer);
DBUG_ASSERT(buffer->file != -1);
translog_wait_for_writers(buffer);
......@@ -2085,11 +2223,11 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
*/
struct st_translog_buffer *overlay= buffer->overlay;
TRANSLOG_ADDRESS buffer_offset= buffer->offset;
File file= buffer->file;
TRANSLOG_FILE *fl= buffer->file;
translog_buffer_unlock(buffer);
translog_buffer_lock(overlay);
/* rechecks under mutex protection that overlay is still our overlay */
if (buffer->overlay->file == file &&
if (buffer->overlay->file == fl &&
cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size,
buffer_offset) > 0)
{
......@@ -2097,7 +2235,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
}
translog_buffer_unlock(overlay);
translog_buffer_lock(buffer);
if (buffer->file != -1 && buffer_offset == buffer->offset)
if (buffer->file != NULL && buffer_offset == buffer->offset)
{
/*
This means that somebody else flushed the buffer while we was
......@@ -2110,7 +2248,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
}
}
file.file= buffer->file;
file= buffer->file;
for (i= 0, pg= LSN_OFFSET(buffer->offset) / TRANSLOG_PAGE_SIZE;
i < buffer->size;
i+= TRANSLOG_PAGE_SIZE, pg++)
......@@ -2121,30 +2259,36 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
DBUG_ASSERT(log_descriptor.pagecache->block_size == TRANSLOG_PAGE_SIZE);
DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size);
if (pagecache_inject(log_descriptor.pagecache,
&file, pg, 3,
&file->handler, pg, 3,
buffer->buffer + i,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED,
PAGECACHE_PIN_LEFT_UNPINNED, 0,
LSN_IMPOSSIBLE,
&translog_page_validator, (uchar*) &data))
LSN_IMPOSSIBLE))
{
UNRECOVERABLE_ERROR(("Can't write page (%lu,0x%lx) to pagecache",
(ulong) buffer->file,
(ulong) (LSN_OFFSET(buffer->offset)+ i)));
}
}
if (my_pwrite(buffer->file, (char*) buffer->buffer,
file->is_sync= 0;
if (my_pwrite(file->handler.file, (char*) buffer->buffer,
buffer->size, LSN_OFFSET(buffer->offset),
log_write_flags))
{
UNRECOVERABLE_ERROR(("Can't write buffer (%lu,0x%lx) size %lu "
"to the disk (%d)",
(ulong) buffer->file,
(ulong) file->handler.file,
(ulong) LSN_OFFSET(buffer->offset),
(ulong) buffer->size, errno));
DBUG_RETURN(1);
}
/*
Dropping the flag in such way can make false alarm: signalling than the
file in not sync when it is sync, but the situation is quite rare and
protections with mutexes give much more overhead to the whole engine
*/
file->is_sync= 0;
if (LSN_OFFSET(buffer->last_lsn) != 0) /* if buffer->last_lsn is set */
translog_set_sent_to_disk(buffer->last_lsn,
......@@ -2152,7 +2296,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
else
translog_set_only_in_buffers(buffer->next_buffer_offset);
/* Free buffer */
buffer->file= -1;
buffer->file= NULL;
buffer->overlay= 0;
if (buffer->waiting_filling_buffer.last_thread)
{
......@@ -2230,39 +2374,59 @@ static my_bool translog_recover_page_up_to_sector(uchar *page, uint16 offset)
}
/*
Log page validator
/**
@brief Dummy write callback.
*/
SYNOPSIS
translog_page_validator()
page_addr The page to check
data data, need for validation (address in this case)
static my_bool
translog_dummy_callback(__attribute__((unused)) uchar *page,
__attribute__((unused)) pgcache_page_no_t page_no,
__attribute__((unused)) uchar* data_ptr)
{
return 0;
}
RETURN
0 OK
1 Error
/**
@brief Log page validator (read callback)
@param page The page data to check
@param page_no The page number (<offset>/<page length>)
@param data_ptr Read callback data pointer (pointer to TRANSLOG_FILE)
@todo: add turning loghandler to read-only mode after merging with
that patch.
@retval 0 OK
@retval 1 Error
*/
static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr)
static my_bool translog_page_validator(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr)
{
uint this_page_page_overhead;
uint flags;
uchar *page= (uchar*) page_addr, *page_pos;
TRANSLOG_VALIDATOR_DATA *data= (TRANSLOG_VALIDATOR_DATA *) data_ptr;
TRANSLOG_ADDRESS addr= *(data->addr);
uchar *page_pos;
TRANSLOG_FILE *data= (TRANSLOG_FILE *) data_ptr;
#ifndef DBUG_OFF
uint32 offset= page_no * TRANSLOG_PAGE_SIZE;
#endif
DBUG_ENTER("translog_page_validator");
data->was_recovered= 0;
if (uint3korr(page) != LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE ||
uint3korr(page + 3) != LSN_FILE_NO(addr))
if (uint3korr(page) != page_no ||
uint3korr(page + 3) != data->number)
{
UNRECOVERABLE_ERROR(("Page (%lu,0x%lx): "
"page address written in the page is incorrect: "
"File %lu instead of %lu or page %lu instead of %lu",
LSN_IN_PARTS(addr),
(ulong) uint3korr(page + 3), (ulong) LSN_FILE_NO(addr),
(ulong) data->number, (ulong) offset,
(ulong) uint3korr(page + 3), (ulong) data->number,
(ulong) uint3korr(page),
(ulong) LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE));
(ulong) page_no));
DBUG_RETURN(1);
}
flags= (uint)(page[TRANSLOG_PAGE_FLAGS]);
......@@ -2272,7 +2436,8 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr)
{
UNRECOVERABLE_ERROR(("Page (%lu,0x%lx): "
"Garbage in the page flags field detected : %x",
LSN_IN_PARTS(addr), (uint) flags));
(ulong) data->number, (ulong) offset,
(uint) flags));
DBUG_RETURN(1);
}
page_pos= page + (3 + 3 + 1);
......@@ -2285,11 +2450,11 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr)
{
UNRECOVERABLE_ERROR(("Page (%lu,0x%lx): "
"CRC mismatch: calculated: %lx on the page %lx",
LSN_IN_PARTS(addr),
(ulong) data->number, (ulong) offset,
(ulong) crc, (ulong) uint4korr(page_pos)));
DBUG_RETURN(1);
}
page_pos+= CRC_LENGTH; /* Skip crc */
page_pos+= CRC_SIZE; /* Skip crc */
}
if (flags & TRANSLOG_SECTOR_PROTECTION)
{
......@@ -2410,8 +2575,8 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
PAGECACHE_BLOCK_LINK **direct_link)
{
TRANSLOG_ADDRESS addr= *(data->addr), in_buffers;
uint cache_index;
uint32 file_no= LSN_FILE_NO(addr);
TRANSLOG_FILE *file;
DBUG_ENTER("translog_get_page");
DBUG_PRINT("enter", ("File: %lu Offset: %lu(0x%lx)",
(ulong) file_no,
......@@ -2447,7 +2612,7 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
if the page is in the buffer and it is the last version of the
page (in case of division the page by buffer flush)
*/
if (curr_buffer->file != -1 &&
if (curr_buffer->file != NULL &&
cmp_translog_addr(addr, curr_buffer->offset) >= 0 &&
cmp_translog_addr(addr,
(curr_buffer->next_buffer_offset ?
......@@ -2457,10 +2622,21 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
int is_last_unfinished_page;
uint last_protected_sector= 0;
uchar *from, *table= NULL;
TRANSLOG_FILE file_copy;
translog_wait_for_writers(curr_buffer);
DBUG_ASSERT(LSN_FILE_NO(addr) == LSN_FILE_NO(curr_buffer->offset));
from= curr_buffer->buffer + (addr - curr_buffer->offset);
memcpy(buffer, from, TRANSLOG_PAGE_SIZE);
/*
We can use copy then in translog_page_validator() because it
do not put it permanently somewhere.
We have to use copy because after releasing log lock we can't
guaranty that the file still be present (in real life it will be
present but theoretically possible that it will be released
already from last files cache);
*/
file_copy= *(curr_buffer->file);
file_copy.handler.callback_data= (uchar*) &file_copy;
is_last_unfinished_page= ((log_descriptor.bc.buffer ==
curr_buffer) &&
(log_descriptor.bc.ptr >= from) &&
......@@ -2510,10 +2686,12 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
This IF should be true because we use in-memory data which
supposed to be correct.
*/
if (translog_page_validator((uchar*) buffer, (uchar*) data))
if (translog_page_validator((uchar*) buffer,
LSN_OFFSET(addr),
(uchar*) &file_copy))
{
DBUG_ASSERT(0);
buffer= NULL;
DBUG_ASSERT(FALSE);
}
}
DBUG_RETURN(buffer);
......@@ -2529,57 +2707,23 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
}
translog_unlock();
}
if ((cache_index= LSN_FILE_NO(log_descriptor.horizon) - file_no) <
OPENED_FILES_NUM)
{
PAGECACHE_FILE file;
/* file in the cache */
if (log_descriptor.log_file_num[cache_index] == -1)
{
if ((log_descriptor.log_file_num[cache_index]=
open_logfile_by_number_no_cache(file_no)) == -1)
DBUG_RETURN(NULL);
}
file.file= log_descriptor.log_file_num[cache_index];
buffer=
(uchar*) (direct_link ?
pagecache_valid_read(log_descriptor.pagecache, &file,
LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
3, NULL,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_READ, direct_link,
&translog_page_validator, (uchar*) data) :
pagecache_valid_read(log_descriptor.pagecache, &file,
LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
3, (char*) buffer,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, direct_link,
&translog_page_validator, (uchar*) data));
DBUG_PRINT("info", ("Direct link is assigned to : 0x%lx * 0x%lx",
(ulong) direct_link,
(ulong)(direct_link ? *direct_link : NULL)));
}
else
{
/*
TODO: WE KEEP THE LAST OPENED_FILES_NUM FILES IN THE LOG CACHE, NOT
THE LAST USED FILES. THIS WILL BE A NOTABLE PROBLEM IF WE ARE
FOLLOWING AN UNDO CHAIN THAT GOES OVER MANY OLD LOG FILES. WE WILL
PROBABLY NEED SPECIAL HANDLING OF THIS OR HAVE A FILO FOR THE LOG
FILES.
*/
File file= open_logfile_by_number_no_cache(file_no);
if (file == -1)
DBUG_RETURN(NULL);
if (my_pread(file, (char*) buffer, TRANSLOG_PAGE_SIZE,
LSN_OFFSET(addr), MYF(MY_FNABP | MY_WME)))
buffer= NULL;
else if (translog_page_validator((uchar*) buffer, (uchar*) data))
buffer= NULL;
my_close(file, MYF(MY_WME));
}
file= get_logfile_by_number(file_no);
buffer=
(uchar*) (direct_link ?
pagecache_read(log_descriptor.pagecache, &file->handler,
LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
3, NULL,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_READ, direct_link) :
pagecache_read(log_descriptor.pagecache, &file->handler,
LSN_OFFSET(addr) / TRANSLOG_PAGE_SIZE,
3, (char*) buffer,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, direct_link));
DBUG_PRINT("info", ("Direct link is assigned to : 0x%lx * 0x%lx",
(ulong) direct_link,
(ulong)(direct_link ? *direct_link : NULL)));
data->was_recovered= file->was_recovered;
DBUG_RETURN(buffer);
}
......@@ -2794,6 +2938,47 @@ static my_bool translog_truncate_log(TRANSLOG_ADDRESS addr)
DBUG_RETURN(0);
}
/**
@brief Check log files presence
@retval 0 no log files.
@retval 1 there is at least 1 log file in the directory
*/
my_bool translog_is_log_files()
{
MY_DIR *dirp;
uint i;
my_bool rc= FALSE;
/* Finds and removes transaction log files */
if (!(dirp = my_dir(log_descriptor.directory, MYF(MY_DONT_SORT))))
return 1;
for (i= 0; i < dirp->number_off_files; i++)
{
char *file= dirp->dir_entry[i].name;
if (strncmp(file, "maria_log.", 10) == 0 &&
file[10] >= '0' && file[10] <= '9' &&
file[11] >= '0' && file[11] <= '9' &&
file[12] >= '0' && file[12] <= '9' &&
file[13] >= '0' && file[13] <= '9' &&
file[14] >= '0' && file[14] <= '9' &&
file[15] >= '0' && file[15] <= '9' &&
file[16] >= '0' && file[16] <= '9' &&
file[17] >= '0' && file[17] <= '9' &&
file[18] == '\0')
{
rc= TRUE;
break;
}
}
my_dirend(dirp);
return FALSE;
}
/*
Initialize transaction log
......@@ -2823,6 +3008,7 @@ my_bool translog_init(const char *directory,
int i;
int old_log_was_recovered= 0, logs_found= 0;
uint old_flags= flags;
uint32 start_file_num= 1;
TRANSLOG_ADDRESS sure_page, last_page, last_valid_page;
my_bool version_changed= 0;
DBUG_ENTER("translog_init");
......@@ -2843,6 +3029,10 @@ my_bool translog_init(const char *directory,
MY_MUTEX_INIT_FAST) ||
pthread_mutex_init(&log_descriptor.log_flush_lock,
MY_MUTEX_INIT_FAST) ||
my_rwlock_init(&log_descriptor.open_files_lock,
NULL) ||
my_init_dynamic_array(&log_descriptor.open_files,
sizeof(TRANSLOG_FILE*), 10, 10) ||
my_init_dynamic_array(&log_descriptor.unfinished_files,
sizeof(struct st_file_counter),
10, 10))
......@@ -2883,7 +3073,7 @@ my_bool translog_init(const char *directory,
{
page_overhead[i]= 7;
if (i & TRANSLOG_PAGE_CRC)
page_overhead[i]+= CRC_LENGTH;
page_overhead[i]+= CRC_SIZE;
if (i & TRANSLOG_SECTOR_PROTECTION)
page_overhead[i]+= (TRANSLOG_PAGE_SIZE /
DISK_DRIVE_SECTOR_SIZE) * 2;
......@@ -2904,14 +3094,21 @@ my_bool translog_init(const char *directory,
log_descriptor.buffer_capacity_chunk_2,
log_descriptor.half_buffer_capacity_chunk_2));
/* *** Current state of the log handler *** */
/*
last_logno and last_checkpoint_lsn were set in
ma_control_file_create_or_open()
*/
logs_found= (last_logno != FILENO_IMPOSSIBLE);
/* Init log handler file handlers cache */
for (i= 0; i < OPENED_FILES_NUM; i++)
log_descriptor.log_file_num[i]= -1;
/* just to init it somehow */
translog_start_buffer(log_descriptor.buffers, &log_descriptor.bc, 0);
/* Just to init it somehow (hack for bootstrap)*/
{
TRANSLOG_FILE *file= 0;
log_descriptor.min_file = log_descriptor.max_file= 1;
insert_dynamic(&log_descriptor.open_files, (uchar *)&file);
translog_start_buffer(log_descriptor.buffers, &log_descriptor.bc, 0);
pop_dynamic(&log_descriptor.open_files);
}
/* Buffers for log writing */
for (i= 0; i < TRANSLOG_BUFFERS_NO; i++)
......@@ -2925,12 +3122,6 @@ my_bool translog_init(const char *directory,
i, (ulong) log_descriptor.buffers + i));
}
/*
last_logno and last_checkpoint_lsn were set in
ma_control_file_create_or_open()
*/
logs_found= (last_logno != FILENO_IMPOSSIBLE);
if (logs_found)
{
my_bool pageok;
......@@ -2955,8 +3146,17 @@ my_bool translog_init(const char *directory,
}
log_descriptor.horizon= last_page= MAKE_LSN(last_logno, 0);
if (translog_get_last_page_addr(&last_page, &pageok))
DBUG_RETURN(1);
if (LSN_OFFSET(last_page) == 0)
{
if (!translog_is_log_files())
{
/* files was deleted, just start from the next log number */
start_file_num= last_logno + 1;
logs_found= 0;
}
else
DBUG_RETURN(1);
}
else if (LSN_OFFSET(last_page) == 0)
{
if (LSN_FILE_NO(last_page) == 1)
{
......@@ -2969,6 +3169,52 @@ my_bool translog_init(const char *directory,
DBUG_RETURN(1);
}
}
if (logs_found)
{
uint32 i;
log_descriptor.min_file= translog_first_file(log_descriptor.horizon, 1);
log_descriptor.max_file= last_logno;
/* Open all files */
if (allocate_dynamic(&log_descriptor.open_files,
log_descriptor.max_file -
log_descriptor.min_file + 1))
DBUG_RETURN(1);
for (i = log_descriptor.max_file; i >= log_descriptor.min_file; i--)
{
/*
We can't allocate all file together because they will be freed
one by one
*/
TRANSLOG_FILE *file= (TRANSLOG_FILE *)my_malloc(sizeof(TRANSLOG_FILE),
MYF(0));
if (file == NULL ||
(file->handler.file=
open_logfile_by_number_no_cache(i)) < 0)
{
int j;
for (j= i - log_descriptor.min_file - 1; j > 0; j--)
{
TRANSLOG_FILE *el=
*dynamic_element(&log_descriptor.open_files, j,
TRANSLOG_FILE **);
my_close(el->handler.file, MYF(MY_WME));
my_free(el, MYF(0));
}
if (file)
{
free(file);
DBUG_RETURN(1);
}
else
DBUG_RETURN(1);
}
translog_file_init(file, i, 1);
/* we allocated space so it can't fail */
insert_dynamic(&log_descriptor.open_files, (uchar *)&file);
}
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
}
}
if (logs_found)
{
......@@ -3097,7 +3343,15 @@ my_bool translog_init(const char *directory,
if (!old_log_was_recovered && old_flags == flags)
{
LOGHANDLER_FILE_INFO info;
if (translog_read_file_header(&info, log_descriptor.log_file_num[0]))
/*
Accessing &log_descriptor.open_files without mutex is safe
because it is initialization
*/
if (translog_read_file_header(&info,
(*dynamic_element(&log_descriptor.
open_files,
0, TRANSLOG_FILE **))->
handler.file))
DBUG_RETURN(1);
version_changed= (info.maria_version != TRANSLOG_VERSION_ID);
}
......@@ -3106,14 +3360,25 @@ my_bool translog_init(const char *directory,
logs_found, old_log_was_recovered));
if (!logs_found)
{
TRANSLOG_FILE *file= (TRANSLOG_FILE*)my_malloc(sizeof(TRANSLOG_FILE),
MYF(0));
if (file == NULL)
DBUG_RETURN(1);
/* Start new log system from scratch */
/* Used space */
log_descriptor.horizon= MAKE_LSN(1, TRANSLOG_PAGE_SIZE); /* header page */
/* Current logs file number in page cache */
if ((log_descriptor.log_file_num[0]=
create_logfile_by_number_no_cache(1)) == -1 ||
translog_write_file_header())
log_descriptor.horizon= MAKE_LSN(start_file_num,
TRANSLOG_PAGE_SIZE); /* header page */
if ((file->handler.file=
create_logfile_by_number_no_cache(start_file_num)) == -1)
DBUG_RETURN(1);
translog_file_init(file, start_file_num, 0);
if (insert_dynamic(&log_descriptor.open_files, (uchar*)&file))
DBUG_RETURN(1);
log_descriptor.min_file= log_descriptor.max_file= start_file_num;
if (translog_write_file_header())
DBUG_RETURN(1);
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
if (ma_control_file_write_and_force(LSN_IMPOSSIBLE, 1,
CONTROL_FILE_UPDATE_ONLY_LOGNO))
DBUG_RETURN(1);
......@@ -3313,11 +3578,11 @@ static void translog_buffer_destroy(struct st_translog_buffer *buffer)
DBUG_PRINT("enter",
("Buffer #%u: 0x%lx file: %d offset: (%lu,0x%lx) size: %lu",
(uint) buffer->buffer_no, (ulong) buffer,
buffer->file,
(buffer->file ? buffer->file->handler.file : -1),
LSN_IN_PARTS(buffer->offset),
(ulong) buffer->size));
DBUG_ASSERT(buffer->waiting_filling_buffer.last_thread == 0);
if (buffer->file != -1)
if (buffer->file != NULL)
{
/*
We ignore errors here, because we can't do something about it
......@@ -3342,13 +3607,14 @@ static void translog_buffer_destroy(struct st_translog_buffer *buffer)
void translog_destroy()
{
TRANSLOG_FILE **file;
uint i;
DBUG_ENTER("translog_destroy");
DBUG_ASSERT(translog_inited);
translog_lock();
translog_inited= 0;
if (log_descriptor.bc.buffer->file != -1)
if (log_descriptor.bc.buffer->file != NULL)
translog_finish_page(&log_descriptor.horizon, &log_descriptor.bc);
translog_unlock();
......@@ -3359,16 +3625,15 @@ void translog_destroy()
}
/* close files */
for (i= 0; i < OPENED_FILES_NUM; i++)
{
if (log_descriptor.log_file_num[i] != -1)
translog_close_log_file(log_descriptor.log_file_num[i]);
}
while ((file= (TRANSLOG_FILE **)pop_dynamic(&log_descriptor.open_files)))
translog_close_log_file(*file);
pthread_mutex_destroy(&log_descriptor.sent_to_disk_lock);
pthread_mutex_destroy(&log_descriptor.file_header_lock);
pthread_mutex_destroy(&log_descriptor.unfinished_files_lock);
pthread_mutex_destroy(&log_descriptor.purger_lock);
pthread_mutex_destroy(&log_descriptor.log_flush_lock);
rwlock_destroy(&log_descriptor.open_files_lock);
delete_dynamic(&log_descriptor.open_files);
delete_dynamic(&log_descriptor.unfinished_files);
my_close(log_descriptor.directory_fd, MYF(MY_WME));
......@@ -4897,15 +5162,20 @@ translog_write_variable_record_mgroup(LSN *lsn,
if (translog_set_lsn_for_files(file_of_the_first_group, LSN_FILE_NO(*lsn),
*lsn, FALSE))
goto err;
translog_mark_file_finished(file_of_the_first_group);
translog_mark_file_finished(file_of_the_first_group);
delete_dynamic(&groups);
DBUG_RETURN(rc);
err_unlock:
translog_unlock();
err:
translog_mark_file_finished(file_of_the_first_group);
delete_dynamic(&groups);
DBUG_RETURN(1);
}
......@@ -5534,7 +5804,7 @@ my_bool translog_scanner_init(LSN lsn,
LSN_IN_PARTS(scanner->horizon)));
/* lsn < horizon */
DBUG_ASSERT(lsn < scanner->horizon);
DBUG_ASSERT(lsn <= scanner->horizon);
scanner->page_addr= lsn;
scanner->page_addr-= scanner->page_offset; /*decrease offset */
......@@ -6563,7 +6833,7 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
TRANSLOG_ADDRESS flush_horizon;
int rc= 0;
/* We can't have more different files then buffers */
File file_handlers[TRANSLOG_BUFFERS_NO];
TRANSLOG_FILE *file_handlers[TRANSLOG_BUFFERS_NO];
int current_file_handler= -1;
uint32 prev_file= 0;
my_bool full_circle= 0;
......@@ -6601,7 +6871,7 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
translog_buffer_lock(buffer);
translog_buffer_unlock(buffer_unlock);
buffer_unlock= buffer;
if (buffer->file != -1)
if (buffer->file != NULL)
{
buffer_unlock= NULL;
if (buffer_start == buffer_no)
......@@ -6619,27 +6889,14 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
if (prev_file != LSN_FILE_NO(buffer->offset))
{
uint cache_index;
TRANSLOG_FILE *file;
uint32 fn= LSN_FILE_NO(buffer->offset);
prev_file= fn;
if ((cache_index= LSN_FILE_NO(log_descriptor.horizon) - fn) <
OPENED_FILES_NUM)
file= get_logfile_by_number(fn);
if (!file->is_sync)
{
/* file in the cache */
if (log_descriptor.log_file_num[cache_index] == -1)
{
if ((log_descriptor.log_file_num[cache_index]=
open_logfile_by_number_no_cache(fn)) == -1)
{
/* We don't need translog_unlock() here */
translog_buffer_unlock(buffer);
rc= 1;
goto out;
}
}
current_file_handler++;
file_handlers[current_file_handler]=
log_descriptor.log_file_num[cache_index];
file_handlers[current_file_handler]= file;
}
/* We sync file when we are closing it => do nothing if file closed */
}
......@@ -6654,10 +6911,13 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
translog_unlock();
{
File *handler= file_handlers;
File *end= file_handlers + current_file_handler;
for (; handler <= end; handler++)
rc|= my_sync(*handler, MYF(MY_WME));
TRANSLOG_FILE **cur= file_handlers;
TRANSLOG_FILE **end= file_handlers + current_file_handler;
for (; cur <= end; cur++)
{
(*cur)->is_sync= 1;
rc|= my_sync((*cur)->handler.file, MYF(MY_WME));
}
}
log_descriptor.flushed= sent_to_disk;
/*
......@@ -6841,12 +7101,6 @@ static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected)
max_file= LSN_FILE_NO(horizon);
if (MAKE_LSN(1, TRANSLOG_PAGE_SIZE) >= horizon)
{
/* there is no first page yet */
DBUG_RETURN(0);
}
/* binary search for last file */
while (min_file != max_file && min_file != (max_file - 1))
{
......@@ -6863,6 +7117,8 @@ static uint32 translog_first_file(TRANSLOG_ADDRESS horizon, int is_protected)
log_descriptor.min_file_number= max_file;
if (!is_protected)
pthread_mutex_unlock(&log_descriptor.purger_lock);
DBUG_PRINT("info", ("first file :%lu", (ulong) max_file));
DBUG_ASSERT(max_file >= 1);
DBUG_RETURN(max_file);
}
......@@ -7070,7 +7326,30 @@ my_bool translog_purge(TRANSLOG_ADDRESS low)
}
if (cmp_translog_addr(lsn, low) >= 0)
break;
DBUG_PRINT("info", ("purge file %lu", (ulong) i));
/* remove file descriptor from the cache */
/*
log_descriptor.min_file can be changed only here during execution
and the function is serialized, so we can access it without problems
*/
if (i >= log_descriptor.min_file)
{
TRANSLOG_FILE *file;
rw_wrlock(&log_descriptor.open_files_lock);
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
DBUG_ASSERT(log_descriptor.min_file == i);
file= *((TRANSLOG_FILE **)pop_dynamic(&log_descriptor.open_files));
DBUG_PRINT("info", ("Files : %d", log_descriptor.open_files.elements));
DBUG_ASSERT(i == file->number);
log_descriptor.min_file++;
DBUG_ASSERT(log_descriptor.max_file - log_descriptor.min_file + 1 ==
log_descriptor.open_files.elements);
rw_unlock(&log_descriptor.open_files_lock);
translog_close_log_file(file);
}
if (log_purge_type == TRANSLOG_PURGE_IMMIDIATE)
{
char path[FN_REFLEN], *file_name;
......
......@@ -54,7 +54,6 @@ struct st_maria_handler;
/* Length of CRC at end of pages */
#define ROW_EXTENT_PAGE_SIZE 5
#define ROW_EXTENT_COUNT_SIZE 2
#define CRC_LENGTH 4
/* Size of file id in logs */
#define FILEID_STORE_SIZE 2
/* Size of page reference in log */
......
......@@ -151,6 +151,10 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode,
info.errkey= -1;
info.page_changed=1;
info.keyread_buff= info.buff + share->base.max_key_block_length;
pagecache_file_init(info.dfile, &maria_page_crc_check_data,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_normal), share);
bitmap_init(&info.changed_fields, changed_fields_bitmap,
share->base.fields, 0);
if ((*share->init)(&info))
......@@ -714,6 +718,10 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
errpos= 5;
share->kfile.file= kfile;
pagecache_file_init(share->kfile, &maria_page_crc_check_index,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_index :
&maria_page_filler_set_normal), share);
share->this_process=(ulong) getpid();
share->last_process= share->state.process;
share->base.key_parts=key_parts;
......@@ -1535,6 +1543,14 @@ int _ma_open_datafile(MARIA_HA *info, MARIA_SHARE *share,
info->dfile.file= share->bitmap.file.file=
my_open(share->data_file_name, share->mode | O_SHARE,
MYF(MY_WME));
pagecache_file_init(share->bitmap.file, &maria_page_crc_check_bitmap,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_bitmap), share);
pagecache_file_init(info->dfile, &maria_page_crc_check_data,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_normal), share);
return info->dfile.file >= 0 ? 0 : 1;
}
......@@ -1549,6 +1565,10 @@ int _ma_open_keyfile(MARIA_SHARE *share)
share->kfile.file= my_open(share->unique_file_name,
share->mode | O_SHARE,
MYF(MY_WME));
pagecache_file_init(share->kfile, &maria_page_crc_check_index,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_index :
&maria_page_filler_set_normal), share);
pthread_mutex_unlock(&share->intern_lock);
return (share->kfile.file < 0);
}
......
......@@ -136,9 +136,6 @@ int _ma_write_keypage(register MARIA_HA *info, register MARIA_KEYDEF *keyinfo,
}
#endif
DBUG_ASSERT(share->pagecache->block_size == block_size);
if (!(share->options & HA_OPTION_PAGE_CHECKSUM))
bfill(buff + block_size - KEYPAGE_CHECKSUM_SIZE,
KEYPAGE_CHECKSUM_SIZE, (uchar) 255);
res= pagecache_write(share->pagecache,
&share->kfile, page / block_size,
......@@ -247,7 +244,7 @@ int _ma_dispose(register MARIA_HA *info, my_off_t pos, my_bool page_not_read)
lock_method, pin_method,
PAGECACHE_WRITE_DELAY, &page_link.link,
LSN_IMPOSSIBLE,
0, share->keypage_header+8, 0, 0))
0, share->keypage_header + 8))
result= 1;
#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
......
......@@ -15,7 +15,7 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
These functions handle page cacheing for Maria tables.
These functions handle page caching for Maria tables.
One cache can handle many files.
It must contain buffers of the same blocksize.
......@@ -608,8 +608,18 @@ static uint pagecache_fwrite(PAGECACHE *pagecache,
/* TODO: integrate with page format */
lsn= lsn_korr(buffer + PAGE_LSN_OFFSET);
DBUG_ASSERT(LSN_VALID(lsn));
translog_flush(lsn);
if (translog_flush(lsn))
DBUG_RETURN(1);
}
DBUG_PRINT("info", ("write_callback: 0x%lx data: 0x%lx",
(ulong) filedesc->write_callback,
(ulong) filedesc->callback_data));
if ((filedesc->write_callback)(buffer, pageno, filedesc->callback_data))
{
DBUG_PRINT("error", ("write callback problem"));
DBUG_RETURN(1);
}
DBUG_RETURN(my_pwrite(filedesc->file, buffer, pagecache->block_size,
(pageno)<<(pagecache->shift), flags));
}
......@@ -2398,8 +2408,6 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache,
pagecache pointer to a page cache data structure
block block to which buffer the data is to be read
primary <-> the current thread will read the data
validator validator of read from the disk data
validator_data pointer to the data need by the validator
RETURN VALUE
None
......@@ -2413,9 +2421,7 @@ static my_bool make_lock_and_pin(PAGECACHE *pagecache,
static void read_block(PAGECACHE *pagecache,
PAGECACHE_BLOCK_LINK *block,
my_bool primary,
pagecache_disk_read_validator validator,
uchar* validator_data)
my_bool primary)
{
/* On entry cache_lock is locked */
......@@ -2448,9 +2454,18 @@ static void read_block(PAGECACHE *pagecache,
else
block->status= PCBLOCK_READ;
if (validator != NULL &&
(*validator)(block->buffer, validator_data))
DBUG_PRINT("info", ("read_callback: 0x%lx data: 0x%lx",
(ulong) block->hash_link->file.read_callback,
(ulong) block->hash_link->file.callback_data));
if ((*block->hash_link->file.read_callback)(block->buffer,
block->hash_link->pageno,
block->hash_link->
file.callback_data))
{
DBUG_PRINT("error", ("read callback problem"));
block->status|= PCBLOCK_ERROR;
}
DBUG_PRINT("read_block",
("primary request: new page in cache"));
......@@ -2881,25 +2896,20 @@ void pagecache_unpin_by_link(PAGECACHE *pagecache,
/*
Read a block of data from a cached file into a buffer;
@brief Read a block of data from a cached file into a buffer;
SYNOPSIS
pagecache_valid_read()
pagecache pointer to a page cache data structure
file handler for the file for the block of data to be read
pageno number of the block of data in the file
level determines the weight of the data
buff buffer to where the data must be placed
type type of the page
lock lock change
link link to the page if we pin it
validator validator of read from the disk data
validator_data pointer to the data need by the validator
@param pagecache pointer to a page cache data structure
@param file handler for the file for the block of data to be read
@param pageno number of the block of data in the file
@param level determines the weight of the data
@param buff buffer to where the data must be placed
@param type type of the page
@param lock lock change
@param link link to the page if we pin it
RETURN VALUE
Returns address from where the data is placed if successful, 0 - otherwise.
@return address from where the data is placed if successful, 0 - otherwise.
Pin will be chosen according to lock parameter (see lock_to_pin)
@note Pin will be chosen according to lock parameter (see lock_to_pin)
*/
static enum pagecache_page_pin lock_to_pin[2][8]=
{
......@@ -2925,16 +2935,14 @@ static enum pagecache_page_pin lock_to_pin[2][8]=
}
};
uchar *pagecache_valid_read(PAGECACHE *pagecache,
PAGECACHE_FILE *file,
pgcache_page_no_t pageno,
uint level,
uchar *buff,
enum pagecache_page_type type,
enum pagecache_page_lock lock,
PAGECACHE_BLOCK_LINK **page_link,
pagecache_disk_read_validator validator,
uchar* validator_data)
uchar *pagecache_read(PAGECACHE *pagecache,
PAGECACHE_FILE *file,
pgcache_page_no_t pageno,
uint level,
uchar *buff,
enum pagecache_page_type type,
enum pagecache_page_lock lock,
PAGECACHE_BLOCK_LINK **page_link)
{
int error= 0;
enum pagecache_page_pin pin= lock_to_pin[test(buff==0)][lock];
......@@ -2991,8 +2999,7 @@ uchar *pagecache_valid_read(PAGECACHE *pagecache,
DBUG_PRINT("info", ("read block 0x%lx", (ulong)block));
/* The requested page is to be read into the block buffer */
read_block(pagecache, block,
(my_bool)(page_st == PAGE_TO_BE_READ),
validator, validator_data);
(my_bool)(page_st == PAGE_TO_BE_READ));
DBUG_PRINT("info", ("read is done"));
}
......@@ -3309,9 +3316,7 @@ my_bool pagecache_write_part(PAGECACHE *pagecache,
enum pagecache_write_mode write_mode,
PAGECACHE_BLOCK_LINK **page_link,
LSN first_REDO_LSN_for_page,
uint offset, uint size,
pagecache_disk_read_validator validator,
uchar* validator_data)
uint offset, uint size)
{
PAGECACHE_BLOCK_LINK *block= NULL;
PAGECACHE_BLOCK_LINK *fake_link;
......@@ -3412,12 +3417,20 @@ my_bool pagecache_write_part(PAGECACHE *pagecache,
memcpy(block->buffer + offset, buff, size);
block->status= PCBLOCK_READ;
/*
The validator can change the page content (removing page
The read_callback can change the page content (removing page
protection) so it have to be called
*/
if (validator != NULL &&
(*validator)(block->buffer, validator_data))
DBUG_PRINT("info", ("read_callback: 0x%lx data: 0x%lx",
(ulong) block->hash_link->file.read_callback,
(ulong) block->hash_link->file.callback_data));
if ((*block->hash_link->file.read_callback)(block->buffer,
block->hash_link->pageno,
block->hash_link->
file.callback_data))
{
DBUG_PRINT("error", ("read callback problem"));
block->status|= PCBLOCK_ERROR;
}
KEYCACHE_DBUG_PRINT("key_cache_insert",
("Page injection"));
#ifdef THREAD
......@@ -3429,7 +3442,6 @@ my_bool pagecache_write_part(PAGECACHE *pagecache,
}
else
{
DBUG_ASSERT(validator == 0 && validator_data == 0);
if (! (block->status & PCBLOCK_CHANGED))
link_to_changed_list(pagecache, block);
......
......@@ -74,15 +74,20 @@ enum pagecache_write_mode
PAGECACHE_WRITE_DONE
};
/* page number for maria */
typedef uint32 pgcache_page_no_t;
/* file descriptor for Maria */
typedef struct st_pagecache_file
{
File file;
my_bool (*read_callback)(uchar *page, pgcache_page_no_t offset,
uchar *data);
my_bool (*write_callback)(uchar *page, pgcache_page_no_t offset,
uchar *data);
uchar *callback_data;
} PAGECACHE_FILE;
/* page number for maria */
typedef uint32 pgcache_page_no_t;
/* declare structures that is used by st_pagecache */
struct st_pagecache_block_link;
......@@ -94,8 +99,6 @@ typedef struct st_pagecache_hash_link PAGECACHE_HASH_LINK;
#include <wqueue.h>
typedef my_bool (*pagecache_disk_read_validator)(uchar *page, uchar *data);
#define PAGECACHE_CHANGED_BLOCKS_HASH 128 /* must be power of 2 */
#define PAGECACHE_PRIORITY_LOW 0
#define PAGECACHE_PRIORITY_DEFAULT 3
......@@ -192,26 +195,21 @@ extern ulong resize_pagecache(PAGECACHE *pagecache,
extern void change_pagecache_param(PAGECACHE *pagecache, uint division_limit,
uint age_threshold);
#define pagecache_read(P,F,N,L,B,T,K,I) \
pagecache_valid_read(P,F,N,L,B,T,K,I,0,0)
extern uchar *pagecache_valid_read(PAGECACHE *pagecache,
PAGECACHE_FILE *file,
pgcache_page_no_t pageno,
uint level,
uchar *buff,
enum pagecache_page_type type,
enum pagecache_page_lock lock,
PAGECACHE_BLOCK_LINK **link,
pagecache_disk_read_validator validator,
uchar* validator_data);
extern uchar *pagecache_read(PAGECACHE *pagecache,
PAGECACHE_FILE *file,
pgcache_page_no_t pageno,
uint level,
uchar *buff,
enum pagecache_page_type type,
enum pagecache_page_lock lock,
PAGECACHE_BLOCK_LINK **link);
#define pagecache_write(P,F,N,L,B,T,O,I,M,K,R) \
pagecache_write_part(P,F,N,L,B,T,O,I,M,K,R,0,(P)->block_size,0,0)
pagecache_write_part(P,F,N,L,B,T,O,I,M,K,R,0,(P)->block_size)
#define pagecache_inject(P,F,N,L,B,T,O,I,K,R,V,D) \
#define pagecache_inject(P,F,N,L,B,T,O,I,K,R) \
pagecache_write_part(P,F,N,L,B,T,O,I,PAGECACHE_WRITE_DONE, \
K,R,0,(P)->block_size,V,D)
K,R,0,(P)->block_size)
extern my_bool pagecache_write_part(PAGECACHE *pagecache,
PAGECACHE_FILE *file,
......@@ -225,9 +223,7 @@ extern my_bool pagecache_write_part(PAGECACHE *pagecache,
PAGECACHE_BLOCK_LINK **link,
LSN first_REDO_LSN_for_page,
uint offset,
uint size,
pagecache_disk_read_validator validator,
uchar* validator_data);
uint size);
extern void pagecache_unlock(PAGECACHE *pagecache,
PAGECACHE_FILE *file,
pgcache_page_no_t pageno,
......@@ -261,6 +257,11 @@ extern void pagecache_unpin_by_link(PAGECACHE *pagecache,
/* PCFLUSH_ERROR and PCFLUSH_PINNED. */
#define PCFLUSH_PINNED_AND_ERROR (PCFLUSH_ERROR|PCFLUSH_PINNED)
#define pagecache_file_init(F,RC,WC,D) \
do{ \
(F).read_callback= (RC); (F).write_callback= (WC); \
(F).callback_data= (uchar*)(D); \
} while(0)
#define flush_pagecache_blocks(A,B,C) \
flush_pagecache_blocks_with_filter(A,B,C,NULL,NULL)
......
......@@ -99,12 +99,19 @@ int maria_panic(enum ha_panic_function flag)
{ /* Open closed files */
char name_buff[FN_REFLEN];
if (info->s->kfile.file < 0)
{
if ((info->s->kfile.file= my_open(fn_format(name_buff,
info->filename, "",
N_NAME_IEXT,4),
info->mode,
MYF(MY_WME))) < 0)
error = my_errno;
pagecache_file_init(info->s->kfile, &maria_page_crc_check_index,
(info->s->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_index :
&maria_page_filler_set_normal), info->s);
}
if (info->dfile.file < 0)
{
if ((info->dfile.file= my_open(fn_format(name_buff, info->filename,
......@@ -112,6 +119,10 @@ int maria_panic(enum ha_panic_function flag)
info->mode,
MYF(MY_WME))) < 0)
error = my_errno;
pagecache_file_init(info->dfile, &maria_page_crc_check_data,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal:
&maria_page_filler_set_normal), share);
info->rec_cache.file= info->dfile.file;
}
}
......
......@@ -1670,6 +1670,10 @@ static int maria_sort_records(HA_CHECK *param,
VOID(my_close(info->dfile.file, MYF(MY_WME)));
param->out_flag|=O_NEW_DATA; /* Data in new file */
info->dfile.file= new_file; /* Use new datafile */
pagecache_file_init(info->dfile, &maria_page_crc_check_data,
(share->options & HA_OPTION_PAGE_CHECKSUM ?
&maria_page_crc_set_normal :
&maria_page_filler_set_normal), share);
info->state->del=0;
info->state->empty=0;
share->state.dellink= HA_OFFSET_ERROR;
......
......@@ -44,7 +44,9 @@
struct st_transaction;
/* undef map from my_nosys; We need test-if-disk full */
#undef my_write
#undef my_write
#define CRC_SIZE 4
typedef struct st_maria_status_info
{
......@@ -572,9 +574,19 @@ struct st_maria_handler
#define _ma_store_keypage_flag(share,x,flag) x[(share)->keypage_header - KEYPAGE_USED_SIZE - KEYPAGE_FLAG_SIZE]= (flag)
/*
TODO: write int4store_aligned as *((uint32 *) (T))= (uint32) (A) for
architectures where it is possible
*/
#define int4store_aligned(A,B) int4store((A),(B))
#define maria_mark_crashed(x) do{(x)->s->state.changed|= STATE_CRASHED; \
DBUG_PRINT("error", ("Marked table crashed")); \
}while(0)
#define maria_mark_crashed_share(x) \
do{(x)->state.changed|= STATE_CRASHED; \
DBUG_PRINT("error", ("Marked table crashed")); \
}while(0)
#define maria_mark_crashed_on_repair(x) do{(x)->s->state.changed|= \
STATE_CRASHED|STATE_CRASHED_ON_REPAIR; \
(x)->update|= HA_STATE_CHANGED; \
......@@ -1039,4 +1051,27 @@ void _ma_tmp_disable_logging_for_table(MARIA_HA *info,
{ if (((S)->now_transactional= (S)->base.born_transactional)) \
(S)->page_type= PAGECACHE_LSN_PAGE; }
#define MARIA_NO_CRC_NORMAL_PAGE 0xffffffff
#define MARIA_NO_CRC_BITMAP_PAGE 0xfffffffe
extern my_bool maria_page_crc_set_index(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
extern my_bool maria_page_crc_set_normal(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
extern my_bool maria_page_crc_check_bitmap(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
extern my_bool maria_page_crc_check_data(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
extern my_bool maria_page_crc_check_index(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
extern my_bool maria_page_filler_set_bitmap(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
extern my_bool maria_page_filler_set_normal(uchar *page,
pgcache_page_no_t page_no,
uchar* data_ptr);
extern PAGECACHE *maria_log_pagecache;
......@@ -46,7 +46,8 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \
ma_test_loghandler_noflush-t \
ma_test_loghandler_first_lsn-t \
ma_test_loghandler_max_lsn-t \
ma_test_loghandler_purge-t
ma_test_loghandler_purge-t \
ma_test_loghandler_nologs-t
ma_test_loghandler_t_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_multigroup_t_SOURCES = ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
......@@ -58,6 +59,7 @@ ma_test_loghandler_noflush_t_SOURCES = ma_test_loghandler_noflush-t.c ma_maria_l
ma_test_loghandler_first_lsn_t_SOURCES = ma_test_loghandler_first_lsn-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_max_lsn_t_SOURCES = ma_test_loghandler_max_lsn-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_purge_t_SOURCES = ma_test_loghandler_purge-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_test_loghandler_nologs_t_SOURCES = ma_test_loghandler_nologs-t.c ma_maria_log_cleanup.c ma_loghandler_examples.c
ma_pagecache_single_src = ma_pagecache_single.c test_file.c test_file.h
ma_pagecache_consist_src = ma_pagecache_consist.c test_file.c test_file.h
......
......@@ -37,9 +37,13 @@ my_bool maria_log_remove()
if (fn_format(file_name, file,
maria_data_root, "", MYF(MY_WME)) == NullS ||
my_delete(file_name, MYF(MY_WME)) != 0)
{
my_dirend(dirp);
return 1;
}
}
}
my_dirend(dirp);
return 0;
}
......@@ -57,6 +57,18 @@ static uint flush_divider= 1000;
#endif /*TEST_READERS*/
#endif /*TEST_HIGH_CONCURENCY*/
/**
@brief Dummy pagecache callback.
*/
static my_bool
dummy_callback(__attribute__((unused)) uchar *page,
__attribute__((unused)) pgcache_page_no_t page_no,
__attribute__((unused)) uchar* data_ptr)
{
return 0;
}
/*
Get pseudo-random length of the field in (0;limit)
......@@ -321,6 +333,7 @@ int main(int argc __attribute__((unused)),
errno);
exit(1);
}
pagecache_file_init(file1, &dummy_callback, &dummy_callback, NULL);
DBUG_PRINT("info", ("file1: %d", file1.file));
if (chmod(file1_name, S_IRWXU | S_IRWXG | S_IRWXO) != 0)
{
......
......@@ -60,6 +60,19 @@ static struct file_desc simple_delete_flush_test_file[]=
{ 0, 0}
};
/**
@brief Dummy pagecache callback.
*/
static my_bool
dummy_callback(__attribute__((unused)) uchar *page,
__attribute__((unused)) pgcache_page_no_t page_no,
__attribute__((unused)) uchar* data_ptr)
{
return 0;
}
/*
Recreate and reopen a file for test
......@@ -496,7 +509,6 @@ int main(int argc __attribute__((unused)),
#endif
DBUG_ENTER("main");
DBUG_PRINT("info", ("Main thread: %s\n", my_thread_name()));
if ((tmp_file= my_open(file2_name, O_CREAT | O_TRUNC | O_RDWR,
MYF(MY_WME))) < 0)
exit(1);
......@@ -508,6 +520,7 @@ int main(int argc __attribute__((unused)),
errno);
exit(1);
}
pagecache_file_init(file1, &dummy_callback, &dummy_callback, NULL);
my_close(tmp_file, MYF(0));
my_delete(file2_name, MYF(0));
......
......@@ -179,7 +179,6 @@ int main(int argc __attribute__((unused)), char *argv[])
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS))
{
fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1);
}
example_loghandler_init();
......
......@@ -69,7 +69,6 @@ int main(int argc __attribute__((unused)), char *argv[])
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS))
{
fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1);
}
example_loghandler_init();
......
......@@ -63,7 +63,6 @@ int main(int argc __attribute__((unused)), char *argv[])
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS))
{
fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1);
}
example_loghandler_init();
......
......@@ -176,7 +176,6 @@ int main(int argc __attribute__((unused)), char *argv[])
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, 0))
{
fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1);
}
example_loghandler_init();
......@@ -341,7 +340,6 @@ int main(int argc __attribute__((unused)), char *argv[])
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, 0))
{
fprintf(stderr, "pass2: Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1);
}
example_loghandler_init();
......
......@@ -286,7 +286,6 @@ int main(int argc __attribute__((unused)),
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS))
{
fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1);
}
example_loghandler_init();
......
......@@ -71,7 +71,6 @@ int main(int argc __attribute__((unused)), char *argv[])
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS))
{
fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1);
}
example_loghandler_init();
......
......@@ -20,6 +20,19 @@ static char *first_translog_file= (char*)"maria_log.00000001";
static char *file1_name= (char*)"page_cache_test_file_1";
static PAGECACHE_FILE file1;
/**
@brief Dummy pagecache callback.
*/
static my_bool
dummy_callback(__attribute__((unused)) uchar *page,
__attribute__((unused)) pgcache_page_no_t page_no,
__attribute__((unused)) uchar* data_ptr)
{
return 0;
}
int main(int argc __attribute__((unused)), char *argv[])
{
uint pagen;
......@@ -71,7 +84,6 @@ int main(int argc __attribute__((unused)), char *argv[])
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS))
{
fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1);
}
example_loghandler_init();
......@@ -112,6 +124,7 @@ int main(int argc __attribute__((unused)), char *argv[])
errno);
exit(1);
}
pagecache_file_init(file1, &dummy_callback, &dummy_callback, NULL);
if (chmod(file1_name, S_IRWXU | S_IRWXG | S_IRWXO) != 0)
{
fprintf(stderr, "Got error during file1 chmod() (errno: %d)\n",
......
......@@ -66,7 +66,6 @@ int main(int argc __attribute__((unused)), char *argv[])
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS))
{
fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1);
}
example_loghandler_init();
......
......@@ -30,11 +30,6 @@ int test_file(PAGECACHE_FILE file, char *file_name,
int step= 0;
int res= 1; /* ok */
if (my_sync(file.file, MYF(MY_WME | MY_IGNORE_BADFD)))
{
diag("Got error during syncing file\n");
exit(1);
}
if ((stat= my_stat(file_name, &stat_buff, MYF(0))) == NULL)
{
diag("Can't stat() %s (errno: %d)\n", file_name, errno);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment