Commit 0f1d0244 authored by konstantin@mysql.com's avatar konstantin@mysql.com

Porting of "buffered read" patch to 5.0 and post-review fixes.

The patch implements the idea suggested by Olaf van der Spek in 
thread "Client: many small reads?" (internals@lists.mysql.com).
Now small reads performed by the client library are buffered.
The buffering gives up to 2 times speedup when retrieving 
one-column tables.
parent e25a5877
...@@ -48,8 +48,8 @@ global_warnings="-Wimplicit -Wreturn-type -Wswitch -Wtrigraphs -Wcomment -W -Wch ...@@ -48,8 +48,8 @@ global_warnings="-Wimplicit -Wreturn-type -Wswitch -Wtrigraphs -Wcomment -W -Wch
c_warnings="$global_warnings -Wunused" c_warnings="$global_warnings -Wunused"
cxx_warnings="$global_warnings -Woverloaded-virtual -Wsign-promo -Wreorder -Wctor-dtor-privacy -Wnon-virtual-dtor" cxx_warnings="$global_warnings -Woverloaded-virtual -Wsign-promo -Wreorder -Wctor-dtor-privacy -Wnon-virtual-dtor"
base_max_configs="--with-innodb --with-berkeley-db --with-ndbcluster --with-archive-storage-engine --with-raid --with-openssl --with-raid --with-vio" base_max_configs="--with-innodb --with-berkeley-db --with-ndbcluster --with-archive-storage-engine --with-raid --with-openssl --with-raid"
max_leave_isam_configs="--with-innodb --with-berkeley-db --with-ndbcluster --with-archive-storage-engine --with-federated-storage-engine --with-raid --with-openssl --with-raid --with-vio --with-embedded-server" max_leave_isam_configs="--with-innodb --with-berkeley-db --with-ndbcluster --with-archive-storage-engine --with-federated-storage-engine --with-raid --with-openssl --with-raid --with-embedded-server"
max_no_es_configs="$max_leave_isam_configs --without-isam" max_no_es_configs="$max_leave_isam_configs --without-isam"
max_configs="$max_no_es_configs --with-embedded-server" max_configs="$max_no_es_configs --with-embedded-server"
......
...@@ -9,7 +9,7 @@ cxx_warnings="$cxx_warnings $debug_extra_warnings" ...@@ -9,7 +9,7 @@ cxx_warnings="$cxx_warnings $debug_extra_warnings"
extra_configs="$pentium_configs $debug_configs" extra_configs="$pentium_configs $debug_configs"
# We want to test isam when building with valgrind # We want to test isam when building with valgrind
extra_configs="$extra_configs --with-berkeley-db --with-innodb --with-isam --with-embedded-server --with-openssl --with-vio --with-raid --with-ndbcluster" extra_configs="$extra_configs --with-berkeley-db --with-innodb --with-isam --with-embedded-server --with-openssl --with-raid --with-ndbcluster"
. "$path/FINISH.sh" . "$path/FINISH.sh"
......
...@@ -594,22 +594,13 @@ AC_MSG_RESULT($ac_cv_conv_longlong_to_float) ...@@ -594,22 +594,13 @@ AC_MSG_RESULT($ac_cv_conv_longlong_to_float)
]) ])
AC_DEFUN([MYSQL_CHECK_VIO], [ AC_DEFUN([MYSQL_CHECK_VIO], [
AC_ARG_WITH([vio], dnl
[ --with-vio Include the Virtual IO support], dnl we always use vio: no need for special defines
[vio="$withval"], dnl
[vio=no]) AC_DEFINE([HAVE_VIO_READ_BUFF], [1],
[Define to enable buffered read. This works only if syscalls
if test "$vio" = "yes" read/recv return as soon as there is some data in the kernel
then buffer, no matter how big the given buffer is.])
vio_dir="vio"
vio_libs="../vio/libvio.la"
AC_DEFINE(HAVE_VIO, 1)
else
vio_dir=""
vio_libs=""
fi
AC_SUBST([vio_dir])
AC_SUBST([vio_libs])
]) ])
# Local version of _AC_PROG_CXX_EXIT_DECLARATION that does not # Local version of _AC_PROG_CXX_EXIT_DECLARATION that does not
......
...@@ -89,9 +89,6 @@ AC_MSG_CHECKING(for OpenSSL) ...@@ -89,9 +89,6 @@ AC_MSG_CHECKING(for OpenSSL)
fi fi
MYSQL_FIND_OPENSSL([$openssl_includes], [$openssl_libs]) MYSQL_FIND_OPENSSL([$openssl_includes], [$openssl_libs])
#force VIO use #force VIO use
vio_dir="vio"
vio_libs="../vio/libvio.la"
AC_DEFINE([HAVE_VIO], [1], [Virtual IO])
AC_MSG_RESULT(yes) AC_MSG_RESULT(yes)
openssl_libs="-L$OPENSSL_LIB -lssl -lcrypto" openssl_libs="-L$OPENSSL_LIB -lssl -lcrypto"
# Don't set openssl_includes to /usr/include as this gives us a lot of # Don't set openssl_includes to /usr/include as this gives us a lot of
......
...@@ -308,7 +308,7 @@ inline double ulonglong2double(ulonglong value) ...@@ -308,7 +308,7 @@ inline double ulonglong2double(ulonglong value)
#define HAVE_QUERY_CACHE #define HAVE_QUERY_CACHE
#define SPRINTF_RETURNS_INT #define SPRINTF_RETURNS_INT
#define HAVE_SETFILEPOINTER #define HAVE_SETFILEPOINTER
#define HAVE_VIO #define HAVE_VIO_READ_BUFF
#ifdef NOT_USED #ifdef NOT_USED
#define HAVE_SNPRINTF /* Gave link error */ #define HAVE_SNPRINTF /* Gave link error */
......
...@@ -37,7 +37,12 @@ enum enum_vio_type ...@@ -37,7 +37,12 @@ enum enum_vio_type
VIO_TYPE_SSL, VIO_TYPE_SHARED_MEMORY VIO_TYPE_SSL, VIO_TYPE_SHARED_MEMORY
}; };
Vio* vio_new(my_socket sd, enum enum_vio_type type, my_bool localhost);
#define VIO_LOCALHOST 1 /* a localhost connection */
#define VIO_BUFFERED_READ 2 /* use buffered read */
#define VIO_READ_BUFFER_SIZE 16384 /* size of read buffer */
Vio* vio_new(my_socket sd, enum enum_vio_type type, uint flags);
#ifdef __WIN__ #ifdef __WIN__
Vio* vio_new_win32pipe(HANDLE hPipe); Vio* vio_new_win32pipe(HANDLE hPipe);
Vio* vio_new_win32shared_memory(NET *net,HANDLE handle_file_map, Vio* vio_new_win32shared_memory(NET *net,HANDLE handle_file_map,
...@@ -57,8 +62,9 @@ int vio_close_pipe(Vio * vio); ...@@ -57,8 +62,9 @@ int vio_close_pipe(Vio * vio);
void vio_delete(Vio* vio); void vio_delete(Vio* vio);
int vio_close(Vio* vio); int vio_close(Vio* vio);
void vio_reset(Vio* vio, enum enum_vio_type type, void vio_reset(Vio* vio, enum enum_vio_type type,
my_socket sd, HANDLE hPipe, my_bool localhost); my_socket sd, HANDLE hPipe, uint flags);
int vio_read(Vio *vio, gptr buf, int size); int vio_read(Vio *vio, gptr buf, int size);
int vio_read_buff(Vio *vio, gptr buf, int size);
int vio_write(Vio *vio, const gptr buf, int size); int vio_write(Vio *vio, const gptr buf, int size);
int vio_blocking(Vio *vio, my_bool onoff, my_bool *old_mode); int vio_blocking(Vio *vio, my_bool onoff, my_bool *old_mode);
my_bool vio_is_blocking(Vio *vio); my_bool vio_is_blocking(Vio *vio);
...@@ -135,7 +141,7 @@ int vio_close_shared_memory(Vio * vio); ...@@ -135,7 +141,7 @@ int vio_close_shared_memory(Vio * vio);
} }
#endif #endif
#if defined(HAVE_VIO) && !defined(DONT_MAP_VIO) #if !defined(DONT_MAP_VIO)
#define vio_delete(vio) (vio)->viodelete(vio) #define vio_delete(vio) (vio)->viodelete(vio)
#define vio_errno(vio) (vio)->vioerrno(vio) #define vio_errno(vio) (vio)->vioerrno(vio)
#define vio_read(vio, buf, size) (vio)->read(vio,buf,size) #define vio_read(vio, buf, size) (vio)->read(vio,buf,size)
...@@ -150,7 +156,7 @@ int vio_close_shared_memory(Vio * vio); ...@@ -150,7 +156,7 @@ int vio_close_shared_memory(Vio * vio);
#define vio_peer_addr(vio, buf, prt) (vio)->peer_addr(vio, buf, prt) #define vio_peer_addr(vio, buf, prt) (vio)->peer_addr(vio, buf, prt)
#define vio_in_addr(vio, in) (vio)->in_addr(vio, in) #define vio_in_addr(vio, in) (vio)->in_addr(vio, in)
#define vio_timeout(vio, seconds) (vio)->timeout(vio, seconds) #define vio_timeout(vio, seconds) (vio)->timeout(vio, seconds)
#endif /* defined(HAVE_VIO) && !defined(DONT_MAP_VIO) */ #endif /* !defined(DONT_MAP_VIO) */
/* This enumerator is used in parser - should be always visible */ /* This enumerator is used in parser - should be always visible */
enum SSL_type enum SSL_type
...@@ -175,7 +181,10 @@ struct st_vio ...@@ -175,7 +181,10 @@ struct st_vio
struct sockaddr_in remote; /* Remote internet address */ struct sockaddr_in remote; /* Remote internet address */
enum enum_vio_type type; /* Type of connection */ enum enum_vio_type type; /* Type of connection */
char desc[30]; /* String description */ char desc[30]; /* String description */
#ifdef HAVE_VIO char *read_buffer; /* buffer for vio_read_buff */
char *read_pos; /* start of unfetched data in the
read buffer */
char *read_end; /* end of unfetched data */
/* function pointers. They are similar for socket/SSL/whatever */ /* function pointers. They are similar for socket/SSL/whatever */
void (*viodelete)(Vio*); void (*viodelete)(Vio*);
int (*vioerrno)(Vio*); int (*vioerrno)(Vio*);
...@@ -203,6 +212,5 @@ struct st_vio ...@@ -203,6 +212,5 @@ struct st_vio
char *shared_memory_pos; char *shared_memory_pos;
NET *net; NET *net;
#endif /* HAVE_SMEM */ #endif /* HAVE_SMEM */
#endif /* HAVE_VIO */
}; };
#endif /* vio_violite_h_ */ #endif /* vio_violite_h_ */
...@@ -1681,7 +1681,8 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user, ...@@ -1681,7 +1681,8 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
ER(net->last_errno),socket_errno); ER(net->last_errno),socket_errno);
goto error; goto error;
} }
net->vio = vio_new(sock, VIO_TYPE_SOCKET, TRUE); net->vio= vio_new(sock, VIO_TYPE_SOCKET,
VIO_LOCALHOST | VIO_BUFFERED_READ);
bzero((char*) &UNIXaddr,sizeof(UNIXaddr)); bzero((char*) &UNIXaddr,sizeof(UNIXaddr));
UNIXaddr.sun_family = AF_UNIX; UNIXaddr.sun_family = AF_UNIX;
strmake(UNIXaddr.sun_path, unix_socket, sizeof(UNIXaddr.sun_path)-1); strmake(UNIXaddr.sun_path, unix_socket, sizeof(UNIXaddr.sun_path)-1);
...@@ -1756,7 +1757,7 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user, ...@@ -1756,7 +1757,7 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
ER(net->last_errno),socket_errno); ER(net->last_errno),socket_errno);
goto error; goto error;
} }
net->vio = vio_new(sock,VIO_TYPE_TCPIP,FALSE); net->vio= vio_new(sock, VIO_TYPE_TCPIP, VIO_BUFFERED_READ);
bzero((char*) &sock_addr,sizeof(sock_addr)); bzero((char*) &sock_addr,sizeof(sock_addr));
sock_addr.sin_family = AF_INET; sock_addr.sin_family = AF_INET;
......
...@@ -3745,7 +3745,7 @@ extern "C" pthread_handler_decl(handle_connections_sockets, ...@@ -3745,7 +3745,7 @@ extern "C" pthread_handler_decl(handle_connections_sockets,
if (!(vio_tmp=vio_new(new_sock, if (!(vio_tmp=vio_new(new_sock,
sock == unix_sock ? VIO_TYPE_SOCKET : sock == unix_sock ? VIO_TYPE_SOCKET :
VIO_TYPE_TCPIP, VIO_TYPE_TCPIP,
sock == unix_sock)) || sock == unix_sock ? VIO_LOCALHOST: 0)) ||
my_net_init(&thd->net,vio_tmp)) my_net_init(&thd->net,vio_tmp))
{ {
if (vio_tmp) if (vio_tmp)
......
...@@ -27,20 +27,23 @@ ...@@ -27,20 +27,23 @@
* Helper to fill most of the Vio* with defaults. * Helper to fill most of the Vio* with defaults.
*/ */
void vio_reset(Vio* vio, enum enum_vio_type type, static void vio_init(Vio* vio, enum enum_vio_type type,
my_socket sd, HANDLE hPipe, my_socket sd, HANDLE hPipe, uint flags)
my_bool localhost)
{ {
DBUG_ENTER("vio_reset"); DBUG_ENTER("vio_init");
DBUG_PRINT("enter", ("type: %d sd: %d localhost: %d", type, sd, DBUG_PRINT("enter", ("type: %d sd: %d flags: %d", type, sd, flags));
localhost));
#ifndef HAVE_VIO_READ_BUFF
flags&= ~VIO_BUFFERED_READ;
#endif
bzero((char*) vio, sizeof(*vio)); bzero((char*) vio, sizeof(*vio));
vio->type = type; vio->type = type;
vio->sd = sd; vio->sd = sd;
vio->hPipe = hPipe; vio->hPipe = hPipe;
vio->localhost= localhost; vio->localhost= flags & VIO_LOCALHOST;
#ifdef HAVE_VIO if ((flags & VIO_BUFFERED_READ) &&
!(vio->read_buffer= (char*)my_malloc(VIO_READ_BUFFER_SIZE, MYF(MY_WME))))
flags&= ~VIO_BUFFERED_READ;
#ifdef __WIN__ #ifdef __WIN__
if (type == VIO_TYPE_NAMEDPIPE) if (type == VIO_TYPE_NAMEDPIPE)
{ {
...@@ -101,7 +104,7 @@ void vio_reset(Vio* vio, enum enum_vio_type type, ...@@ -101,7 +104,7 @@ void vio_reset(Vio* vio, enum enum_vio_type type,
{ {
vio->viodelete =vio_delete; vio->viodelete =vio_delete;
vio->vioerrno =vio_errno; vio->vioerrno =vio_errno;
vio->read =vio_read; vio->read= (flags & VIO_BUFFERED_READ) ? vio_read_buff : vio_read;
vio->write =vio_write; vio->write =vio_write;
vio->fastsend =vio_fastsend; vio->fastsend =vio_fastsend;
vio->viokeepalive =vio_keepalive; vio->viokeepalive =vio_keepalive;
...@@ -113,21 +116,30 @@ void vio_reset(Vio* vio, enum enum_vio_type type, ...@@ -113,21 +116,30 @@ void vio_reset(Vio* vio, enum enum_vio_type type,
vio->is_blocking =vio_is_blocking; vio->is_blocking =vio_is_blocking;
vio->timeout =vio_timeout; vio->timeout =vio_timeout;
} }
#endif /* HAVE_VIO */
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/* Reset initialized VIO to use with another transport type */
void vio_reset(Vio* vio, enum enum_vio_type type,
my_socket sd, HANDLE hPipe, uint flags)
{
my_free(vio->read_buffer, MYF(MY_ALLOW_ZERO_PTR));
vio_init(vio, type, sd, hPipe, flags);
}
/* Open the socket or TCP/IP connection and read the fnctl() status */ /* Open the socket or TCP/IP connection and read the fnctl() status */
Vio *vio_new(my_socket sd, enum enum_vio_type type, my_bool localhost) Vio *vio_new(my_socket sd, enum enum_vio_type type, uint flags)
{ {
Vio *vio; Vio *vio;
DBUG_ENTER("vio_new"); DBUG_ENTER("vio_new");
DBUG_PRINT("enter", ("sd: %d", sd)); DBUG_PRINT("enter", ("sd: %d", sd));
if ((vio = (Vio*) my_malloc(sizeof(*vio),MYF(MY_WME)))) if ((vio = (Vio*) my_malloc(sizeof(*vio),MYF(MY_WME))))
{ {
vio_reset(vio, type, sd, 0, localhost); vio_init(vio, type, sd, 0, flags);
sprintf(vio->desc, sprintf(vio->desc,
(vio->type == VIO_TYPE_SOCKET ? "socket (%d)" : "TCP/IP (%d)"), (vio->type == VIO_TYPE_SOCKET ? "socket (%d)" : "TCP/IP (%d)"),
vio->sd); vio->sd);
...@@ -163,7 +175,7 @@ Vio *vio_new_win32pipe(HANDLE hPipe) ...@@ -163,7 +175,7 @@ Vio *vio_new_win32pipe(HANDLE hPipe)
DBUG_ENTER("vio_new_handle"); DBUG_ENTER("vio_new_handle");
if ((vio = (Vio*) my_malloc(sizeof(Vio),MYF(MY_WME)))) if ((vio = (Vio*) my_malloc(sizeof(Vio),MYF(MY_WME))))
{ {
vio_reset(vio, VIO_TYPE_NAMEDPIPE, 0, hPipe, TRUE); vio_init(vio, VIO_TYPE_NAMEDPIPE, 0, hPipe, VIO_LOCALHOST);
strmov(vio->desc, "named pipe"); strmov(vio->desc, "named pipe");
} }
DBUG_RETURN(vio); DBUG_RETURN(vio);
...@@ -179,7 +191,7 @@ Vio *vio_new_win32shared_memory(NET *net,HANDLE handle_file_map, HANDLE handle_m ...@@ -179,7 +191,7 @@ Vio *vio_new_win32shared_memory(NET *net,HANDLE handle_file_map, HANDLE handle_m
DBUG_ENTER("vio_new_win32shared_memory"); DBUG_ENTER("vio_new_win32shared_memory");
if ((vio = (Vio*) my_malloc(sizeof(Vio),MYF(MY_WME)))) if ((vio = (Vio*) my_malloc(sizeof(Vio),MYF(MY_WME))))
{ {
vio_reset(vio, VIO_TYPE_SHARED_MEMORY, 0, 0, TRUE); vio_init(vio, VIO_TYPE_SHARED_MEMORY, 0, 0, VIO_LOCALHOST);
vio->handle_file_map= handle_file_map; vio->handle_file_map= handle_file_map;
vio->handle_map= handle_map; vio->handle_map= handle_map;
vio->event_server_wrote= event_server_wrote; vio->event_server_wrote= event_server_wrote;
...@@ -204,11 +216,8 @@ void vio_delete(Vio* vio) ...@@ -204,11 +216,8 @@ void vio_delete(Vio* vio)
if (vio) if (vio)
{ {
if (vio->type != VIO_CLOSED) if (vio->type != VIO_CLOSED)
#ifdef HAVE_VIO /*WAX*/
vio->vioclose(vio); vio->vioclose(vio);
#else my_free((gptr) vio->read_buffer, MYF(MY_ALLOW_ZERO_PTR));
vio_close(vio);
#endif
my_free((gptr) vio,MYF(0)); my_free((gptr) vio,MYF(0));
} }
} }
...@@ -35,6 +35,8 @@ int vio_read(Vio * vio, gptr buf, int size) ...@@ -35,6 +35,8 @@ int vio_read(Vio * vio, gptr buf, int size)
DBUG_ENTER("vio_read"); DBUG_ENTER("vio_read");
DBUG_PRINT("enter", ("sd: %d, buf: 0x%p, size: %d", vio->sd, buf, size)); DBUG_PRINT("enter", ("sd: %d, buf: 0x%p, size: %d", vio->sd, buf, size));
/* Ensure nobody uses vio_read_buff and vio_read simultaneously */
DBUG_ASSERT(vio->read_end == vio->read_pos);
#ifdef __WIN__ #ifdef __WIN__
r = recv(vio->sd, buf, size,0); r = recv(vio->sd, buf, size,0);
#else #else
...@@ -52,6 +54,50 @@ int vio_read(Vio * vio, gptr buf, int size) ...@@ -52,6 +54,50 @@ int vio_read(Vio * vio, gptr buf, int size)
} }
/*
Buffered read: if average read size is small it may
reduce number of syscalls.
*/
int vio_read_buff(Vio *vio, gptr buf, int size)
{
int rc;
#define VIO_UNBUFFERED_READ_MIN_SIZE 2048
DBUG_ENTER("vio_read_buff");
DBUG_PRINT("enter", ("sd: %d, buf: 0x%p, size: %d", vio->sd, buf, size));
if (vio->read_pos < vio->read_end)
{
rc= min(vio->read_end - vio->read_pos, size);
memcpy(buf, vio->read_pos, rc);
vio->read_pos+= rc;
/*
Do not try to read from the socket now even if rc < size:
vio_read can return -1 due to an error or non-blocking mode, and
the safest way to handle it is to move to a separate branch.
*/
}
else if (size < VIO_UNBUFFERED_READ_MIN_SIZE)
{
rc= vio_read(vio, vio->read_buffer, VIO_READ_BUFFER_SIZE);
if (rc > 0)
{
if (rc > size)
{
vio->read_pos= vio->read_buffer + size;
vio->read_end= vio->read_buffer + rc;
rc= size;
}
memcpy(buf, vio->read_buffer, rc);
}
}
else
rc= vio_read(vio, buf, size);
DBUG_RETURN(rc);
#undef VIO_UNBUFFERED_READ_MIN_SIZE
}
int vio_write(Vio * vio, const gptr buf, int size) int vio_write(Vio * vio, const gptr buf, int size)
{ {
int r; int r;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment