Commit 9dd2d08c authored by unknown's avatar unknown

Fixed syncronization of buffer flush.

Debug asserts/prints from Monty.
No check of mutex operations (by Monty).


storage/maria/ma_control_file.c:
  New external function to check mutex ownership.
storage/maria/ma_loghandler.h:
  No check of mutex operations (by Monty).
  New external function to check mutex ownership.
parent 9b9175ff
...@@ -422,7 +422,7 @@ int ma_control_file_write_and_force(const LSN checkpoint_lsn, uint32 logno, ...@@ -422,7 +422,7 @@ int ma_control_file_write_and_force(const LSN checkpoint_lsn, uint32 logno,
DBUG_ASSERT(control_file_fd >= 0); /* must be open */ DBUG_ASSERT(control_file_fd >= 0); /* must be open */
#ifndef DBUG_OFF #ifndef DBUG_OFF
if (maria_multi_threaded) if (maria_multi_threaded)
translog_lock_assert_owner(); translog_lock_handler_assert_owner();
#endif #endif
if (objs_to_write == CONTROL_FILE_UPDATE_ONLY_LSN) if (objs_to_write == CONTROL_FILE_UPDATE_ONLY_LSN)
......
...@@ -127,6 +127,16 @@ struct st_translog_buffer ...@@ -127,6 +127,16 @@ struct st_translog_buffer
impossible to get all 5 buffers locked simultaneously). impossible to get all 5 buffers locked simultaneously).
*/ */
pthread_mutex_t mutex; pthread_mutex_t mutex;
/*
Some thread is going to close the buffer and it should be
done only by that thread
*/
my_bool is_closing_buffer;
/*
Version of the buffer increases every time buffer the buffer flushed.
With file and offset it allow detect buffer changes
*/
uint8 ver;
/* Cache for current log. */ /* Cache for current log. */
uchar buffer[TRANSLOG_WRITE_BUFFER]; uchar buffer[TRANSLOG_WRITE_BUFFER];
}; };
...@@ -314,10 +324,12 @@ LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES]; ...@@ -314,10 +324,12 @@ LOG_DESC log_record_type_descriptor[LOGREC_NUMBER_OF_TYPES];
#ifndef DBUG_OFF #ifndef DBUG_OFF
#define translog_buffer_lock_assert_owner(B) \ #define translog_buffer_lock_assert_owner(B) \
safe_mutex_assert_owner(&(B)->mutex); safe_mutex_assert_owner(&(B)->mutex)
void translog_lock_assert_owner() #define translog_lock_assert_owner() \
safe_mutex_assert_owner(&log_descriptor.bc.buffer->mutex)
void translog_lock_handler_assert_owner()
{ {
translog_buffer_lock_assert_owner(log_descriptor.bc.buffer); translog_lock_assert_owner();
} }
/** /**
...@@ -383,8 +395,8 @@ static void check_translog_description_table(int num) ...@@ -383,8 +395,8 @@ static void check_translog_description_table(int num)
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
#else #else
#define translog_buffer_lock_assert_owner(B) #define translog_buffer_lock_assert_owner(B) {}
#define translog_lock_assert_owner() #define translog_lock_assert_owner() {}
#endif #endif
static LOG_DESC INIT_LOGREC_RESERVED_FOR_CHUNKS23= static LOG_DESC INIT_LOGREC_RESERVED_FOR_CHUNKS23=
...@@ -1310,6 +1322,8 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer) ...@@ -1310,6 +1322,8 @@ static my_bool translog_buffer_init(struct st_translog_buffer *buffer)
/* lock for the buffer. Current buffer also lock the handler */ /* lock for the buffer. Current buffer also lock the handler */
if (pthread_mutex_init(&buffer->mutex, MY_MUTEX_INIT_FAST)) if (pthread_mutex_init(&buffer->mutex, MY_MUTEX_INIT_FAST))
DBUG_RETURN(1); DBUG_RETURN(1);
buffer->is_closing_buffer= 0;
buffer->ver= 0;
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -1467,15 +1481,14 @@ error: ...@@ -1467,15 +1481,14 @@ error:
@retval 1 Error @retval 1 Error
*/ */
static my_bool translog_buffer_lock(struct st_translog_buffer *buffer) static void translog_buffer_lock(struct st_translog_buffer *buffer)
{ {
my_bool res;
DBUG_ENTER("translog_buffer_lock"); DBUG_ENTER("translog_buffer_lock");
DBUG_PRINT("enter", DBUG_PRINT("enter",
("Lock buffer #%u: (0x%lx)", (uint) buffer->buffer_no, ("Lock buffer #%u: (0x%lx)", (uint) buffer->buffer_no,
(ulong) buffer)); (ulong) buffer));
res= (pthread_mutex_lock(&buffer->mutex) != 0); pthread_mutex_lock(&buffer->mutex);
DBUG_RETURN(res); DBUG_VOID_RETURN;
} }
...@@ -1491,15 +1504,14 @@ static my_bool translog_buffer_lock(struct st_translog_buffer *buffer) ...@@ -1491,15 +1504,14 @@ static my_bool translog_buffer_lock(struct st_translog_buffer *buffer)
1 Error 1 Error
*/ */
static my_bool translog_buffer_unlock(struct st_translog_buffer *buffer) static void translog_buffer_unlock(struct st_translog_buffer *buffer)
{ {
my_bool res;
DBUG_ENTER("translog_buffer_unlock"); DBUG_ENTER("translog_buffer_unlock");
DBUG_PRINT("enter", ("Unlock buffer... #%u (0x%lx)", DBUG_PRINT("enter", ("Unlock buffer... #%u (0x%lx)",
(uint) buffer->buffer_no, (ulong) buffer)); (uint) buffer->buffer_no, (ulong) buffer));
res= (pthread_mutex_unlock(&buffer->mutex) != 0); pthread_mutex_unlock(&buffer->mutex);
DBUG_RETURN(res); DBUG_VOID_RETURN;
} }
...@@ -1733,6 +1745,38 @@ static void translog_finish_page(TRANSLOG_ADDRESS *horizon, ...@@ -1733,6 +1745,38 @@ static void translog_finish_page(TRANSLOG_ADDRESS *horizon,
} }
/*
@brief Wait until all threads have finished closing this buffer.
@param buffer This buffer should be check
*/
static void translog_wait_for_closing(struct st_translog_buffer *buffer)
{
DBUG_ENTER("translog_wait_for_closing");
DBUG_PRINT("enter", ("Buffer #%u 0x%lx copies in progress: %u "
"is closing %u File: %d size: %lu",
(uint) buffer->buffer_no, (ulong) buffer,
(uint) buffer->copy_to_buffer_in_progress,
(uint) buffer->is_closing_buffer,
(buffer->file ? buffer->file->handler.file : -1),
(ulong) buffer->size));
translog_buffer_lock_assert_owner(buffer);
while (buffer->is_closing_buffer)
{
DBUG_PRINT("info", ("wait for writers... buffer: #%u 0x%lx",
(uint) buffer->buffer_no, (ulong) buffer));
DBUG_ASSERT(buffer->file != NULL);
pthread_cond_wait(&buffer->waiting_filling_buffer, &buffer->mutex);
DBUG_PRINT("info", ("wait for writers done buffer: #%u 0x%lx",
(uint) buffer->buffer_no, (ulong) buffer));
}
DBUG_VOID_RETURN;
}
/* /*
@brief Wait until all threads have finished filling this buffer. @brief Wait until all threads have finished filling this buffer.
...@@ -1742,9 +1786,13 @@ static void translog_finish_page(TRANSLOG_ADDRESS *horizon, ...@@ -1742,9 +1786,13 @@ static void translog_finish_page(TRANSLOG_ADDRESS *horizon,
static void translog_wait_for_writers(struct st_translog_buffer *buffer) static void translog_wait_for_writers(struct st_translog_buffer *buffer)
{ {
DBUG_ENTER("translog_wait_for_writers"); DBUG_ENTER("translog_wait_for_writers");
DBUG_PRINT("enter", ("Buffer #%u 0x%lx copies in progress: %u", DBUG_PRINT("enter", ("Buffer #%u 0x%lx copies in progress: %u "
"is closing %u File: %d size: %lu",
(uint) buffer->buffer_no, (ulong) buffer, (uint) buffer->buffer_no, (ulong) buffer,
(int) buffer->copy_to_buffer_in_progress)); (uint) buffer->copy_to_buffer_in_progress,
(uint) buffer->is_closing_buffer,
(buffer->file ? buffer->file->handler.file : -1),
(ulong) buffer->size));
translog_buffer_lock_assert_owner(buffer); translog_buffer_lock_assert_owner(buffer);
while (buffer->copy_to_buffer_in_progress) while (buffer->copy_to_buffer_in_progress)
...@@ -1775,16 +1823,23 @@ static void translog_wait_for_writers(struct st_translog_buffer *buffer) ...@@ -1775,16 +1823,23 @@ static void translog_wait_for_writers(struct st_translog_buffer *buffer)
static void translog_wait_for_buffer_free(struct st_translog_buffer *buffer) static void translog_wait_for_buffer_free(struct st_translog_buffer *buffer)
{ {
TRANSLOG_ADDRESS offset= buffer->offset;
TRANSLOG_FILE *file= buffer->file;
uint8 ver= buffer->ver;
DBUG_ENTER("translog_wait_for_buffer_free"); DBUG_ENTER("translog_wait_for_buffer_free");
DBUG_PRINT("enter", ("Buffer: #%u 0x%lx copies in progress: %u " DBUG_PRINT("enter", ("Buffer #%u 0x%lx copies in progress: %u "
"File: %d size: %lu", "is closing %u File: %d size: %lu",
(uint) buffer->buffer_no, (ulong) buffer, (uint) buffer->buffer_no, (ulong) buffer,
(int) buffer->copy_to_buffer_in_progress, (uint) buffer->copy_to_buffer_in_progress,
(uint) buffer->is_closing_buffer,
(buffer->file ? buffer->file->handler.file : -1), (buffer->file ? buffer->file->handler.file : -1),
(ulong) buffer->size)); (ulong) buffer->size));
translog_wait_for_writers(buffer); translog_wait_for_writers(buffer);
if (offset != buffer->offset || file != buffer->file || ver != buffer->ver)
DBUG_VOID_RETURN; /* the buffer if already freed */
while (buffer->file != NULL) while (buffer->file != NULL)
{ {
DBUG_PRINT("info", ("wait for writers... buffer: #%u 0x%lx", DBUG_PRINT("info", ("wait for writers... buffer: #%u 0x%lx",
...@@ -1898,7 +1953,20 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon, ...@@ -1898,7 +1953,20 @@ static my_bool translog_buffer_next(TRANSLOG_ADDRESS *horizon,
if (!chasing) if (!chasing)
{ {
translog_buffer_lock(new_buffer); translog_buffer_lock(new_buffer);
#ifndef DBUG_OFF
{
TRANSLOG_ADDRESS offset= new_buffer->offset;
TRANSLOG_FILE *file= new_buffer->file;
uint8 ver= new_buffer->ver;
translog_lock_assert_owner();
#endif
translog_wait_for_buffer_free(new_buffer); translog_wait_for_buffer_free(new_buffer);
#ifndef DBUG_OFF
/* We keep the handler locked so nobody can start this new buffer */
DBUG_ASSERT(offset == new_buffer->offset && file == new_buffer->file &&
ver == new_buffer->ver);
}
#endif
} }
else else
DBUG_ASSERT(new_buffer->file != NULL); DBUG_ASSERT(new_buffer->file != NULL);
...@@ -2238,7 +2306,9 @@ static uint16 translog_get_total_chunk_length(uchar *page, uint16 offset) ...@@ -2238,7 +2306,9 @@ static uint16 translog_get_total_chunk_length(uchar *page, uint16 offset)
static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
{ {
uint32 i, pg; uint32 i, pg;
TRANSLOG_FILE *file; TRANSLOG_ADDRESS offset= buffer->offset;
TRANSLOG_FILE *file= buffer->file;
uint8 ver= buffer->ver;
DBUG_ENTER("translog_buffer_flush"); DBUG_ENTER("translog_buffer_flush");
DBUG_ASSERT(buffer->file != NULL); DBUG_ASSERT(buffer->file != NULL);
DBUG_PRINT("enter", DBUG_PRINT("enter",
...@@ -2251,6 +2321,18 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2251,6 +2321,18 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
translog_wait_for_writers(buffer); translog_wait_for_writers(buffer);
if (buffer->file != file || buffer->offset != offset || buffer->ver != ver)
DBUG_RETURN(0); /* some the thread flushed the buffer already */
if (buffer->is_closing_buffer)
{
/* some other flush in progress */
translog_wait_for_closing(buffer);
}
if (buffer->file != file || buffer->offset != offset || buffer->ver != ver)
DBUG_RETURN(0); /* some the thread flushed the buffer already */
if (buffer->overlay && buffer->overlay->file == buffer->file && if (buffer->overlay && buffer->overlay->file == buffer->file &&
cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size, cmp_translog_addr(buffer->overlay->offset + buffer->overlay->size,
buffer->offset) > 0) buffer->offset) > 0)
...@@ -2262,6 +2344,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2262,6 +2344,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
struct st_translog_buffer *overlay= buffer->overlay; struct st_translog_buffer *overlay= buffer->overlay;
TRANSLOG_ADDRESS buffer_offset= buffer->offset; TRANSLOG_ADDRESS buffer_offset= buffer->offset;
TRANSLOG_FILE *fl= buffer->file; TRANSLOG_FILE *fl= buffer->file;
uint8 ver= buffer->ver;
translog_buffer_unlock(buffer); translog_buffer_unlock(buffer);
translog_buffer_lock(overlay); translog_buffer_lock(overlay);
/* rechecks under mutex protection that overlay is still our overlay */ /* rechecks under mutex protection that overlay is still our overlay */
...@@ -2273,13 +2356,12 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2273,13 +2356,12 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
} }
translog_buffer_unlock(overlay); translog_buffer_unlock(overlay);
translog_buffer_lock(buffer); translog_buffer_lock(buffer);
if (buffer->file != NULL && buffer_offset == buffer->offset) if (buffer->file != fl || buffer_offset != buffer->offset ||
ver != buffer->ver)
{ {
/* /*
This means that somebody else flushed the buffer while we was This means that somebody else flushed the buffer while we was
waiting for overlay then for locking buffer again. waiting for overlay then for locking buffer again.
It is possible for single request for flush and destroying the
loghandler.
*/ */
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -2346,6 +2428,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer) ...@@ -2346,6 +2428,7 @@ static my_bool translog_buffer_flush(struct st_translog_buffer *buffer)
/* Free buffer */ /* Free buffer */
buffer->file= NULL; buffer->file= NULL;
buffer->overlay= 0; buffer->overlay= 0;
buffer->ver++;
pthread_cond_broadcast(&buffer->waiting_filling_buffer); pthread_cond_broadcast(&buffer->waiting_filling_buffer);
DBUG_RETURN(0); DBUG_RETURN(0);
} }
...@@ -2573,14 +2656,9 @@ static my_bool translog_page_validator(uchar *page, ...@@ -2573,14 +2656,9 @@ static my_bool translog_page_validator(uchar *page,
/** /**
@brief Locks the loghandler. @brief Locks the loghandler.
@note See comment before buffer 'mutex' variable.
@retval 0 OK
@retval 1 Error
*/ */
my_bool translog_lock() void translog_lock()
{ {
uint8 current_buffer; uint8 current_buffer;
DBUG_ENTER("translog_lock"); DBUG_ENTER("translog_lock");
...@@ -2596,13 +2674,12 @@ my_bool translog_lock() ...@@ -2596,13 +2674,12 @@ my_bool translog_lock()
an atomic operation an atomic operation
*/ */
current_buffer= log_descriptor.bc.buffer_no; current_buffer= log_descriptor.bc.buffer_no;
if (translog_buffer_lock(log_descriptor.buffers + current_buffer)) translog_buffer_lock(log_descriptor.buffers + current_buffer);
DBUG_RETURN(1);
if (log_descriptor.bc.buffer_no == current_buffer) if (log_descriptor.bc.buffer_no == current_buffer)
break; break;
translog_buffer_unlock(log_descriptor.buffers + current_buffer); translog_buffer_unlock(log_descriptor.buffers + current_buffer);
} }
DBUG_RETURN(0); DBUG_VOID_RETURN;
} }
...@@ -2617,12 +2694,9 @@ my_bool translog_lock() ...@@ -2617,12 +2694,9 @@ my_bool translog_lock()
1 Error 1 Error
*/ */
my_bool translog_unlock() void translog_unlock()
{ {
DBUG_ENTER("translog_unlock");
translog_buffer_unlock(log_descriptor.bc.buffer); translog_buffer_unlock(log_descriptor.bc.buffer);
DBUG_RETURN(0);
} }
...@@ -2653,10 +2727,11 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer, ...@@ -2653,10 +2727,11 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
/* it is really page address */ /* it is really page address */
DBUG_ASSERT(LSN_OFFSET(addr) % TRANSLOG_PAGE_SIZE == 0); DBUG_ASSERT(LSN_OFFSET(addr) % TRANSLOG_PAGE_SIZE == 0);
if (direct_link) if (direct_link)
*direct_link= NULL; *direct_link= NULL;
restart:
in_buffers= translog_only_in_buffers(); in_buffers= translog_only_in_buffers();
DBUG_PRINT("info", ("in_buffers: (%lu,0x%lx)", DBUG_PRINT("info", ("in_buffers: (%lu,0x%lx)",
LSN_IN_PARTS(in_buffers))); LSN_IN_PARTS(in_buffers)));
...@@ -2687,11 +2762,21 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer, ...@@ -2687,11 +2762,21 @@ static uchar *translog_get_page(TRANSLOG_VALIDATOR_DATA *data, uchar *buffer,
curr_buffer->next_buffer_offset: curr_buffer->next_buffer_offset:
curr_buffer->offset + curr_buffer->size)) < 0) curr_buffer->offset + curr_buffer->size)) < 0)
{ {
TRANSLOG_ADDRESS offset= curr_buffer->offset;
TRANSLOG_FILE *fl= curr_buffer->file;
uchar *from, *table= NULL;
int is_last_unfinished_page; int is_last_unfinished_page;
uint last_protected_sector= 0; uint last_protected_sector= 0;
uchar *from, *table= NULL;
TRANSLOG_FILE file_copy; TRANSLOG_FILE file_copy;
uint8 ver= curr_buffer->ver;
translog_wait_for_writers(curr_buffer); translog_wait_for_writers(curr_buffer);
if (offset != curr_buffer->offset || fl != curr_buffer->file ||
ver != curr_buffer->ver)
{
DBUG_ASSERT(buffer_unlock == curr_buffer);
translog_buffer_unlock(buffer_unlock);
goto restart;
}
DBUG_ASSERT(LSN_FILE_NO(addr) == LSN_FILE_NO(curr_buffer->offset)); DBUG_ASSERT(LSN_FILE_NO(addr) == LSN_FILE_NO(curr_buffer->offset));
from= curr_buffer->buffer + (addr - curr_buffer->offset); from= curr_buffer->buffer + (addr - curr_buffer->offset);
memcpy(buffer, from, TRANSLOG_PAGE_SIZE); memcpy(buffer, from, TRANSLOG_PAGE_SIZE);
...@@ -3696,6 +3781,10 @@ static void translog_buffer_destroy(struct st_translog_buffer *buffer) ...@@ -3696,6 +3781,10 @@ static void translog_buffer_destroy(struct st_translog_buffer *buffer)
/* /*
We ignore errors here, because we can't do something about it We ignore errors here, because we can't do something about it
(it is shutting down) (it is shutting down)
We also have to take the locks even if there can't be any other
threads running, because translog_buffer_flush()
requires that we have the buffer locked.
*/ */
translog_buffer_lock(buffer); translog_buffer_lock(buffer);
translog_buffer_flush(buffer); translog_buffer_flush(buffer);
...@@ -4072,15 +4161,14 @@ translog_write_variable_record_chunk2_page(struct st_translog_parts *parts, ...@@ -4072,15 +4161,14 @@ translog_write_variable_record_chunk2_page(struct st_translog_parts *parts,
DBUG_ENTER("translog_write_variable_record_chunk2_page"); DBUG_ENTER("translog_write_variable_record_chunk2_page");
chunk2_header[0]= TRANSLOG_CHUNK_NOHDR; chunk2_header[0]= TRANSLOG_CHUNK_NOHDR;
LINT_INIT(buffer_to_flush);
rc= translog_page_next(horizon, cursor, &buffer_to_flush); rc= translog_page_next(horizon, cursor, &buffer_to_flush);
if (buffer_to_flush != NULL) if (buffer_to_flush != NULL)
{ {
rc|= translog_buffer_lock(buffer_to_flush); translog_buffer_lock(buffer_to_flush);
translog_buffer_decrease_writers(buffer_to_flush); translog_buffer_decrease_writers(buffer_to_flush);
if (!rc) if (!rc)
rc= translog_buffer_flush(buffer_to_flush); rc= translog_buffer_flush(buffer_to_flush);
rc|= translog_buffer_unlock(buffer_to_flush); translog_buffer_unlock(buffer_to_flush);
} }
if (rc) if (rc)
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -4121,15 +4209,14 @@ translog_write_variable_record_chunk3_page(struct st_translog_parts *parts, ...@@ -4121,15 +4209,14 @@ translog_write_variable_record_chunk3_page(struct st_translog_parts *parts,
uchar chunk3_header[1 + 2]; uchar chunk3_header[1 + 2];
DBUG_ENTER("translog_write_variable_record_chunk3_page"); DBUG_ENTER("translog_write_variable_record_chunk3_page");
LINT_INIT(buffer_to_flush);
rc= translog_page_next(horizon, cursor, &buffer_to_flush); rc= translog_page_next(horizon, cursor, &buffer_to_flush);
if (buffer_to_flush != NULL) if (buffer_to_flush != NULL)
{ {
rc|= translog_buffer_lock(buffer_to_flush); translog_buffer_lock(buffer_to_flush);
translog_buffer_decrease_writers(buffer_to_flush); translog_buffer_decrease_writers(buffer_to_flush);
if (!rc) if (!rc)
rc= translog_buffer_flush(buffer_to_flush); rc= translog_buffer_flush(buffer_to_flush);
rc|= translog_buffer_unlock(buffer_to_flush); translog_buffer_unlock(buffer_to_flush);
} }
if (rc) if (rc)
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -4240,7 +4327,20 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data) ...@@ -4240,7 +4327,20 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data)
new_buffer= log_descriptor.buffers + new_buffer_no; new_buffer= log_descriptor.buffers + new_buffer_no;
translog_buffer_lock(new_buffer); translog_buffer_lock(new_buffer);
#ifndef DBUG_OFF
{
TRANSLOG_ADDRESS offset= new_buffer->offset;
TRANSLOG_FILE *file= new_buffer->file;
uint8 ver= new_buffer->ver;
translog_lock_assert_owner();
#endif
translog_wait_for_buffer_free(new_buffer); translog_wait_for_buffer_free(new_buffer);
#ifndef DBUG_OFF
/* We keep the handler locked so nobody can start this new buffer */
DBUG_ASSERT(offset == new_buffer->offset && file == new_buffer->file &&
ver == new_buffer->ver);
}
#endif
min_offset= min(buffer_end_offset, file_end_offset); min_offset= min(buffer_end_offset, file_end_offset);
/* TODO: check is it ptr or size enough */ /* TODO: check is it ptr or size enough */
...@@ -4279,8 +4379,7 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data) ...@@ -4279,8 +4379,7 @@ static my_bool translog_advance_pointer(int pages, uint16 last_page_data)
} }
translog_start_buffer(new_buffer, &log_descriptor.bc, new_buffer_no); translog_start_buffer(new_buffer, &log_descriptor.bc, new_buffer_no);
old_buffer->next_buffer_offset= new_buffer->offset; old_buffer->next_buffer_offset= new_buffer->offset;
if (translog_buffer_unlock(old_buffer)) translog_buffer_unlock(old_buffer);
DBUG_RETURN(1);
offset-= min_offset; offset-= min_offset;
} }
DBUG_PRINT("info", ("drop write_counter")); DBUG_PRINT("info", ("drop write_counter"));
...@@ -4397,6 +4496,10 @@ static translog_size_t translog_get_current_group_size() ...@@ -4397,6 +4496,10 @@ static translog_size_t translog_get_current_group_size()
@param hook_arg Argument which will be passed to pre-write and @param hook_arg Argument which will be passed to pre-write and
in-write hooks of this record. in-write hooks of this record.
@note
We must have a translog_lock() when entering this function
We must have buffer_to_flush locked (if not null)
@return Operation status @return Operation status
@retval 0 OK @retval 0 OK
@retval 1 Error @retval 1 Error
...@@ -4421,6 +4524,8 @@ translog_write_variable_record_1group(LSN *lsn, ...@@ -4421,6 +4524,8 @@ translog_write_variable_record_1group(LSN *lsn,
uchar chunk0_header[1 + 2 + 5 + 2]; uchar chunk0_header[1 + 2 + 5 + 2];
DBUG_ENTER("translog_write_variable_record_1group"); DBUG_ENTER("translog_write_variable_record_1group");
translog_lock_assert_owner(); translog_lock_assert_owner();
if (buffer_to_flush)
translog_buffer_lock_assert_owner(buffer_to_flush);
*lsn= horizon= log_descriptor.horizon; *lsn= horizon= log_descriptor.horizon;
if (translog_set_lsn_for_files(LSN_FILE_NO(*lsn), LSN_FILE_NO(*lsn), if (translog_set_lsn_for_files(LSN_FILE_NO(*lsn), LSN_FILE_NO(*lsn),
...@@ -4464,7 +4569,7 @@ translog_write_variable_record_1group(LSN *lsn, ...@@ -4464,7 +4569,7 @@ translog_write_variable_record_1group(LSN *lsn,
(record_rest ? record_rest + 3 : 0)); (record_rest ? record_rest + 3 : 0));
log_descriptor.bc.buffer->last_lsn= *lsn; log_descriptor.bc.buffer->last_lsn= *lsn;
rc|= translog_unlock(); translog_unlock();
/* /*
Check if we switched buffer and need process it (current buffer is Check if we switched buffer and need process it (current buffer is
...@@ -4474,7 +4579,7 @@ translog_write_variable_record_1group(LSN *lsn, ...@@ -4474,7 +4579,7 @@ translog_write_variable_record_1group(LSN *lsn,
{ {
if (!rc) if (!rc)
rc= translog_buffer_flush(buffer_to_flush); rc= translog_buffer_flush(buffer_to_flush);
rc|= translog_buffer_unlock(buffer_to_flush); translog_buffer_unlock(buffer_to_flush);
} }
if (rc) if (rc)
DBUG_RETURN(1); DBUG_RETURN(1);
...@@ -4522,11 +4627,9 @@ translog_write_variable_record_1group(LSN *lsn, ...@@ -4522,11 +4627,9 @@ translog_write_variable_record_1group(LSN *lsn,
(ulong) LSN_FILE_NO(horizon), (ulong) LSN_FILE_NO(horizon),
(ulong) LSN_OFFSET(horizon))); (ulong) LSN_OFFSET(horizon)));
if (!(rc= translog_buffer_lock(cursor.buffer))) translog_buffer_lock(cursor.buffer);
{
translog_buffer_decrease_writers(cursor.buffer); translog_buffer_decrease_writers(cursor.buffer);
} translog_buffer_unlock(cursor.buffer);
rc|= translog_buffer_unlock(cursor.buffer);
DBUG_RETURN(rc); DBUG_RETURN(rc);
} }
...@@ -4545,6 +4648,10 @@ translog_write_variable_record_1group(LSN *lsn, ...@@ -4545,6 +4648,10 @@ translog_write_variable_record_1group(LSN *lsn,
@param hook_arg Argument which will be passed to pre-write and @param hook_arg Argument which will be passed to pre-write and
in-write hooks of this record. in-write hooks of this record.
@note
We must have a translog_lock() when entering this function
We must have buffer_to_flush locked (if not null)
@return Operation status @return Operation status
@retval 0 OK @retval 0 OK
@retval 1 Error @retval 1 Error
...@@ -4564,6 +4671,8 @@ translog_write_variable_record_1chunk(LSN *lsn, ...@@ -4564,6 +4671,8 @@ translog_write_variable_record_1chunk(LSN *lsn,
uchar chunk0_header[1 + 2 + 5 + 2]; uchar chunk0_header[1 + 2 + 5 + 2];
DBUG_ENTER("translog_write_variable_record_1chunk"); DBUG_ENTER("translog_write_variable_record_1chunk");
translog_lock_assert_owner(); translog_lock_assert_owner();
if (buffer_to_flush)
translog_buffer_lock_assert_owner(buffer_to_flush);
translog_write_variable_record_1group_header(parts, type, short_trid, translog_write_variable_record_1group_header(parts, type, short_trid,
header_length, chunk0_header); header_length, chunk0_header);
...@@ -4583,7 +4692,7 @@ translog_write_variable_record_1chunk(LSN *lsn, ...@@ -4583,7 +4692,7 @@ translog_write_variable_record_1chunk(LSN *lsn,
&log_descriptor.bc, &log_descriptor.bc,
parts->total_record_length, parts); parts->total_record_length, parts);
log_descriptor.bc.buffer->last_lsn= *lsn; log_descriptor.bc.buffer->last_lsn= *lsn;
rc|= translog_unlock(); translog_unlock();
/* /*
check if we switched buffer and need process it (current buffer is check if we switched buffer and need process it (current buffer is
...@@ -4593,7 +4702,7 @@ translog_write_variable_record_1chunk(LSN *lsn, ...@@ -4593,7 +4702,7 @@ translog_write_variable_record_1chunk(LSN *lsn,
{ {
if (!rc) if (!rc)
rc= translog_buffer_flush(buffer_to_flush); rc= translog_buffer_flush(buffer_to_flush);
rc|= translog_buffer_unlock(buffer_to_flush); translog_buffer_unlock(buffer_to_flush);
} }
DBUG_RETURN(rc); DBUG_RETURN(rc);
...@@ -4880,6 +4989,14 @@ static void translog_relative_LSN_encode(struct st_translog_parts *parts, ...@@ -4880,6 +4989,14 @@ static void translog_relative_LSN_encode(struct st_translog_parts *parts,
@param hook_arg Argument which will be passed to pre-write and @param hook_arg Argument which will be passed to pre-write and
in-write hooks of this record. in-write hooks of this record.
@note
We must have a translog_lock() when entering this function
We must have buffer_to_flush locked (if not null)
buffer_to_flush should *NOT* be locked when calling this function.
(This is note is here as this is different from most other
translog_write...() functions which require the buffer to be locked)
@return Operation status @return Operation status
@retval 0 OK @retval 0 OK
@retval 1 Error @retval 1 Error
...@@ -4985,15 +5102,15 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -4985,15 +5102,15 @@ translog_write_variable_record_mgroup(LSN *lsn,
done))); done)));
rc|= translog_advance_pointer((int)full_pages, 0); rc|= translog_advance_pointer((int)full_pages, 0);
rc|= translog_unlock(); translog_unlock();
if (buffer_to_flush != NULL) if (buffer_to_flush != NULL)
{ {
rc|= translog_buffer_lock(buffer_to_flush); translog_buffer_lock(buffer_to_flush);
translog_buffer_decrease_writers(buffer_to_flush); translog_buffer_decrease_writers(buffer_to_flush);
if (!rc) if (!rc)
rc= translog_buffer_flush(buffer_to_flush); rc= translog_buffer_flush(buffer_to_flush);
rc|= translog_buffer_unlock(buffer_to_flush); translog_buffer_unlock(buffer_to_flush);
buffer_to_flush= NULL; buffer_to_flush= NULL;
} }
if (rc) if (rc)
...@@ -5032,11 +5149,11 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5032,11 +5149,11 @@ translog_write_variable_record_mgroup(LSN *lsn,
rc= translog_page_next(&horizon, &cursor, &buffer_to_flush); rc= translog_page_next(&horizon, &cursor, &buffer_to_flush);
if (buffer_to_flush != NULL) if (buffer_to_flush != NULL)
{ {
rc|= translog_buffer_lock(buffer_to_flush); translog_buffer_lock(buffer_to_flush);
translog_buffer_decrease_writers(buffer_to_flush); translog_buffer_decrease_writers(buffer_to_flush);
if (!rc) if (!rc)
rc= translog_buffer_flush(buffer_to_flush); rc= translog_buffer_flush(buffer_to_flush);
rc|= translog_buffer_unlock(buffer_to_flush); translog_buffer_unlock(buffer_to_flush);
buffer_to_flush= NULL; buffer_to_flush= NULL;
} }
if (rc) if (rc)
...@@ -5044,12 +5161,9 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5044,12 +5161,9 @@ translog_write_variable_record_mgroup(LSN *lsn,
DBUG_PRINT("error", ("flush of unlock buffer failed")); DBUG_PRINT("error", ("flush of unlock buffer failed"));
goto err; goto err;
} }
rc= translog_buffer_lock(cursor.buffer); translog_buffer_lock(cursor.buffer);
if (!rc)
translog_buffer_decrease_writers(cursor.buffer); translog_buffer_decrease_writers(cursor.buffer);
rc|= translog_buffer_unlock(cursor.buffer); translog_buffer_unlock(cursor.buffer);
if (rc)
goto err;
translog_lock(); translog_lock();
...@@ -5137,7 +5251,7 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5137,7 +5251,7 @@ translog_write_variable_record_mgroup(LSN *lsn,
((page_capacity - ((page_capacity -
header_fixed_part) / (7 + 1)) * header_fixed_part) / (7 + 1)) *
(chunk0_pages - 1)) * (7 + 1)); (chunk0_pages - 1)) * (7 + 1));
rc|= translog_unlock(); translog_unlock();
if (rc) if (rc)
goto err; goto err;
...@@ -5212,11 +5326,11 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5212,11 +5326,11 @@ translog_write_variable_record_mgroup(LSN *lsn,
rc= translog_page_next(&horizon, &cursor, &buffer_to_flush); rc= translog_page_next(&horizon, &cursor, &buffer_to_flush);
if (buffer_to_flush != NULL) if (buffer_to_flush != NULL)
{ {
rc|= translog_buffer_lock(buffer_to_flush); translog_buffer_lock(buffer_to_flush);
translog_buffer_decrease_writers(buffer_to_flush); translog_buffer_decrease_writers(buffer_to_flush);
if (!rc) if (!rc)
rc= translog_buffer_flush(buffer_to_flush); rc= translog_buffer_flush(buffer_to_flush);
rc|= translog_buffer_unlock(buffer_to_flush); translog_buffer_unlock(buffer_to_flush);
buffer_to_flush= NULL; buffer_to_flush= NULL;
} }
if (rc) if (rc)
...@@ -5283,11 +5397,12 @@ translog_write_variable_record_mgroup(LSN *lsn, ...@@ -5283,11 +5397,12 @@ translog_write_variable_record_mgroup(LSN *lsn,
curr_group+= limit; curr_group+= limit;
} while (chunk0_pages != 0); } while (chunk0_pages != 0);
rc= translog_buffer_lock(cursor.buffer); translog_buffer_lock(cursor.buffer);
if (cmp_translog_addr(cursor.buffer->last_lsn, *lsn) < 0) if (cmp_translog_addr(cursor.buffer->last_lsn, *lsn) < 0)
cursor.buffer->last_lsn= *lsn; cursor.buffer->last_lsn= *lsn;
translog_buffer_decrease_writers(cursor.buffer); translog_buffer_decrease_writers(cursor.buffer);
rc|= translog_buffer_unlock(cursor.buffer); translog_buffer_unlock(cursor.buffer);
rc= 0;
if (translog_set_lsn_for_files(file_of_the_first_group, LSN_FILE_NO(*lsn), if (translog_set_lsn_for_files(file_of_the_first_group, LSN_FILE_NO(*lsn),
*lsn, FALSE)) *lsn, FALSE))
...@@ -5451,7 +5566,7 @@ static my_bool translog_write_fixed_record(LSN *lsn, ...@@ -5451,7 +5566,7 @@ static my_bool translog_write_fixed_record(LSN *lsn,
uchar compressed_LSNs[MAX_NUMBER_OF_LSNS_PER_RECORD * uchar compressed_LSNs[MAX_NUMBER_OF_LSNS_PER_RECORD *
COMPRESSED_LSN_MAX_STORE_SIZE]; COMPRESSED_LSN_MAX_STORE_SIZE];
LEX_STRING *part; LEX_STRING *part;
int rc; int rc= 1;
DBUG_ENTER("translog_write_fixed_record"); DBUG_ENTER("translog_write_fixed_record");
DBUG_ASSERT((log_record_type_descriptor[type].rclass == DBUG_ASSERT((log_record_type_descriptor[type].rclass ==
LOGRECTYPE_FIXEDLENGTH && LOGRECTYPE_FIXEDLENGTH &&
...@@ -5486,20 +5601,20 @@ static my_bool translog_write_fixed_record(LSN *lsn, ...@@ -5486,20 +5601,20 @@ static my_bool translog_write_fixed_record(LSN *lsn,
TRANSLOG_PAGE_SIZE) TRANSLOG_PAGE_SIZE)
{ {
DBUG_PRINT("info", ("Next page")); DBUG_PRINT("info", ("Next page"));
translog_page_next(&log_descriptor.horizon, &log_descriptor.bc, if (translog_page_next(&log_descriptor.horizon, &log_descriptor.bc,
&buffer_to_flush); &buffer_to_flush))
goto err; /* rc == 1 */
if (buffer_to_flush)
translog_buffer_lock_assert_owner(buffer_to_flush);
} }
*lsn= log_descriptor.horizon; *lsn= log_descriptor.horizon;
if (translog_set_lsn_for_files(LSN_FILE_NO(*lsn), LSN_FILE_NO(*lsn), if (translog_set_lsn_for_files(LSN_FILE_NO(*lsn), LSN_FILE_NO(*lsn),
*lsn, TRUE) || *lsn, TRUE) ||
(log_record_type_descriptor[type].inwrite_hook && (log_record_type_descriptor[type].inwrite_hook &&
(*log_record_type_descriptor[type].inwrite_hook) (type, trn, tbl_info, (*log_record_type_descriptor[type].inwrite_hook)(type, trn, tbl_info,
lsn, hook_arg))) lsn, hook_arg)))
{
rc= 1;
goto err; goto err;
}
/* compress LSNs */ /* compress LSNs */
if (log_record_type_descriptor[type].rclass == if (log_record_type_descriptor[type].rclass ==
...@@ -5529,7 +5644,7 @@ static my_bool translog_write_fixed_record(LSN *lsn, ...@@ -5529,7 +5644,7 @@ static my_bool translog_write_fixed_record(LSN *lsn,
log_descriptor.bc.buffer->last_lsn= *lsn; log_descriptor.bc.buffer->last_lsn= *lsn;
err: err:
rc|= translog_unlock(); translog_unlock();
/* /*
check if we switched buffer and need process it (current buffer is check if we switched buffer and need process it (current buffer is
...@@ -5539,7 +5654,7 @@ err: ...@@ -5539,7 +5654,7 @@ err:
{ {
if (!rc) if (!rc)
rc= translog_buffer_flush(buffer_to_flush); rc= translog_buffer_flush(buffer_to_flush);
rc|= translog_buffer_unlock(buffer_to_flush); translog_buffer_unlock(buffer_to_flush);
} }
DBUG_RETURN(rc); DBUG_RETURN(rc);
...@@ -6819,7 +6934,7 @@ static void translog_force_current_buffer_to_finish() ...@@ -6819,7 +6934,7 @@ static void translog_force_current_buffer_to_finish()
DBUG_PRINT("enter", ("Buffer #%u 0x%lx " DBUG_PRINT("enter", ("Buffer #%u 0x%lx "
"Buffer addr: (%lu,0x%lx) " "Buffer addr: (%lu,0x%lx) "
"Page addr: (%lu,0x%lx) " "Page addr: (%lu,0x%lx) "
"size: %lu (%lu) Pg: %u left: %u", "size: %lu (%lu) Pg: %u left: %u in progress %u",
(uint) log_descriptor.bc.buffer_no, (uint) log_descriptor.bc.buffer_no,
(ulong) log_descriptor.bc.buffer, (ulong) log_descriptor.bc.buffer,
LSN_IN_PARTS(log_descriptor.bc.buffer->offset), LSN_IN_PARTS(log_descriptor.bc.buffer->offset),
...@@ -6830,8 +6945,10 @@ static void translog_force_current_buffer_to_finish() ...@@ -6830,8 +6945,10 @@ static void translog_force_current_buffer_to_finish()
(ulong) (log_descriptor.bc.ptr -log_descriptor.bc. (ulong) (log_descriptor.bc.ptr -log_descriptor.bc.
buffer->buffer), buffer->buffer),
(uint) log_descriptor.bc.current_page_fill, (uint) log_descriptor.bc.current_page_fill,
(uint) left)); (uint) left,
(uint) log_descriptor.bc.buffer->
copy_to_buffer_in_progress));
translog_lock_assert_owner();
LINT_INIT(current_page_fill); LINT_INIT(current_page_fill);
new_buff_beginning= log_descriptor.bc.buffer->offset; new_buff_beginning= log_descriptor.bc.buffer->offset;
new_buff_beginning+= log_descriptor.bc.buffer->size; /* increase offset */ new_buff_beginning+= log_descriptor.bc.buffer->size; /* increase offset */
...@@ -6869,7 +6986,20 @@ static void translog_force_current_buffer_to_finish() ...@@ -6869,7 +6986,20 @@ static void translog_force_current_buffer_to_finish()
} }
translog_buffer_lock(new_buffer); translog_buffer_lock(new_buffer);
#ifndef DBUG_OFF
{
TRANSLOG_ADDRESS offset= new_buffer->offset;
TRANSLOG_FILE *file= new_buffer->file;
uint8 ver= new_buffer->ver;
translog_lock_assert_owner();
#endif
translog_wait_for_buffer_free(new_buffer); translog_wait_for_buffer_free(new_buffer);
#ifndef DBUG_OFF
/* We keep the handler locked so nobody can start this new buffer */
DBUG_ASSERT(offset == new_buffer->offset && file == new_buffer->file &&
ver == new_buffer->ver);
}
#endif
write_counter= log_descriptor.bc.write_counter; write_counter= log_descriptor.bc.write_counter;
previous_offset= log_descriptor.bc.previous_offset; previous_offset= log_descriptor.bc.previous_offset;
...@@ -6900,7 +7030,27 @@ static void translog_force_current_buffer_to_finish() ...@@ -6900,7 +7030,27 @@ static void translog_force_current_buffer_to_finish()
pages by applying protection and copying the page content in the pages by applying protection and copying the page content in the
new buffer. new buffer.
*/ */
#ifndef DBUG_OFF
{
TRANSLOG_ADDRESS offset= old_buffer->offset;
TRANSLOG_FILE *file= old_buffer->file;
uint8 ver= old_buffer->ver;
#endif
/*
Now only one thread can flush log (buffer can flush many threads but
log flush is serialized) so no other thread can set is_closing_buffer
*/
DBUG_ASSERT(!old_buffer->is_closing_buffer);
old_buffer->is_closing_buffer= 1; /* Other flushes will wait */
DBUG_PRINT("enter", ("Buffer #%u 0x%lx is_closing_buffer set",
(uint) old_buffer->buffer_no, (ulong) old_buffer));
translog_wait_for_writers(old_buffer); translog_wait_for_writers(old_buffer);
#ifndef DBUG_OFF
/* We blocked flushing this buffer so the buffer should not changed */
DBUG_ASSERT(offset == old_buffer->offset && file == old_buffer->file &&
ver == old_buffer->ver);
}
#endif
if (log_descriptor.flags & TRANSLOG_SECTOR_PROTECTION) if (log_descriptor.flags & TRANSLOG_SECTOR_PROTECTION)
{ {
...@@ -6926,6 +7076,10 @@ static void translog_force_current_buffer_to_finish() ...@@ -6926,6 +7076,10 @@ static void translog_force_current_buffer_to_finish()
DBUG_PRINT("info", ("CRC: 0x%lx", (ulong) crc)); DBUG_PRINT("info", ("CRC: 0x%lx", (ulong) crc));
int4store(data + 3 + 3 + 1, crc); int4store(data + 3 + 3 + 1, crc);
} }
old_buffer->is_closing_buffer= 0;
DBUG_PRINT("enter", ("Buffer #%u 0x%lx is_closing_buffer cleared",
(uint) old_buffer->buffer_no, (ulong) old_buffer));
pthread_cond_broadcast(&old_buffer->waiting_filling_buffer);
if (left) if (left)
{ {
...@@ -7031,6 +7185,12 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn) ...@@ -7031,6 +7185,12 @@ my_bool translog_flush(TRANSLOG_ADDRESS lsn)
{ {
/* we made a circle */ /* we made a circle */
full_circle= 1; full_circle= 1;
/*
If buffer from which we started still current we have to
finish it (we will not flush intentionally more records
then was at the moment of start flushing);
*/
if (buffer_start == log_descriptor.bc.buffer_no)
translog_force_current_buffer_to_finish(); translog_force_current_buffer_to_finish();
} }
break; break;
......
...@@ -302,9 +302,9 @@ extern int translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner, ...@@ -302,9 +302,9 @@ extern int translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner,
extern LSN translog_get_file_max_lsn_stored(uint32 file); extern LSN translog_get_file_max_lsn_stored(uint32 file);
extern my_bool translog_purge(TRANSLOG_ADDRESS low); extern my_bool translog_purge(TRANSLOG_ADDRESS low);
extern my_bool translog_is_file(uint file_no); extern my_bool translog_is_file(uint file_no);
extern my_bool translog_lock(); extern void translog_lock();
extern my_bool translog_unlock(); extern void translog_unlock();
extern void translog_lock_assert_owner(); extern void translog_lock_handler_assert_owner();
extern TRANSLOG_ADDRESS translog_get_horizon(); extern TRANSLOG_ADDRESS translog_get_horizon();
extern TRANSLOG_ADDRESS translog_get_horizon_no_lock(); extern TRANSLOG_ADDRESS translog_get_horizon_no_lock();
extern int translog_assign_id_to_share(struct st_maria_handler *tbl_info, extern int translog_assign_id_to_share(struct st_maria_handler *tbl_info,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment