Commit 24c923d4 authored by Kristian Nielsen's avatar Kristian Nielsen

MDEV-31273: Refactor MYSQL_BIN_LOG::write_cache()

Preparatory patch for pre-computing binlog checksums outside of holding
LOCK_log.

The existing code for MYSQL_BIN_LOG::write_cache() was needlessly complex
and very hard to understand and modify for handling the new case where
pre-computed checksums are already present in the IO_CACHE.

Greatly simplify the logic by replacing the (implicit) state machine with
direct code that pulls the events one by one from the IO_CACHE. This removes
a lot of state flags and avoids duplicate code for handling full vs. split
headers.

This also removes the need for the CacheWriter class. As a bonus, this fixes
the bug that CacheWriter::write() was completely ignoring write errors. No
other functional changes are done with this patch, only code cleanup.
Reviewed-by: default avatarMonty <monty@mariadb.org>
Signed-off-by: default avatarKristian Nielsen <knielsen@knielsen-hq.org>
parent 8eee9806
......@@ -7713,40 +7713,6 @@ uint MYSQL_BIN_LOG::next_file_id()
return res;
}
class CacheWriter: public Log_event_writer
{
public:
size_t remains;
CacheWriter(THD *thd_arg, IO_CACHE *file_arg,
enum_binlog_checksum_alg checksum_alg,
Binlog_crypt_data *cr)
: Log_event_writer(file_arg, 0, checksum_alg, cr), remains(0), thd(thd_arg),
first(true)
{
}
~CacheWriter()
{ status_var_add(thd->status_var.binlog_bytes_written, bytes_written); }
int write(uchar* pos, size_t len)
{
DBUG_ENTER("CacheWriter::write");
if (first)
write_header(pos, len);
else
write_data(pos, len);
remains -= len;
if ((first= !remains))
write_footer();
DBUG_RETURN(0);
}
private:
THD *thd;
bool first;
};
static int binlog_online_alter_end_trans(THD *thd, bool all, bool commit)
{
DBUG_ENTER("binlog_online_alter_end_trans");
......@@ -7916,13 +7882,15 @@ int Event_log::write_cache(THD *thd, IO_CACHE *cache)
mysql_mutex_assert_owner(&LOCK_log);
if (reinit_io_cache(cache, READ_CACHE, 0, 0, 0))
DBUG_RETURN(ER_ERROR_ON_WRITE);
size_t length= my_b_bytes_in_cache(cache), group, carry, hdr_offs;
size_t val;
/* Amount of remaining bytes in the IO_CACHE read buffer. */
size_t group;
size_t end_log_pos_inc= 0; // each event processed adds BINLOG_CHECKSUM_LEN 2 t
uchar header[LOG_EVENT_HEADER_LEN];
CacheWriter writer(thd, get_log_file(),
(enum_binlog_checksum_alg)binlog_checksum_options,
&crypto);
uchar header_buf[LOG_EVENT_HEADER_LEN];
Log_event_writer writer(get_log_file(), 0,
(enum_binlog_checksum_alg)binlog_checksum_options,
&crypto);
uint checksum_len= writer.checksum_len;
int err= 0;
if (crypto.scheme)
{
......@@ -7948,128 +7916,73 @@ int Event_log::write_cache(THD *thd, IO_CACHE *cache)
*/
group= (size_t)my_b_tell(get_log_file());
hdr_offs= carry= 0;
do
for (;;)
{
/*
if we only got a partial header in the last iteration,
get the other half now and process a full header.
Empty cache at an event boundary means we are done (but empty cache
elsewhere is an error).
*/
if (unlikely(carry > 0))
{
DBUG_ASSERT(carry < LOG_EVENT_HEADER_LEN);
size_t tail= LOG_EVENT_HEADER_LEN - carry;
/* assemble both halves */
memcpy(&header[carry], (char *)cache->read_pos, tail);
uint32 len= uint4korr(header + EVENT_LEN_OFFSET);
writer.remains= len;
/* fix end_log_pos */
end_log_pos_inc += writer.checksum_len;
val= uint4korr(header + LOG_POS_OFFSET) + group + end_log_pos_inc;
int4store(header + LOG_POS_OFFSET, val);
/* fix len */
len+= writer.checksum_len;
int4store(header + EVENT_LEN_OFFSET, len);
if (writer.write(header, LOG_EVENT_HEADER_LEN))
DBUG_RETURN(ER_ERROR_ON_WRITE);
cache->read_pos+= tail;
length-= tail;
carry= 0;
/* next event header at ... */
hdr_offs= len - LOG_EVENT_HEADER_LEN - writer.checksum_len;
}
/* if there is anything to write, process it. */
if (my_b_tell(cache) == cache->end_of_file)
break;
if (likely(length > 0))
{
DBUG_EXECUTE_IF("fail_binlog_write_1",
errno= 28; DBUG_RETURN(ER_ERROR_ON_WRITE););
/*
process all event-headers in this (partial) cache.
if next header is beyond current read-buffer,
we'll get it later (though not necessarily in the
very next iteration, just "eventually").
*/
DBUG_EXECUTE_IF("fail_binlog_write_1",
{
errno= 28;
goto error_in_write;
});
if (hdr_offs >= length)
if (my_b_read(cache, header_buf, LOG_EVENT_HEADER_LEN))
goto error_in_read;
/* Adjust the length and end_log_pos appropriately. */
uint ev_len= uint4korr(&header_buf[EVENT_LEN_OFFSET]); // netto len
DBUG_ASSERT(ev_len >= LOG_EVENT_HEADER_LEN);
if (unlikely(ev_len < LOG_EVENT_HEADER_LEN))
goto error_in_read;
int4store(&header_buf[EVENT_LEN_OFFSET], ev_len + checksum_len);
end_log_pos_inc += checksum_len;
size_t val= uint4korr(&header_buf[LOG_POS_OFFSET]) + group + end_log_pos_inc;
int4store(&header_buf[LOG_POS_OFFSET], val);
/* Write the header to the binlog. */
if (writer.write_header(header_buf, LOG_EVENT_HEADER_LEN))
goto error_in_write;
ev_len-= LOG_EVENT_HEADER_LEN;
/* Write the rest of the event. */
size_t length= my_b_bytes_in_cache(cache);
while (ev_len > 0)
{
if (length == 0)
{
if (writer.write(cache->read_pos, length))
DBUG_RETURN(ER_ERROR_ON_WRITE);
if (!(length= my_b_fill(cache)))
goto error_in_read;
}
uint chunk= MY_MIN(ev_len, (uint)length);
if (writer.write_data(cache->read_pos, chunk))
goto error_in_write;
cache->read_pos+= chunk;
length-= chunk;
ev_len-= chunk;
}
while (hdr_offs < length)
{
/*
finish off with remains of the last event that crawls
from previous into the current buffer
*/
if (writer.remains != 0)
{
if (writer.write(cache->read_pos, hdr_offs))
DBUG_RETURN(ER_ERROR_ON_WRITE);
}
/*
partial header only? save what we can get, process once
we get the rest.
*/
if (hdr_offs + LOG_EVENT_HEADER_LEN > length)
{
carry= length - hdr_offs;
memcpy(header, (char *)cache->read_pos + hdr_offs, carry);
length= hdr_offs;
}
else
{
/* we've got a full event-header, and it came in one piece */
uchar *ev= (uchar *)cache->read_pos + hdr_offs;
uint ev_len= uint4korr(ev + EVENT_LEN_OFFSET); // netto len
uchar *log_pos= ev + LOG_POS_OFFSET;
end_log_pos_inc += writer.checksum_len;
/* fix end_log_pos */
val= uint4korr(log_pos) + group + end_log_pos_inc;
int4store(log_pos, val);
/* fix length */
int4store(ev + EVENT_LEN_OFFSET, ev_len + writer.checksum_len);
writer.remains= ev_len;
if (writer.write(ev, MY_MIN(ev_len, length - hdr_offs)))
DBUG_RETURN(ER_ERROR_ON_WRITE);
/* next event header at ... */
hdr_offs += ev_len; // incr by the netto len
if (writer.write_footer())
goto error_in_write;
DBUG_ASSERT(!writer.checksum_len || writer.remains == 0 || hdr_offs >= length);
}
}
}
goto end; // All OK
/*
Adjust hdr_offs. Note that it may still point beyond the segment
read in the next iteration; if the current event is very long,
it may take a couple of read-iterations (and subsequent adjustments
of hdr_offs) for it to point into the then-current segment.
If we have a split header (!carry), hdr_offs will be set at the
beginning of the next iteration, overwriting the value we set here:
*/
hdr_offs -= length;
}
} while ((length= my_b_fill(cache)));
error_in_write:
err= ER_ERROR_ON_WRITE;
goto end;
DBUG_ASSERT(carry == 0);
DBUG_ASSERT(!writer.checksum_len || writer.remains == 0);
error_in_read:
err= ER_ERROR_ON_READ;
goto end;
DBUG_RETURN(0); // All OK
end:
status_var_add(thd->status_var.binlog_bytes_written, writer.bytes_written);
DBUG_RETURN(err);
}
/*
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment