Commit e69becf1 authored by monty@narttu.mysql.fi's avatar monty@narttu.mysql.fi

changed to use IO_CACHE instead of FILE

parent 7a013339
monty@tik.mysql.com monty@narttu.mysql.fi
...@@ -3610,10 +3610,13 @@ similar system. In the worst case, we may require access to your system ...@@ -3610,10 +3610,13 @@ similar system. In the worst case, we may require access to your system
to be able to create a binary distribution. to be able to create a binary distribution.
@item @item
If you can provide accommodations and pay for traveler fares, you can even If you can provide accommodations and pay for traveler fares, you can
get a @strong{MySQL} developer to visit you and offer you help with your even get a @strong{MySQL} developer to visit you and offer you help with
troubles. Extended login support entitles you to one personal your troubles. Extended login support entitles you to one personal
encounter per year, but we are always very flexible towards our customers! encounter per year, but we are always very flexible towards our
customers! If the visit takes 16 hours or more, the first 8 hours is
without charge. For the hours above 8 hours, you will be charged with a
rate that is at least 20 % less than our standard rates.
@end itemize @end itemize
@node Installing, Compatibility, Licensing and Support, Top @node Installing, Compatibility, Licensing and Support, Top
...@@ -38367,6 +38370,9 @@ though, so 3.23 is not released as a stable version yet. ...@@ -38367,6 +38370,9 @@ though, so 3.23 is not released as a stable version yet.
@appendixsubsec Changes in release 3.23.28 @appendixsubsec Changes in release 3.23.28
@itemize @bullet @itemize @bullet
@item @item
Changed all log files to use our own IO_CACHE mechanism instead of
FILE:s to avoid OS problems when there is many files open.
@item
Added options @code{--open-files} and @code{--timezone} to @code{safe_mysqld}. Added options @code{--open-files} and @code{--timezone} to @code{safe_mysqld}.
@item @item
Fixed fatal bug in @code{CREATE TEMPORARY TABLE ...SELECT ...}. Fixed fatal bug in @code{CREATE TEMPORARY TABLE ...SELECT ...}.
...@@ -25,7 +25,7 @@ ...@@ -25,7 +25,7 @@
** * * ** * *
** ************************* ** *************************
*/ */
#define IMPORT_VERSION "2.5" #define IMPORT_VERSION "2.6"
#include <global.h> #include <global.h>
#include <my_sys.h> #include <my_sys.h>
...@@ -125,7 +125,7 @@ file. The SQL command 'LOAD DATA INFILE' is used to import the rows.\n"); ...@@ -125,7 +125,7 @@ file. The SQL command 'LOAD DATA INFILE' is used to import the rows.\n");
Give the column names in a comma separated list.\n\ Give the column names in a comma separated list.\n\
This is same as giving columns to LOAD DATA INFILE.\n\ This is same as giving columns to LOAD DATA INFILE.\n\
-C, --compress Use compression in server/client protocol\n\ -C, --compress Use compression in server/client protocol\n\
-d, --delete Deletes first all rows from table.\n\ -d, --delete First delete all rows from table.\n\
-f, --force Continue even if we get an sql-error.\n\ -f, --force Continue even if we get an sql-error.\n\
-h, --host=... Connect to host.\n\ -h, --host=... Connect to host.\n\
-i, --ignore If duplicate unique key was found, keep old row.\n\ -i, --ignore If duplicate unique key was found, keep old row.\n\
......
...@@ -95,17 +95,6 @@ bool String::realloc(uint32 alloc_length) ...@@ -95,17 +95,6 @@ bool String::realloc(uint32 alloc_length)
return FALSE; return FALSE;
} }
#ifdef NOT_NEEDED
bool String::set(long num)
{
if (alloc(14))
return TRUE;
str_length=(uint32) (int10_to_str(num,Ptr,-10)-Ptr);
return FALSE;
}
#endif
bool String::set(longlong num) bool String::set(longlong num)
{ {
if (alloc(21)) if (alloc(21))
...@@ -274,6 +263,7 @@ bool String::append(const char *s,uint32 arg_length) ...@@ -274,6 +263,7 @@ bool String::append(const char *s,uint32 arg_length)
return FALSE; return FALSE;
} }
#ifdef TO_BE_REMOVED
bool String::append(FILE* file, uint32 arg_length, myf my_flags) bool String::append(FILE* file, uint32 arg_length, myf my_flags)
{ {
if (realloc(str_length+arg_length)) if (realloc(str_length+arg_length))
...@@ -286,6 +276,20 @@ bool String::append(FILE* file, uint32 arg_length, myf my_flags) ...@@ -286,6 +276,20 @@ bool String::append(FILE* file, uint32 arg_length, myf my_flags)
str_length+=arg_length; str_length+=arg_length;
return FALSE; return FALSE;
} }
#endif
bool String::append(IO_CACHE* file, uint32 arg_length)
{
if (realloc(str_length+arg_length))
return TRUE;
if (my_b_read(file, (byte*) Ptr + str_length, arg_length))
{
shrink(str_length);
return TRUE;
}
str_length+=arg_length;
return FALSE;
}
uint32 String::numchars() uint32 String::numchars()
{ {
......
...@@ -100,16 +100,16 @@ public: ...@@ -100,16 +100,16 @@ public:
bool set(ulonglong num); bool set(ulonglong num);
bool set(double num,uint decimals=2); bool set(double num,uint decimals=2);
inline void free() inline void free()
{
if (alloced)
{ {
if (alloced) alloced=0;
{ Alloced_length=0;
alloced=0; my_free(Ptr,MYF(0));
Alloced_length=0; Ptr=0;
my_free(Ptr,MYF(0)); str_length=0; /* Safety */
Ptr=0;
}
} }
}
inline bool alloc(uint32 arg_length) inline bool alloc(uint32 arg_length)
{ {
if (arg_length < Alloced_length) if (arg_length < Alloced_length)
...@@ -152,7 +152,7 @@ public: ...@@ -152,7 +152,7 @@ public:
bool copy(const char *s,uint32 arg_length); // Allocate new string bool copy(const char *s,uint32 arg_length); // Allocate new string
bool append(const String &s); bool append(const String &s);
bool append(const char *s,uint32 arg_length=0); bool append(const char *s,uint32 arg_length=0);
bool append(FILE* file, uint32 arg_length, myf my_flags); bool append(IO_CACHE* file, uint32 arg_length);
int strstr(const String &search,uint32 offset=0); // Returns offset to substring or -1 int strstr(const String &search,uint32 offset=0); // Returns offset to substring or -1
int strrstr(const String &search,uint32 offset=0); // Returns offset to substring or -1 int strrstr(const String &search,uint32 offset=0); // Returns offset to substring or -1
bool replace(uint32 offset,uint32 arg_length,const String &to); bool replace(uint32 offset,uint32 arg_length,const String &to);
......
...@@ -59,6 +59,7 @@ extern int NEAR my_errno; /* Last error in mysys */ ...@@ -59,6 +59,7 @@ extern int NEAR my_errno; /* Last error in mysys */
#define MY_WME 16 /* Write message on error */ #define MY_WME 16 /* Write message on error */
#define MY_WAIT_IF_FULL 32 /* Wait and try again if disk full error */ #define MY_WAIT_IF_FULL 32 /* Wait and try again if disk full error */
#define MY_RAID 64 /* Support for RAID (not the "Johnson&Johnson"-s one ;) */ #define MY_RAID 64 /* Support for RAID (not the "Johnson&Johnson"-s one ;) */
#define MY_DONT_CHECK_FILESIZE 128 /* Option to init_io_cache() */
#define MY_LINK_WARNING 32 /* my_redel() gives warning if links */ #define MY_LINK_WARNING 32 /* my_redel() gives warning if links */
#define MY_COPYTIME 64 /* my_redel() copys time */ #define MY_COPYTIME 64 /* my_redel() copys time */
#define MY_HOLD_ORIGINAL_MODES 128 /* my_copy() holds to file modes */ #define MY_HOLD_ORIGINAL_MODES 128 /* my_copy() holds to file modes */
...@@ -505,6 +506,10 @@ extern int my_block_write(IO_CACHE *info, const byte *Buffer, ...@@ -505,6 +506,10 @@ extern int my_block_write(IO_CACHE *info, const byte *Buffer,
uint Count, my_off_t pos); uint Count, my_off_t pos);
extern int flush_io_cache(IO_CACHE *info); extern int flush_io_cache(IO_CACHE *info);
extern int end_io_cache(IO_CACHE *info); extern int end_io_cache(IO_CACHE *info);
extern uint my_b_fill(IO_CACHE *info);
extern void my_b_seek(IO_CACHE *info,my_off_t pos);
extern uint my_b_gets(IO_CACHE *info, char *to, uint max_length);
extern uint my_b_printf(IO_CACHE *info, const char* fmt, ...);
extern my_bool open_cached_file(IO_CACHE *cache,const char *dir, extern my_bool open_cached_file(IO_CACHE *cache,const char *dir,
const char *prefix, uint cache_size, const char *prefix, uint cache_size,
myf cache_myflags); myf cache_myflags);
......
No preview for this file type
...@@ -26,7 +26,7 @@ libmysys_a_SOURCES = my_init.c my_getwd.c mf_getdate.c\ ...@@ -26,7 +26,7 @@ libmysys_a_SOURCES = my_init.c my_getwd.c mf_getdate.c\
mf_path.c mf_loadpath.c\ mf_path.c mf_loadpath.c\
my_open.c my_create.c my_seek.c my_read.c \ my_open.c my_create.c my_seek.c my_read.c \
my_pread.c my_write.c \ my_pread.c my_write.c \
mf_reccache.c mf_keycache.c \ mf_keycache.c \
mf_iocache.c mf_cache.c mf_tempfile.c \ mf_iocache.c mf_cache.c mf_tempfile.c \
my_lock.c mf_brkhant.c my_alarm.c \ my_lock.c mf_brkhant.c my_alarm.c \
my_malloc.c my_realloc.c my_once.c mulalloc.c \ my_malloc.c my_realloc.c my_once.c mulalloc.c \
......
...@@ -74,7 +74,7 @@ my_bool open_cached_file(IO_CACHE *cache, const char* dir, const char *prefix, ...@@ -74,7 +74,7 @@ my_bool open_cached_file(IO_CACHE *cache, const char* dir, const char *prefix,
} }
my_free(cache->dir, MYF(MY_ALLOW_ZERO_PTR)); my_free(cache->dir, MYF(MY_ALLOW_ZERO_PTR));
my_free(cache->prefix,MYF(MY_ALLOW_ZERO_PTR)); my_free(cache->prefix,MYF(MY_ALLOW_ZERO_PTR));
DBUG_RETURN(0); DBUG_RETURN(1);
} }
/* Create the temporary file */ /* Create the temporary file */
...@@ -101,10 +101,12 @@ void close_cached_file(IO_CACHE *cache) ...@@ -101,10 +101,12 @@ void close_cached_file(IO_CACHE *cache)
DBUG_ENTER("close_cached_file"); DBUG_ENTER("close_cached_file");
if (my_b_inited(cache)) if (my_b_inited(cache))
{ {
File file=cache->file;
cache->file= -1; /* Don't flush data */
(void) end_io_cache(cache); (void) end_io_cache(cache);
if (cache->file >= 0) if (file >= 0)
{ {
(void) my_close(cache->file,MYF(0)); (void) my_close(file,MYF(0));
#ifdef CANT_DELETE_OPEN_FILES #ifdef CANT_DELETE_OPEN_FILES
if (cache->file_name) if (cache->file_name)
{ {
......
...@@ -22,10 +22,13 @@ ...@@ -22,10 +22,13 @@
(and get a EOF-error). (and get a EOF-error).
Possibly use of asyncronic io. Possibly use of asyncronic io.
macros for read and writes for faster io. macros for read and writes for faster io.
Used instead of FILE when reading or writing hole files. Used instead of FILE when reading or writing whole files.
This shall make mf_rec_cache obsolite. This will make mf_rec_cache obsolete.
One can change info->pos_in_file to a higer value to skipp bytes in file if One can change info->pos_in_file to a higher value to skip bytes in file if
also info->rc_pos is set to info->rc_end. also info->rc_pos is set to info->rc_end.
If called through open_cached_file(), then the temporary file will
only be created if a write exeeds the file buffer or if one calls
flush_io_cache().
*/ */
#define MAP_TO_USE_RAID #define MAP_TO_USE_RAID
...@@ -40,7 +43,7 @@ static void my_aiowait(my_aio_result *result); ...@@ -40,7 +43,7 @@ static void my_aiowait(my_aio_result *result);
/* /*
** if cachesize == 0 then use default cachesize (from s-file) ** if cachesize == 0 then use default cachesize (from s-file)
** if file == -1 then real_open_cached_file() will be called to ** if file == -1 then real_open_cached_file() will be called.
** returns 0 if ok ** returns 0 if ok
*/ */
...@@ -59,17 +62,24 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, ...@@ -59,17 +62,24 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2; min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
if (type == READ_CACHE) if (type == READ_CACHE)
{ /* Assume file isn't growing */ { /* Assume file isn't growing */
my_off_t file_pos,end_of_file; if (cache_myflags & MY_DONT_CHECK_FILESIZE)
if ((file_pos=my_tell(file,MYF(0)) == MY_FILEPOS_ERROR))
DBUG_RETURN(1);
end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
if (end_of_file < seek_offset)
end_of_file=seek_offset;
VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0)));
if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
{ {
cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1; cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
use_async_io=0; /* No nead to use async */ }
else
{
my_off_t file_pos,end_of_file;
if ((file_pos=my_tell(file,MYF(0)) == MY_FILEPOS_ERROR))
DBUG_RETURN(1);
end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
if (end_of_file < seek_offset)
end_of_file=seek_offset;
VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0)));
if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
{
cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1;
use_async_io=0; /* No nead to use async */
}
} }
} }
...@@ -545,7 +555,6 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count, ...@@ -545,7 +555,6 @@ int my_block_write(register IO_CACHE *info, const byte *Buffer, uint Count,
return error; return error;
} }
/* Flush write cache */ /* Flush write cache */
int flush_io_cache(IO_CACHE *info) int flush_io_cache(IO_CACHE *info)
...@@ -565,7 +574,9 @@ int flush_io_cache(IO_CACHE *info) ...@@ -565,7 +574,9 @@ int flush_io_cache(IO_CACHE *info)
length=(uint) (info->rc_pos - info->buffer); length=(uint) (info->rc_pos - info->buffer);
if (info->seek_not_done) if (info->seek_not_done)
{ /* File touched, do seek */ { /* File touched, do seek */
VOID(my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0))); if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)) ==
MY_FILEPOS_ERROR)
DBUG_RETURN((info->error= -1));
info->seek_not_done=0; info->seek_not_done=0;
} }
info->rc_pos=info->buffer; info->rc_pos=info->buffer;
......
...@@ -21,74 +21,49 @@ ...@@ -21,74 +21,49 @@
#include <stdarg.h> #include <stdarg.h>
#include <m_ctype.h> #include <m_ctype.h>
int my_vsnprintf(char *to, size_t n, const char* fmt, va_list ap)
int my_vsnprintf(char* str, size_t n, const char* fmt, va_list ap)
{ {
uint olen = 0, plen; char *start=to, *end=to+n-1;
const char *tpos;
reg1 char *endpos;
reg2 char * par;
char* ebuff = str;
endpos=ebuff;
tpos = fmt;
while (*tpos) for (; *fmt ; fmt++)
{ {
if (tpos[0] != '%') if (fmt[0] != '%')
{ {
if(olen + 1 >= n) if (to == end) /* End of buffer */
break; break;
*to++= *fmt; /* Copy ordinary char */
*endpos++= *tpos++; /* Copy ordinary char */
olen++;
continue; continue;
} }
if (*++tpos == '%') /* test if %% */ /* Skipp if max size is used (to be compatible with printf) */
{ while (isdigit(*fmt) || *fmt == '.' || *fmt == '-')
olen--; fmt++;
} if (*fmt == 's') /* String parameter */
else
{ {
/* Skipp if max size is used (to be compatible with printf) */ reg2 char *par = va_arg(ap, char *);
while (isdigit(*tpos) || *tpos == '.' || *tpos == '-') uint plen = (uint) strlen(par);
tpos++; if ((uint) (end-to) > plen) /* Replace if possible */
if (*tpos == 's') /* String parameter */
{
par = va_arg(ap, char *);
plen = (uint) strlen(par);
if (olen + plen < n) /* Replace if possible */
{
endpos=strmov(endpos,par);
tpos++;
olen+=plen;
continue;
}
}
else if (*tpos == 'd' || *tpos == 'u') /* Integer parameter */
{ {
register int iarg; to=strmov(to,par);
iarg = va_arg(ap, int); continue;
if(olen + 16 >= n) break;
if (*tpos == 'd')
plen= (uint) (int2str((long) iarg,endpos, -10) - endpos);
else
plen= (uint) (int2str((long) (uint) iarg,endpos,10)- endpos);
if (olen + plen < n) /* Replace parameter if possible */
{
endpos+=plen;
tpos++;
olen+=plen;
continue;
}
} }
} }
*endpos++='%'; /* % used as % or unknown code */ else if (*fmt == 'd' || *fmt == 'u') /* Integer parameter */
{
register int iarg;
if ((uint) (end-to) < 16)
break;
iarg = va_arg(ap, int);
if (*fmt == 'd')
to=int10_to_str((long) iarg,to, -10);
else
to=int10_to_str((long) (uint) iarg,to,10);
continue;
}
/* We come here on '%%', unknown code or too long parameter */
if (to == end)
break;
*to++='%'; /* % used as % or unknown code */
} }
*endpos='\0'; *to='\0'; /* End of errmessage */
/* End of errmessage */ return (uint) (to - start);
return olen;
} }
...@@ -72,17 +72,16 @@ static int find_uniq_filename(char *name) ...@@ -72,17 +72,16 @@ static int find_uniq_filename(char *name)
DBUG_RETURN(0); DBUG_RETURN(0);
} }
MYSQL_LOG::MYSQL_LOG(): last_time(0), query_start(0),
name(0), log_type(LOG_CLOSED),write_error(0),
MYSQL_LOG::MYSQL_LOG(): file(-1),index_file(-1),last_time(0),query_start(0), inited(0), opened(0), no_rotate(0)
name(0), log_type(LOG_CLOSED),write_error(0),inited(0),
no_rotate(0)
{ {
/* /*
We don't want to intialize LOCK_Log here as the thread system may We don't want to intialize LOCK_Log here as the thread system may
not have been initailized yet. We do it instead at 'open'. not have been initailized yet. We do it instead at 'open'.
*/ */
index_file_name[0] = 0; index_file_name[0] = 0;
bzero((char*) &log_file,sizeof(log_file));
} }
MYSQL_LOG::~MYSQL_LOG() MYSQL_LOG::~MYSQL_LOG()
...@@ -103,6 +102,7 @@ void MYSQL_LOG::set_index_file_name(const char* index_file_name) ...@@ -103,6 +102,7 @@ void MYSQL_LOG::set_index_file_name(const char* index_file_name)
this->index_file_name[0] = 0; this->index_file_name[0] = 0;
} }
int MYSQL_LOG::generate_new_name(char *new_name, const char *log_name) int MYSQL_LOG::generate_new_name(char *new_name, const char *log_name)
{ {
if (log_type == LOG_NORMAL) if (log_type == LOG_NORMAL)
...@@ -128,6 +128,9 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, ...@@ -128,6 +128,9 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
{ {
MY_STAT tmp_stat; MY_STAT tmp_stat;
char buff[512]; char buff[512];
File file= -1;
bool do_magic;
if (!inited) if (!inited)
{ {
inited=1; inited=1;
...@@ -136,29 +139,27 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, ...@@ -136,29 +139,27 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
if (log_type_arg == LOG_BIN && *fn_ext(log_name)) if (log_type_arg == LOG_BIN && *fn_ext(log_name))
no_rotate = 1; no_rotate = 1;
} }
log_type=log_type_arg; log_type=log_type_arg;
name=my_strdup(log_name,MYF(0)); if (!(name=my_strdup(log_name,MYF(MY_WME))))
goto err;
if (new_name) if (new_name)
strmov(log_file_name,new_name); strmov(log_file_name,new_name);
else if (generate_new_name(log_file_name, name)) else if (generate_new_name(log_file_name, name))
return; goto err;
if (log_type == LOG_BIN && !index_file_name[0]) if (log_type == LOG_BIN && !index_file_name[0])
fn_format(index_file_name, name, mysql_data_home, ".index", 6); fn_format(index_file_name, name, mysql_data_home, ".index", 6);
db[0]=0; db[0]=0;
bool do_magic = ((log_type == LOG_BIN) && !my_stat(log_file_name, do_magic = ((log_type == LOG_BIN) && !my_stat(log_file_name,
&tmp_stat, MYF(0))); &tmp_stat, MYF(0)));
if ((file=my_open(log_file_name,O_APPEND | O_WRONLY | O_BINARY, if ((file=my_open(log_file_name,O_APPEND | O_WRONLY | O_BINARY,
MYF(MY_WME | ME_WAITTANG)) < 0) MYF(MY_WME | ME_WAITTANG))) < 0 ||
{ init_io_cache(&log_file, file, IO_SIZE, WRITE_CACHE,
my_free(name,MYF(0)); my_tell(file,MYF(MY_WME)), 0, MYF(MY_WME | MY_NABP)))
name=0; goto err;
log_type=LOG_CLOSED;
return;
}
if (log_type == LOG_NORMAL) if (log_type == LOG_NORMAL)
{ {
...@@ -169,7 +170,9 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, ...@@ -169,7 +170,9 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
sprintf(buff, "%s, Version: %s, started with:\nTcp port: %d Unix socket: %s\n", my_progname,server_version,mysql_port,mysql_unix_port); sprintf(buff, "%s, Version: %s, started with:\nTcp port: %d Unix socket: %s\n", my_progname,server_version,mysql_port,mysql_unix_port);
#endif #endif
end=strmov(strend(buff),"Time Id Command Argument\n"); end=strmov(strend(buff),"Time Id Command Argument\n");
my_write(file,buff,(uint) (end-buff),MYF(0)); if (my_b_write(&log_file,buff,(uint) (end-buff)) ||
flush_io_cache(&log_file))
goto err;
} }
else if (log_type == LOG_NEW) else if (log_type == LOG_NEW)
{ {
...@@ -184,11 +187,12 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, ...@@ -184,11 +187,12 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
tm_tmp.tm_hour, tm_tmp.tm_hour,
tm_tmp.tm_min, tm_tmp.tm_min,
tm_tmp.tm_sec); tm_tmp.tm_sec);
my_write(file,buff,(uint) strlen(buff),MYF(0)); if (my_b_write(&log_file,buff,(uint) strlen(buff)) ||
flush_io_cache(&log_file))
goto err;
} }
else if (log_type == LOG_BIN) else if (log_type == LOG_BIN)
{ {
// Explanation of the boolean black magic: // Explanation of the boolean black magic:
// //
// if we are supposed to write magic number try write // if we are supposed to write magic number try write
...@@ -196,34 +200,36 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg, ...@@ -196,34 +200,36 @@ void MYSQL_LOG::open(const char *log_name, enum_log_type log_type_arg,
// then if index_file has not been previously opened, try to open it // then if index_file has not been previously opened, try to open it
// clean up if failed // clean up if failed
if ((do_magic && my_write(file, (byte*) BINLOG_MAGIC, 4, if ((do_magic && my_b_write(&log_file, (byte*) BINLOG_MAGIC, 4)) ||
MYF(MY_NABP|MY_WME)) ||
(index_file < 0 && (index_file < 0 &&
(index_file = my_fopen(index_file_name,O_APPEND | O_BINARY | O_RDWR, (index_file = my_open(index_file_name,O_APPEND | O_BINARY | O_RDWR,
MYF(MY_WME))) < 0))) MYF(MY_WME))) < 0))
{ goto err;
my_close(file,MYF(MY_WME));
my_free(name,MYF(0));
name=0;
file= -1;
log_type=LOG_CLOSED;
return;
}
Start_log_event s; Start_log_event s;
s.write(file); s.write(&log_file);
pthread_mutex_lock(&LOCK_index); pthread_mutex_lock(&LOCK_index);
my_seek(index_file, 0L, MY_SEEK_END, MYF(MY_WME));
my_write(index_file, log_file_name,strlen(log_file_name), MYF(0)); my_write(index_file, log_file_name,strlen(log_file_name), MYF(0));
my_write(index_file, "\n",1, MYF(0)); my_write(index_file, "\n",1, MYF(0));
pthread_mutex_unlock(&LOCK_index); pthread_mutex_unlock(&LOCK_index);
} }
return;
err:
if (file >= 0)
my_close(file,MYF(0));
end_io_cache(&log_file);
x_free(name); name=0;
log_type=LOG_CLOSED;
return;
} }
int MYSQL_LOG::get_current_log(LOG_INFO* linfo) int MYSQL_LOG::get_current_log(LOG_INFO* linfo)
{ {
pthread_mutex_lock(&LOCK_log); pthread_mutex_lock(&LOCK_log);
strmake(linfo->log_file_name, log_file_name, sizeof(linfo->log_file_name)); strmake(linfo->log_file_name, log_file_name, sizeof(linfo->log_file_name)-1);
linfo->pos = my_tell(file, MYF(MY_WME)); linfo->pos = my_b_tell(&log_file);
pthread_mutex_unlock(&LOCK_log); pthread_mutex_unlock(&LOCK_log);
return 0; return 0;
} }
...@@ -231,46 +237,46 @@ int MYSQL_LOG::get_current_log(LOG_INFO* linfo) ...@@ -231,46 +237,46 @@ int MYSQL_LOG::get_current_log(LOG_INFO* linfo)
// if log_name is "" we stop at the first entry // if log_name is "" we stop at the first entry
int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name) int MYSQL_LOG::find_first_log(LOG_INFO* linfo, const char* log_name)
{ {
// mutex needed because we need to make sure the file pointer does not move if (index_file < 0)
// from under our feet return LOG_INFO_INVALID;
if (index_file < 0) return LOG_INFO_INVALID;
int error = 0; int error = 0;
char* fname = linfo->log_file_name; char* fname = linfo->log_file_name;
int log_name_len = (uint) strlen(log_name); int log_name_len = (uint) strlen(log_name);
IO_CACHE io_cache;
// mutex needed because we need to make sure the file pointer does not move
// from under our feet
pthread_mutex_lock(&LOCK_index); pthread_mutex_lock(&LOCK_index);
if (my_seek(index_file, 0L, MY_SEEK_SET, MYF(MY_WME) ) == MY_FILEPOS_ERROR) if (init_io_cache(&io_cache, index_file, IO_SIZE, READ_CACHE, (my_off_t) 0,
0, MYF(MY_WME)))
{ {
error = LOG_INFO_SEEK; error = LOG_INFO_SEEK;
goto err; goto err;
} }
for(;;) for(;;)
{ {
if (!fgets(fname, FN_REFLEN, index_file)) uint length;
if (!(length=my_b_gets(&io_cache, fname, FN_REFLEN)))
{ {
error = feof(index_file) ? LOG_INFO_EOF : LOG_INFO_IO; error = !io_cache.error ? LOG_INFO_EOF : LOG_INFO_IO;
goto err; goto err;
} }
// if the log entry matches, empty string matching anything // if the log entry matches, empty string matching anything
if (!log_name_len || if (!log_name_len ||
(fname[log_name_len] == '\n' && (log_name_len == length+1 && fname[log_name_len] == '\n' &&
!memcmp(fname, log_name, log_name_len))) !memcmp(fname, log_name, log_name_len)))
{ {
if (log_name_len) fname[length-1]=0; // remove last \n
fname[log_name_len] = 0; // to kill \n linfo->index_file_offset = my_b_tell(&io_cache);
else
{
*(strend(fname) - 1) = 0;
}
linfo->index_file_offset = my_tell(index_file, MYF(MY_WME));
break; break;
} }
} }
error = 0; error = 0;
err: err:
pthread_mutex_unlock(&LOCK_index); pthread_mutex_unlock(&LOCK_index);
end_io_cache(&io_cache);
return error; return error;
} }
...@@ -283,27 +289,29 @@ int MYSQL_LOG::find_next_log(LOG_INFO* linfo) ...@@ -283,27 +289,29 @@ int MYSQL_LOG::find_next_log(LOG_INFO* linfo)
if (!index_file) return LOG_INFO_INVALID; if (!index_file) return LOG_INFO_INVALID;
int error = 0; int error = 0;
char* fname = linfo->log_file_name; char* fname = linfo->log_file_name;
char* end ; IO_CACHE io_cache;
uint length;
pthread_mutex_lock(&LOCK_index);
if (my_fseek(index_file, linfo->index_file_offset, MY_SEEK_SET, MYF(MY_WME) ) == MY_FILEPOS_ERROR)
{
error = LOG_INFO_SEEK;
goto err;
}
if (!fgets(fname, FN_REFLEN, index_file)) pthread_mutex_lock(&LOCK_index);
{ if (init_io_cache(&io_cache, index_file, IO_SIZE,
error = feof(index_file) ? LOG_INFO_EOF : LOG_INFO_IO; READ_CACHE, (my_off_t) linfo->index_file_offset, 0,
goto err; MYF(MY_WME)))
} {
error = LOG_INFO_SEEK;
end = strend(fname) - 1; goto err;
*end = 0; // kill /n }
linfo->index_file_offset = ftell(index_file); if (!(length=my_b_gets(&io_cache, fname, FN_REFLEN)))
{
error = !io_cache.error ? LOG_INFO_EOF : LOG_INFO_IO;
goto err;
}
fname[length-1]=0; // kill /n
linfo->index_file_offset = my_b_tell(&io_cache);
error = 0; error = 0;
err: err:
pthread_mutex_unlock(&LOCK_index); pthread_mutex_unlock(&LOCK_index);
end_io_cache(&io_cache);
return error; return error;
} }
...@@ -311,22 +319,18 @@ err: ...@@ -311,22 +319,18 @@ err:
// we assume that buf has at least FN_REFLEN bytes alloced // we assume that buf has at least FN_REFLEN bytes alloced
void MYSQL_LOG::make_log_name(char* buf, const char* log_ident) void MYSQL_LOG::make_log_name(char* buf, const char* log_ident)
{ {
buf[0] = 0; // In case of error
if (inited) if (inited)
{ {
int dir_len = dirname_length(log_file_name); int dir_len = dirname_length(log_file_name);
int ident_len = (uint) strlen(log_ident); int ident_len = (uint) strlen(log_ident);
if (dir_len + ident_len + 1 > FN_REFLEN) if (dir_len + ident_len + 1 > FN_REFLEN)
{ return; // protection agains malicious buffer overflow
buf[0] = 0;
return; // protection agains malicious buffer overflow
}
memcpy(buf, log_file_name, dir_len); memcpy(buf, log_file_name, dir_len);
memcpy(buf + dir_len, log_ident, ident_len + 1); // this takes care of \0 // copy filename + end null
// at the end memcpy(buf + dir_len, log_ident, ident_len + 1);
} }
else
buf[0] = 0;
} }
bool MYSQL_LOG::is_active(const char* log_file_name) bool MYSQL_LOG::is_active(const char* log_file_name)
...@@ -336,15 +340,17 @@ bool MYSQL_LOG::is_active(const char* log_file_name) ...@@ -336,15 +340,17 @@ bool MYSQL_LOG::is_active(const char* log_file_name)
void MYSQL_LOG::new_file() void MYSQL_LOG::new_file()
{ {
if (file) // only rotate open logs that are marked non-rotatable
// (binlog with constant name are non-rotatable)
if (is_open() && ! no_rotate)
{ {
if (no_rotate) // do not rotate logs that are marked non-rotatable
return; // ( for binlog with constant name)
char new_name[FN_REFLEN], *old_name=name; char new_name[FN_REFLEN], *old_name=name;
VOID(pthread_mutex_lock(&LOCK_log)); VOID(pthread_mutex_lock(&LOCK_log));
if (generate_new_name(new_name, name)) if (generate_new_name(new_name, name))
{
VOID(pthread_mutex_unlock(&LOCK_log));
return; // Something went wrong return; // Something went wrong
}
if (log_type == LOG_BIN) if (log_type == LOG_BIN)
{ {
/* /*
...@@ -352,15 +358,13 @@ void MYSQL_LOG::new_file() ...@@ -352,15 +358,13 @@ void MYSQL_LOG::new_file()
to change base names at some point. to change base names at some point.
*/ */
Rotate_log_event r(new_name+dirname_length(new_name)); Rotate_log_event r(new_name+dirname_length(new_name));
r.write(file); r.write(&log_file);
VOID(pthread_cond_broadcast(&COND_binlog_update)); VOID(pthread_cond_broadcast(&COND_binlog_update));
} }
name=0; name=0;
close(); close();
open(old_name, log_type, new_name); open(old_name, log_type, new_name);
my_free(old_name,MYF(0)); my_free(old_name,MYF(0));
if (!file) // Something went wrong
log_type=LOG_CLOSED;
last_time=query_start=0; last_time=query_start=0;
write_error=0; write_error=0;
VOID(pthread_mutex_unlock(&LOCK_log)); VOID(pthread_mutex_unlock(&LOCK_log));
...@@ -375,7 +379,10 @@ void MYSQL_LOG::write(THD *thd,enum enum_server_command command, ...@@ -375,7 +379,10 @@ void MYSQL_LOG::write(THD *thd,enum enum_server_command command,
{ {
va_list args; va_list args;
va_start(args,format); va_start(args,format);
char buff[32];
VOID(pthread_mutex_lock(&LOCK_log)); VOID(pthread_mutex_lock(&LOCK_log));
/* Test if someone closed after the is_open test */
if (log_type != LOG_CLOSED) if (log_type != LOG_CLOSED)
{ {
time_t skr; time_t skr;
...@@ -405,28 +412,30 @@ void MYSQL_LOG::write(THD *thd,enum enum_server_command command, ...@@ -405,28 +412,30 @@ void MYSQL_LOG::write(THD *thd,enum enum_server_command command,
struct tm *start; struct tm *start;
localtime_r(&skr,&tm_tmp); localtime_r(&skr,&tm_tmp);
start=&tm_tmp; start=&tm_tmp;
if (fprintf(file,"%02d%02d%02d %2d:%02d:%02d\t", /* Note that my_b_write() assumes it knows the length for this */
start->tm_year % 100, sprintf(buff,"%02d%02d%02d %2d:%02d:%02d\t",
start->tm_mon+1, start->tm_year % 100,
start->tm_mday, start->tm_mon+1,
start->tm_hour, start->tm_mday,
start->tm_min, start->tm_hour,
start->tm_sec) < 0) start->tm_min,
start->tm_sec);
if (my_b_write(&log_file,buff,16))
error=errno; error=errno;
} }
else if (fputs("\t\t",file) < 0) else if (my_b_write(&log_file,"\t\t",2) < 0)
error=errno; error=errno;
if (fprintf(file,"%7ld %-10.10s", sprintf(buff,"%7ld %-10.10s", id,command_name[(uint) command]);
id,command_name[(uint) command]) < 0) if (my_b_write(&log_file,buff,strlen(buff)))
error=errno; error=errno;
if (format) if (format)
{ {
if (fputc(' ',file) < 0 || vfprintf(file,format,args) < 0) if (my_b_write(&log_file," ",1) ||
my_b_printf(&log_file,format,args) == (uint) -1)
error=errno; error=errno;
} }
if (fputc('\n',file) < 0) if (my_b_write(&log_file,"\n",1) ||
error=errno; flush_io_cache(&log_file))
if (fflush(file) < 0)
error=errno; error=errno;
if (error && ! write_error) if (error && ! write_error)
{ {
...@@ -446,7 +455,7 @@ void MYSQL_LOG::write(Query_log_event* event_info) ...@@ -446,7 +455,7 @@ void MYSQL_LOG::write(Query_log_event* event_info)
if (is_open()) if (is_open())
{ {
VOID(pthread_mutex_lock(&LOCK_log)); VOID(pthread_mutex_lock(&LOCK_log));
if (file) if (is_open())
{ {
THD *thd=event_info->thd; THD *thd=event_info->thd;
if ((!(thd->options & OPTION_BIN_LOG) && if ((!(thd->options & OPTION_BIN_LOG) &&
...@@ -460,43 +469,38 @@ void MYSQL_LOG::write(Query_log_event* event_info) ...@@ -460,43 +469,38 @@ void MYSQL_LOG::write(Query_log_event* event_info)
if (thd->last_insert_id_used) if (thd->last_insert_id_used)
{ {
Intvar_log_event e((uchar)LAST_INSERT_ID_EVENT, thd->last_insert_id); Intvar_log_event e((uchar)LAST_INSERT_ID_EVENT, thd->last_insert_id);
if (e.write(file)) if (e.write(&log_file))
{ {
sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno); sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
goto err; goto err;
} }
} }
if (thd->insert_id_used) if (thd->insert_id_used)
{ {
Intvar_log_event e((uchar)INSERT_ID_EVENT, thd->last_insert_id); Intvar_log_event e((uchar)INSERT_ID_EVENT, thd->last_insert_id);
if (e.write(file)) if (e.write(&log_file))
{ {
sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno); sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
goto err; goto err;
} }
} }
if (thd->convert_set) if (thd->convert_set)
{
char buf[1024] = "SET CHARACTER SET ";
char* p = strend(buf);
p = strmov(p, thd->convert_set->name);
int save_query_length = thd->query_length;
// just in case somebody wants it later
thd->query_length = (uint)(p - buf);
Query_log_event e(thd, buf);
if (e.write(&log_file))
{ {
char buf[1024] = "SET CHARACTER SET "; sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
char* p = strend(buf); goto err;
p = strmov(p, thd->convert_set->name);
int save_query_length = thd->query_length;
// just in case somebody wants it later
thd->query_length = (uint)(p - buf);
Query_log_event e(thd, buf);
if (e.write(file))
{
sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
goto err;
}
thd->query_length = save_query_length; // clean up
} }
thd->query_length = save_query_length; // clean up
if (event_info->write(file)) }
if (event_info->write(&log_file) || flush_io_cache(&log_file))
{ {
sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno); sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
} }
...@@ -512,13 +516,13 @@ void MYSQL_LOG::write(Load_log_event* event_info) ...@@ -512,13 +516,13 @@ void MYSQL_LOG::write(Load_log_event* event_info)
if (is_open()) if (is_open())
{ {
VOID(pthread_mutex_lock(&LOCK_log)); VOID(pthread_mutex_lock(&LOCK_log));
if (file) if (is_open())
{ {
THD *thd=event_info->thd; THD *thd=event_info->thd;
if ((thd->options & OPTION_BIN_LOG) || if ((thd->options & OPTION_BIN_LOG) ||
!(thd->master_access & PROCESS_ACL)) !(thd->master_access & PROCESS_ACL))
{ {
if (event_info->write(file)) if (event_info->write(&log_file) || flush_io_cache(&log_file))
sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno); sql_print_error(ER(ER_ERROR_ON_WRITE), name, errno);
VOID(pthread_cond_broadcast(&COND_binlog_update)); VOID(pthread_cond_broadcast(&COND_binlog_update));
} }
...@@ -537,7 +541,7 @@ void MYSQL_LOG::write(THD *thd,const char *query, uint query_length, ...@@ -537,7 +541,7 @@ void MYSQL_LOG::write(THD *thd,const char *query, uint query_length,
{ {
time_t current_time; time_t current_time;
VOID(pthread_mutex_lock(&LOCK_log)); VOID(pthread_mutex_lock(&LOCK_log));
if (file) if (is_open())
{ // Safety agains reopen { // Safety agains reopen
int error=0; int error=0;
char buff[80],*end; char buff[80],*end;
...@@ -556,37 +560,42 @@ void MYSQL_LOG::write(THD *thd,const char *query, uint query_length, ...@@ -556,37 +560,42 @@ void MYSQL_LOG::write(THD *thd,const char *query, uint query_length,
last_time=current_time; last_time=current_time;
struct tm tm_tmp; struct tm tm_tmp;
struct tm *start; struct tm *start;
char buff[32];
localtime_r(&current_time,&tm_tmp); localtime_r(&current_time,&tm_tmp);
start=&tm_tmp; start=&tm_tmp;
if (fprintf(file,"# Time: %02d%02d%02d %2d:%02d:%02d\n", /* Note that my_b_write() assumes it knows the length for this */
start->tm_year % 100, sprintf(buff,"# Time: %02d%02d%02d %2d:%02d:%02d\n",
start->tm_mon+1, start->tm_year % 100,
start->tm_mday, start->tm_mon+1,
start->tm_hour, start->tm_mday,
start->tm_min, start->tm_hour,
start->tm_sec) < 0) start->tm_min,
start->tm_sec);
if (my_b_write(&log_file,buff,24))
error=errno; error=errno;
} }
if (fprintf(file, "# User@Host: %s [%s] @ %s [%s]\n", if (my_b_printf(&log_file, "# User@Host: %s [%s] @ %s [%s]\n",
thd->priv_user, thd->priv_user,
thd->user, thd->user,
thd->host ? thd->host : "", thd->host ? thd->host : "",
thd->ip ? thd->ip : "") < 0) thd->ip ? thd->ip : ""))
error=errno;; error=errno;
} }
if (query_start) if (query_start)
{ {
/* For slow query log */ /* For slow query log */
if (!(specialflag & SPECIAL_LONG_LOG_FORMAT)) if (!(specialflag & SPECIAL_LONG_LOG_FORMAT))
current_time=time(NULL); current_time=time(NULL);
fprintf(file,"# Time: %lu Lock_time: %lu Rows_sent: %lu\n", if (my_b_printf(&log_file,
(ulong) (current_time - query_start), "# Time: %lu Lock_time: %lu Rows_sent: %lu\n",
(ulong) (thd->time_after_lock - query_start), (ulong) (current_time - query_start),
(ulong) thd->sent_row_count); (ulong) (thd->time_after_lock - query_start),
(ulong) thd->sent_row_count))
error=errno;
} }
if (thd->db && strcmp(thd->db,db)) if (thd->db && strcmp(thd->db,db))
{ // Database changed { // Database changed
if (fprintf(file,"use %s;\n",thd->db) < 0) if (my_b_printf(&log_file,"use %s;\n",thd->db))
error=errno; error=errno;
strmov(db,thd->db); strmov(db,thd->db);
} }
...@@ -618,7 +627,8 @@ void MYSQL_LOG::write(THD *thd,const char *query, uint query_length, ...@@ -618,7 +627,8 @@ void MYSQL_LOG::write(THD *thd,const char *query, uint query_length,
*end++=';'; *end++=';';
*end++='\n'; *end++='\n';
*end=0; *end=0;
if (fputs("SET ",file) < 0 || fputs(buff+1,file) < 0) if (my_b_write(&log_file,"SET ",4) ||
my_b_write(&log_file,buff+1,(uint) (end-buff)-1))
error=errno; error=errno;
} }
if (!query) if (!query)
...@@ -626,10 +636,9 @@ void MYSQL_LOG::write(THD *thd,const char *query, uint query_length, ...@@ -626,10 +636,9 @@ void MYSQL_LOG::write(THD *thd,const char *query, uint query_length,
query="#adminstrator command"; query="#adminstrator command";
query_length=21; query_length=21;
} }
if (my_fwrite(file,(byte*) query,query_length,MYF(MY_NABP)) || if (my_b_write(&log_file,(byte*) query,query_length) ||
fputc(';',file) < 0 || fputc('\n',file) < 0) my_b_write(&log_file,";\n",2) ||
error=errno; flush_io_cache(&log_file))
if (fflush(file) < 0)
error=errno; error=errno;
if (error && ! write_error) if (error && ! write_error)
{ {
...@@ -641,51 +650,48 @@ void MYSQL_LOG::write(THD *thd,const char *query, uint query_length, ...@@ -641,51 +650,48 @@ void MYSQL_LOG::write(THD *thd,const char *query, uint query_length,
} }
} }
#ifdef TO_BE_REMOVED
void MYSQL_LOG::flush() void MYSQL_LOG::flush()
{ {
if (file) if (is_open())
if (fflush(file) < 0 && ! write_error) if (flush_io_cache(log_file) && ! write_error)
{ {
write_error=1; write_error=1;
sql_print_error(ER(ER_ERROR_ON_WRITE),name,errno); sql_print_error(ER(ER_ERROR_ON_WRITE),name,errno);
} }
} }
#endif
void MYSQL_LOG::close(bool exiting) void MYSQL_LOG::close(bool exiting)
{ // One can't set log_type here! { // One can't set log_type here!
if (file) if (is_open())
{ {
File file=log_file.file;
if (log_type == LOG_BIN) if (log_type == LOG_BIN)
{ {
Stop_log_event s; Stop_log_event s;
s.write(file); s.write(&log_file);
VOID(pthread_cond_broadcast(&COND_binlog_update)); VOID(pthread_cond_broadcast(&COND_binlog_update));
} }
if (my_fclose(file,MYF(0)) < 0 && ! write_error) end_io_cache(&log_file);
if (my_close(file,MYF(0)) < 0 && ! write_error)
{ {
write_error=1; write_error=1;
sql_print_error(ER(ER_ERROR_ON_WRITE),name,errno); sql_print_error(ER(ER_ERROR_ON_WRITE),name,errno);
} }
file=0; log_type=LOG_CLOSED;
}
if (name)
{
my_free(name,MYF(0));
name=0;
} }
if (exiting && index_file >= 0)
if (exiting && index_file)
{ {
if (my_fclose(index_file,MYF(0)) < 0 && ! write_error) if (my_close(index_file,MYF(0)) < 0 && ! write_error)
{ {
write_error=1; write_error=1;
sql_print_error(ER(ER_ERROR_ON_WRITE),name,errno); sql_print_error(ER(ER_ERROR_ON_WRITE),name,errno);
} }
index_file=0; index_file=0;
} }
safeFree(name);
} }
......
...@@ -26,84 +26,78 @@ ...@@ -26,84 +26,78 @@
static void pretty_print_char(FILE* file, int c) static void pretty_print_char(FILE* file, int c)
{ {
fputc('\'', file); fputc('\'', file);
switch(c) switch(c) {
{ case '\n': fprintf(file, "\\n"); break;
case '\n': fprintf(file, "\\n"); break; case '\r': fprintf(file, "\\r"); break;
case '\r': fprintf(file, "\\r"); break; case '\\': fprintf(file, "\\\\"); break;
case '\\': fprintf(file, "\\\\"); break; case '\b': fprintf(file, "\\b"); break;
case '\b': fprintf(file, "\\b"); break; case '\'': fprintf(file, "\\'"); break;
case '\'': fprintf(file, "\\'"); break; case 0 : fprintf(file, "\\0"); break;
case 0 : fprintf(file, "\\0"); break; default:
default: fputc(c, file);
fputc(c, file); break;
break; }
} fputc('\'', file);
fputc( '\'', file);
} }
int Query_log_event::write(FILE* file) int Query_log_event::write(IO_CACHE* file)
{ {
return query ? Log_event::write(file) : -1; return query ? Log_event::write(file) : -1;
} }
int Log_event::write(FILE* file) int Log_event::write(IO_CACHE* file)
{ {
if (write_header(file) return (write_header(file) || write_data(file)) ? -1 : 0;
|| write_data(file) || fflush(file)) return -1;
return 0;
} }
int Log_event::write_header(FILE* file) int Log_event::write_header(IO_CACHE* file)
{ {
char buf[LOG_EVENT_HEADER_LEN];
// make sure to change this when the header gets bigger // make sure to change this when the header gets bigger
char buf[LOG_EVENT_HEADER_LEN];
char* pos = buf; char* pos = buf;
int4store(pos, when); // timestamp int4store(pos, when); // timestamp
pos += 4; pos += 4;
*pos++ = get_type_code(); // event type code *pos++ = get_type_code(); // event type code
int4store(pos, server_id); int4store(pos, server_id);
pos += 4; pos += 4;
int4store(pos, get_data_size() + LOG_EVENT_HEADER_LEN); long tmp=get_data_size() + LOG_EVENT_HEADER_LEN;
int4store(pos, tmp);
pos += 4; pos += 4;
return (my_fwrite(file, (byte*) buf, (uint) (pos - buf), return (my_b_write(file, (byte*) buf, (uint) (pos - buf)));
MYF(MY_NABP | MY_WME)));
} }
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
int Log_event::read_log_event(FILE* file, String* packet, int Log_event::read_log_event(IO_CACHE* file, String* packet,
pthread_mutex_t* log_lock) pthread_mutex_t* log_lock)
{ {
ulong data_len; ulong data_len;
char buf[LOG_EVENT_HEADER_LEN]; char buf[LOG_EVENT_HEADER_LEN];
if(log_lock) if (log_lock)
pthread_mutex_lock(log_lock); pthread_mutex_lock(log_lock);
if (my_fread(file, (byte*)buf, sizeof(buf), MYF(MY_NABP))) if (my_b_read(file, (byte*) buf, sizeof(buf)))
{ {
if(log_lock) pthread_mutex_unlock(log_lock); if (log_lock) pthread_mutex_unlock(log_lock);
return feof(file) ? LOG_READ_EOF: LOG_READ_IO; return file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO;
} }
data_len = uint4korr(buf + EVENT_LEN_OFFSET); data_len = uint4korr(buf + EVENT_LEN_OFFSET);
if (data_len < LOG_EVENT_HEADER_LEN || data_len > MAX_EVENT_LEN) if (data_len < LOG_EVENT_HEADER_LEN || data_len > MAX_EVENT_LEN)
{ {
if(log_lock) pthread_mutex_unlock(log_lock); if (log_lock) pthread_mutex_unlock(log_lock);
return LOG_READ_BOGUS; return LOG_READ_BOGUS;
} }
packet->append(buf, sizeof(buf)); packet->append(buf, sizeof(buf));
data_len -= LOG_EVENT_HEADER_LEN; data_len -= LOG_EVENT_HEADER_LEN;
if (!data_len) if (data_len)
{ {
if(log_lock) pthread_mutex_unlock(log_lock); if (packet->append(file, data_len))
return 0; // the event does not have a data section
}
if (packet->append(file, data_len, MYF(MY_WME|MY_NABP)))
{ {
if(log_lock) if(log_lock)
pthread_mutex_unlock(log_lock); pthread_mutex_unlock(log_lock);
return feof(file) ? LOG_READ_TRUNC: LOG_READ_IO; return file->error >= 0 ? LOG_READ_TRUNC: LOG_READ_IO;
} }
}
if(log_lock) pthread_mutex_unlock(log_lock); if (log_lock) pthread_mutex_unlock(log_lock);
return 0; return 0;
} }
...@@ -111,18 +105,18 @@ int Log_event::read_log_event(FILE* file, String* packet, ...@@ -111,18 +105,18 @@ int Log_event::read_log_event(FILE* file, String* packet,
// allocates memory - the caller is responsible for clean-up // allocates memory - the caller is responsible for clean-up
Log_event* Log_event::read_log_event(FILE* file, pthread_mutex_t* log_lock) Log_event* Log_event::read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock)
{ {
time_t timestamp; time_t timestamp;
uint32 server_id; uint32 server_id;
char buf[LOG_EVENT_HEADER_LEN-4]; char buf[LOG_EVENT_HEADER_LEN-4];
if(log_lock) pthread_mutex_lock(log_lock); if(log_lock) pthread_mutex_lock(log_lock);
if (my_fread(file, (byte *) buf, sizeof(buf), MY_NABP)) if (my_b_read(file, (byte *) buf, sizeof(buf)))
{ {
if(log_lock) pthread_mutex_unlock(log_lock); if (log_lock) pthread_mutex_unlock(log_lock);
return NULL; return NULL;
} }
timestamp = uint4korr(buf); timestamp = uint4korr(buf);
server_id = uint4korr(buf + 5); server_id = uint4korr(buf + 5);
...@@ -132,13 +126,11 @@ Log_event* Log_event::read_log_event(FILE* file, pthread_mutex_t* log_lock) ...@@ -132,13 +126,11 @@ Log_event* Log_event::read_log_event(FILE* file, pthread_mutex_t* log_lock)
{ {
Query_log_event* q = new Query_log_event(file, timestamp, server_id); Query_log_event* q = new Query_log_event(file, timestamp, server_id);
if(log_lock) pthread_mutex_unlock(log_lock); if(log_lock) pthread_mutex_unlock(log_lock);
if (!q->query) if (!q->query)
{ {
delete q; delete q;
return NULL; q=NULL;
} }
return q; return q;
} }
...@@ -146,13 +138,11 @@ Log_event* Log_event::read_log_event(FILE* file, pthread_mutex_t* log_lock) ...@@ -146,13 +138,11 @@ Log_event* Log_event::read_log_event(FILE* file, pthread_mutex_t* log_lock)
{ {
Load_log_event* l = new Load_log_event(file, timestamp, server_id); Load_log_event* l = new Load_log_event(file, timestamp, server_id);
if(log_lock) pthread_mutex_unlock(log_lock); if(log_lock) pthread_mutex_unlock(log_lock);
if (!l->table_name) if (!l->table_name)
{ {
delete l; delete l;
return NULL; l=NULL;
} }
return l; return l;
} }
...@@ -165,9 +155,8 @@ Log_event* Log_event::read_log_event(FILE* file, pthread_mutex_t* log_lock) ...@@ -165,9 +155,8 @@ Log_event* Log_event::read_log_event(FILE* file, pthread_mutex_t* log_lock)
if (!r->new_log_ident) if (!r->new_log_ident)
{ {
delete r; delete r;
return NULL; r=NULL;
} }
return r; return r;
} }
...@@ -179,9 +168,8 @@ Log_event* Log_event::read_log_event(FILE* file, pthread_mutex_t* log_lock) ...@@ -179,9 +168,8 @@ Log_event* Log_event::read_log_event(FILE* file, pthread_mutex_t* log_lock)
if (e->type == INVALID_INT_EVENT) if (e->type == INVALID_INT_EVENT)
{ {
delete e; delete e;
return NULL; e=NULL;
} }
return e; return e;
} }
...@@ -198,12 +186,11 @@ Log_event* Log_event::read_log_event(FILE* file, pthread_mutex_t* log_lock) ...@@ -198,12 +186,11 @@ Log_event* Log_event::read_log_event(FILE* file, pthread_mutex_t* log_lock)
return e; return e;
} }
default: default:
if(log_lock) pthread_mutex_unlock(log_lock); break;
return NULL;
} }
//impossible // default
if(log_lock) pthread_mutex_unlock(log_lock); if (log_lock) pthread_mutex_unlock(log_lock);
return NULL; return NULL;
} }
...@@ -320,26 +307,24 @@ void Rotate_log_event::print(FILE* file, bool short_form) ...@@ -320,26 +307,24 @@ void Rotate_log_event::print(FILE* file, bool short_form)
fflush(file); fflush(file);
} }
Rotate_log_event::Rotate_log_event(FILE* file, time_t when_arg, Rotate_log_event::Rotate_log_event(IO_CACHE* file, time_t when_arg,
uint32 server_id): uint32 server_id):
Log_event(when_arg, 0, 0, server_id),new_log_ident(NULL),alloced(0) Log_event(when_arg, 0, 0, server_id),new_log_ident(NULL),alloced(0)
{ {
char *tmp_ident; char *tmp_ident;
char buf[4]; char buf[4];
if (my_fread(file, (byte*) buf, sizeof(buf), MYF(MY_NABP | MY_WME))) if (my_b_read(file, (byte*) buf, sizeof(buf)))
return; return;
ulong event_len; ulong event_len;
event_len = uint4korr(buf); event_len = uint4korr(buf);
if(event_len < ROTATE_EVENT_OVERHEAD) if (event_len < ROTATE_EVENT_OVERHEAD)
return; return;
ident_len = (uchar)(event_len - ROTATE_EVENT_OVERHEAD); ident_len = (uchar)(event_len - ROTATE_EVENT_OVERHEAD);
if (!(tmp_ident = (char*) my_malloc((uint)ident_len, MYF(MY_WME)))) if (!(tmp_ident = (char*) my_malloc((uint)ident_len, MYF(MY_WME))))
return; return;
if (my_fread( file, (byte*) tmp_ident, (uint)ident_len, MYF(MY_NABP | MY_WME))) if (my_b_read( file, (byte*) tmp_ident, (uint) ident_len))
{ {
my_free((gptr) tmp_ident, MYF(0)); my_free((gptr) tmp_ident, MYF(0));
return; return;
...@@ -373,21 +358,18 @@ Rotate_log_event::Rotate_log_event(const char* buf, int max_buf): ...@@ -373,21 +358,18 @@ Rotate_log_event::Rotate_log_event(const char* buf, int max_buf):
alloced = 1; alloced = 1;
} }
int Rotate_log_event::write_data(FILE* file) int Rotate_log_event::write_data(IO_CACHE* file)
{ {
if (my_fwrite(file, (byte*) new_log_ident, (uint) ident_len, return my_b_write(file, (byte*) new_log_ident, (uint) ident_len) ? -1 :0;
MYF(MY_NABP | MY_WME)))
return -1;
return 0;
} }
Query_log_event::Query_log_event(FILE* file, time_t when_arg, Query_log_event::Query_log_event(IO_CACHE* file, time_t when_arg,
uint32 server_id): uint32 server_id):
Log_event(when_arg,0,0,server_id),data_buf(0),query(NULL),db(NULL) Log_event(when_arg,0,0,server_id),data_buf(0),query(NULL),db(NULL)
{ {
char buf[QUERY_HEADER_LEN + 4]; char buf[QUERY_HEADER_LEN + 4];
ulong data_len; ulong data_len;
if (my_fread(file, (byte*) buf, sizeof(buf), MYF(MY_NABP | MY_WME))) if (my_b_read(file, (byte*) buf, sizeof(buf)))
return; // query == NULL will tell the return; // query == NULL will tell the
// caller there was a problem // caller there was a problem
data_len = uint4korr(buf); data_len = uint4korr(buf);
...@@ -399,9 +381,10 @@ Query_log_event::Query_log_event(FILE* file, time_t when_arg, ...@@ -399,9 +381,10 @@ Query_log_event::Query_log_event(FILE* file, time_t when_arg,
db_len = (uint)buf[12]; db_len = (uint)buf[12];
error_code = uint2korr(buf + 13); error_code = uint2korr(buf + 13);
/* Allocate one byte extra for end \0 */
if (!(data_buf = (char*) my_malloc(data_len+1, MYF(MY_WME)))) if (!(data_buf = (char*) my_malloc(data_len+1, MYF(MY_WME))))
return; return;
if (my_fread( file, (byte*) data_buf, data_len, MYF(MY_NABP | MY_WME))) if (my_b_read( file, (byte*) data_buf, data_len))
{ {
my_free((gptr) data_buf, MYF(0)); my_free((gptr) data_buf, MYF(0));
data_buf = 0; data_buf = 0;
...@@ -412,7 +395,7 @@ Query_log_event::Query_log_event(FILE* file, time_t when_arg, ...@@ -412,7 +395,7 @@ Query_log_event::Query_log_event(FILE* file, time_t when_arg,
db = data_buf; db = data_buf;
query=data_buf + db_len + 1; query=data_buf + db_len + 1;
q_len = data_len - 1 - db_len; q_len = data_len - 1 - db_len;
*((char*)query + q_len) = 0; *((char*) query + q_len) = 0; // Safety
} }
Query_log_event::Query_log_event(const char* buf, int max_buf): Query_log_event::Query_log_event(const char* buf, int max_buf):
...@@ -428,7 +411,7 @@ Query_log_event::Query_log_event(const char* buf, int max_buf): ...@@ -428,7 +411,7 @@ Query_log_event::Query_log_event(const char* buf, int max_buf):
exec_time = uint4korr(buf + 8); exec_time = uint4korr(buf + 8);
error_code = uint2korr(buf + 13); error_code = uint2korr(buf + 13);
if (!(data_buf = (char*) my_malloc( data_len + 1, MYF(MY_WME)))) if (!(data_buf = (char*) my_malloc(data_len + 1, MYF(MY_WME))))
return; return;
memcpy(data_buf, buf + QUERY_HEADER_LEN + 4, data_len); memcpy(data_buf, buf + QUERY_HEADER_LEN + 4, data_len);
...@@ -455,9 +438,9 @@ void Query_log_event::print(FILE* file, bool short_form) ...@@ -455,9 +438,9 @@ void Query_log_event::print(FILE* file, bool short_form)
fprintf(file, ";\n"); fprintf(file, ";\n");
} }
int Query_log_event::write_data(FILE* file) int Query_log_event::write_data(IO_CACHE* file)
{ {
if(!query) return -1; if (!query) return -1;
char buf[QUERY_HEADER_LEN]; char buf[QUERY_HEADER_LEN];
char* pos = buf; char* pos = buf;
...@@ -469,23 +452,21 @@ int Query_log_event::write_data(FILE* file) ...@@ -469,23 +452,21 @@ int Query_log_event::write_data(FILE* file)
int2store(pos, error_code); int2store(pos, error_code);
pos += 2; pos += 2;
if (my_fwrite(file, (byte*) buf, (uint)(pos - buf), MYF(MY_NABP | MY_WME)) || return (my_b_write(file, (byte*) buf, (uint)(pos - buf)) ||
my_fwrite(file, (db) ? (byte*) db : (byte*)"", my_b_write(file, (db) ? (byte*) db : (byte*)"", db_len + 1) ||
db_len + 1, MYF(MY_NABP | MY_WME)) || my_b_write(file, (byte*) query, q_len)) ? -1 : 0;
my_fwrite(file, (byte*) query, q_len, MYF(MY_NABP | MY_WME)))
return -1;
return 0;
} }
Intvar_log_event:: Intvar_log_event(FILE* file, time_t when_arg, Intvar_log_event:: Intvar_log_event(IO_CACHE* file, time_t when_arg,
uint32 server_id) uint32 server_id)
:Log_event(when_arg,0,0,server_id), type(INVALID_INT_EVENT) :Log_event(when_arg,0,0,server_id), type(INVALID_INT_EVENT)
{ {
my_fseek(file, 4L, MY_SEEK_CUR, MYF(MY_WME)); // skip the event length char buf[9+4];
char buf[9]; if (!my_b_read(file, (byte*) buf, sizeof(buf)))
if(my_fread(file, (byte*)buf, sizeof(buf), MYF(MY_NABP|MY_WME))) return; {
type = buf[0]; type = buf[4];
val = uint8korr(buf+1); val = uint8korr(buf+1+4);
}
} }
Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf) Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf)
...@@ -495,12 +476,12 @@ Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf) ...@@ -495,12 +476,12 @@ Intvar_log_event::Intvar_log_event(const char* buf):Log_event(buf)
val = uint8korr(buf+1); val = uint8korr(buf+1);
} }
int Intvar_log_event::write_data(FILE* file) int Intvar_log_event::write_data(IO_CACHE* file)
{ {
char buf[9]; char buf[9];
buf[0] = type; buf[0] = type;
int8store(buf + 1, val); int8store(buf + 1, val);
return my_fwrite(file, (byte*) buf, sizeof(buf), MYF(MY_NABP|MY_WME)); return my_b_write(file, (byte*) buf, sizeof(buf));
} }
void Intvar_log_event::print(FILE* file, bool short_form) void Intvar_log_event::print(FILE* file, bool short_form)
...@@ -527,7 +508,7 @@ void Intvar_log_event::print(FILE* file, bool short_form) ...@@ -527,7 +508,7 @@ void Intvar_log_event::print(FILE* file, bool short_form)
} }
int Load_log_event::write_data(FILE* file __attribute__((unused))) int Load_log_event::write_data(IO_CACHE* file)
{ {
char buf[LOAD_HEADER_LEN]; char buf[LOAD_HEADER_LEN];
int4store(buf, thread_id); int4store(buf, thread_id);
...@@ -537,77 +518,46 @@ int Load_log_event::write_data(FILE* file __attribute__((unused))) ...@@ -537,77 +518,46 @@ int Load_log_event::write_data(FILE* file __attribute__((unused)))
buf[13] = (char)db_len; buf[13] = (char)db_len;
int4store(buf + 14, num_fields); int4store(buf + 14, num_fields);
if(my_fwrite(file, (byte*)buf, sizeof(buf), MYF(MY_NABP|MY_WME)) || if(my_b_write(file, (byte*)buf, sizeof(buf)) ||
my_fwrite(file, (byte*)&sql_ex, sizeof(sql_ex), MYF(MY_NABP|MY_WME))) my_b_write(file, (byte*)&sql_ex, sizeof(sql_ex)))
return 1; return 1;
if(num_fields && fields && field_lens) if (num_fields && fields && field_lens)
{ {
if(my_fwrite(file, (byte*)field_lens, num_fields, MYF(MY_NABP|MY_WME)) || if(my_b_write(file, (byte*)field_lens, num_fields) ||
my_fwrite(file, (byte*)fields, field_block_len, MYF(MY_NABP|MY_WME))) my_b_write(file, (byte*)fields, field_block_len))
return 1; return 1;
} }
if(my_b_write(file, (byte*)table_name, table_name_len + 1) ||
if(my_fwrite(file, (byte*)table_name, table_name_len + 1, MYF(MY_NABP|MY_WME)) || my_b_write(file, (byte*)db, db_len + 1) ||
my_fwrite(file, (byte*)db, db_len + 1, MYF(MY_NABP|MY_WME)) || my_b_write(file, (byte*)fname, fname_len))
my_fwrite(file, (byte*)fname, fname_len, MYF(MY_NABP|MY_WME)) )
return 1; return 1;
return 0; return 0;
} }
Load_log_event::Load_log_event(FILE* file, time_t when, uint32 server_id): Load_log_event::Load_log_event(IO_CACHE* file, time_t when, uint32 server_id):
Log_event(when,0,0,server_id),data_buf(0),num_fields(0), Log_event(when,0,0,server_id),data_buf(0),num_fields(0),
fields(0),field_lens(0),field_block_len(0), fields(0),field_lens(0),field_block_len(0),
table_name(0),db(0),fname(0) table_name(0),db(0),fname(0)
{ {
char buf[LOAD_HEADER_LEN + 4]; char buf[LOAD_HEADER_LEN + 4];
ulong data_len; ulong data_len;
if(my_fread(file, (byte*)buf, sizeof(buf), MYF(MY_NABP|MY_WME)) || if (my_b_read(file, (byte*)buf, sizeof(buf)) ||
my_fread(file, (byte*)&sql_ex, sizeof(sql_ex), MYF(MY_NABP|MY_WME))) my_b_read(file, (byte*)&sql_ex, sizeof(sql_ex)))
return;
data_len = uint4korr(buf);
thread_id = uint4korr(buf+4);
exec_time = uint4korr(buf+8);
skip_lines = uint4korr(buf + 12);
table_name_len = (uint)buf[16];
db_len = (uint)buf[17];
num_fields = uint4korr(buf + 18);
data_len -= LOAD_EVENT_OVERHEAD;
if(!(data_buf = (char*)my_malloc(data_len + 1, MYF(MY_WME))))
return; return;
if(my_fread(file, (byte*)data_buf, data_len, MYF(MY_NABP|MY_WME))) data_len = uint4korr(buf) - LOAD_EVENT_OVERHEAD;
if (!(data_buf = (char*)my_malloc(data_len + 1, MYF(MY_WME))))
return; return;
if (my_b_read(file, (byte*)data_buf, data_len))
if(num_fields > data_len) // simple sanity check against corruption
return; return;
copy_log_event(buf,data_len);
field_lens = (uchar*)data_buf;
uint i;
for(i = 0; i < num_fields; i++)
{
field_block_len += (uint)field_lens[i] + 1;
}
fields = (char*)field_lens + num_fields;
*((char*)data_buf+data_len) = 0;
table_name = fields + field_block_len;
db = table_name + table_name_len + 1;
fname = db + db_len + 1;
fname_len = data_len - 2 - db_len - table_name_len - num_fields - field_block_len;
} }
Load_log_event::Load_log_event(const char* buf, int max_buf): Load_log_event::Load_log_event(const char* buf, int max_buf):
Log_event(when,0,0,server_id),data_buf(0),num_fields(0),fields(0), Log_event(when,0,0,server_id),data_buf(0),num_fields(0),fields(0),
field_lens(0),field_block_len(0), field_lens(0),field_block_len(0),
table_name(0),db(0),fname(0) table_name(0),db(0),fname(0)
{ {
ulong data_len; ulong data_len;
...@@ -617,9 +567,19 @@ Load_log_event::Load_log_event(const char* buf, int max_buf): ...@@ -617,9 +567,19 @@ Load_log_event::Load_log_event(const char* buf, int max_buf):
buf += EVENT_LEN_OFFSET; buf += EVENT_LEN_OFFSET;
data_len = uint4korr(buf); data_len = uint4korr(buf);
if((uint)data_len > (uint)max_buf) if ((uint)data_len > (uint) max_buf)
return; return;
data_len -= LOAD_EVENT_OVERHEAD;
memcpy(&sql_ex, buf + 22, sizeof(sql_ex));
if(!(data_buf = (char*)my_malloc(data_len + 1, MYF(MY_WME))))
return;
memcpy(data_buf, buf + 22 + sizeof(sql_ex), data_len);
copy_log_event(buf, data_len);
}
void Load_log_event::copy_log_event(const char *buf, ulong data_len)
{
thread_id = uint4korr(buf+4); thread_id = uint4korr(buf+4);
exec_time = uint4korr(buf+8); exec_time = uint4korr(buf+8);
skip_lines = uint4korr(buf + 12); skip_lines = uint4korr(buf + 12);
...@@ -627,32 +587,23 @@ Load_log_event::Load_log_event(const char* buf, int max_buf): ...@@ -627,32 +587,23 @@ Load_log_event::Load_log_event(const char* buf, int max_buf):
db_len = (uint)buf[17]; db_len = (uint)buf[17];
num_fields = uint4korr(buf + 18); num_fields = uint4korr(buf + 18);
data_len -= LOAD_EVENT_OVERHEAD; if (num_fields > data_len) // simple sanity check against corruption
memcpy(&sql_ex, buf + 22, sizeof(sql_ex));
if(!(data_buf = (char*)my_malloc(data_len + 1, MYF(MY_WME))))
return; return;
memcpy(data_buf, buf + 22 + sizeof(sql_ex), data_len); field_lens = (uchar*) data_buf;
if(num_fields > data_len) // simple sanity check against corruption
return;
field_lens = (uchar*)data_buf;
uint i; uint i;
for(i = 0; i < num_fields; i++) for (i = 0; i < num_fields; i++)
{ {
field_block_len += (uint)field_lens[i] + 1; field_block_len += (uint)field_lens[i] + 1;
} }
fields = (char*)field_lens + num_fields; fields = (char*)field_lens + num_fields;
*((char*)data_buf+data_len) = 0; *((char*)data_buf+data_len) = 0;
table_name = fields + field_block_len; table_name = fields + field_block_len;
db = table_name + table_name_len + 1; db = table_name + table_name_len + 1;
fname = db + db_len + 1; fname = db + db_len + 1;
fname_len = data_len - 2 - db_len - table_name_len - num_fields - field_block_len; fname_len = data_len - 2 - db_len - table_name_len - num_fields -
field_block_len;
} }
...@@ -711,21 +662,21 @@ void Load_log_event::print(FILE* file, bool short_form) ...@@ -711,21 +662,21 @@ void Load_log_event::print(FILE* file, bool short_form)
if((int)skip_lines > 0) if((int)skip_lines > 0)
fprintf(file, " IGNORE %ld LINES ", skip_lines); fprintf(file, " IGNORE %ld LINES ", skip_lines);
if(num_fields) if (num_fields)
{
uint i;
const char* field = fields;
fprintf( file, " (");
for(i = 0; i < num_fields; i++)
{ {
uint i; if(i)
const char* field = fields; fputc(',', file);
fprintf( file, " ("); fprintf(file, field);
for(i = 0; i < num_fields; i++)
{
if(i)
fputc(',', file);
fprintf(file, field);
field += field_lens[i] + 1; field += field_lens[i] + 1;
}
fputc(')', file);
} }
fputc(')', file);
}
fprintf(file, ";\n"); fprintf(file, ";\n");
} }
......
...@@ -65,9 +65,9 @@ public: ...@@ -65,9 +65,9 @@ public:
int valid_exec_time; // if false, the exec time setting is bogus int valid_exec_time; // if false, the exec time setting is bogus
uint32 server_id; uint32 server_id;
int write(FILE* file); int write(IO_CACHE* file);
int write_header(FILE* file); int write_header(IO_CACHE* file);
virtual int write_data(FILE* file __attribute__((unused))) { return 0; } virtual int write_data(IO_CACHE* file __attribute__((unused))) { return 0; }
virtual Log_event_type get_type_code() = 0; virtual Log_event_type get_type_code() = 0;
Log_event(time_t when_arg, ulong exec_time_arg = 0, Log_event(time_t when_arg, ulong exec_time_arg = 0,
int valid_exec_time_arg = 0, uint32 server_id = 0): when(when_arg), int valid_exec_time_arg = 0, uint32 server_id = 0): when(when_arg),
...@@ -92,11 +92,11 @@ public: ...@@ -92,11 +92,11 @@ public:
void print_header(FILE* file); void print_header(FILE* file);
// if mutex is 0, the read will proceed without mutex // if mutex is 0, the read will proceed without mutex
static Log_event* read_log_event(FILE* file, pthread_mutex_t* log_lock); static Log_event* read_log_event(IO_CACHE* file, pthread_mutex_t* log_lock);
static Log_event* read_log_event(const char* buf, int max_buf); static Log_event* read_log_event(const char* buf, int max_buf);
#ifndef MYSQL_CLIENT #ifndef MYSQL_CLIENT
static int read_log_event(FILE* file, String* packet, static int read_log_event(IO_CACHE* file, String* packet,
pthread_mutex_t* log_lock); pthread_mutex_t* log_lock);
#endif #endif
...@@ -132,18 +132,18 @@ public: ...@@ -132,18 +132,18 @@ public:
} }
#endif #endif
Query_log_event(FILE* file, time_t when, uint32 server_id); Query_log_event(IO_CACHE* file, time_t when, uint32 server_id);
Query_log_event(const char* buf, int max_buf); Query_log_event(const char* buf, int max_buf);
~Query_log_event() ~Query_log_event()
{ {
if (data_buf) if (data_buf)
{ {
my_free((gptr)data_buf, MYF(0)); my_free((gptr) data_buf, MYF(0));
} }
} }
Log_event_type get_type_code() { return QUERY_EVENT; } Log_event_type get_type_code() { return QUERY_EVENT; }
int write(FILE* file); int write(IO_CACHE* file);
int write_data(FILE* file); // returns 0 on success, -1 on error int write_data(IO_CACHE* file); // returns 0 on success, -1 on error
int get_data_size() int get_data_size()
{ {
return q_len + db_len + 2 + return q_len + db_len + 2 +
...@@ -183,6 +183,8 @@ class Load_log_event: public Log_event ...@@ -183,6 +183,8 @@ class Load_log_event: public Log_event
{ {
protected: protected:
char* data_buf; char* data_buf;
void Load_log_event::copy_log_event(const char *buf, ulong data_len);
public: public:
int thread_id; int thread_id;
uint32 table_name_len; uint32 table_name_len;
...@@ -272,17 +274,17 @@ public: ...@@ -272,17 +274,17 @@ public:
void set_fields(List<Item> &fields); void set_fields(List<Item> &fields);
#endif #endif
Load_log_event(FILE* file, time_t when, uint32 server_id); Load_log_event(IO_CACHE * file, time_t when, uint32 server_id);
Load_log_event(const char* buf, int max_buf); Load_log_event(const char* buf, int max_buf);
~Load_log_event() ~Load_log_event()
{ {
if (data_buf) if (data_buf)
{ {
my_free((gptr)data_buf, MYF(0)); my_free((gptr) data_buf, MYF(0));
} }
} }
Log_event_type get_type_code() { return LOAD_EVENT; } Log_event_type get_type_code() { return LOAD_EVENT; }
int write_data(FILE* file); // returns 0 on success, -1 on error int write_data(IO_CACHE* file); // returns 0 on success, -1 on error
int get_data_size() int get_data_size()
{ {
return table_name_len + 2 + db_len + 2 + fname_len return table_name_len + 2 + db_len + 2 + fname_len
...@@ -311,30 +313,26 @@ public: ...@@ -311,30 +313,26 @@ public:
created = (uint32) when; created = (uint32) when;
memcpy(server_version, ::server_version, sizeof(server_version)); memcpy(server_version, ::server_version, sizeof(server_version));
} }
Start_log_event(FILE* file, time_t when_arg, uint32 server_id) : Start_log_event(IO_CACHE* file, time_t when_arg, uint32 server_id) :
Log_event(when_arg, 0, 0, server_id) Log_event(when_arg, 0, 0, server_id)
{ {
char buf[sizeof(server_version) + sizeof(binlog_version) + char buf[sizeof(server_version) + sizeof(binlog_version) +
sizeof(created)]; sizeof(created)+4];
my_fseek(file, 4L, MY_SEEK_CUR, MYF(MY_WME)); // skip the event length if (my_b_read(file, (byte*) buf, sizeof(buf)))
if (my_fread(file, (byte*) buf, sizeof(buf), MYF(MY_NABP | MY_WME)))
return; return;
binlog_version = uint2korr(buf); binlog_version = uint2korr(buf+4);
memcpy(server_version, buf + 2, sizeof(server_version)); memcpy(server_version, buf + 6, sizeof(server_version));
created = uint4korr(buf + 2 + sizeof(server_version)); created = uint4korr(buf + 6 + sizeof(server_version));
} }
Start_log_event(const char* buf); Start_log_event(const char* buf);
~Start_log_event() {} ~Start_log_event() {}
Log_event_type get_type_code() { return START_EVENT;} Log_event_type get_type_code() { return START_EVENT;}
int write_data(FILE* file) int write_data(IO_CACHE* file)
{ {
if(my_fwrite(file, (byte*) &binlog_version, sizeof(binlog_version), if (my_b_write(file, (byte*) &binlog_version, sizeof(binlog_version)) ||
MYF(MY_NABP | MY_WME)) || my_b_write(file, (byte*) server_version, sizeof(server_version)) ||
my_fwrite(file, (byte*) server_version, sizeof(server_version), my_b_write(file, (byte*) &created, sizeof(created)))
MYF(MY_NABP | MY_WME)) ||
my_fwrite(file, (byte*) &created, sizeof(created),
MYF(MY_NABP | MY_WME)))
return -1; return -1;
return 0; return 0;
} }
...@@ -354,12 +352,12 @@ public: ...@@ -354,12 +352,12 @@ public:
Intvar_log_event(uchar type_arg, ulonglong val_arg) Intvar_log_event(uchar type_arg, ulonglong val_arg)
:Log_event(time(NULL)),val(val_arg),type(type_arg) :Log_event(time(NULL)),val(val_arg),type(type_arg)
{} {}
Intvar_log_event(FILE* file, time_t when, uint32 server_id); Intvar_log_event(IO_CACHE* file, time_t when, uint32 server_id);
Intvar_log_event(const char* buf); Intvar_log_event(const char* buf);
~Intvar_log_event() {} ~Intvar_log_event() {}
Log_event_type get_type_code() { return INTVAR_EVENT;} Log_event_type get_type_code() { return INTVAR_EVENT;}
int get_data_size() { return sizeof(type) + sizeof(val);} int get_data_size() { return sizeof(type) + sizeof(val);}
int write_data(FILE* file); int write_data(IO_CACHE* file);
void print(FILE* file, bool short_form = 0); void print(FILE* file, bool short_form = 0);
...@@ -370,10 +368,11 @@ class Stop_log_event: public Log_event ...@@ -370,10 +368,11 @@ class Stop_log_event: public Log_event
public: public:
Stop_log_event() :Log_event(time(NULL)) Stop_log_event() :Log_event(time(NULL))
{} {}
Stop_log_event(FILE* file, time_t when_arg, uint32 server_id): Stop_log_event(IO_CACHE* file, time_t when_arg, uint32 server_id):
Log_event(when_arg,0,0,server_id) Log_event(when_arg,0,0,server_id)
{ {
my_fseek(file, 4L, MY_SEEK_CUR, MYF(MY_WME)); // skip the event length char skip[4];
my_b_read(file, skip, sizeof(skip)); // skip the event length
} }
Stop_log_event(const char* buf):Log_event(buf) Stop_log_event(const char* buf):Log_event(buf)
{ {
...@@ -397,7 +396,7 @@ public: ...@@ -397,7 +396,7 @@ public:
alloced(0) alloced(0)
{} {}
Rotate_log_event(FILE* file, time_t when, uint32 server_id) ; Rotate_log_event(IO_CACHE* file, time_t when, uint32 server_id) ;
Rotate_log_event(const char* buf, int max_buf); Rotate_log_event(const char* buf, int max_buf);
~Rotate_log_event() ~Rotate_log_event()
{ {
...@@ -406,7 +405,7 @@ public: ...@@ -406,7 +405,7 @@ public:
} }
Log_event_type get_type_code() { return ROTATE_EVENT;} Log_event_type get_type_code() { return ROTATE_EVENT;}
int get_data_size() { return ident_len;} int get_data_size() { return ident_len;}
int write_data(FILE* file); int write_data(IO_CACHE* file);
void print(FILE* file, bool short_form = 0); void print(FILE* file, bool short_form = 0);
}; };
......
...@@ -32,17 +32,19 @@ ...@@ -32,17 +32,19 @@
#define MAP_TO_USE_RAID #define MAP_TO_USE_RAID
#include "mysql_priv.h" #include "mysql_priv.h"
#include <mysys_err.h>
#ifdef HAVE_AIOWAIT #ifdef HAVE_AIOWAIT
#include <mysys_err.h>
#include <errno.h> #include <errno.h>
static void my_aiowait(my_aio_result *result); static void my_aiowait(my_aio_result *result);
#endif #endif
/* if cachesize == 0 then use default cachesize (from s-file) */
/* returns 0 if we have enough memory */
extern "C" { extern "C" {
/*
** if cachesize == 0 then use default cachesize (from s-file)
** if file == -1 then real_open_cached_file() will be called.
** returns 0 if ok
*/
int init_io_cache(IO_CACHE *info, File file, uint cachesize, int init_io_cache(IO_CACHE *info, File file, uint cachesize,
enum cache_type type, my_off_t seek_offset, enum cache_type type, my_off_t seek_offset,
...@@ -60,20 +62,26 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize, ...@@ -60,20 +62,26 @@ int init_io_cache(IO_CACHE *info, File file, uint cachesize,
min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2; min_cache=use_async_io ? IO_SIZE*4 : IO_SIZE*2;
if (type == READ_CACHE) if (type == READ_CACHE)
{ /* Assume file isn't growing */ { /* Assume file isn't growing */
my_off_t file_pos,end_of_file; if (cache_myflags & MY_DONT_CHECK_FILESIZE)
if ((file_pos=my_tell(file,MYF(0)) == MY_FILEPOS_ERROR)) {
DBUG_RETURN(1); cache_myflags &= ~MY_DONT_CHECK_FILESIZE;
end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0)); }
if (end_of_file < seek_offset) else
end_of_file=seek_offset;
VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0)));
if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
{ {
cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1; my_off_t file_pos,end_of_file;
use_async_io=0; /* No nead to use async */ if ((file_pos=my_tell(file,MYF(0)) == MY_FILEPOS_ERROR))
DBUG_RETURN(1);
end_of_file=my_seek(file,0L,MY_SEEK_END,MYF(0));
if (end_of_file < seek_offset)
end_of_file=seek_offset;
VOID(my_seek(file,file_pos,MY_SEEK_SET,MYF(0)));
if ((my_off_t) cachesize > end_of_file-seek_offset+IO_SIZE*2-1)
{
cachesize=(uint) (end_of_file-seek_offset)+IO_SIZE*2-1;
use_async_io=0; /* No nead to use async */
}
} }
} }
if ((int) type < (int) READ_NET) if ((int) type < (int) READ_NET)
{ {
for (;;) for (;;)
...@@ -167,7 +175,8 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type, ...@@ -167,7 +175,8 @@ my_bool reinit_io_cache(IO_CACHE *info, enum cache_type type,
DBUG_ENTER("reinit_io_cache"); DBUG_ENTER("reinit_io_cache");
info->seek_not_done= test(info->file >= 0); /* Seek not done */ info->seek_not_done= test(info->file >= 0); /* Seek not done */
if (!clear_cache && seek_offset >= info->pos_in_file && if (! clear_cache &&
seek_offset >= info->pos_in_file &&
seek_offset <= info->pos_in_file + seek_offset <= info->pos_in_file +
(uint) (info->rc_end - info->rc_request_pos)) (uint) (info->rc_end - info->rc_request_pos))
{ /* use current buffer */ { /* use current buffer */
...@@ -231,6 +240,7 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count) ...@@ -231,6 +240,7 @@ int _my_b_read(register IO_CACHE *info, byte *Buffer, uint Count)
{ {
uint length,diff_length,left_length; uint length,diff_length,left_length;
my_off_t max_length, pos_in_file; my_off_t max_length, pos_in_file;
memcpy(Buffer,info->rc_pos, memcpy(Buffer,info->rc_pos,
(size_t) (left_length=(uint) (info->rc_end-info->rc_pos))); (size_t) (left_length=(uint) (info->rc_end-info->rc_pos)));
Buffer+=left_length; Buffer+=left_length;
...@@ -607,7 +617,9 @@ int flush_io_cache(IO_CACHE *info) ...@@ -607,7 +617,9 @@ int flush_io_cache(IO_CACHE *info)
length=(uint) (info->rc_pos - info->buffer); length=(uint) (info->rc_pos - info->buffer);
if (info->seek_not_done) if (info->seek_not_done)
{ /* File touched, do seek */ { /* File touched, do seek */
VOID(my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0))); if (my_seek(info->file,info->pos_in_file,MY_SEEK_SET,MYF(0)) ==
MY_FILEPOS_ERROR)
DBUG_RETURN((info->error= -1));
info->seek_not_done=0; info->seek_not_done=0;
} }
info->rc_pos=info->buffer; info->rc_pos=info->buffer;
...@@ -644,4 +656,4 @@ int end_io_cache(IO_CACHE *info) ...@@ -644,4 +656,4 @@ int end_io_cache(IO_CACHE *info)
DBUG_RETURN(error); DBUG_RETURN(error);
} /* end_io_cache */ } /* end_io_cache */
} } /* extern "C" */
...@@ -55,7 +55,7 @@ static struct option long_options[] = ...@@ -55,7 +55,7 @@ static struct option long_options[] =
{"password", required_argument,0, 'p'}, {"password", required_argument,0, 'p'},
{"position", required_argument,0, 'j'}, {"position", required_argument,0, 'j'},
#ifndef DBUG_OFF #ifndef DBUG_OFF
{"debug", required_argument, 0, '#'} {"debug", optional_argument, 0, '#'}
#endif #endif
}; };
...@@ -151,7 +151,7 @@ static int parse_args(int *argc, char*** argv) ...@@ -151,7 +151,7 @@ static int parse_args(int *argc, char*** argv)
{ {
int c, opt_index = 0; int c, opt_index = 0;
while((c = getopt_long(*argc, *argv, "so:#:h:j:u:p:P:t:?", long_options, while((c = getopt_long(*argc, *argv, "so:#::h:j:u:p:P:t:?", long_options,
&opt_index)) != EOF) &opt_index)) != EOF)
{ {
switch(c) switch(c)
...@@ -310,86 +310,106 @@ Unfortunately, no sweepstakes today, adjusted position to 4\n"); ...@@ -310,86 +310,106 @@ Unfortunately, no sweepstakes today, adjusted position to 4\n");
static void dump_local_log_entries(const char* logname) static void dump_local_log_entries(const char* logname)
{ {
FILE* file; File fd;
int rec_count = 0; IO_CACHE cache,*file= &cache;
int rec_count = 0;
if(logname && logname[0] != '-')
file = my_fopen(logname, O_RDONLY|O_BINARY, MYF(MY_WME)); if (logname && logname[0] != '-')
else {
file = stdin; if ((fd = my_open(logname, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0)
exit(1);
if(!file) if (init_io_cache(file, fd, 0, READ_CACHE, (my_off_t) position, 0,
die("Could not open log file %s", logname); MYF(MY_WME | MY_NABP)))
exit(1);
if(my_fseek(file, position, MY_SEEK_SET, MYF(MY_WME))) }
die("failed on my_fseek()"); else
{
if(!position) if (init_io_cache(file, fileno(stdout), 0, READ_CACHE, (my_off_t) 0,
{ 0, MYF(MY_WME | MY_NABP | MY_DONT_CHECK_FILESIZE)))
char magic[4]; exit(1);
if (my_fread(file, (byte*) magic, sizeof(magic), MYF(MY_NABP|MY_WME))) if (position)
die("I/O error reading binlog magic number"); {
if(memcmp(magic, BINLOG_MAGIC, 4)) /* skip 'position' characters from stdout */
die("Bad magic number"); char buff[IO_SIZE];
my_off_t length,tmp;
for (length=position ; length > 0 ; length-=tmp)
{
tmp=min(length,sizeof(buff));
if (my_b_read(file,buff,tmp))
exit(1);
}
}
file->pos_in_file=position;
file->seek_not_done=0;
}
if (!position)
{
char magic[4];
if (my_b_read(file, (byte*) magic, sizeof(magic)))
die("I/O error reading binlog magic number");
if(memcmp(magic, BINLOG_MAGIC, 4))
die("Bad magic number");
} }
while(1) while(1)
{ {
Log_event* ev = Log_event::read_log_event(file, 0); Log_event* ev = Log_event::read_log_event(file, 0);
if(!ev) if (!ev)
if(!feof(file)) {
if (file->error)
die("Could not read entry at offset %ld : Error in log format or \ die("Could not read entry at offset %ld : Error in log format or \
read error", read error",
my_ftell(file, MYF(MY_WME))); my_b_tell(file));
else break;
break; }
if (rec_count >= offset)
if(rec_count >= offset) ev->print(stdout, short_form);
ev->print(stdout, short_form); rec_count++;
rec_count++; delete ev;
delete ev; }
} my_close(fd, MYF(MY_WME));
end_io_cache(file);
my_fclose(file, MYF(MY_WME));
} }
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
MY_INIT(argv[0]); MY_INIT(argv[0]);
parse_args(&argc, (char***)&argv); parse_args(&argc, (char***)&argv);
if(!argc && !table) if(!argc && !table)
{ {
usage(); usage();
return -1; return -1;
} }
if(use_remote) if(use_remote)
{ {
init_thr_alarm(10); // need to do this manually init_thr_alarm(10); // need to do this manually
mysql = safe_connect(); mysql = safe_connect();
} }
if(table) if (table)
{ {
if(!use_remote) if(!use_remote)
die("You must specify connection parameter to get table dump"); die("You must specify connection parameter to get table dump");
char* db = (char*)table; char* db = (char*)table;
char* tbl = (char*) strchr(table, '.'); char* tbl = (char*) strchr(table, '.');
if(!tbl) if(!tbl)
die("You must use database.table syntax to specify the table"); die("You must use database.table syntax to specify the table");
*tbl++ = 0; *tbl++ = 0;
dump_remote_table(&mysql->net, db, tbl); dump_remote_table(&mysql->net, db, tbl);
} }
else else
while(--argc >= 0) {
while(--argc >= 0)
{ {
dump_log_entries(*(argv++)); dump_log_entries(*(argv++));
} }
}
if(use_remote) if (use_remote)
mc_mysql_close(mysql); mc_mysql_close(mysql);
return 0; return 0;
} }
......
...@@ -319,11 +319,11 @@ bool net_store_data(String* packet, I_List<i_string>* str_list) ...@@ -319,11 +319,11 @@ bool net_store_data(String* packet, I_List<i_string>* str_list)
i_string* s; i_string* s;
while((s=it++)) while((s=it++))
{ {
if(tmp.length()) if(tmp.length())
tmp.append(','); tmp.append(',');
tmp.append(s->ptr); tmp.append(s->ptr);
} }
return net_store_data(packet, (char*)tmp.ptr(), tmp.length()); return net_store_data(packet, (char*)tmp.ptr(), tmp.length());
} }
...@@ -113,11 +113,17 @@ THD::THD() ...@@ -113,11 +113,17 @@ THD::THD()
ull=0; ull=0;
system_thread=0; system_thread=0;
bzero((char*) &mem_root,sizeof(mem_root)); bzero((char*) &mem_root,sizeof(mem_root));
#if defined(HAVE_BDB) || defined(HAVE_INNOBASE) || defined(HAVE_GEMENI)
if (open_cached_file(&transactions.trans_log,
mysql_tempdir,LOG_PREFIX,0,MYF(MY_WME)))
killed=1;
transaction.bdb_lock_count=0;
#endif
transaction.bdb_tid=0;
#ifdef __WIN__ #ifdef __WIN__
real_id = 0 ; real_id = 0 ;
#endif #endif
transaction.bdb_lock_count=0;
transaction.bdb_tid=0;
} }
THD::~THD() THD::~THD()
...@@ -136,6 +142,9 @@ THD::~THD() ...@@ -136,6 +142,9 @@ THD::~THD()
close_thread_tables(this); close_thread_tables(this);
} }
close_temporary_tables(this); close_temporary_tables(this);
#if defined(HAVE_BDB) || defined(HAVE_INNOBASE) || defined(HAVE_GEMENI)
close_cached_file(transactions.trans_log);
#endif
if (global_read_lock) if (global_read_lock)
{ {
pthread_mutex_lock(&LOCK_open); pthread_mutex_lock(&LOCK_open);
......
...@@ -82,16 +82,16 @@ typedef struct st_master_info ...@@ -82,16 +82,16 @@ typedef struct st_master_info
} MASTER_INFO; } MASTER_INFO;
class MYSQL_LOG { class MYSQL_LOG {
public:
private: private:
pthread_mutex_t LOCK_log, LOCK_index; pthread_mutex_t LOCK_log, LOCK_index;
File file, index_file;
time_t last_time,query_start; time_t last_time,query_start;
IO_CACHE log_file;
File index_file;
char *name; char *name;
enum_log_type log_type; volatile enum_log_type log_type;
char time_buff[20],db[NAME_LEN+1]; char time_buff[20],db[NAME_LEN+1];
char log_file_name[FN_REFLEN],index_file_name[FN_REFLEN]; char log_file_name[FN_REFLEN],index_file_name[FN_REFLEN];
bool write_error,inited; bool write_error,inited,opened;
bool no_rotate; // for binlog - if log name can never change bool no_rotate; // for binlog - if log name can never change
// we should not try to rotate it or write any rotation events // we should not try to rotate it or write any rotation events
// the user should use FLUSH MASTER instead of FLUSH LOGS for // the user should use FLUSH MASTER instead of FLUSH LOGS for
...@@ -114,7 +114,7 @@ public: ...@@ -114,7 +114,7 @@ public:
int generate_new_name(char *new_name,const char *old_name); int generate_new_name(char *new_name,const char *old_name);
void make_log_name(char* buf, const char* log_ident); void make_log_name(char* buf, const char* log_ident);
bool is_active(const char* log_file_name); bool is_active(const char* log_file_name);
void flush(void); // void flush(void);
void close(bool exiting = 0); // if we are exiting, we also want to close the void close(bool exiting = 0); // if we are exiting, we also want to close the
// index file // index file
...@@ -270,6 +270,7 @@ public: ...@@ -270,6 +270,7 @@ public:
thr_lock_type update_lock_default; thr_lock_type update_lock_default;
delayed_insert *di; delayed_insert *di;
struct st_transactions { struct st_transactions {
IO_CACHE trans_log;
void *bdb_tid; void *bdb_tid;
uint bdb_lock_count; uint bdb_lock_count;
} transaction; } transaction;
......
...@@ -2439,7 +2439,7 @@ bool reload_acl_and_cache(THD *thd, uint options, TABLE_LIST *tables) ...@@ -2439,7 +2439,7 @@ bool reload_acl_and_cache(THD *thd, uint options, TABLE_LIST *tables)
bool result=0; bool result=0;
select_errors=0; /* Write if more errors */ select_errors=0; /* Write if more errors */
mysql_log.flush(); // Flush log // mysql_log.flush(); // Flush log
if (options & REFRESH_GRANT) if (options & REFRESH_GRANT)
{ {
acl_reload(); acl_reload();
......
...@@ -93,284 +93,284 @@ static int send_file(THD *thd) ...@@ -93,284 +93,284 @@ static int send_file(THD *thd)
DBUG_RETURN(error); DBUG_RETURN(error);
} }
static File open_log(IO_CACHE *log, const char *log_file_name,
const char **errmsg)
{
File file;
char magic[4];
if ((file = my_open(log_file_name, O_RDONLY | O_BINARY, MYF(MY_WME))) < 0 ||
init_io_cache(log, file, IO_SIZE*2, READ_CACHE, 0, 0,
MYF(MY_WME)))
{
*errmsg = "Could not open log file"; // This will not be sent
goto err;
}
if (my_b_read(log, (byte*) magic, sizeof(magic)))
{
*errmsg = "I/O error reading binlog magic number";
goto err;
}
if (memcmp(magic, BINLOG_MAGIC, 4))
{
*errmsg = "Binlog has bad magic number, fire your magician";
goto err;
}
return file;
err:
if (file > 0)
my_close(file,MYF(0));
end_io_cache(log);
return -1;
}
void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags) void mysql_binlog_send(THD* thd, char* log_ident, ulong pos, ushort flags)
{ {
LOG_INFO linfo; LOG_INFO linfo;
char *log_file_name = linfo.log_file_name; char *log_file_name = linfo.log_file_name;
char search_file_name[FN_REFLEN]; char search_file_name[FN_REFLEN];
char magic[4]; IO_CACHE log;
FILE* log = NULL; File file = -1;
String* packet = &thd->packet; String* packet = &thd->packet;
int error; int error;
const char *errmsg = "Unknown error"; const char *errmsg = "Unknown error";
NET* net = &thd->net; NET* net = &thd->net;
DBUG_ENTER("mysql_binlog_send"); DBUG_ENTER("mysql_binlog_send");
bzero((char*) &log,sizeof(log));
if(!mysql_bin_log.is_open()) if(!mysql_bin_log.is_open())
{ {
errmsg = "Binary log is not open"; errmsg = "Binary log is not open";
goto err; goto err;
} }
if(log_ident[0]) if (log_ident[0])
mysql_bin_log.make_log_name(search_file_name, log_ident); mysql_bin_log.make_log_name(search_file_name, log_ident);
else else
search_file_name[0] = 0; search_file_name[0] = 0;
if(mysql_bin_log.find_first_log(&linfo, search_file_name)) if (mysql_bin_log.find_first_log(&linfo, search_file_name))
{ {
errmsg = "Could not find first log"; errmsg = "Could not find first log";
goto err; goto err;
} }
log = my_fopen(log_file_name, O_RDONLY|O_BINARY, MYF(MY_WME));
if(!log) if ((file=open_log(&log, log_file_name, &errmsg)) < 0)
{ goto err;
errmsg = "Could not open log file";
goto err;
}
if(my_fread(log, (byte*) magic, sizeof(magic), MYF(MY_NABP|MY_WME)))
{
errmsg = "I/O error reading binlog magic number";
goto err;
}
if(memcmp(magic, BINLOG_MAGIC, 4))
{
errmsg = "Binlog has bad magic number, fire your magician";
goto err;
}
if(pos < 4) if(pos < 4)
{ {
errmsg = "Contratulations! You have hit the magic number and can win \ errmsg = "Contratulations! You have hit the magic number and can win \
sweepstakes if you report the bug"; sweepstakes if you report the bug";
goto err; goto err;
} }
if(my_fseek(log, pos, MY_SEEK_SET, MYF(MY_WME)) == MY_FILEPOS_ERROR ) my_b_seek(&log, pos); // Seek will done on next read
{
errmsg = "Error on fseek()";
goto err;
}
packet->length(0); packet->length(0);
packet->append("\0", 1); packet->append("\0", 1);
// we need to start a packet with something other than 255 // we need to start a packet with something other than 255
// to distiquish it from error // to distiquish it from error
while(!net->error && net->vio != 0 && !thd->killed) while(!net->error && net->vio != 0 && !thd->killed)
{ {
pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock(); pthread_mutex_t *log_lock = mysql_bin_log.get_log_lock();
while(!(error = Log_event::read_log_event(log, packet, log_lock))) while (!(error = Log_event::read_log_event(&log, packet, log_lock)))
{ {
if(my_net_write(net, (char*)packet->ptr(), packet->length()) ) if(my_net_write(net, (char*)packet->ptr(), packet->length()) )
{ {
errmsg = "Failed on my_net_write()"; errmsg = "Failed on my_net_write()";
goto err; goto err;
} }
DBUG_PRINT("info", ("log event code %d", DBUG_PRINT("info", ("log event code %d",
(*packet)[LOG_EVENT_OFFSET+1] )); (*packet)[LOG_EVENT_OFFSET+1] ));
if((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT) if((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
{ {
if(send_file(thd)) if(send_file(thd))
{
errmsg = "failed in send_file()";
goto err;
}
}
packet->length(0);
packet->append("\0",1);
}
if(error != LOG_READ_EOF)
{ {
switch(error) errmsg = "failed in send_file()";
{
case LOG_READ_BOGUS:
errmsg = "bogus data in log event";
break;
case LOG_READ_IO:
errmsg = "I/O error reading log event";
break;
case LOG_READ_MEM:
errmsg = "memory allocation failed reading log event";
break;
case LOG_READ_TRUNC:
errmsg = "binlog truncated in the middle of event";
break;
}
goto err; goto err;
} }
}
packet->length(0);
packet->append("\0",1);
}
if(error != LOG_READ_EOF)
{
switch(error)
{
case LOG_READ_BOGUS:
errmsg = "bogus data in log event";
break;
case LOG_READ_IO:
errmsg = "I/O error reading log event";
break;
case LOG_READ_MEM:
errmsg = "memory allocation failed reading log event";
break;
case LOG_READ_TRUNC:
errmsg = "binlog truncated in the middle of event";
break;
}
goto err;
}
if(!(flags & BINLOG_DUMP_NON_BLOCK) && if(!(flags & BINLOG_DUMP_NON_BLOCK) &&
mysql_bin_log.is_active(log_file_name)) mysql_bin_log.is_active(log_file_name))
// block until there is more data in the log // block until there is more data in the log
// unless non-blocking mode requested // unless non-blocking mode requested
{ {
if(net_flush(net)) if(net_flush(net))
{ {
errmsg = "failed on net_flush()"; errmsg = "failed on net_flush()";
goto err; goto err;
} }
// we may have missed the update broadcast from the log // we may have missed the update broadcast from the log
// that has just happened, let's try to catch it if it did // that has just happened, let's try to catch it if it did
// if we did not miss anything, we just wait for other threads // if we did not miss anything, we just wait for other threads
// to signal us // to signal us
{ {
clearerr(log); log.error=0;
// tell the kill thread how to wake us up
pthread_mutex_lock(&thd->mysys_var->mutex);
thd->mysys_var->current_mutex = log_lock;
thd->mysys_var->current_cond = &COND_binlog_update;
const char* proc_info = thd->proc_info;
thd->proc_info = "Waiting for update";
pthread_mutex_unlock(&thd->mysys_var->mutex);
bool read_packet = 0, fatal_error = 0;
// no one will update the log while we are reading
// now, but we'll be quick and just read one record
switch(Log_event::read_log_event(log, packet, log_lock))
{
case 0:
read_packet = 1;
// we read successfully, so we'll need to send it to the
// slave
break;
case LOG_READ_EOF:
pthread_mutex_lock(log_lock);
pthread_cond_wait(&COND_binlog_update, log_lock);
pthread_mutex_unlock(log_lock);
break;
default:
fatal_error = 1;
break;
}
pthread_mutex_lock(&thd->mysys_var->mutex);
thd->mysys_var->current_mutex= 0;
thd->mysys_var->current_cond= 0;
thd->proc_info= proc_info;
pthread_mutex_unlock(&thd->mysys_var->mutex);
if(read_packet)
{
thd->proc_info = "sending update to slave";
if(my_net_write(net, (char*)packet->ptr(), packet->length()) )
{
errmsg = "Failed on my_net_write()";
goto err;
}
if((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
{
if(send_file(thd))
{
errmsg = "failed in send_file()";
goto err;
}
}
packet->length(0);
packet->append("\0",1);
// no need to net_flush because we will get to flush later when
// we hit EOF pretty quick
}
if(fatal_error) // tell the kill thread how to wake us up
{ pthread_mutex_lock(&thd->mysys_var->mutex);
errmsg = "error reading log entry"; thd->mysys_var->current_mutex = log_lock;
goto err; thd->mysys_var->current_cond = &COND_binlog_update;
} const char* proc_info = thd->proc_info;
thd->proc_info = "Waiting for update";
pthread_mutex_unlock(&thd->mysys_var->mutex);
clearerr(log); bool read_packet = 0, fatal_error = 0;
}
} // no one will update the log while we are reading
else // now, but we'll be quick and just read one record
switch(Log_event::read_log_event(&log, packet, log_lock))
{ {
bool loop_breaker = 0; case 0:
// need this to break out of the for loop from switch read_packet = 1;
thd->proc_info = "switching to next log"; // we read successfully, so we'll need to send it to the
switch(mysql_bin_log.find_next_log(&linfo)) // slave
{ break;
case LOG_INFO_EOF: case LOG_READ_EOF:
loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK); pthread_mutex_lock(log_lock);
break; pthread_cond_wait(&COND_binlog_update, log_lock);
case 0: pthread_mutex_unlock(log_lock);
break; break;
default:
errmsg = "could not find next log"; default:
goto err; fatal_error = 1;
} break;
}
if(loop_breaker)
break;
(void) my_fclose(log, MYF(MY_WME)); pthread_mutex_lock(&thd->mysys_var->mutex);
log = my_fopen(log_file_name, O_RDONLY|O_BINARY, MYF(MY_WME)); thd->mysys_var->current_mutex= 0;
if(!log) thd->mysys_var->current_cond= 0;
{ thd->proc_info= proc_info;
errmsg = "Could not open next log"; pthread_mutex_unlock(&thd->mysys_var->mutex);
goto err;
}
//check the magic if(read_packet)
if(my_fread(log, (byte*) magic, sizeof(magic), MYF(MY_NABP|MY_WME))) {
{ thd->proc_info = "sending update to slave";
errmsg = "I/O error reading binlog magic number"; if(my_net_write(net, (char*)packet->ptr(), packet->length()) )
goto err; {
} errmsg = "Failed on my_net_write()";
if(memcmp(magic, BINLOG_MAGIC, 4)) goto err;
}
if((*packet)[LOG_EVENT_OFFSET+1] == LOAD_EVENT)
{
if(send_file(thd))
{ {
errmsg = "Binlog has bad magic number, fire your magician"; errmsg = "failed in send_file()";
goto err; goto err;
} }
// fake Rotate_log event just in case it did not make it to the log
// otherwise the slave make get confused about the offset
{
char header[LOG_EVENT_HEADER_LEN];
memset(header, 0, 4); // when does not matter
header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
char* p = strrchr(log_file_name, FN_LIBCHAR);
// find the last slash
if(p)
p++;
else
p = log_file_name;
uint ident_len = (uint) strlen(p);
ulong event_len = ident_len + sizeof(header);
int4store(header + EVENT_TYPE_OFFSET + 1, server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
packet->append(header, sizeof(header));
packet->append(p,ident_len);
if(my_net_write(net, (char*)packet->ptr(), packet->length()))
{
errmsg = "failed on my_net_write()";
goto err;
}
packet->length(0);
packet->append("\0",1);
} }
packet->length(0);
packet->append("\0",1);
// no need to net_flush because we will get to flush later when
// we hit EOF pretty quick
} }
if(fatal_error)
{
errmsg = "error reading log entry";
goto err;
}
log.error=0;
}
}
else
{
bool loop_breaker = 0;
// need this to break out of the for loop from switch
thd->proc_info = "switching to next log";
switch(mysql_bin_log.find_next_log(&linfo))
{
case LOG_INFO_EOF:
loop_breaker = (flags & BINLOG_DUMP_NON_BLOCK);
break;
case 0:
break;
default:
errmsg = "could not find next log";
goto err;
}
if(loop_breaker)
break;
end_io_cache(&log);
(void) my_close(file, MYF(MY_WME));
if ((file=open_log(&log, log_file_name, &errmsg)) < 0)
goto err;
// fake Rotate_log event just in case it did not make it to the log
// otherwise the slave make get confused about the offset
{
char header[LOG_EVENT_HEADER_LEN];
memset(header, 0, 4); // when does not matter
header[EVENT_TYPE_OFFSET] = ROTATE_EVENT;
char* p = strrchr(log_file_name, FN_LIBCHAR);
// find the last slash
if(p)
p++;
else
p = log_file_name;
uint ident_len = (uint) strlen(p);
ulong event_len = ident_len + sizeof(header);
int4store(header + EVENT_TYPE_OFFSET + 1, server_id);
int4store(header + EVENT_LEN_OFFSET, event_len);
packet->append(header, sizeof(header));
packet->append(p,ident_len);
if(my_net_write(net, (char*)packet->ptr(), packet->length()))
{
errmsg = "failed on my_net_write()";
goto err;
}
packet->length(0);
packet->append("\0",1);
}
} }
}
(void)my_fclose(log, MYF(MY_WME)); end_io_cache(&log);
(void)my_close(file, MYF(MY_WME));
send_eof(&thd->net); send_eof(&thd->net);
thd->proc_info = "waiting to finalize termination"; thd->proc_info = "waiting to finalize termination";
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
err: err:
thd->proc_info = "waiting to finalize termination"; thd->proc_info = "waiting to finalize termination";
if(log) end_io_cache(&log);
(void) my_fclose(log, MYF(MY_WME)); if (file >= 0)
(void) my_close(file, MYF(MY_WME));
send_error(&thd->net, 0, errmsg); send_error(&thd->net, 0, errmsg);
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
......
...@@ -95,17 +95,6 @@ bool String::realloc(uint32 alloc_length) ...@@ -95,17 +95,6 @@ bool String::realloc(uint32 alloc_length)
return FALSE; return FALSE;
} }
#ifdef NOT_NEEDED
bool String::set(long num)
{
if (alloc(14))
return TRUE;
str_length=(uint32) (int10_to_str(num,Ptr,-10)-Ptr);
return FALSE;
}
#endif
bool String::set(longlong num) bool String::set(longlong num)
{ {
if (alloc(21)) if (alloc(21))
...@@ -274,6 +263,7 @@ bool String::append(const char *s,uint32 arg_length) ...@@ -274,6 +263,7 @@ bool String::append(const char *s,uint32 arg_length)
return FALSE; return FALSE;
} }
#ifdef TO_BE_REMOVED
bool String::append(FILE* file, uint32 arg_length, myf my_flags) bool String::append(FILE* file, uint32 arg_length, myf my_flags)
{ {
if (realloc(str_length+arg_length)) if (realloc(str_length+arg_length))
...@@ -286,6 +276,20 @@ bool String::append(FILE* file, uint32 arg_length, myf my_flags) ...@@ -286,6 +276,20 @@ bool String::append(FILE* file, uint32 arg_length, myf my_flags)
str_length+=arg_length; str_length+=arg_length;
return FALSE; return FALSE;
} }
#endif
bool String::append(IO_CACHE* file, uint32 arg_length)
{
if (realloc(str_length+arg_length))
return TRUE;
if (my_b_read(file, (byte*) Ptr + str_length, arg_length))
{
shrink(str_length);
return TRUE;
}
str_length+=arg_length;
return FALSE;
}
uint32 String::numchars() uint32 String::numchars()
{ {
......
...@@ -152,7 +152,7 @@ public: ...@@ -152,7 +152,7 @@ public:
bool copy(const char *s,uint32 arg_length); // Allocate new string bool copy(const char *s,uint32 arg_length); // Allocate new string
bool append(const String &s); bool append(const String &s);
bool append(const char *s,uint32 arg_length=0); bool append(const char *s,uint32 arg_length=0);
bool append(FILE* file, uint32 arg_length, myf my_flags); bool append(IO_CACHE* file, uint32 arg_length);
int strstr(const String &search,uint32 offset=0); // Returns offset to substring or -1 int strstr(const String &search,uint32 offset=0); // Returns offset to substring or -1
int strrstr(const String &search,uint32 offset=0); // Returns offset to substring or -1 int strrstr(const String &search,uint32 offset=0); // Returns offset to substring or -1
bool replace(uint32 offset,uint32 arg_length,const String &to); bool replace(uint32 offset,uint32 arg_length,const String &to);
......
...@@ -413,16 +413,6 @@ ulong convert_month_to_period(ulong month) ...@@ -413,16 +413,6 @@ ulong convert_month_to_period(ulong month)
return year*100+month%12+1; return year*100+month%12+1;
} }
#ifdef NOT_NEEDED
ulong add_to_period(ulong period,int months)
{
if (period == 0L)
return 0L;
return convert_month_to_period(convert_period_to_month(period)+months);
}
#endif
/***************************************************************************** /*****************************************************************************
** convert a timestamp string to a TIME value. ** convert a timestamp string to a TIME value.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment