Commit d5e9a747 authored by Michael Widenius's avatar Michael Widenius

Automatic merge

parents 92e14601 7521594b
...@@ -121,6 +121,8 @@ struct st_translog_buffer ...@@ -121,6 +121,8 @@ struct st_translog_buffer
in case of flush by LSN it can be offset + size - TRANSLOG_PAGE_SIZE) in case of flush by LSN it can be offset + size - TRANSLOG_PAGE_SIZE)
*/ */
TRANSLOG_ADDRESS next_buffer_offset; TRANSLOG_ADDRESS next_buffer_offset;
/* Previous buffer offset to detect it flush finish */
TRANSLOG_ADDRESS prev_buffer_offset;
/* /*
How much is written (or will be written when copy_to_buffer_in_progress How much is written (or will be written when copy_to_buffer_in_progress
become 0) to this buffer become 0) to this buffer
...@@ -135,12 +137,12 @@ struct st_translog_buffer ...@@ -135,12 +137,12 @@ struct st_translog_buffer
/* list of waiting buffer ready threads */ /* list of waiting buffer ready threads */
struct st_my_thread_var *waiting_flush; struct st_my_thread_var *waiting_flush;
/* /*
Pointer on the buffer which overlap with this one (due to flush of If true then previous buffer overlap with this one (due to flush of
loghandler, the last page of that buffer is the same as the first page loghandler, the last page of that buffer is the same as the first page
of this buffer) and have to be written first (because contain old of this buffer) and have to be written first (because contain old
content of page which present in both buffers) content of page which present in both buffers)
*/ */
struct st_translog_buffer *overlay; my_bool overlay;
uint buffer_no; uint buffer_no;
/* /*
Lock for the buffer. Lock for the buffer.
...@@ -175,6 +177,14 @@ struct st_translog_buffer ...@@ -175,6 +177,14 @@ struct st_translog_buffer
With file and offset it allow detect buffer changes With file and offset it allow detect buffer changes
*/ */
uint8 ver; uint8 ver;
/*
When previous buffer sent to disk it set its address here to allow
to detect when it is done
(we have to keep it in this buffer to lock buffers only in one direction).
*/
TRANSLOG_ADDRESS prev_sent_to_disk;
pthread_cond_t prev_sent_to_disk_cond;
}; };
...@@ -1421,9 +1431,12 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer) ...@@ -1421,9 +1431,12 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer)
/* list of waiting buffer ready threads */ /* list of waiting buffer ready threads */
buffer->waiting_flush= 0; buffer->waiting_flush= 0;
/* lock for the buffer. Current buffer also lock the handler */ /* lock for the buffer. Current buffer also lock the handler */
if (pthread_mutex_init(&buffer->mutex, MY_MUTEX_INIT_FAST)) if (pthread_mutex_init(&buffer->mutex, MY_MUTEX_INIT_FAST) ||
pthread_cond_init(&buffer->prev_sent_to_disk_cond, 0))
DBUG_RETURN(1); DBUG_RETURN(1);
buffer->is_closing_buffer= 0; buffer->is_closing_buffer= 0;
buffer->prev_sent_to_disk= LSN_IMPOSSIBLE;
buffer->prev_buffer_offset= LSN_IMPOSSIBLE;
buffer->ver= 0; buffer->ver= 0;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -2100,10 +2113,12 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon, ...@@ -2100,10 +2113,12 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
{ {
translog_lock_assert_owner(); translog_lock_assert_owner();
translog_start_buffer(new_buffer, cursor, new_buffer_no); translog_start_buffer(new_buffer, cursor, new_buffer_no);
new_buffer->prev_buffer_offset=
log_descriptor.buffers[old_buffer_no].offset;
new_buffer->prev_last_lsn=
BUFFER_MAX_LSN(log_descriptor.buffers + old_buffer_no);
} }
log_descriptor.buffers[old_buffer_no].next_buffer_offset= new_buffer->offset; log_descriptor.buffers[old_buffer_no].next_buffer_offset= new_buffer->offset;
new_buffer->prev_last_lsn=
BUFFER_MAX_LSN(log_descriptor.buffers + old_buffer_no);
DBUG_PRINT("info", ("prev_last_lsn set to (%lu,0x%lx) buffer: 0x%lx", DBUG_PRINT("info", ("prev_last_lsn set to (%lu,0x%lx) buffer: 0x%lx",
LSN_IN_PARTS(new_buffer->prev_last_lsn), LSN_IN_PARTS(new_buffer->prev_last_lsn),
(ulong) new_buffer)); (ulong) new_buffer));
...@@ -2117,14 +2132,16 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon, ...@@ -2117,14 +2132,16 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
SYNOPSIS SYNOPSIS
translog_set_sent_to_disk() translog_set_sent_to_disk()
lsn LSN to assign buffer buffer which we have sent to disk
in_buffers to assign to in_buffers_only
TODO: use atomic operations if possible (64bit architectures?) TODO: use atomic operations if possible (64bit architectures?)
*/ */
static void translog_set_sent_to_disk(LSN lsn, TRANSLOG_ADDRESS in_buffers) static void translog_set_sent_to_disk(struct st_translog_buffer *buffer)
{ {
LSN lsn= buffer->last_lsn;
TRANSLOG_ADDRESS in_buffers= buffer->next_buffer_offset;
DBUG_ENTER("translog_set_sent_to_disk"); DBUG_ENTER("translog_set_sent_to_disk");
pthread_mutex_lock(&log_descriptor.sent_to_disk_lock); pthread_mutex_lock(&log_descriptor.sent_to_disk_lock);
DBUG_PRINT("enter", ("lsn: (%lu,0x%lx) in_buffers: (%lu,0x%lx) " DBUG_PRINT("enter", ("lsn: (%lu,0x%lx) in_buffers: (%lu,0x%lx) "
...@@ -2415,6 +2432,51 @@ static uint16 translog_get_total_chunk_length(uchar *page, uint16 offset) ...@@ -2415,6 +2432,51 @@ static uint16 translog_get_total_chunk_length(uchar *page, uint16 offset)
} }
} }
/*
@brief Waits previous buffer flush finish
@param buffer buffer for check
@retval 0 previous buffer flushed and this thread have to flush this one
@retval 1 previous buffer flushed and this buffer flushed by other thread too
*/
my_bool translog_prev_buffer_flush_wait(struct st_translog_buffer *buffer)
{
TRANSLOG_ADDRESS offset= buffer->offset;
TRANSLOG_FILE *file= buffer->file;
uint8 ver= buffer->ver;
DBUG_ENTER("translog_prev_buffer_flush_wait");
DBUG_PRINT("enter", ("buffer: 0x%lx #%u offset: (%lu,0x%lx) "
"prev sent: (%lu,0x%lx) prev offset: (%lu,0x%lx)",
(ulong) buffer, (uint) buffer->buffer_no,
LSN_IN_PARTS(buffer->offset),
LSN_IN_PARTS(buffer->prev_sent_to_disk),
LSN_IN_PARTS(buffer->prev_buffer_offset)));
translog_buffer_lock_assert_owner(buffer);
/*
if prev_sent_to_disk == LSN_IMPOSSIBLE then
prev_buffer_offset should be LSN_IMPOSSIBLE
because it means that this buffer was never used
*/
DBUG_ASSERT((buffer->prev_sent_to_disk == LSN_IMPOSSIBLE &&
buffer->prev_buffer_offset == LSN_IMPOSSIBLE) ||
buffer->prev_sent_to_disk != LSN_IMPOSSIBLE);
if (buffer->prev_buffer_offset != buffer->prev_sent_to_disk)
{
do {
pthread_cond_wait(&buffer->prev_sent_to_disk_cond, &buffer->mutex);
if (buffer->file != file || buffer->offset != offset ||
buffer->ver != ver)
{
translog_buffer_unlock(buffer);
DBUG_RETURN(1); /* some the thread flushed the buffer already */
}
} while(buffer->prev_buffer_offset != buffer->prev_sent_to_disk);
}
DBUG_RETURN(0);
}
/* /*
Flush given buffer Flush given buffer
...@@ -2460,39 +2522,8 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2460,39 +2522,8 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
if (buffer->file != file || buffer->offset != offset || buffer->ver != ver) if (buffer->file != file || buffer->offset != offset || buffer->ver != ver)
DBUG_RETURN(0); /* some the thread flushed the buffer already */ DBUG_RETURN(0); /* some the thread flushed the buffer already */
if (buffer->overlay && buffer->overlay->file == buffer->file && if (buffer->overlay && translog_prev_buffer_flush_wait(buffer))
cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size, DBUG_RETURN(0); /* some the thread flushed the buffer already */
buffer->offset) > 0)
{
/*
This can't happen for normal translog_flush,
only during destroying the loghandler
*/
struct st_translog_buffer *overlay= buffer->overlay;
TRANSLOG_ADDRESS buffer_offset= buffer->offset;
TRANSLOG_FILE *fl= buffer->file;
uint8 ver= buffer->ver;
translog_buffer_unlock(buffer);
translog_buffer_lock(overlay);
/* rechecks under mutex protection that overlay is still our overlay */
if (buffer->overlay->file == fl &&
cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size,
buffer_offset) > 0)
{
translog_wait_for_buffer_free(overlay);
}
translog_buffer_unlock(overlay);
translog_buffer_lock(buffer);
if (buffer->file != fl || buffer_offset != buffer->offset ||
ver != buffer->ver)
{
/*
This means that somebody else flushed the buffer while we was
waiting for overlay then for locking buffer again.
*/
DBUG_RETURN(0);
}
}
/* /*
Send page by page in the pagecache what we are going to write on the Send page by page in the pagecache what we are going to write on the
...@@ -2553,10 +2584,34 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2553,10 +2584,34 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
file->is_sync= 0; file->is_sync= 0;
if (LSN_OFFSET(buffer->last_lsn) != 0) /* if buffer->last_lsn is set */ if (LSN_OFFSET(buffer->last_lsn) != 0) /* if buffer->last_lsn is set */
translog_set_sent_to_disk(buffer->last_lsn, {
buffer->next_buffer_offset); if (translog_prev_buffer_flush_wait(buffer))
DBUG_RETURN(0); /* some the thread flushed the buffer already */
translog_set_sent_to_disk(buffer);
}
else else
translog_set_only_in_buffers(buffer->next_buffer_offset); translog_set_only_in_buffers(buffer->next_buffer_offset);
/* say to next buffer that we are finished */
{
struct st_translog_buffer *next_buffer=
log_descriptor.buffers + ((buffer->buffer_no + 1) % TRANSLOG_BUFFERS_NO);
if (likely(translog_status == TRANSLOG_OK)){
translog_buffer_lock(next_buffer);
next_buffer->prev_sent_to_disk= buffer->offset;
translog_buffer_unlock(next_buffer);
pthread_cond_broadcast(&next_buffer->prev_sent_to_disk_cond);
}
else
{
/*
It is shutdown =>
1) there is only one thread
2) mutexes of other buffers can be destroyed => we can't use them
*/
next_buffer->prev_sent_to_disk= buffer->offset;
}
}
/* Free buffer */ /* Free buffer */
buffer->file= NULL; buffer->file= NULL;
buffer->overlay= 0; buffer->overlay= 0;
...@@ -4640,6 +4695,7 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data) ...@@ -4640,6 +4695,7 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data)
} }
translog_start_buffer(new_buffer, &log_descriptor.bc, new_buffer_no); translog_start_buffer(new_buffer, &log_descriptor.bc, new_buffer_no);
old_buffer->next_buffer_offset= new_buffer->offset; old_buffer->next_buffer_offset= new_buffer->offset;
new_buffer->prev_buffer_offset= old_buffer->offset;
translog_buffer_unlock(old_buffer); translog_buffer_unlock(old_buffer);
offset-= min_offset; offset-= min_offset;
} }
...@@ -7355,7 +7411,7 @@ static void translog_force_current_buffer_to_finish() ...@@ -7355,7 +7411,7 @@ static void translog_force_current_buffer_to_finish()
log_descriptor.bc.ptr+= current_page_fill; log_descriptor.bc.ptr+= current_page_fill;
log_descriptor.bc.buffer->size= log_descriptor.bc.current_page_fill= log_descriptor.bc.buffer->size= log_descriptor.bc.current_page_fill=
current_page_fill; current_page_fill;
new_buffer->overlay= old_buffer; new_buffer->overlay= 1;
} }
else else
translog_new_page_header(&log_descriptor.horizon, &log_descriptor.bc); translog_new_page_header(&log_descriptor.horizon, &log_descriptor.bc);
...@@ -7428,8 +7484,8 @@ static void translog_force_current_buffer_to_finish() ...@@ -7428,8 +7484,8 @@ static void translog_force_current_buffer_to_finish()
memcpy(new_buffer->buffer, data, current_page_fill); memcpy(new_buffer->buffer, data, current_page_fill);
} }
old_buffer->next_buffer_offset= new_buffer->offset; old_buffer->next_buffer_offset= new_buffer->offset;
translog_buffer_lock(new_buffer); translog_buffer_lock(new_buffer);
new_buffer->prev_buffer_offset= old_buffer->offset;
translog_buffer_decrease_writers(new_buffer); translog_buffer_decrease_writers(new_buffer);
translog_buffer_unlock(new_buffer); translog_buffer_unlock(new_buffer);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment