Commit fddf5050 authored by unknown's avatar unknown

Merge desktop.sanja.is.com.ua:/home/bell/mysql/bk/work-maria-noflush

into  desktop.sanja.is.com.ua:/home/bell/mysql/bk/work-maria


storage/maria/ma_pagecache.c:
  Auto merged
storage/maria/unittest/ma_test_loghandler-t.c:
  Auto merged
storage/maria/ma_loghandler.c:
  merge
storage/maria/ma_pagecache.h:
  merge
parents dd20a9a7 7c32eac4
...@@ -67,6 +67,11 @@ struct st_translog_buffer ...@@ -67,6 +67,11 @@ struct st_translog_buffer
LSN last_lsn; LSN last_lsn;
/* This buffer offset in the file */ /* This buffer offset in the file */
TRANSLOG_ADDRESS offset; TRANSLOG_ADDRESS offset;
/*
Next buffer offset in the file (it is not always offset + size,
in case of flush by LSN it can be offset + size - TRANSLOG_PAGE_SIZE)
*/
TRANSLOG_ADDRESS next_buffer_offset;
/* /*
How much written (or will be written when copy_to_buffer_in_progress How much written (or will be written when copy_to_buffer_in_progress
become 0) to this buffer become 0) to this buffer
...@@ -150,7 +155,10 @@ struct st_translog_descriptor ...@@ -150,7 +155,10 @@ struct st_translog_descriptor
/* Last flushed LSN */ /* Last flushed LSN */
LSN flushed; LSN flushed;
/* Last LSN sent to the disk (but maybe not written yet) */
LSN sent_to_file; LSN sent_to_file;
/* All what is after this addess is not sent to disk yet */
TRANSLOG_ADDRESS in_buffers_only;
pthread_mutex_t sent_to_file_lock; pthread_mutex_t sent_to_file_lock;
}; };
...@@ -187,6 +195,8 @@ static my_bool write_hook_for_undo(enum translog_record_type type, ...@@ -187,6 +195,8 @@ static my_bool write_hook_for_undo(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info, LSN *lsn, TRN *trn, MARIA_HA *tbl_info, LSN *lsn,
struct st_translog_parts *parts); struct st_translog_parts *parts);
static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr);
/* /*
Initialize log_record_type_descriptors Initialize log_record_type_descriptors
...@@ -676,7 +686,6 @@ my_bool translog_read_file_header(LOGHANDLER_FILE_INFO *desc) ...@@ -676,7 +686,6 @@ my_bool translog_read_file_header(LOGHANDLER_FILE_INFO *desc)
static my_bool translog_buffer_init(struct st_translog_buffer *buffer) static my_bool translog_buffer_init(struct st_translog_buffer *buffer)
{ {
DBUG_ENTER("translog_buffer_init"); DBUG_ENTER("translog_buffer_init");
/* This buffer offset */
buffer->last_lsn= LSN_IMPOSSIBLE; buffer->last_lsn= LSN_IMPOSSIBLE;
/* This Buffer File */ /* This Buffer File */
buffer->file= -1; buffer->file= -1;
...@@ -970,7 +979,8 @@ static void translog_put_sector_protection(uchar *page, ...@@ -970,7 +979,8 @@ static void translog_put_sector_protection(uchar *page,
static uint32 translog_crc(uchar *area, uint length) static uint32 translog_crc(uchar *area, uint length)
{ {
return crc32(0L, (unsigned char*) area, length); DBUG_ENTER("translog_crc");
DBUG_RETURN(crc32(0L, (unsigned char*) area, length));
} }
...@@ -1184,6 +1194,7 @@ static void translog_start_buffer(struct st_translog_buffer *buffer, ...@@ -1184,6 +1194,7 @@ static void translog_start_buffer(struct st_translog_buffer *buffer,
DBUG_ASSERT(buffer_no == buffer->buffer_no); DBUG_ASSERT(buffer_no == buffer->buffer_no);
buffer->last_lsn= LSN_IMPOSSIBLE; buffer->last_lsn= LSN_IMPOSSIBLE;
buffer->offset= log_descriptor.horizon; buffer->offset= log_descriptor.horizon;
buffer->next_buffer_offset= LSN_IMPOSSIBLE;
buffer->file= log_descriptor.log_file_num[0]; buffer->file= log_descriptor.log_file_num[0];
buffer->overlay= 0; buffer->overlay= 0;
buffer->size= 0; buffer->size= 0;
...@@ -1258,45 +1269,117 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon, ...@@ -1258,45 +1269,117 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
translog_cursor_init(cursor, new_buffer, new_buffer_no); translog_cursor_init(cursor, new_buffer, new_buffer_no);
else else
translog_start_buffer(new_buffer, cursor, new_buffer_no); translog_start_buffer(new_buffer, cursor, new_buffer_no);
log_descriptor.buffers[old_buffer_no].next_buffer_offset= new_buffer->offset;
translog_new_page_header(horizon, cursor); translog_new_page_header(horizon, cursor);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/* /*
Set max LSN sent to file Sets max LSN sent to file, and address from which data is only in the buffer
SYNOPSIS SYNOPSIS
translog_set_sent_to_file() translog_set_sent_to_file()
lsn LSN to assign lsn LSN to assign
in_buffers to assign to in_buffers_only
TODO: use atomic operations if possible (64bit architectures?)
*/ */
static void translog_set_sent_to_file(LSN *lsn) static void translog_set_sent_to_file(LSN lsn, TRANSLOG_ADDRESS in_buffers)
{ {
DBUG_ENTER("translog_set_sent_to_file"); DBUG_ENTER("translog_set_sent_to_file");
pthread_mutex_lock(&log_descriptor.sent_to_file_lock); pthread_mutex_lock(&log_descriptor.sent_to_file_lock);
DBUG_ASSERT(cmp_translog_addr(*lsn, log_descriptor.sent_to_file) >= 0); DBUG_PRINT("enter", ("lsn: (%lu,0x%lx) in_buffers: (%lu,0x%lx) "
log_descriptor.sent_to_file= *lsn; "in_buffers_only: (%lu,0x%lx)",
(ulong) LSN_FILE_NO(lsn),
(ulong) LSN_OFFSET(lsn),
(ulong) LSN_FILE_NO(in_buffers),
(ulong) LSN_OFFSET(in_buffers),
(ulong) LSN_FILE_NO(log_descriptor.in_buffers_only),
(ulong) LSN_OFFSET(log_descriptor.in_buffers_only)));
DBUG_ASSERT(cmp_translog_addr(lsn, log_descriptor.sent_to_file) >= 0);
log_descriptor.sent_to_file= lsn;
/* LSN_IMPOSSIBLE == 0 => it will work for very first time */
if (cmp_translog_addr(in_buffers, log_descriptor.in_buffers_only) > 0)
{
log_descriptor.in_buffers_only= in_buffers;
DBUG_PRINT("info", ("set new in_buffers_only"));
}
pthread_mutex_unlock(&log_descriptor.sent_to_file_lock); pthread_mutex_unlock(&log_descriptor.sent_to_file_lock);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/* /*
Get max LSN send to file Sets address from which data is only in the buffer
SYNOPSIS
translog_set_only_in_buffers()
lsn LSN to assign
in_buffers to assign to in_buffers_only
*/
static void translog_set_only_in_buffers(TRANSLOG_ADDRESS in_buffers)
{
DBUG_ENTER("translog_set_only_in_buffers");
pthread_mutex_lock(&log_descriptor.sent_to_file_lock);
DBUG_PRINT("enter", ("in_buffers: (%lu,0x%lx) "
"in_buffers_only: (%lu,0x%lx)",
(ulong) LSN_FILE_NO(in_buffers),
(ulong) LSN_OFFSET(in_buffers),
(ulong) LSN_FILE_NO(log_descriptor.in_buffers_only),
(ulong) LSN_OFFSET(log_descriptor.in_buffers_only)));
/* LSN_IMPOSSIBLE == 0 => it will work for very first time */
if (cmp_translog_addr(in_buffers, log_descriptor.in_buffers_only) > 0)
{
log_descriptor.in_buffers_only= in_buffers;
DBUG_PRINT("info", ("set new in_buffers_only"));
}
pthread_mutex_unlock(&log_descriptor.sent_to_file_lock);
DBUG_VOID_RETURN;
}
/*
Gets address from which data is only in the buffer
SYNOPSIS
translog_only_in_buffers()
RETURN
address from which data is only in the buffer
*/
static TRANSLOG_ADDRESS translog_only_in_buffers()
{
register TRANSLOG_ADDRESS addr;
DBUG_ENTER("translog_only_in_buffers");
pthread_mutex_lock(&log_descriptor.sent_to_file_lock);
addr= log_descriptor.in_buffers_only;
pthread_mutex_unlock(&log_descriptor.sent_to_file_lock);
DBUG_RETURN(addr);
}
/*
Get max LSN sent to file
SYNOPSIS SYNOPSIS
translog_get_sent_to_file() translog_get_sent_to_file()
lsn LSN to value
RETURN
max LSN send to file
*/ */
static void translog_get_sent_to_file(LSN *lsn) static LSN translog_get_sent_to_file()
{ {
register LSN lsn;
DBUG_ENTER("translog_get_sent_to_file"); DBUG_ENTER("translog_get_sent_to_file");
pthread_mutex_lock(&log_descriptor.sent_to_file_lock); pthread_mutex_lock(&log_descriptor.sent_to_file_lock);
*lsn= log_descriptor.sent_to_file; lsn= log_descriptor.sent_to_file;
pthread_mutex_unlock(&log_descriptor.sent_to_file_lock); pthread_mutex_unlock(&log_descriptor.sent_to_file_lock);
DBUG_VOID_RETURN; DBUG_RETURN(lsn);
} }
...@@ -1536,16 +1619,20 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -1536,16 +1619,20 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
file.file= buffer->file; file.file= buffer->file;
for (i= 0; i < buffer->size; i+= TRANSLOG_PAGE_SIZE) for (i= 0; i < buffer->size; i+= TRANSLOG_PAGE_SIZE)
{ {
TRANSLOG_ADDRESS addr= (buffer->offset + i);
TRANSLOG_VALIDATOR_DATA data;
data.addr= &addr;
DBUG_ASSERT(log_descriptor.pagecache->block_size == TRANSLOG_PAGE_SIZE); DBUG_ASSERT(log_descriptor.pagecache->block_size == TRANSLOG_PAGE_SIZE);
DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size); DBUG_ASSERT(i + TRANSLOG_PAGE_SIZE <= buffer->size);
if (pagecache_write(log_descriptor.pagecache, if (pagecache_inject(log_descriptor.pagecache,
&file, &file,
(LSN_OFFSET(buffer->offset) + i) / TRANSLOG_PAGE_SIZE, (LSN_OFFSET(buffer->offset) + i) / TRANSLOG_PAGE_SIZE,
3, 3,
buffer->buffer + i, buffer->buffer + i,
PAGECACHE_PLAIN_PAGE, PAGECACHE_PLAIN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, PAGECACHE_LOCK_LEFT_UNLOCKED,
PAGECACHE_PIN_LEFT_UNPINNED, PAGECACHE_WRITE_DONE, 0)) PAGECACHE_PIN_LEFT_UNPINNED, 0,
&translog_page_validator, (uchar*) &data))
{ {
UNRECOVERABLE_ERROR(("Can't write page (%lu,0x%lx) to pagecache", UNRECOVERABLE_ERROR(("Can't write page (%lu,0x%lx) to pagecache",
(ulong) buffer->file, (ulong) buffer->file,
...@@ -1563,9 +1650,12 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -1563,9 +1650,12 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
(ulong) buffer->size, errno)); (ulong) buffer->size, errno));
DBUG_RETURN(1); DBUG_RETURN(1);
} }
if (LSN_OFFSET(buffer->last_lsn) != 0) /* if buffer->last_lsn is set */
translog_set_sent_to_file(&buffer->last_lsn);
if (LSN_OFFSET(buffer->last_lsn) != 0) /* if buffer->last_lsn is set */
translog_set_sent_to_file(buffer->last_lsn,
buffer->next_buffer_offset);
else
translog_set_only_in_buffers(buffer->next_buffer_offset);
/* Free buffer */ /* Free buffer */
buffer->file= -1; buffer->file= -1;
buffer->overlay= 0; buffer->overlay= 0;
...@@ -1751,6 +1841,59 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr) ...@@ -1751,6 +1841,59 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr)
} }
/*
Lock the loghandler
SYNOPSIS
translog_lock()
RETURN
0 OK
1 Error
*/
my_bool translog_lock()
{
struct st_translog_buffer *current_buffer;
DBUG_ENTER("translog_lock");
/*
Locking the loghandler mean locking current buffer, but it can change
during locking, so we should check it
*/
for (;;)
{
current_buffer= log_descriptor.bc.buffer;
if (translog_buffer_lock(current_buffer))
DBUG_RETURN(1);
if (log_descriptor.bc.buffer == current_buffer)
break;
translog_buffer_unlock(current_buffer);
}
DBUG_RETURN(0);
}
/*
Unlock the loghandler
SYNOPSIS
translog_unlock()
RETURN
0 OK
1 Error
*/
my_bool translog_unlock()
{
DBUG_ENTER("translog_unlock");
translog_buffer_unlock(log_descriptor.bc.buffer);
DBUG_RETURN(0);
}
/* /*
Get log page by file number and offset of the beginning of the page Get log page by file number and offset of the beginning of the page
...@@ -1767,7 +1910,7 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr) ...@@ -1767,7 +1910,7 @@ static my_bool translog_page_validator(uchar *page_addr, uchar* data_ptr)
static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer) static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer)
{ {
TRANSLOG_ADDRESS addr= *(data->addr); TRANSLOG_ADDRESS addr= *(data->addr), in_buffers;
uint cache_index; uint cache_index;
uint32 file_no= LSN_FILE_NO(addr); uint32 file_no= LSN_FILE_NO(addr);
DBUG_ENTER("translog_get_page"); DBUG_ENTER("translog_get_page");
...@@ -1779,6 +1922,107 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer) ...@@ -1779,6 +1922,107 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer)
/* it is really page address */ /* it is really page address */
DBUG_ASSERT(LSN_OFFSET(addr) % TRANSLOG_PAGE_SIZE == 0); DBUG_ASSERT(LSN_OFFSET(addr) % TRANSLOG_PAGE_SIZE == 0);
in_buffers= translog_only_in_buffers();
DBUG_PRINT("info", ("in_buffers: (%lu,0x%lx)",
(ulong) LSN_FILE_NO(in_buffers),
(ulong) LSN_OFFSET(in_buffers)));
if (in_buffers != LSN_IMPOSSIBLE &&
cmp_translog_addr(addr, in_buffers) >= 0)
{
translog_lock();
/* recheck with locked loghandler */
in_buffers= translog_only_in_buffers();
if (cmp_translog_addr(addr, in_buffers) >= 0)
{
uint16 buffer_no= log_descriptor.bc.buffer_no;
uint16 buffer_start= buffer_no;
struct st_translog_buffer *buffer_unlock= log_descriptor.bc.buffer;
struct st_translog_buffer *curr_buffer= log_descriptor.bc.buffer;
for (;;)
{
/*
if the page is in the buffer and it is the last version of the
page (in case of devision the page bu buffer flush
*/
if (curr_buffer->file != -1 &&
cmp_translog_addr(addr, curr_buffer->offset) >= 0 &&
cmp_translog_addr(addr,
(curr_buffer->next_buffer_offset ?
curr_buffer->next_buffer_offset:
curr_buffer->offset + curr_buffer->size)) < 0)
{
int is_last_unfinished_page;
uint last_protected_sector= 0;
uchar *from, *table;
translog_wait_for_writers(curr_buffer);
DBUG_ASSERT(LSN_FILE_NO(addr) == LSN_FILE_NO(curr_buffer->offset));
from= curr_buffer->buffer + (addr - curr_buffer->offset);
memcpy(buffer, from, TRANSLOG_PAGE_SIZE);
is_last_unfinished_page= ((log_descriptor.bc.buffer ==
curr_buffer) &&
(log_descriptor.bc.ptr >= from) &&
(log_descriptor.bc.ptr <
from + TRANSLOG_PAGE_SIZE));
if (is_last_unfinished_page &&
(buffer[TRANSLOG_PAGE_FLAGS] & TRANSLOG_SECTOR_PROTECTION))
{
last_protected_sector= ((log_descriptor.bc.previous_offset - 1) /
DISK_DRIVE_SECTOR_SIZE);
table= buffer + log_descriptor.page_overhead -
(TRANSLOG_PAGE_SIZE / DISK_DRIVE_SECTOR_SIZE) * 2;
}
DBUG_ASSERT(buffer_unlock == curr_buffer);
translog_buffer_unlock(buffer_unlock);
if (is_last_unfinished_page)
{
uint i;
/*
This is last unfinished page => we should not check CRC and
remove only that protection which already installed (no need
to check it)
We do not check the flag of sector protection, because if
(buffer[TRANSLOG_PAGE_FLAGS] & TRANSLOG_SECTOR_PROTECTION) is
not set then last_protected_sector will be 0 so following loop
will be never executed
*/
DBUG_PRINT("info", ("This is last unfinished page, "
"last protected sector %u",
last_protected_sector));
for (i= 1; i <= last_protected_sector; i++)
{
uint index= i * 2;
uint offset= i * DISK_DRIVE_SECTOR_SIZE;
DBUG_PRINT("info", ("Sector %u: 0x%02x%02x <- 0x%02x%02x",
i, buffer[offset], buffer[offset + 1],
table[index], table[index + 1]));
buffer[offset]= table[index];
buffer[offset + 1]= table[index + 1];
}
}
else
{
/*
This IF should be true because we use in-memory data which
supposed to be correct.
*/
if (translog_page_validator((uchar*) buffer, (uchar*) data))
buffer= NULL;
}
DBUG_RETURN(buffer);
}
buffer_no= (buffer_no + 1) % TRANSLOG_BUFFERS_NO;
curr_buffer= log_descriptor.buffers + buffer_no;
translog_buffer_lock(curr_buffer);
translog_buffer_unlock(buffer_unlock);
buffer_unlock= curr_buffer;
/* we can't make full circle */
DBUG_ASSERT(buffer_start != buffer_no);
}
}
translog_unlock();
}
if ((cache_index= LSN_FILE_NO(log_descriptor.horizon) - file_no) < if ((cache_index= LSN_FILE_NO(log_descriptor.horizon) - file_no) <
OPENED_FILES_NUM) OPENED_FILES_NUM)
{ {
...@@ -2004,6 +2248,7 @@ my_bool translog_init(const char *directory, ...@@ -2004,6 +2248,7 @@ my_bool translog_init(const char *directory,
DBUG_RETURN(1); DBUG_RETURN(1);
} }
log_descriptor.in_buffers_only= LSN_IMPOSSIBLE;
/* max size of one log size (for new logs creation) */ /* max size of one log size (for new logs creation) */
log_descriptor.log_file_max_size= log_descriptor.log_file_max_size=
log_file_max_size - (log_file_max_size % TRANSLOG_PAGE_SIZE); log_file_max_size - (log_file_max_size % TRANSLOG_PAGE_SIZE);
...@@ -2273,7 +2518,9 @@ my_bool translog_init(const char *directory, ...@@ -2273,7 +2518,9 @@ my_bool translog_init(const char *directory,
} }
/* all LSNs that are on disk are flushed */ /* all LSNs that are on disk are flushed */
log_descriptor.sent_to_file= log_descriptor.flushed= log_descriptor.horizon; log_descriptor.sent_to_file=
log_descriptor.flushed= log_descriptor.horizon;
log_descriptor.in_buffers_only= log_descriptor.bc.buffer->offset;
/* /*
horizon is (potentially) address of the next LSN we need decrease horizon is (potentially) address of the next LSN we need decrease
it to signal that all LSNs before it are flushed it to signal that all LSNs before it are flushed
...@@ -2370,57 +2617,6 @@ void translog_destroy() ...@@ -2370,57 +2617,6 @@ void translog_destroy()
} }
/*
Lock the loghandler
SYNOPSIS
translog_lock()
RETURN
0 OK
1 Error
*/
my_bool translog_lock()
{
struct st_translog_buffer *current_buffer;
DBUG_ENTER("translog_lock");
/*
Locking the loghandler mean locking current buffer, but it can change
during locking, so we should check it
*/
for (;;)
{
current_buffer= log_descriptor.bc.buffer;
if (translog_buffer_lock(current_buffer))
DBUG_RETURN(1);
if (log_descriptor.bc.buffer == current_buffer)
break;
translog_buffer_unlock(current_buffer);
}
DBUG_RETURN(0);
}
/*
Unlock the loghandler
SYNOPSIS
translog_unlock()
RETURN
0 OK
1 Error
*/
my_bool translog_unlock()
{
DBUG_ENTER("translog_unlock");
translog_buffer_unlock(log_descriptor.bc.buffer);
DBUG_RETURN(0);
}
#define translog_buffer_lock_assert_owner(B) \ #define translog_buffer_lock_assert_owner(B) \
...@@ -2927,6 +3123,7 @@ static my_bool translog_advance_pointer(uint pages, uint16 last_page_data) ...@@ -2927,6 +3123,7 @@ static my_bool translog_advance_pointer(uint pages, uint16 last_page_data)
log_descriptor.horizon+= min_offset; /* offset increasing */ log_descriptor.horizon+= min_offset; /* offset increasing */
} }
translog_start_buffer(new_buffer, &log_descriptor.bc, new_buffer_no); translog_start_buffer(new_buffer, &log_descriptor.bc, new_buffer_no);
old_buffer->next_buffer_offset= new_buffer->offset;
if (translog_buffer_unlock(old_buffer)) if (translog_buffer_unlock(old_buffer))
DBUG_RETURN(1); DBUG_RETURN(1);
offset-= min_offset; offset-= min_offset;
...@@ -3530,7 +3727,7 @@ static my_bool translog_relative_LSN_encode(struct st_translog_parts *parts, ...@@ -3530,7 +3727,7 @@ static my_bool translog_relative_LSN_encode(struct st_translog_parts *parts,
uchar *dst_ptr= compressed_LSNs + (MAX_NUMBER_OF_LSNS_PER_RECORD * uchar *dst_ptr= compressed_LSNs + (MAX_NUMBER_OF_LSNS_PER_RECORD *
COMPRESSED_LSN_MAX_STORE_SIZE); COMPRESSED_LSN_MAX_STORE_SIZE);
for (src_ptr= buffer + lsns_len - LSN_STORE_SIZE; for (src_ptr= buffer + lsns_len - LSN_STORE_SIZE;
src_ptr >= (uchar *)buffer; src_ptr >= (uchar*) buffer;
src_ptr-= LSN_STORE_SIZE) src_ptr-= LSN_STORE_SIZE)
{ {
ref= lsn_korr(src_ptr); ref= lsn_korr(src_ptr);
...@@ -5374,7 +5571,7 @@ static void translog_force_current_buffer_to_finish() ...@@ -5374,7 +5571,7 @@ static void translog_force_current_buffer_to_finish()
struct st_translog_buffer *new_buffer= (log_descriptor.buffers + struct st_translog_buffer *new_buffer= (log_descriptor.buffers +
new_buffer_no); new_buffer_no);
struct st_translog_buffer *old_buffer= log_descriptor.bc.buffer; struct st_translog_buffer *old_buffer= log_descriptor.bc.buffer;
uchar *data= log_descriptor.bc.ptr -log_descriptor.bc.current_page_fill; uchar *data= log_descriptor.bc.ptr - log_descriptor.bc.current_page_fill;
uint16 left= TRANSLOG_PAGE_SIZE - log_descriptor.bc.current_page_fill; uint16 left= TRANSLOG_PAGE_SIZE - log_descriptor.bc.current_page_fill;
uint16 current_page_fill, write_counter, previous_offset; uint16 current_page_fill, write_counter, previous_offset;
DBUG_ENTER("translog_force_current_buffer_to_finish"); DBUG_ENTER("translog_force_current_buffer_to_finish");
...@@ -5469,13 +5666,14 @@ static void translog_force_current_buffer_to_finish() ...@@ -5469,13 +5666,14 @@ static void translog_force_current_buffer_to_finish()
if (left) if (left)
{ {
memcpy(new_buffer->buffer, data, current_page_fill); memcpy(new_buffer->buffer, data, current_page_fill);
log_descriptor.bc.ptr +=current_page_fill; log_descriptor.bc.ptr+= current_page_fill;
log_descriptor.bc.buffer->size= log_descriptor.bc.current_page_fill= log_descriptor.bc.buffer->size= log_descriptor.bc.current_page_fill=
current_page_fill; current_page_fill;
new_buffer->overlay= old_buffer; new_buffer->overlay= old_buffer;
} }
else else
translog_new_page_header(&log_descriptor.horizon, &log_descriptor.bc); translog_new_page_header(&log_descriptor.horizon, &log_descriptor.bc);
old_buffer->next_buffer_offset= new_buffer->offset;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -5538,7 +5736,7 @@ my_bool translog_flush(LSN lsn) ...@@ -5538,7 +5736,7 @@ my_bool translog_flush(LSN lsn)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
/* send to the file if it is not sent */ /* send to the file if it is not sent */
translog_get_sent_to_file(&sent_to_file); sent_to_file= translog_get_sent_to_file();
if (cmp_translog_addr(sent_to_file, lsn) >= 0) if (cmp_translog_addr(sent_to_file, lsn) >= 0)
break; break;
......
...@@ -141,7 +141,7 @@ int _ma_dispose(register MARIA_HA *info, MARIA_KEYDEF *keyinfo, my_off_t pos, ...@@ -141,7 +141,7 @@ int _ma_dispose(register MARIA_HA *info, MARIA_KEYDEF *keyinfo, my_off_t pos,
PAGECACHE_LOCK_LEFT_UNLOCKED, PAGECACHE_LOCK_LEFT_UNLOCKED,
PAGECACHE_PIN_LEFT_UNPINNED, PAGECACHE_PIN_LEFT_UNPINNED,
PAGECACHE_WRITE_DELAY, 0, PAGECACHE_WRITE_DELAY, 0,
offset, sizeof(buff))); offset, sizeof(buff), 0, 0));
} /* _ma_dispose */ } /* _ma_dispose */
......
...@@ -3179,7 +3179,9 @@ my_bool pagecache_write_part(PAGECACHE *pagecache, ...@@ -3179,7 +3179,9 @@ my_bool pagecache_write_part(PAGECACHE *pagecache,
enum pagecache_page_pin pin, enum pagecache_page_pin pin,
enum pagecache_write_mode write_mode, enum pagecache_write_mode write_mode,
PAGECACHE_PAGE_LINK *link, PAGECACHE_PAGE_LINK *link,
uint offset, uint size) uint offset, uint size,
pagecache_disk_read_validator validator,
uchar* validator_data)
{ {
PAGECACHE_BLOCK_LINK *block= NULL; PAGECACHE_BLOCK_LINK *block= NULL;
PAGECACHE_PAGE_LINK fake_link; PAGECACHE_PAGE_LINK fake_link;
...@@ -3265,7 +3267,7 @@ restart: ...@@ -3265,7 +3267,7 @@ restart:
if (write_mode == PAGECACHE_WRITE_DONE) if (write_mode == PAGECACHE_WRITE_DONE)
{ {
if ((block->status & PCBLOCK_ERROR) && page_st != PAGE_READ) if (!(block->status & PCBLOCK_ERROR))
{ {
/* Copy data from buff */ /* Copy data from buff */
if (!(size & 511)) if (!(size & 511))
...@@ -3273,8 +3275,15 @@ restart: ...@@ -3273,8 +3275,15 @@ restart:
else else
memcpy(block->buffer + offset, buff, size); memcpy(block->buffer + offset, buff, size);
block->status= (PCBLOCK_READ | (block->status & PCBLOCK_WRLOCK)); block->status= (PCBLOCK_READ | (block->status & PCBLOCK_WRLOCK));
/*
The validator can change the page content (removing page
protection) so it have to be called
*/
if (validator != NULL &&
(*validator)(block->buffer, validator_data))
block->status|= PCBLOCK_ERROR;
KEYCACHE_DBUG_PRINT("key_cache_insert", KEYCACHE_DBUG_PRINT("key_cache_insert",
("primary request: new page in cache")); ("Page injection"));
#ifdef THREAD #ifdef THREAD
/* Signal that all pending requests for this now can be processed. */ /* Signal that all pending requests for this now can be processed. */
if (block->wqueue[COND_FOR_REQUESTED].last_thread) if (block->wqueue[COND_FOR_REQUESTED].last_thread)
...@@ -3284,6 +3293,7 @@ restart: ...@@ -3284,6 +3293,7 @@ restart:
} }
else else
{ {
DBUG_ASSERT(validator == 0 && validator_data == 0);
if (! (block->status & PCBLOCK_CHANGED)) if (! (block->status & PCBLOCK_CHANGED))
link_to_changed_list(pagecache, block); link_to_changed_list(pagecache, block);
......
...@@ -190,7 +190,11 @@ extern uchar *pagecache_valid_read(PAGECACHE *pagecache, ...@@ -190,7 +190,11 @@ extern uchar *pagecache_valid_read(PAGECACHE *pagecache,
uchar* validator_data); uchar* validator_data);
#define pagecache_write(P,F,N,L,B,T,O,I,M,K) \ #define pagecache_write(P,F,N,L,B,T,O,I,M,K) \
pagecache_write_part(P,F,N,L,B,T,O,I,M,K,0,(P)->block_size) pagecache_write_part(P,F,N,L,B,T,O,I,M,K,0,(P)->block_size,0,0)
#define pagecache_inject(P,F,N,L,B,T,O,I,K,V,D) \
pagecache_write_part(P,F,N,L,B,T,O,I,PAGECACHE_WRITE_DONE, \
K,0,(P)->block_size,V,D)
extern my_bool pagecache_write_part(PAGECACHE *pagecache, extern my_bool pagecache_write_part(PAGECACHE *pagecache,
PAGECACHE_FILE *file, PAGECACHE_FILE *file,
...@@ -203,7 +207,9 @@ extern my_bool pagecache_write_part(PAGECACHE *pagecache, ...@@ -203,7 +207,9 @@ extern my_bool pagecache_write_part(PAGECACHE *pagecache,
enum pagecache_write_mode write_mode, enum pagecache_write_mode write_mode,
PAGECACHE_PAGE_LINK *link, PAGECACHE_PAGE_LINK *link,
uint offset, uint offset,
uint size); uint size,
pagecache_disk_read_validator validator,
uchar* validator_data);
extern void pagecache_unlock(PAGECACHE *pagecache, extern void pagecache_unlock(PAGECACHE *pagecache,
PAGECACHE_FILE *file, PAGECACHE_FILE *file,
pgcache_page_no_t pageno, pgcache_page_no_t pageno,
......
...@@ -42,7 +42,8 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \ ...@@ -42,7 +42,8 @@ noinst_PROGRAMS = ma_control_file-t trnman-t lockman2-t \
ma_test_loghandler_multigroup-t \ ma_test_loghandler_multigroup-t \
ma_test_loghandler_multithread-t \ ma_test_loghandler_multithread-t \
ma_test_loghandler_pagecache-t \ ma_test_loghandler_pagecache-t \
ma_test_loghandler_long-t-big ma_test_loghandler_long-t-big \
ma_test_loghandler_noflush-t
ma_test_loghandler_t_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c ma_test_loghandler_t_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c
ma_test_loghandler_multigroup_t_SOURCES = ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c ma_test_loghandler_multigroup_t_SOURCES = ma_test_loghandler_multigroup-t.c ma_maria_log_cleanup.c
...@@ -50,6 +51,7 @@ ma_test_loghandler_multithread_t_SOURCES = ma_test_loghandler_multithread-t.c ma ...@@ -50,6 +51,7 @@ ma_test_loghandler_multithread_t_SOURCES = ma_test_loghandler_multithread-t.c ma
ma_test_loghandler_pagecache_t_SOURCES = ma_test_loghandler_pagecache-t.c ma_maria_log_cleanup.c ma_test_loghandler_pagecache_t_SOURCES = ma_test_loghandler_pagecache-t.c ma_maria_log_cleanup.c
ma_test_loghandler_long_t_big_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c ma_test_loghandler_long_t_big_SOURCES = ma_test_loghandler-t.c ma_maria_log_cleanup.c
ma_test_loghandler_long_t_big_CPPFLAGS = -DLONG_LOG_TEST ma_test_loghandler_long_t_big_CPPFLAGS = -DLONG_LOG_TEST
ma_test_loghandler_noflush_t_SOURCES = ma_test_loghandler_noflush-t.c ma_maria_log_cleanup.c
ma_pagecache_single_src = ma_pagecache_single.c test_file.c ma_pagecache_single_src = ma_pagecache_single.c test_file.c
ma_pagecache_consist_src = ma_pagecache_consist.c test_file.c ma_pagecache_consist_src = ma_pagecache_consist.c test_file.c
......
...@@ -19,8 +19,9 @@ static TRN *trn= &dummy_transaction_object; ...@@ -19,8 +19,9 @@ static TRN *trn= &dummy_transaction_object;
#define LOG_FLAGS 0 #define LOG_FLAGS 0
#define LOG_FILE_SIZE (1024L*1024L) #define LOG_FILE_SIZE (1024L*1024L)
#define ITERATIONS (1600*4) #define ITERATIONS (1600*4)
#else #else
#define LOG_FLAGS TRANSLOG_SECTOR_PROTECTION | TRANSLOG_PAGE_CRC #define LOG_FLAGS (TRANSLOG_SECTOR_PROTECTION | TRANSLOG_PAGE_CRC)
#define LOG_FILE_SIZE (1024L*1024L*3L) #define LOG_FILE_SIZE (1024L*1024L*3L)
#define ITERATIONS 1600 #define ITERATIONS 1600
#endif #endif
...@@ -332,32 +333,8 @@ int main(int argc __attribute__((unused)), char *argv[]) ...@@ -332,32 +333,8 @@ int main(int argc __attribute__((unused)), char *argv[])
ok(1, "flush"); ok(1, "flush");
} }
translog_destroy();
end_pagecache(&pagecache, 1);
ma_control_file_end();
if (ma_control_file_create_or_open(TRUE))
{
fprintf(stderr, "pass2: Can't init control file (%d)\n", errno);
exit(1);
}
if ((pagen= init_pagecache(&pagecache, PCACHE_SIZE, 0, 0,
TRANSLOG_PAGE_SIZE)) == 0)
{
fprintf(stderr, "pass2: Got error: init_pagecache() (errno: %d)\n", errno);
exit(1);
}
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS))
{
fprintf(stderr, "pass2: Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1);
}
example_loghandler_init();
srandom(122334817L); srandom(122334817L);
rc= 1; rc= 1;
{ {
...@@ -640,5 +617,6 @@ err: ...@@ -640,5 +617,6 @@ err:
if (maria_log_remove()) if (maria_log_remove())
exit(1); exit(1);
return(test(exit_status())); return(test(exit_status()));
} }
#include "../maria_def.h"
#include <stdio.h>
#include <errno.h>
#include <tap.h>
#include "../trnman.h"
extern my_bool maria_log_remove();
#ifndef DBUG_OFF
static const char *default_dbug_option;
#endif
#define PCACHE_SIZE (1024*1024*10)
#define PCACHE_PAGE TRANSLOG_PAGE_SIZE
#define LOG_FILE_SIZE (1024L*1024L*1024L + 1024L*1024L*512)
#define LOG_FLAGS 0
static char *first_translog_file= (char*)"maria_log.00000001";
int main(int argc __attribute__((unused)), char *argv[])
{
uint pagen;
int rc= 1;
uchar long_tr_id[6];
PAGECACHE pagecache;
LSN first_lsn;
MY_STAT st;
TRANSLOG_HEADER_BUFFER rec;
LEX_STRING parts[TRANSLOG_INTERNAL_PARTS + 1];
MY_INIT(argv[0]);
plan(1);
bzero(&pagecache, sizeof(pagecache));
maria_data_root= ".";
if (maria_log_remove())
exit(1);
/* be sure that we have no logs in the directory*/
if (my_stat(CONTROL_FILE_BASE_NAME, &st, MYF(0)))
my_delete(CONTROL_FILE_BASE_NAME, MYF(0));
if (my_stat(first_translog_file, &st, MYF(0)))
my_delete(first_translog_file, MYF(0));
bzero(long_tr_id, 6);
#ifndef DBUG_OFF
#if defined(__WIN__)
default_dbug_option= "d:t:i:O,\\ma_test_loghandler.trace";
#else
default_dbug_option= "d:t:i:o,/tmp/ma_test_loghandler.trace";
#endif
if (argc > 1)
{
DBUG_SET(default_dbug_option);
DBUG_SET_INITIAL(default_dbug_option);
}
#endif
if (ma_control_file_create_or_open(TRUE))
{
fprintf(stderr, "Can't init control file (%d)\n", errno);
exit(1);
}
if ((pagen= init_pagecache(&pagecache, PCACHE_SIZE, 0, 0,
PCACHE_PAGE)) == 0)
{
fprintf(stderr, "Got error: init_pagecache() (errno: %d)\n", errno);
exit(1);
}
if (translog_init(".", LOG_FILE_SIZE, 50112, 0, &pagecache, LOG_FLAGS))
{
fprintf(stderr, "Can't init loghandler (%d)\n", errno);
translog_destroy();
exit(1);
}
example_loghandler_init();
int4store(long_tr_id, 0);
long_tr_id[5]= 0xff;
parts[TRANSLOG_INTERNAL_PARTS + 0].str= (char*)long_tr_id;
parts[TRANSLOG_INTERNAL_PARTS + 0].length= 6;
if (translog_write_record(&first_lsn,
LOGREC_FIXED_RECORD_0LSN_EXAMPLE,
&dummy_transaction_object, NULL, 6,
TRANSLOG_INTERNAL_PARTS + 1,
parts, NULL))
{
fprintf(stderr, "Can't write record #%lu\n", (ulong) 0);
translog_destroy();
exit(1);
}
translog_size_t len= translog_read_record_header(first_lsn, &rec);
if (len == 0)
{
fprintf(stderr, "translog_read_record_header failed (%d)\n", errno);
goto err;
}
if (rec.type !=LOGREC_FIXED_RECORD_0LSN_EXAMPLE || rec.short_trid != 0 ||
rec.record_length != 6 || uint4korr(rec.header) != 0 ||
((uchar)rec.header[4]) != 0 || ((uchar)rec.header[5]) != 0xFF ||
first_lsn != rec.lsn)
{
fprintf(stderr, "Incorrect LOGREC_FIXED_RECORD_0LSN_EXAMPLE "
"data read(0)\n"
"type: %u (%d) strid: %u (%d) len: %u (%d) i: %u (%d), "
"4: %u (%d) 5: %u (%d) "
"lsn(%lu,0x%lx) (%d)\n",
(uint) rec.type, (rec.type !=LOGREC_FIXED_RECORD_0LSN_EXAMPLE),
(uint) rec.short_trid, (rec.short_trid != 0),
(uint) rec.record_length, (rec.record_length != 6),
(uint) uint4korr(rec.header), (uint4korr(rec.header) != 0),
(uint) rec.header[4], (((uchar)rec.header[4]) != 0),
(uint) rec.header[5], (((uchar)rec.header[5]) != 0xFF),
(ulong) LSN_FILE_NO(rec.lsn), (ulong) LSN_OFFSET(rec.lsn),
(first_lsn != rec.lsn));
goto err;
}
ok(1, "read OK");
rc= 0;
err:
translog_destroy();
end_pagecache(&pagecache, 1);
ma_control_file_end();
my_delete(CONTROL_FILE_BASE_NAME, MYF(0));
my_delete(first_translog_file, MYF(0));
exit(rc);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment