Commit 89a57fd3 authored by unknown's avatar unknown

Porting of "buffered read" patch to 5.0 and post-review fixes.

The patch implements the idea suggested by Olaf van der Spek in 
thread "Client: many small reads?" (internals@lists.mysql.com).
Now small reads performed by the client library are buffered.
The buffering gives up to 2 times speedup when retrieving 
one-column tables.


BUILD/SETUP.sh:
  Remove --with-vio option which no longer exist.
BUILD/compile-pentium64-valgrind-max:
  Remove --with-vio option which no longer exist.
config/ac-macros/misc.m4:
  Removed --with-vio configure switch: we always use VIO. The switch,
   in fact, only saved us one pointer dereferencing per call in case we had
  only one transport type in VIO enabled.
config/ac-macros/openssl.m4:
  Removed HAVE_VIO.
include/config-win.h:
  Removed HAVE_VIO (not needed anymore)
  Added HAVE_VIO_READ_BUFF (define buffered client reads for Windows clients).
include/violite.h:
  Removed HAVE_VIO, as currently VIO is always in use.
  Added declaration for vio_read_buff and related members in struct VIO.
sql-common/client.c:
  Use flags to set up vio read buffering in mysql_real_connect.
sql/mysqld.cc:
  Use flags to disable vio read buffering when creating a server 
  connection.
vio/vio.c:
  Optionally set up vio read buffer when creating a new VIO structure.
vio/viosocket.c:
  Implementation of client-side buffered reads in VIO: the idea 
  is to buffer small reads in a client buffer to save amount of
  syscalls per retrieved result set. The implementation relies
  on the fact that read/recv will return as soon as there is
  some data in the kernel buffer, no matter how big the given
  user's buffer is. To be able to disable it in case recv/read don't
  have such semantics, the new calls are guarded
  with #define HAVE_VIO_READ_BUFF. Currently buffered reading is 
  switched on only for BSD sockets and named pipes, both on Windows
  and UNIXes.
parent d06b2ee9
...@@ -48,8 +48,8 @@ global_warnings="-Wimplicit -Wreturn-type -Wswitch -Wtrigraphs -Wcomment -W -Wch ...@@ -48,8 +48,8 @@ global_warnings="-Wimplicit -Wreturn-type -Wswitch -Wtrigraphs -Wcomment -W -Wch
c_warnings="$global_warnings -Wunused" c_warnings="$global_warnings -Wunused"
cxx_warnings="$global_warnings -Woverloaded-virtual -Wsign-promo -Wreorder -Wctor-dtor-privacy -Wnon-virtual-dtor" cxx_warnings="$global_warnings -Woverloaded-virtual -Wsign-promo -Wreorder -Wctor-dtor-privacy -Wnon-virtual-dtor"
base_max_configs="--with-innodb --with-berkeley-db --with-ndbcluster --with-archive-storage-engine --with-raid --with-openssl --with-raid --with-vio" base_max_configs="--with-innodb --with-berkeley-db --with-ndbcluster --with-archive-storage-engine --with-raid --with-openssl --with-raid"
max_leave_isam_configs="--with-innodb --with-berkeley-db --with-ndbcluster --with-archive-storage-engine --with-federated-storage-engine --with-raid --with-openssl --with-raid --with-vio --with-embedded-server" max_leave_isam_configs="--with-innodb --with-berkeley-db --with-ndbcluster --with-archive-storage-engine --with-federated-storage-engine --with-raid --with-openssl --with-raid --with-embedded-server"
max_no_es_configs="$max_leave_isam_configs --without-isam" max_no_es_configs="$max_leave_isam_configs --without-isam"
max_configs="$max_no_es_configs --with-embedded-server" max_configs="$max_no_es_configs --with-embedded-server"
......
...@@ -9,7 +9,7 @@ cxx_warnings="$cxx_warnings $debug_extra_warnings" ...@@ -9,7 +9,7 @@ cxx_warnings="$cxx_warnings $debug_extra_warnings"
extra_configs="$pentium_configs $debug_configs" extra_configs="$pentium_configs $debug_configs"
# We want to test isam when building with valgrind # We want to test isam when building with valgrind
extra_configs="$extra_configs --with-berkeley-db --with-innodb --with-isam --with-embedded-server --with-openssl --with-vio --with-raid --with-ndbcluster" extra_configs="$extra_configs --with-berkeley-db --with-innodb --with-isam --with-embedded-server --with-openssl --with-raid --with-ndbcluster"
. "$path/FINISH.sh" . "$path/FINISH.sh"
......
...@@ -594,22 +594,13 @@ AC_MSG_RESULT($ac_cv_conv_longlong_to_float) ...@@ -594,22 +594,13 @@ AC_MSG_RESULT($ac_cv_conv_longlong_to_float)
]) ])
AC_DEFUN([MYSQL_CHECK_VIO], [ AC_DEFUN([MYSQL_CHECK_VIO], [
AC_ARG_WITH([vio], dnl
[ --with-vio Include the Virtual IO support], dnl we always use vio: no need for special defines
[vio="$withval"], dnl
[vio=no]) AC_DEFINE([HAVE_VIO_READ_BUFF], [1],
[Define to enable buffered read. This works only if syscalls
if test "$vio" = "yes" read/recv return as soon as there is some data in the kernel
then buffer, no matter how big the given buffer is.])
vio_dir="vio"
vio_libs="../vio/libvio.la"
AC_DEFINE(HAVE_VIO, 1)
else
vio_dir=""
vio_libs=""
fi
AC_SUBST([vio_dir])
AC_SUBST([vio_libs])
]) ])
# Local version of _AC_PROG_CXX_EXIT_DECLARATION that does not # Local version of _AC_PROG_CXX_EXIT_DECLARATION that does not
......
...@@ -89,9 +89,6 @@ AC_MSG_CHECKING(for OpenSSL) ...@@ -89,9 +89,6 @@ AC_MSG_CHECKING(for OpenSSL)
fi fi
MYSQL_FIND_OPENSSL([$openssl_includes], [$openssl_libs]) MYSQL_FIND_OPENSSL([$openssl_includes], [$openssl_libs])
#force VIO use #force VIO use
vio_dir="vio"
vio_libs="../vio/libvio.la"
AC_DEFINE([HAVE_VIO], [1], [Virtual IO])
AC_MSG_RESULT(yes) AC_MSG_RESULT(yes)
openssl_libs="-L$OPENSSL_LIB -lssl -lcrypto" openssl_libs="-L$OPENSSL_LIB -lssl -lcrypto"
# Don't set openssl_includes to /usr/include as this gives us a lot of # Don't set openssl_includes to /usr/include as this gives us a lot of
......
...@@ -308,7 +308,7 @@ inline double ulonglong2double(ulonglong value) ...@@ -308,7 +308,7 @@ inline double ulonglong2double(ulonglong value)
#define HAVE_QUERY_CACHE #define HAVE_QUERY_CACHE
#define SPRINTF_RETURNS_INT #define SPRINTF_RETURNS_INT
#define HAVE_SETFILEPOINTER #define HAVE_SETFILEPOINTER
#define HAVE_VIO #define HAVE_VIO_READ_BUFF
#ifdef NOT_USED #ifdef NOT_USED
#define HAVE_SNPRINTF /* Gave link error */ #define HAVE_SNPRINTF /* Gave link error */
......
...@@ -37,7 +37,12 @@ enum enum_vio_type ...@@ -37,7 +37,12 @@ enum enum_vio_type
VIO_TYPE_SSL, VIO_TYPE_SHARED_MEMORY VIO_TYPE_SSL, VIO_TYPE_SHARED_MEMORY
}; };
Vio* vio_new(my_socket sd, enum enum_vio_type type, my_bool localhost);
#define VIO_LOCALHOST 1 /* a localhost connection */
#define VIO_BUFFERED_READ 2 /* use buffered read */
#define VIO_READ_BUFFER_SIZE 16384 /* size of read buffer */
Vio* vio_new(my_socket sd, enum enum_vio_type type, uint flags);
#ifdef __WIN__ #ifdef __WIN__
Vio* vio_new_win32pipe(HANDLE hPipe); Vio* vio_new_win32pipe(HANDLE hPipe);
Vio* vio_new_win32shared_memory(NET *net,HANDLE handle_file_map, Vio* vio_new_win32shared_memory(NET *net,HANDLE handle_file_map,
...@@ -57,8 +62,9 @@ int vio_close_pipe(Vio * vio); ...@@ -57,8 +62,9 @@ int vio_close_pipe(Vio * vio);
void vio_delete(Vio* vio); void vio_delete(Vio* vio);
int vio_close(Vio* vio); int vio_close(Vio* vio);
void vio_reset(Vio* vio, enum enum_vio_type type, void vio_reset(Vio* vio, enum enum_vio_type type,
my_socket sd, HANDLE hPipe, my_bool localhost); my_socket sd, HANDLE hPipe, uint flags);
int vio_read(Vio *vio, gptr buf, int size); int vio_read(Vio *vio, gptr buf, int size);
int vio_read_buff(Vio *vio, gptr buf, int size);
int vio_write(Vio *vio, const gptr buf, int size); int vio_write(Vio *vio, const gptr buf, int size);
int vio_blocking(Vio *vio, my_bool onoff, my_bool *old_mode); int vio_blocking(Vio *vio, my_bool onoff, my_bool *old_mode);
my_bool vio_is_blocking(Vio *vio); my_bool vio_is_blocking(Vio *vio);
...@@ -135,7 +141,7 @@ int vio_close_shared_memory(Vio * vio); ...@@ -135,7 +141,7 @@ int vio_close_shared_memory(Vio * vio);
} }
#endif #endif
#if defined(HAVE_VIO) && !defined(DONT_MAP_VIO) #if !defined(DONT_MAP_VIO)
#define vio_delete(vio) (vio)->viodelete(vio) #define vio_delete(vio) (vio)->viodelete(vio)
#define vio_errno(vio) (vio)->vioerrno(vio) #define vio_errno(vio) (vio)->vioerrno(vio)
#define vio_read(vio, buf, size) (vio)->read(vio,buf,size) #define vio_read(vio, buf, size) (vio)->read(vio,buf,size)
...@@ -150,7 +156,7 @@ int vio_close_shared_memory(Vio * vio); ...@@ -150,7 +156,7 @@ int vio_close_shared_memory(Vio * vio);
#define vio_peer_addr(vio, buf, prt) (vio)->peer_addr(vio, buf, prt) #define vio_peer_addr(vio, buf, prt) (vio)->peer_addr(vio, buf, prt)
#define vio_in_addr(vio, in) (vio)->in_addr(vio, in) #define vio_in_addr(vio, in) (vio)->in_addr(vio, in)
#define vio_timeout(vio, seconds) (vio)->timeout(vio, seconds) #define vio_timeout(vio, seconds) (vio)->timeout(vio, seconds)
#endif /* defined(HAVE_VIO) && !defined(DONT_MAP_VIO) */ #endif /* !defined(DONT_MAP_VIO) */
/* This enumerator is used in parser - should be always visible */ /* This enumerator is used in parser - should be always visible */
enum SSL_type enum SSL_type
...@@ -175,7 +181,10 @@ struct st_vio ...@@ -175,7 +181,10 @@ struct st_vio
struct sockaddr_in remote; /* Remote internet address */ struct sockaddr_in remote; /* Remote internet address */
enum enum_vio_type type; /* Type of connection */ enum enum_vio_type type; /* Type of connection */
char desc[30]; /* String description */ char desc[30]; /* String description */
#ifdef HAVE_VIO char *read_buffer; /* buffer for vio_read_buff */
char *read_pos; /* start of unfetched data in the
read buffer */
char *read_end; /* end of unfetched data */
/* function pointers. They are similar for socket/SSL/whatever */ /* function pointers. They are similar for socket/SSL/whatever */
void (*viodelete)(Vio*); void (*viodelete)(Vio*);
int (*vioerrno)(Vio*); int (*vioerrno)(Vio*);
...@@ -203,6 +212,5 @@ struct st_vio ...@@ -203,6 +212,5 @@ struct st_vio
char *shared_memory_pos; char *shared_memory_pos;
NET *net; NET *net;
#endif /* HAVE_SMEM */ #endif /* HAVE_SMEM */
#endif /* HAVE_VIO */
}; };
#endif /* vio_violite_h_ */ #endif /* vio_violite_h_ */
...@@ -1681,7 +1681,8 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user, ...@@ -1681,7 +1681,8 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
ER(net->last_errno),socket_errno); ER(net->last_errno),socket_errno);
goto error; goto error;
} }
net->vio = vio_new(sock, VIO_TYPE_SOCKET, TRUE); net->vio= vio_new(sock, VIO_TYPE_SOCKET,
VIO_LOCALHOST | VIO_BUFFERED_READ);
bzero((char*) &UNIXaddr,sizeof(UNIXaddr)); bzero((char*) &UNIXaddr,sizeof(UNIXaddr));
UNIXaddr.sun_family = AF_UNIX; UNIXaddr.sun_family = AF_UNIX;
strmake(UNIXaddr.sun_path, unix_socket, sizeof(UNIXaddr.sun_path)-1); strmake(UNIXaddr.sun_path, unix_socket, sizeof(UNIXaddr.sun_path)-1);
...@@ -1756,7 +1757,7 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user, ...@@ -1756,7 +1757,7 @@ CLI_MYSQL_REAL_CONNECT(MYSQL *mysql,const char *host, const char *user,
ER(net->last_errno),socket_errno); ER(net->last_errno),socket_errno);
goto error; goto error;
} }
net->vio = vio_new(sock,VIO_TYPE_TCPIP,FALSE); net->vio= vio_new(sock, VIO_TYPE_TCPIP, VIO_BUFFERED_READ);
bzero((char*) &sock_addr,sizeof(sock_addr)); bzero((char*) &sock_addr,sizeof(sock_addr));
sock_addr.sin_family = AF_INET; sock_addr.sin_family = AF_INET;
......
...@@ -3745,7 +3745,7 @@ extern "C" pthread_handler_decl(handle_connections_sockets, ...@@ -3745,7 +3745,7 @@ extern "C" pthread_handler_decl(handle_connections_sockets,
if (!(vio_tmp=vio_new(new_sock, if (!(vio_tmp=vio_new(new_sock,
sock == unix_sock ? VIO_TYPE_SOCKET : sock == unix_sock ? VIO_TYPE_SOCKET :
VIO_TYPE_TCPIP, VIO_TYPE_TCPIP,
sock == unix_sock)) || sock == unix_sock ? VIO_LOCALHOST: 0)) ||
my_net_init(&thd->net,vio_tmp)) my_net_init(&thd->net,vio_tmp))
{ {
if (vio_tmp) if (vio_tmp)
......
...@@ -27,20 +27,23 @@ ...@@ -27,20 +27,23 @@
* Helper to fill most of the Vio* with defaults. * Helper to fill most of the Vio* with defaults.
*/ */
void vio_reset(Vio* vio, enum enum_vio_type type, static void vio_init(Vio* vio, enum enum_vio_type type,
my_socket sd, HANDLE hPipe, my_socket sd, HANDLE hPipe, uint flags)
my_bool localhost)
{ {
DBUG_ENTER("vio_reset"); DBUG_ENTER("vio_init");
DBUG_PRINT("enter", ("type: %d sd: %d localhost: %d", type, sd, DBUG_PRINT("enter", ("type: %d sd: %d flags: %d", type, sd, flags));
localhost));
#ifndef HAVE_VIO_READ_BUFF
flags&= ~VIO_BUFFERED_READ;
#endif
bzero((char*) vio, sizeof(*vio)); bzero((char*) vio, sizeof(*vio));
vio->type = type; vio->type = type;
vio->sd = sd; vio->sd = sd;
vio->hPipe = hPipe; vio->hPipe = hPipe;
vio->localhost= localhost; vio->localhost= flags & VIO_LOCALHOST;
#ifdef HAVE_VIO if ((flags & VIO_BUFFERED_READ) &&
!(vio->read_buffer= (char*)my_malloc(VIO_READ_BUFFER_SIZE, MYF(MY_WME))))
flags&= ~VIO_BUFFERED_READ;
#ifdef __WIN__ #ifdef __WIN__
if (type == VIO_TYPE_NAMEDPIPE) if (type == VIO_TYPE_NAMEDPIPE)
{ {
...@@ -101,7 +104,7 @@ void vio_reset(Vio* vio, enum enum_vio_type type, ...@@ -101,7 +104,7 @@ void vio_reset(Vio* vio, enum enum_vio_type type,
{ {
vio->viodelete =vio_delete; vio->viodelete =vio_delete;
vio->vioerrno =vio_errno; vio->vioerrno =vio_errno;
vio->read =vio_read; vio->read= (flags & VIO_BUFFERED_READ) ? vio_read_buff : vio_read;
vio->write =vio_write; vio->write =vio_write;
vio->fastsend =vio_fastsend; vio->fastsend =vio_fastsend;
vio->viokeepalive =vio_keepalive; vio->viokeepalive =vio_keepalive;
...@@ -113,21 +116,30 @@ void vio_reset(Vio* vio, enum enum_vio_type type, ...@@ -113,21 +116,30 @@ void vio_reset(Vio* vio, enum enum_vio_type type,
vio->is_blocking =vio_is_blocking; vio->is_blocking =vio_is_blocking;
vio->timeout =vio_timeout; vio->timeout =vio_timeout;
} }
#endif /* HAVE_VIO */
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/* Reset initialized VIO to use with another transport type */
void vio_reset(Vio* vio, enum enum_vio_type type,
my_socket sd, HANDLE hPipe, uint flags)
{
my_free(vio->read_buffer, MYF(MY_ALLOW_ZERO_PTR));
vio_init(vio, type, sd, hPipe, flags);
}
/* Open the socket or TCP/IP connection and read the fnctl() status */ /* Open the socket or TCP/IP connection and read the fnctl() status */
Vio *vio_new(my_socket sd, enum enum_vio_type type, my_bool localhost) Vio *vio_new(my_socket sd, enum enum_vio_type type, uint flags)
{ {
Vio *vio; Vio *vio;
DBUG_ENTER("vio_new"); DBUG_ENTER("vio_new");
DBUG_PRINT("enter", ("sd: %d", sd)); DBUG_PRINT("enter", ("sd: %d", sd));
if ((vio = (Vio*) my_malloc(sizeof(*vio),MYF(MY_WME)))) if ((vio = (Vio*) my_malloc(sizeof(*vio),MYF(MY_WME))))
{ {
vio_reset(vio, type, sd, 0, localhost); vio_init(vio, type, sd, 0, flags);
sprintf(vio->desc, sprintf(vio->desc,
(vio->type == VIO_TYPE_SOCKET ? "socket (%d)" : "TCP/IP (%d)"), (vio->type == VIO_TYPE_SOCKET ? "socket (%d)" : "TCP/IP (%d)"),
vio->sd); vio->sd);
...@@ -163,7 +175,7 @@ Vio *vio_new_win32pipe(HANDLE hPipe) ...@@ -163,7 +175,7 @@ Vio *vio_new_win32pipe(HANDLE hPipe)
DBUG_ENTER("vio_new_handle"); DBUG_ENTER("vio_new_handle");
if ((vio = (Vio*) my_malloc(sizeof(Vio),MYF(MY_WME)))) if ((vio = (Vio*) my_malloc(sizeof(Vio),MYF(MY_WME))))
{ {
vio_reset(vio, VIO_TYPE_NAMEDPIPE, 0, hPipe, TRUE); vio_init(vio, VIO_TYPE_NAMEDPIPE, 0, hPipe, VIO_LOCALHOST);
strmov(vio->desc, "named pipe"); strmov(vio->desc, "named pipe");
} }
DBUG_RETURN(vio); DBUG_RETURN(vio);
...@@ -179,7 +191,7 @@ Vio *vio_new_win32shared_memory(NET *net,HANDLE handle_file_map, HANDLE handle_m ...@@ -179,7 +191,7 @@ Vio *vio_new_win32shared_memory(NET *net,HANDLE handle_file_map, HANDLE handle_m
DBUG_ENTER("vio_new_win32shared_memory"); DBUG_ENTER("vio_new_win32shared_memory");
if ((vio = (Vio*) my_malloc(sizeof(Vio),MYF(MY_WME)))) if ((vio = (Vio*) my_malloc(sizeof(Vio),MYF(MY_WME))))
{ {
vio_reset(vio, VIO_TYPE_SHARED_MEMORY, 0, 0, TRUE); vio_init(vio, VIO_TYPE_SHARED_MEMORY, 0, 0, VIO_LOCALHOST);
vio->handle_file_map= handle_file_map; vio->handle_file_map= handle_file_map;
vio->handle_map= handle_map; vio->handle_map= handle_map;
vio->event_server_wrote= event_server_wrote; vio->event_server_wrote= event_server_wrote;
...@@ -204,11 +216,8 @@ void vio_delete(Vio* vio) ...@@ -204,11 +216,8 @@ void vio_delete(Vio* vio)
if (vio) if (vio)
{ {
if (vio->type != VIO_CLOSED) if (vio->type != VIO_CLOSED)
#ifdef HAVE_VIO /*WAX*/
vio->vioclose(vio); vio->vioclose(vio);
#else my_free((gptr) vio->read_buffer, MYF(MY_ALLOW_ZERO_PTR));
vio_close(vio);
#endif
my_free((gptr) vio,MYF(0)); my_free((gptr) vio,MYF(0));
} }
} }
...@@ -35,6 +35,8 @@ int vio_read(Vio * vio, gptr buf, int size) ...@@ -35,6 +35,8 @@ int vio_read(Vio * vio, gptr buf, int size)
DBUG_ENTER("vio_read"); DBUG_ENTER("vio_read");
DBUG_PRINT("enter", ("sd: %d, buf: 0x%p, size: %d", vio->sd, buf, size)); DBUG_PRINT("enter", ("sd: %d, buf: 0x%p, size: %d", vio->sd, buf, size));
/* Ensure nobody uses vio_read_buff and vio_read simultaneously */
DBUG_ASSERT(vio->read_end == vio->read_pos);
#ifdef __WIN__ #ifdef __WIN__
r = recv(vio->sd, buf, size,0); r = recv(vio->sd, buf, size,0);
#else #else
...@@ -52,6 +54,50 @@ int vio_read(Vio * vio, gptr buf, int size) ...@@ -52,6 +54,50 @@ int vio_read(Vio * vio, gptr buf, int size)
} }
/*
Buffered read: if average read size is small it may
reduce number of syscalls.
*/
int vio_read_buff(Vio *vio, gptr buf, int size)
{
int rc;
#define VIO_UNBUFFERED_READ_MIN_SIZE 2048
DBUG_ENTER("vio_read_buff");
DBUG_PRINT("enter", ("sd: %d, buf: 0x%p, size: %d", vio->sd, buf, size));
if (vio->read_pos < vio->read_end)
{
rc= min(vio->read_end - vio->read_pos, size);
memcpy(buf, vio->read_pos, rc);
vio->read_pos+= rc;
/*
Do not try to read from the socket now even if rc < size:
vio_read can return -1 due to an error or non-blocking mode, and
the safest way to handle it is to move to a separate branch.
*/
}
else if (size < VIO_UNBUFFERED_READ_MIN_SIZE)
{
rc= vio_read(vio, vio->read_buffer, VIO_READ_BUFFER_SIZE);
if (rc > 0)
{
if (rc > size)
{
vio->read_pos= vio->read_buffer + size;
vio->read_end= vio->read_buffer + rc;
rc= size;
}
memcpy(buf, vio->read_buffer, rc);
}
}
else
rc= vio_read(vio, buf, size);
DBUG_RETURN(rc);
#undef VIO_UNBUFFERED_READ_MIN_SIZE
}
int vio_write(Vio * vio, const gptr buf, int size) int vio_write(Vio * vio, const gptr buf, int size)
{ {
int r; int r;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment