Commit 3fbf812f authored by unknown's avatar unknown

Remove unneeded module "com"


BitKeeper/deleted/.del-Makefile.am~2b013aa835a140c4:
  Delete: innobase/com/Makefile.am
BitKeeper/deleted/.del-com0com.c~473a1f0f440ce91b:
  Delete: innobase/com/com0com.c
BitKeeper/deleted/.del-com0shm.c~6a16f0c3d81de1f:
  Delete: innobase/com/com0shm.c
BitKeeper/deleted/.del-makefilewin~3e26f0df100887f2:
  Delete: innobase/com/makefilewin
BitKeeper/deleted/.del-com0com.h~533a7eaa16ec585a:
  Delete: innobase/include/com0com.h
BitKeeper/deleted/.del-com0com.ic~671e309916e285b:
  Delete: innobase/include/com0com.ic
BitKeeper/deleted/.del-com0shm.h~5f3df7c04221b0fe:
  Delete: innobase/include/com0shm.h
BitKeeper/deleted/.del-com0shm.ic~f827cfca1603fa6b:
  Delete: innobase/include/com0shm.ic
innobase/configure.in:
  Remove com/Makefile
innobase/include/Makefile.am:
  Remove com*.h
innobase/include/usr0sess.h:
  Remove unused communication functions
innobase/include/usr0sess.ic:
  Remove unused communication functions
innobase/include/usr0types.h:
  Remove sess_sys_t and sess_sig_t
innobase/usr/usr0sess.c:
  Remove unused functions
innobase/dict/dict0crea.c:
  Remove unneeded params of que_fork_start_command()
innobase/include/que0que.h:
  Remove unneeded params of que_fork_start_command()
innobase/row/row0mysql.c:
  Remove unneeded params of que_fork_start_command()
innobase/include/srv0srv.h:
  Remove references to the com module
innobase/srv/srv0srv.c:
  Remove references to the com module
  Remove unused vars srv_n_{com,worker}_threads
  Make some global vars static
innobase/que/que0que.c:
  Remove references to odbc
  Add #ifdef UNIV_SYNC_DEBUG around some ut_ad()
  Remove unneeded params of que_fork_start_command()
  Remove unreachable code
  Output diagnostics to stderr, not stdout
innobase/include/trx0trx.h:
  Remove unneeded params of trx_sig_send() and trx_sig_reply()
innobase/trx/trx0trx.c:
  Remove unneeded params of trx_sig_send() and trx_sig_reply()
  Remove params of sess_open()
innobase/srv/srv0start.c:
  Remove reference to com0com.h
  Remove call to sess_sys_init_at_db_start()
innobase/trx/trx0purge.c:
  Remove references to the com module
  Remove params of sess_open()
  Remove unneeded params of que_fork_start_command()
innobase/trx/trx0roll.c:
  Remove params of sess_open()
  Remove unneeded params of que_fork_start_command()
  Remove unneeded params of trx_sig_send() and trx_sig_reply()
parent 4d47802f
...@@ -22,7 +22,7 @@ TAR = gtar ...@@ -22,7 +22,7 @@ TAR = gtar
noinst_HEADERS = ib_config.h noinst_HEADERS = ib_config.h
SUBDIRS = os ut btr buf com data dict dyn eval fil fsp fut \ SUBDIRS = os ut btr buf data dict dyn eval fil fsp fut \
ha ibuf include lock log mach mem mtr page \ ha ibuf include lock log mach mem mtr page \
pars que read rem row srv sync thr trx usr pars que read rem row srv sync thr trx usr
......
# Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
# & Innobase Oy
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
include ../include/Makefile.i
noinst_LIBRARIES = libcom.a
libcom_a_SOURCES = com0com.c com0shm.c
EXTRA_PROGRAMS =
/******************************************************
The communication primitives
(c) 1995 Innobase Oy
Created 9/23/1995 Heikki Tuuri
*******************************************************/
#include "com0com.h"
#ifdef UNIV_NONINL
#include "com0com.ic"
#endif
#include "mem0mem.h"
#include "com0shm.h"
/*
IMPLEMENTATION OF COMMUNICATION PRIMITIVES
==========================================
The primitives provide a uniform function interface for
use in communication. The primitives have been modeled
after the Windows Sockets interface. Below this uniform
API, the precise methods of communication, for example,
shared memory or Berkeley sockets, can be implemented
as subroutines.
*/
struct com_endpoint_struct{
ulint type; /* endpoint type */
void* par; /* type-specific data structures */
ibool bound; /* TRUE if the endpoint has been
bound to an address */
};
/*************************************************************************
Accessor functions for an endpoint */
UNIV_INLINE
ulint
com_endpoint_get_type(
/*==================*/
com_endpoint_t* ep)
{
ut_ad(ep);
return(ep->type);
}
UNIV_INLINE
void
com_endpoint_set_type(
/*==================*/
com_endpoint_t* ep,
ulint type)
{
ut_ad(ep);
ut_ad(type == COM_SHM);
ep->type = type;
}
UNIV_INLINE
void*
com_endpoint_get_par(
/*=================*/
com_endpoint_t* ep)
{
ut_ad(ep);
return(ep->par);
}
UNIV_INLINE
void
com_endpoint_set_par(
/*=================*/
com_endpoint_t* ep,
void* par)
{
ut_ad(ep);
ut_ad(par);
ep->par = par;
}
UNIV_INLINE
ibool
com_endpoint_get_bound(
/*===================*/
com_endpoint_t* ep)
{
ut_ad(ep);
return(ep->bound);
}
UNIV_INLINE
void
com_endpoint_set_bound(
/*===================*/
com_endpoint_t* ep,
ibool bound)
{
ut_ad(ep);
ep->bound = bound;
}
/*************************************************************************
Creates a communications endpoint. */
com_endpoint_t*
com_endpoint_create(
/*================*/
/* out, own: communications endpoint, NULL if
did not succeed */
ulint type) /* in: communication type of endpoint:
only COM_SHM supported */
{
com_endpoint_t* ep;
void* par;
ep = mem_alloc(sizeof(com_endpoint_t));
com_endpoint_set_type(ep, type);
com_endpoint_set_bound(ep, FALSE);
if (type == COM_SHM) {
par = com_shm_endpoint_create();
com_endpoint_set_par(ep, par);
} else {
par = NULL;
ut_error;
}
if (par != NULL) {
return(ep);
} else {
mem_free(ep);
return(NULL);
}
}
/*************************************************************************
Frees a communications endpoint. */
ulint
com_endpoint_free(
/*==============*/
/* out: O if succeed, else error number */
com_endpoint_t* ep) /* in, own: communications endpoint */
{
ulint type;
ulint ret;
void* par;
type = com_endpoint_get_type(ep);
par = com_endpoint_get_par(ep);
if (type == COM_SHM) {
ret = com_shm_endpoint_free((com_shm_endpoint_t*)par);
} else {
ret = 0;
ut_error;
}
if (ret) {
return(ret);
} else {
mem_free(ep);
return(0);
}
}
/*************************************************************************
Sets an option, like the maximum datagram size for an endpoint.
The options may vary depending on the endpoint type. */
ulint
com_endpoint_set_option(
/*====================*/
/* out: 0 if succeed, else error number */
com_endpoint_t* ep, /* in: endpoint */
ulint optno, /* in: option number, only
COM_OPT_MAX_DGRAM_SIZE currently supported */
byte* optval, /* in: pointer to a buffer containing the
option value to set */
ulint optlen) /* in: option value buffer length */
{
ulint type;
ulint ret;
void* par;
type = com_endpoint_get_type(ep);
par = com_endpoint_get_par(ep);
if (type == COM_SHM) {
ret = com_shm_endpoint_set_option((com_shm_endpoint_t*)par,
optno, optval, optlen);
} else {
ret = 0;
ut_error;
}
return(ret);
}
/*************************************************************************
Binds a communications endpoint to the specified address. */
ulint
com_bind(
/*=====*/
/* out: 0 if succeed, else error number */
com_endpoint_t* ep, /* in: communications endpoint */
char* name, /* in: address name */
ulint len) /* in: name length */
{
ulint type;
ulint ret;
void* par;
ut_ad(len <= COM_MAX_ADDR_LEN);
if (com_endpoint_get_bound(ep)) {
return(COM_ERR_ALREADY_BOUND);
}
type = com_endpoint_get_type(ep);
par = com_endpoint_get_par(ep);
if (type == COM_SHM) {
ret = com_shm_bind((com_shm_endpoint_t*)par, name, len);
} else {
ret = 0;
ut_error;
}
if (ret == 0) {
com_endpoint_set_bound(ep, TRUE);
}
return(ret);
}
/*************************************************************************
Waits for a datagram to arrive at an endpoint. */
ulint
com_recvfrom(
/*=========*/
/* out: 0 if succeed, else error number */
com_endpoint_t* ep, /* in: communications endpoint */
byte* buf, /* out: datagram buffer; the buffer is
supplied by the caller */
ulint buf_len,/* in: datagram buffer length */
ulint* len, /* out: datagram length */
char* from, /* out: address name buffer; the buffer is
supplied by the caller */
ulint from_len,/* in: address name buffer length */
ulint* addr_len)/* out: address name length */
{
ulint type;
ulint ret;
void* par;
if (!com_endpoint_get_bound(ep)) {
return(COM_ERR_NOT_BOUND);
}
type = com_endpoint_get_type(ep);
par = com_endpoint_get_par(ep);
if (type == COM_SHM) {
ret = com_shm_recvfrom((com_shm_endpoint_t*)par,
buf, buf_len, len, from, from_len,
addr_len);
} else {
ret = 0;
ut_error;
}
return(ret);
}
/*************************************************************************
Sends a datagram to the specified destination. */
ulint
com_sendto(
/*=======*/
/* out: 0 if succeed, else error number */
com_endpoint_t* ep, /* in: communications endpoint */
byte* buf, /* in: datagram buffer */
ulint len, /* in: datagram length */
char* to, /* in: address name buffer */
ulint tolen) /* in: address name length */
{
ulint type;
ulint ret;
void* par;
if (!com_endpoint_get_bound(ep)) {
return(COM_ERR_NOT_BOUND);
}
type = com_endpoint_get_type(ep);
par = com_endpoint_get_par(ep);
if (type == COM_SHM) {
ret = com_shm_sendto((com_shm_endpoint_t*)par, buf, len,
to, tolen);
} else {
ret = 0;
ut_error;
}
return(ret);
}
/*************************************************************************
Gets the maximum datagram size for an endpoint. */
ulint
com_endpoint_get_max_size(
/*======================*/
/* out: maximum size */
com_endpoint_t* ep) /* in: endpoint */
{
ulint type;
ulint ret;
void* par;
type = com_endpoint_get_type(ep);
par = com_endpoint_get_par(ep);
if (type == COM_SHM) {
ret = com_shm_endpoint_get_size((com_shm_endpoint_t*)par);
} else {
ret = 0;
ut_error;
}
return(ret);
}
/******************************************************
The communication through shared memory
(c) 1995 Innobase Oy
Created 9/25/1995 Heikki Tuuri
*******************************************************/
#include "com0shm.h"
#ifdef UNIV_NONINL
#include "com0shm.ic"
#endif
#include "mem0mem.h"
#include "ut0mem.h"
#include "com0com.h"
#include "os0shm.h"
#include "sync0sync.h"
#include "sync0ipm.h"
#include "hash0hash.h"
/*
IMPLEMENTATION OF COMMUNICATION PRIMITIVES
==========================================
When bind is called for an endpoint, a shared memory area of
a size specified by com_shm_set_option is created with the
name of the address given concatenated to "_IBSHM".
Also a mutex is created for controlling the access to the
shared memory area. The name of the mutex is address + "_IBSHM_MTX".
An event with name address + "_IBSHM_EV_NE" is created. This event
is in signaled state when the shared memory area is not empty, i.e.,
there is a datagram to read. An event address + "_IBSHM_EV_EM"
is signaled, when the area is empty, i.e., a datagram can be
written to it.
The shared memory area consists of an info struct
at the beginning, containing fields telling:
if the area is valid, i.e., is anybody going to
read it, whether it currently contains a datagram, the
length of the address from which the datagram was received,
the length of the datagram, and the maximum allowed length of
a datagram.
After the info struct, there is a string of bytes
containing the sender address and the data
of the datagram.
*/
/* The following is set TRUE when the first endpoint is created. */
ibool com_shm_initialized = FALSE;
/* When a datagram is sent, the shared memory area
corresponding to the destination address is mapped
to the address space of this (sender) process.
We preserve it and keep the relevant info in the
following list. We can save a lot of CPU time
if the destination can be found on the list. The list is
protected by the mutex below. */
mutex_t com_shm_destination_mutex;
hash_table_t* com_shm_destination_cache;
UT_LIST_BASE_NODE_T(com_shm_endpoint_t)
com_shm_destination_list;
/* The number of successfully bound endpoints in this process. When this
number drops to 0, the destination cache is freed. This variable is protected
by com_shm_destination_mutex above. */
ulint com_shm_bind_count = 0;
/* The performance of communication in NT depends on how
many times a system call is made (excluding os_thread_yield,
as that is the fastest way to switch thread).
The following variable counts such events. */
ulint com_shm_system_call_count = 0;
/* The info struct at the beginning of a shared memory area */
typedef struct com_shm_info_struct com_shm_info_t;
/* An area of shared memory consists of an info struct followed
by a string of bytes. */
typedef com_shm_info_t com_shm_t;
struct com_shm_endpoint_struct{
ibool owns_shm; /* This is true if the shared memory
area is owned by this endpoint structure
(it may also be opened for this endpoint,
not created, in which case does not own it) */
char* addr; /* pointer to address the endpoint is bound
to, NULL if not bound */
ulint addr_len; /* address length */
ulint size; /* maximum allowed datagram size, initialized
to 0 at creation */
os_shm_t shm; /* operating system handle of the shared
memory area */
com_shm_t* map; /* pointer to the start address of the shared
memory area */
os_event_t not_empty; /* this is in the signaled state if
the area currently may contain a datagram;
NOTE: automatic event */
os_event_t empty; /* this is in the signaled state if the area
currently may be empty; NOTE: automatic
event */
ip_mutex_hdl_t* ip_mutex; /* handle to the interprocess mutex
protecting the shared memory */
UT_LIST_NODE_T(com_shm_endpoint_t) list; /* If the endpoint struct
is inserted into a list, this contains
pointers to next and prev */
com_shm_endpoint_t* addr_hash;
/* hash table link */
};
struct com_shm_info_struct{
ulint valid; /* This is COM_SHM_VALID if the creator
of the shared memory area has it still
mapped to its address space. Otherwise,
we may conclude that the datagram cannot
be delivered. */
ibool not_empty; /* TRUE if the area currently contains
a datagram */
ulint empty_waiters; /* Count of (writer) threads which are
waiting for the empty-event */
ulint max_len;/* maximum allowed length for a datagram */
ulint addr_len;/* address length for the sender address */
ulint data_len;/* datagram length */
ip_mutex_t ip_mutex;/* fast interprocess mutex protecting
the shared memory area */
};
#define COM_SHM_VALID 76640
/*************************************************************************
Accessor functions for a shared memory endpoint */
UNIV_INLINE
ibool
com_shm_endpoint_get_owns_shm(
/*==========================*/
com_shm_endpoint_t* ep)
{
ut_ad(ep);
return(ep->owns_shm);
}
UNIV_INLINE
void
com_shm_endpoint_set_owns_shm(
/*==========================*/
com_shm_endpoint_t* ep,
ibool flag)
{
ut_ad(ep);
ep->owns_shm = flag;
}
UNIV_INLINE
char*
com_shm_endpoint_get_addr(
/*======================*/
com_shm_endpoint_t* ep)
{
ut_ad(ep);
return(ep->addr);
}
UNIV_INLINE
void
com_shm_endpoint_set_addr(
/*======================*/
com_shm_endpoint_t* ep,
char* addr)
{
ut_ad(ep);
ep->addr = addr;
}
UNIV_INLINE
ulint
com_shm_endpoint_get_addr_len(
/*==========================*/
com_shm_endpoint_t* ep)
{
return(ep->addr_len);
}
UNIV_INLINE
void
com_shm_endpoint_set_addr_len(
/*==========================*/
com_shm_endpoint_t* ep,
ulint len)
{
ut_ad(ep);
ut_ad(len > 0);
ep->addr_len = len;
}
ulint
com_shm_endpoint_get_size(
/*======================*/
com_shm_endpoint_t* ep)
{
return(ep->size);
}
UNIV_INLINE
void
com_shm_endpoint_set_size(
/*======================*/
com_shm_endpoint_t* ep,
ulint size)
{
ut_ad(ep);
ep->size = size;
}
UNIV_INLINE
os_shm_t
com_shm_endpoint_get_shm(
/*=====================*/
com_shm_endpoint_t* ep)
{
return(ep->shm);
}
UNIV_INLINE
void
com_shm_endpoint_set_shm(
/*=====================*/
com_shm_endpoint_t* ep,
os_shm_t shm)
{
ut_ad(ep);
ut_ad(shm);
ep->shm = shm;
}
UNIV_INLINE
com_shm_t*
com_shm_endpoint_get_map(
/*=====================*/
com_shm_endpoint_t* ep)
{
return(ep->map);
}
UNIV_INLINE
void
com_shm_endpoint_set_map(
/*=====================*/
com_shm_endpoint_t* ep,
com_shm_t* map)
{
ut_ad(ep);
ut_ad(map);
ep->map = map;
}
UNIV_INLINE
os_event_t
com_shm_endpoint_get_empty(
/*=======================*/
com_shm_endpoint_t* ep)
{
return(ep->empty);
}
UNIV_INLINE
void
com_shm_endpoint_set_empty(
/*=======================*/
com_shm_endpoint_t* ep,
os_event_t event)
{
ut_ad(ep);
ut_ad(event);
ep->empty = event;
}
UNIV_INLINE
os_event_t
com_shm_endpoint_get_not_empty(
/*===========================*/
com_shm_endpoint_t* ep)
{
return(ep->not_empty);
}
UNIV_INLINE
void
com_shm_endpoint_set_not_empty(
/*===========================*/
com_shm_endpoint_t* ep,
os_event_t event)
{
ut_ad(ep);
ut_ad(event);
ep->not_empty = event;
}
/************************************************************************
Accessor functions for the shared memory area info struct. */
UNIV_INLINE
ulint
com_shm_get_valid(
/*==============*/
com_shm_info_t* info)
{
return(info->valid);
}
UNIV_INLINE
void
com_shm_set_valid(
/*==============*/
com_shm_info_t* info,
ulint flag)
{
ut_ad(info);
info->valid = flag;
}
UNIV_INLINE
ibool
com_shm_get_not_empty(
/*==================*/
com_shm_info_t* info)
{
return(info->not_empty);
}
UNIV_INLINE
void
com_shm_set_not_empty(
/*==================*/
com_shm_info_t* info,
ibool flag)
{
ut_ad(info);
info->not_empty = flag;
}
UNIV_INLINE
ulint
com_shm_get_empty_waiters(
/*======================*/
com_shm_info_t* info)
{
ut_ad(info->empty_waiters < 1000);
return(info->empty_waiters);
}
UNIV_INLINE
void
com_shm_set_empty_waiters(
/*======================*/
com_shm_info_t* info,
ulint count)
{
ut_ad(info);
ut_ad(count < 1000);
info->empty_waiters = count;
}
UNIV_INLINE
ulint
com_shm_get_max_len(
/*================*/
com_shm_info_t* info)
{
return(info->max_len);
}
UNIV_INLINE
void
com_shm_set_max_len(
/*================*/
com_shm_info_t* info,
ulint len)
{
ut_ad(info);
ut_ad(len > 0);
info->max_len = len;
}
UNIV_INLINE
ulint
com_shm_get_addr_len(
/*=================*/
com_shm_info_t* info)
{
return(info->addr_len);
}
UNIV_INLINE
void
com_shm_set_addr_len(
/*=================*/
com_shm_info_t* info,
ulint len)
{
ut_ad(info);
ut_ad(len > 0);
info->addr_len = len;
}
UNIV_INLINE
ulint
com_shm_get_data_len(
/*=================*/
com_shm_info_t* info)
{
return(info->data_len);
}
UNIV_INLINE
void
com_shm_set_data_len(
/*=================*/
com_shm_info_t* info,
ulint len)
{
ut_ad(info);
ut_ad(len > 0);
info->data_len = len;
}
UNIV_INLINE
ip_mutex_t*
com_shm_get_ip_mutex(
/*=================*/
com_shm_info_t* info)
{
return(&(info->ip_mutex));
}
/*************************************************************************
Accessor functions for the address and datagram fields inside a
shared memory area. */
UNIV_INLINE
char*
com_shm_get_addr(
/*=============*/
com_shm_t* area)
{
return((char*)area + sizeof(com_shm_info_t));
}
UNIV_INLINE
byte*
com_shm_get_data(
/*=============*/
com_shm_t* area)
{
return((byte*)com_shm_get_addr(area) + com_shm_get_addr_len(area));
}
/*************************************************************************
Initializes the shared memory communication system for this
process. */
UNIV_INLINE
void
com_shm_init(void)
/*==============*/
{
mutex_create(&com_shm_destination_mutex);
mutex_set_level(&com_shm_destination_mutex, SYNC_ANY_LATCH);
com_shm_destination_cache = hash_create(1000);
UT_LIST_INIT(com_shm_destination_list);
com_shm_initialized = TRUE;
}
/*************************************************************************
Reserves the ip mutex of the shared memory area of an endpoint. */
UNIV_INLINE
void
com_shm_enter(
/*==========*/
com_shm_endpoint_t* ep)
{
ulint ret;
ret = ip_mutex_enter(ep->ip_mutex, 10000000);
if (ret != 0) {
mutex_list_print_info();
ut_error;
}
}
/*************************************************************************
Releases the ip mutex of the shared memory area of an endpoint. */
UNIV_INLINE
void
com_shm_exit(
/*=========*/
com_shm_endpoint_t* ep)
{
ip_mutex_exit(ep->ip_mutex);
}
/*************************************************************************
Looks for the given address in the cached destination addresses. */
UNIV_INLINE
com_shm_endpoint_t*
com_shm_destination_cache_search(
/*=============================*/
/* out: cached endpoint structure if found, else NULL */
char* addr, /* in: destination address */
ulint len) /* in: address length */
{
com_shm_endpoint_t* ep;
ulint fold;
fold = ut_fold_binary((byte*)addr, len);
/*
printf("Searching dest. cache %s %lu fold %lu\n", addr, len, fold);
*/
mutex_enter(&com_shm_destination_mutex);
HASH_SEARCH(addr_hash, com_shm_destination_cache, fold, ep,
((ep->addr_len == len)
&& (0 == ut_memcmp(addr, ep->addr, len))));
mutex_exit(&com_shm_destination_mutex);
return(ep);
}
/*************************************************************************
Inserts the given endpoint structure in the cached destination addresses. */
static
void
com_shm_destination_cache_insert(
/*=============================*/
com_shm_endpoint_t* ep) /* in: endpoint struct to insert */
{
ulint fold;
fold = ut_fold_binary((byte*)(ep->addr), ep->addr_len);
mutex_enter(&com_shm_destination_mutex);
/* Add to hash table */
HASH_INSERT(com_shm_endpoint_t,
addr_hash, com_shm_destination_cache, fold, ep);
UT_LIST_ADD_LAST(list, com_shm_destination_list, ep);
/* printf("Inserting to dest. cache %s %lu fold %lu\n", ep->addr,
ep->addr_len, fold);
*/
mutex_exit(&com_shm_destination_mutex);
}
/*************************************************************************
Frees the endpoint structs in the destination cache if the bind count is zero.
If it is not, some send operation may just be using a cached endpoint and it
cannot be freed. */
static
void
com_shm_destination_cache_no_binds(void)
/*====================================*/
{
com_shm_endpoint_t* ep;
ulint fold;
mutex_enter(&com_shm_destination_mutex);
if (com_shm_bind_count != 0) {
mutex_exit(&com_shm_destination_mutex);
return;
}
while (UT_LIST_GET_LEN(com_shm_destination_list) != 0) {
ep = UT_LIST_GET_FIRST(com_shm_destination_list);
UT_LIST_REMOVE(list, com_shm_destination_list, ep);
fold = ut_fold_binary((byte*)ep->addr, ep->addr_len);
/*
printf("Deleting from dest. cache %s %lu fold %lu\n",
ep->addr,
ep->addr_len, fold);
*/
HASH_DELETE(com_shm_endpoint_t, addr_hash,
com_shm_destination_cache, fold, ep);
com_shm_endpoint_free(ep);
}
mutex_exit(&com_shm_destination_mutex);
}
/***********************************************************************
Unbinds an endpoint at the time of freeing. */
static
void
com_shm_unbind(
/*===========*/
com_shm_endpoint_t* ep) /* in: endpoint */
{
com_shm_t* map;
map = com_shm_endpoint_get_map(ep);
/* Mark the shared memory area invalid */
com_shm_set_valid(map, 0);
/* Decrement the count of bindings */
mutex_enter(&com_shm_destination_mutex);
com_shm_bind_count--;
mutex_exit(&com_shm_destination_mutex);
/* If there are no binds left, free the cached endpoints */
com_shm_destination_cache_no_binds();
}
/*************************************************************************
Creates a communications endpoint. */
com_shm_endpoint_t*
com_shm_endpoint_create(void)
/*=========================*/
/* out, own: communications endpoint, NULL if
did not succeed */
{
com_shm_endpoint_t* ep;
if (!com_shm_initialized) {
com_shm_init();
}
ep = mem_alloc(sizeof(com_shm_endpoint_t));
com_shm_endpoint_set_owns_shm(ep, FALSE);
com_shm_endpoint_set_addr(ep, NULL);
com_shm_endpoint_set_size(ep, 0);
return(ep);
}
/*************************************************************************
Frees a communications endpoint. */
ulint
com_shm_endpoint_free(
/*==================*/
/* out: O if succeed, else error number */
com_shm_endpoint_t* ep) /* in, own: communications endpoint */
{
com_shm_t* map;
ut_ad(ep);
if (com_shm_endpoint_get_addr(ep) != NULL) {
map = com_shm_endpoint_get_map(ep);
if (com_shm_endpoint_get_owns_shm(ep)) {
com_shm_unbind(ep);
}
/* We do not destroy the data structures in the shared memory
area, because we cannot be sure that there is currently no
process accessing it. Therefore we just close the ip_mutex
residing in the area. */
ip_mutex_close(ep->ip_mutex);
os_event_free(com_shm_endpoint_get_not_empty(ep));
os_event_free(com_shm_endpoint_get_empty(ep));
os_shm_unmap(map);
os_shm_free(com_shm_endpoint_get_shm(ep));
mem_free(com_shm_endpoint_get_addr(ep));
}
mem_free(ep);
return(0);
}
/*************************************************************************
Sets an option, like the maximum datagram size for an endpoint.
The options may vary depending on the endpoint type. */
ulint
com_shm_endpoint_set_option(
/*========================*/
/* out: 0 if succeed, else error number */
com_shm_endpoint_t* ep, /* in: endpoint */
ulint optno, /* in: option number, only
COM_OPT_MAX_DGRAM_SIZE currently supported */
byte* optval, /* in: pointer to a buffer containing the
option value to set */
ulint optlen) /* in: option value buffer length */
{
ulint size;
UT_NOT_USED(optlen);
ut_ad(ep);
ut_a(optno == COM_OPT_MAX_DGRAM_SIZE);
ut_ad(NULL == com_shm_endpoint_get_addr(ep));
size = *((ulint*)optval);
ut_ad(size > 0);
com_shm_endpoint_set_size(ep, size);
return(0);
}
/*************************************************************************
This function is used either to create a new shared memory area or open an
existing one, but this does not do the operations necessary with the ip mutex.
They are performed in com_shm_bind or com_shm_open which call this function. */
static
ulint
com_shm_create_or_open(
/*===================*/
/* out: 0 if succeed, else error number */
com_shm_endpoint_t* ep, /* in: communications endpoint */
char* name, /* in: address name */
ulint len) /* in: address name length */
{
os_shm_t shm;
com_shm_t* map;
os_event_t event_ne;
os_event_t event_em;
char* buf;
ut_ad(ep);
ut_ad(name);
ut_ad(len > 0);
buf = mem_alloc(COM_MAX_ADDR_LEN + 20);
ut_memcpy(buf, name, len);
ut_strcpy(buf + len, (char*)"_IBSHM");
shm = os_shm_create(sizeof(com_shm_info_t) + COM_MAX_ADDR_LEN +
com_shm_endpoint_get_size(ep), buf);
if (shm == NULL) {
return(COM_ERR_NOT_SPECIFIED);
}
map = os_shm_map(shm);
if (map == NULL) {
os_shm_free(shm);
return(COM_ERR_NOT_SPECIFIED);
}
ut_strcpy(buf + len, (char*)"_IBSHM_EV_NE"),
event_ne = os_event_create(buf);
ut_ad(event_ne);
ut_strcpy(buf + len, (char*)"_IBSHM_EV_EM"),
event_em = os_event_create(buf);
ut_ad(event_em);
ut_a(0); /* event_ne and event_em should be auto events! */
com_shm_endpoint_set_shm(ep, shm);
com_shm_endpoint_set_map(ep, map);
com_shm_endpoint_set_not_empty(ep, event_ne);
com_shm_endpoint_set_empty(ep, event_em);
com_shm_endpoint_set_addr(ep, buf);
com_shm_endpoint_set_addr_len(ep, len);
return(0);
}
/*************************************************************************
Opens a shared memory area for communication. */
static
ulint
com_shm_open(
/*=========*/
/* out: 0 if succeed, else error number */
com_shm_endpoint_t* ep, /* in: communications endpoint */
char* name, /* in: address name */
ulint len) /* in: address name length */
{
ip_mutex_hdl_t* ip_hdl;
com_shm_t* map;
ulint ret;
char buf[COM_MAX_ADDR_LEN + 20];
ret = com_shm_create_or_open(ep, name, len);
if (ret != 0) {
return(ret);
}
map = com_shm_endpoint_get_map(ep);
/* Open the interprocess mutex to protect the shared memory area */
ut_memcpy(buf, name, len);
ut_strcpy(buf + len, (char*)"_IBSHM_MTX");
ret = ip_mutex_open(com_shm_get_ip_mutex(map), buf, &ip_hdl);
if (ret != 0) {
return(COM_ERR_NOT_SPECIFIED);
}
ep->ip_mutex = ip_hdl;
return(0);
}
/*************************************************************************
Creates a shared memory area for communication. */
ulint
com_shm_bind(
/*=========*/
/* out: 0 if succeed, else error number */
com_shm_endpoint_t* ep, /* in: communications endpoint */
char* name, /* in: address name */
ulint len) /* in: address name length */
{
com_shm_t* map;
ulint ret;
char buf[COM_MAX_ADDR_LEN + 20];
ip_mutex_hdl_t* ip_hdl;
if (com_shm_endpoint_get_size(ep) == 0) {
return(COM_ERR_MAX_DATAGRAM_SIZE_NOT_SET);
}
ret = com_shm_create_or_open(ep, name, len);
if (ret != 0) {
return(ret);
}
map = com_shm_endpoint_get_map(ep);
/* Create the interprocess mutex to protect the shared memory area */
ut_memcpy(buf, name, len);
ut_strcpy(buf + len, (char*)"_IBSHM_MTX");
ret = ip_mutex_create(com_shm_get_ip_mutex(map), buf, &ip_hdl);
if (ret != 0) {
return(COM_ERR_NOT_SPECIFIED);
}
/* This endpoint structure owns the shared memory area */
com_shm_endpoint_set_owns_shm(ep, TRUE);
ep->ip_mutex = ip_hdl;
mutex_enter(&com_shm_destination_mutex);
/* Increment the count of successful bindings */
com_shm_bind_count++;
mutex_exit(&com_shm_destination_mutex);
com_shm_set_not_empty(map, FALSE);
com_shm_set_empty_waiters(map, 0);
com_shm_set_max_len(map, com_shm_endpoint_get_size(ep));
com_shm_set_valid(map, COM_SHM_VALID);
os_event_set(com_shm_endpoint_get_empty(ep));
return(0);
}
/*************************************************************************
Waits for a datagram to arrive at an endpoint. */
ulint
com_shm_recvfrom(
/*=============*/
/* out: 0 if succeed, else error number */
com_shm_endpoint_t* ep, /* in: communications endpoint */
byte* buf, /* out: datagram buffer; the buffer is
supplied by the caller */
ulint buf_len,/* in: datagram buffer length */
ulint* len, /* out: datagram length */
char* from, /* out: address name buffer; the buffer is
supplied by the caller */
ulint from_len,/* in: address name buffer length */
ulint* addr_len)/* out: address name length */
{
com_shm_t* map;
ulint loop_count;
map = com_shm_endpoint_get_map(ep);
loop_count = 0;
loop:
com_shm_system_call_count++;
/* NOTE: automatic event */
os_event_wait(com_shm_endpoint_get_not_empty(ep));
loop_count++;
if (loop_count > 1) {
printf("!!!!!!!!COM_SHM loop count %lu\n", loop_count);
}
ut_ad(loop_count < 2);
com_shm_enter(ep);
if (!com_shm_get_not_empty(map)) {
/* There was no datagram, wait for the event */
com_shm_exit(ep);
goto loop;
}
if (com_shm_get_data_len(map) > buf_len) {
com_shm_exit(ep);
return(COM_ERR_DATA_BUFFER_TOO_SMALL);
}
if (com_shm_get_addr_len(map) > from_len) {
com_shm_exit(ep);
return(COM_ERR_ADDR_BUFFER_TOO_SMALL);
}
*len = com_shm_get_data_len(map);
*addr_len = com_shm_get_addr_len(map);
ut_memcpy(buf, com_shm_get_data(map), *len);
ut_memcpy(from, com_shm_get_addr(map), *addr_len);
com_shm_set_not_empty(map, FALSE);
/* If there may be writers queuing to insert the datagram, signal the
empty-event */
if (com_shm_get_empty_waiters(map) != 0) {
com_shm_system_call_count++;
os_event_set(com_shm_endpoint_get_empty(ep));
}
com_shm_exit(ep);
return(0);
}
/*************************************************************************
Sends a datagram to the specified destination. */
ulint
com_shm_sendto(
/*===========*/
/* out: 0 if succeed, else error number */
com_shm_endpoint_t* ep, /* in: communications endpoint */
byte* buf, /* in: datagram buffer */
ulint len, /* in: datagram length */
char* to, /* in: address name buffer */
ulint tolen) /* in: address name length */
{
com_shm_endpoint_t* ep2;
com_shm_t* map;
ulint sender_len;
ulint ret;
ulint loop_count;
/* Try first to find from the cached destination addresses */
ep2 = com_shm_destination_cache_search(to, tolen);
if (ep2 == NULL) {
/* Did not find it in the cache */
ep2 = com_shm_endpoint_create();
ret = com_shm_open(ep2, to, tolen);
if (ret != 0) {
com_shm_endpoint_free(ep2);
return(ret);
}
/* Insert into the cached destination addresses */
com_shm_destination_cache_insert(ep2);
}
map = com_shm_endpoint_get_map(ep2);
if (com_shm_get_valid(map) != COM_SHM_VALID) {
com_shm_exit(ep2);
return(COM_ERR_DGRAM_NOT_DELIVERED);
}
if (com_shm_get_max_len(map) < len) {
com_shm_exit(ep2);
return(COM_ERR_DATA_TOO_LONG);
}
/* Optimistically, we first go to see if the datagram area is empty,
without waiting for the empty-event */
loop_count = 0;
loop:
loop_count++;
if (loop_count > 5) {
printf("!!!!!!COM_SHM Notempty loop count %lu\n", loop_count);
}
ut_ad(loop_count < 100);
com_shm_enter(ep2);
if (com_shm_get_not_empty(map)) {
/* Not empty, we cannot insert a datagram */
com_shm_set_empty_waiters(map,
1 + com_shm_get_empty_waiters(map));
com_shm_exit(ep2);
com_shm_system_call_count++;
/* Wait for the area to become empty */
/* NOTE: automatic event */
ret = os_event_wait_time(com_shm_endpoint_get_empty(ep2),
10000000);
ut_a(ret == 0);
com_shm_enter(ep2);
com_shm_set_empty_waiters(map,
com_shm_get_empty_waiters(map) - 1);
com_shm_exit(ep2);
goto loop;
}
sender_len = com_shm_endpoint_get_addr_len(ep);
com_shm_set_data_len(map, len);
com_shm_set_addr_len(map, sender_len);
ut_memcpy(com_shm_get_data(map), buf, len);
ut_memcpy(com_shm_get_addr(map), com_shm_endpoint_get_addr(ep),
sender_len);
com_shm_set_not_empty(map, TRUE);
com_shm_system_call_count++;
com_shm_exit(ep2);
/* Signal the event */
os_event_set(com_shm_endpoint_get_not_empty(ep2));
return(0);
}
include ..\include\makefile.i
com.lib: com0com.obj com0shm.obj
lib -out:..\libs\com.lib com0com.obj com0shm.obj
com0com.obj: com0com.c
$(CCOM) $(CFL) -c com0com.c
com0shm.obj: com0shm.c
$(CCOM) $(CFL) -c com0shm.c
...@@ -111,7 +111,7 @@ case "$target" in ...@@ -111,7 +111,7 @@ case "$target" in
esac esac
AC_OUTPUT(Makefile os/Makefile ut/Makefile btr/Makefile dnl AC_OUTPUT(Makefile os/Makefile ut/Makefile btr/Makefile dnl
buf/Makefile com/Makefile data/Makefile dnl buf/Makefile data/Makefile dnl
dict/Makefile dyn/Makefile dnl dict/Makefile dyn/Makefile dnl
eval/Makefile fil/Makefile fsp/Makefile fut/Makefile dnl eval/Makefile fil/Makefile fsp/Makefile fut/Makefile dnl
ha/Makefile ibuf/Makefile include/Makefile dnl ha/Makefile ibuf/Makefile include/Makefile dnl
......
...@@ -1088,7 +1088,7 @@ dict_create_or_check_foreign_constraint_tables(void) ...@@ -1088,7 +1088,7 @@ dict_create_or_check_foreign_constraint_tables(void)
graph->fork_type = QUE_FORK_MYSQL_INTERFACE; graph->fork_type = QUE_FORK_MYSQL_INTERFACE;
ut_a(thr = que_fork_start_command(graph, SESS_COMM_EXECUTE, 0)); ut_a(thr = que_fork_start_command(graph));
que_run_threads(thr); que_run_threads(thr);
...@@ -1233,7 +1233,7 @@ dict_create_add_foreigns_to_dictionary( ...@@ -1233,7 +1233,7 @@ dict_create_add_foreigns_to_dictionary(
graph->fork_type = QUE_FORK_MYSQL_INTERFACE; graph->fork_type = QUE_FORK_MYSQL_INTERFACE;
ut_a(thr = que_fork_start_command(graph, SESS_COMM_EXECUTE, 0)); ut_a(thr = que_fork_start_command(graph));
que_run_threads(thr); que_run_threads(thr);
......
...@@ -18,8 +18,7 @@ ...@@ -18,8 +18,7 @@
noinst_HEADERS = btr0btr.h btr0btr.ic btr0cur.h btr0cur.ic \ noinst_HEADERS = btr0btr.h btr0btr.ic btr0cur.h btr0cur.ic \
btr0pcur.h btr0pcur.ic btr0sea.h btr0sea.ic btr0types.h \ btr0pcur.h btr0pcur.ic btr0sea.h btr0sea.ic btr0types.h \
buf0buf.h buf0buf.ic buf0flu.h buf0flu.ic buf0lru.h \ buf0buf.h buf0buf.ic buf0flu.h buf0flu.ic buf0lru.h \
buf0lru.ic buf0rea.h buf0types.h com0com.h com0com.ic \ buf0lru.ic buf0rea.h buf0types.h data0data.h data0data.ic data0type.h \
com0shm.h com0shm.ic data0data.h data0data.ic data0type.h \
data0type.ic data0types.h db0err.h dict0boot.h \ data0type.ic data0types.h db0err.h dict0boot.h \
dict0boot.ic dict0crea.h dict0crea.ic dict0dict.h \ dict0boot.ic dict0crea.h dict0crea.ic dict0dict.h \
dict0dict.ic dict0load.h dict0load.ic dict0mem.h \ dict0dict.ic dict0load.h dict0load.ic dict0mem.h \
......
/******************************************************
The communication primitives
(c) 1995 Innobase Oy
Created 9/23/1995 Heikki Tuuri
*******************************************************/
/* This module defines a standard datagram communication
function interface for use in the database. We assume that
the communication medium is reliable. */
#ifndef com0com_h
#define com0com_h
#include "univ.i"
/* The communications endpoint type definition */
typedef struct com_endpoint_struct com_endpoint_t;
/* Possible endpoint communication types */
#define COM_SHM 1 /* communication through shared memory */
/* Option numbers for endpoint */
#define COM_OPT_MAX_DGRAM_SIZE 1
/* Error numbers */
#define COM_ERR_NOT_SPECIFIED 1
#define COM_ERR_NOT_BOUND 2
#define COM_ERR_ALREADY_BOUND 3
#define COM_ERR_MAX_DATAGRAM_SIZE_NOT_SET 4
#define COM_ERR_DATA_BUFFER_TOO_SMALL 5
#define COM_ERR_ADDR_BUFFER_TOO_SMALL 6
#define COM_ERR_DATA_TOO_LONG 7
#define COM_ERR_ADDR_TOO_LONG 8
#define COM_ERR_DGRAM_NOT_DELIVERED 9
/* Maximum allowed address length in bytes */
#define COM_MAX_ADDR_LEN 100
/*************************************************************************
Creates a communications endpoint. */
com_endpoint_t*
com_endpoint_create(
/*================*/
/* out, own: communications endpoint, NULL if
did not succeed */
ulint type); /* in: communication type of endpoint:
only COM_SHM supported */
/*************************************************************************
Frees a communications endpoint. */
ulint
com_endpoint_free(
/*==============*/
/* out: O if succeed, else error number */
com_endpoint_t* ep); /* in, own: communications endpoint */
/*************************************************************************
Sets an option, like the maximum datagram size for an endpoint.
The options may vary depending on the endpoint type. */
ulint
com_endpoint_set_option(
/*====================*/
/* out: 0 if succeed, else error number */
com_endpoint_t* ep, /* in: endpoint */
ulint optno, /* in: option number, only
COM_OPT_MAX_DGRAM_SIZE currently supported */
byte* optval, /* in: pointer to a buffer containing the
option value to set */
ulint optlen);/* in: option value buffer length */
/*************************************************************************
Binds a communications endpoint to a specified address. */
ulint
com_bind(
/*=====*/
/* out: 0 if succeed, else error number */
com_endpoint_t* ep, /* in: communications endpoint */
char* name, /* in: address name */
ulint len); /* in: name length */
/*************************************************************************
Waits for a datagram to arrive at an endpoint. */
ulint
com_recvfrom(
/*=========*/
/* out: 0 if succeed, else error number */
com_endpoint_t* ep, /* in: communications endpoint */
byte* buf, /* out: datagram buffer; the buffer must be
supplied by the caller */
ulint buf_len,/* in: datagram buffer length */
ulint* len, /* out: datagram length */
char* from, /* out: address name buffer; the buffer must be
supplied by the caller */
ulint from_len,/* in: address name buffer length */
ulint* addr_len);/* out: address name length */
/*************************************************************************
Sends a datagram to a specified destination. */
ulint
com_sendto(
/*=======*/
/* out: 0 if succeed, else error number */
com_endpoint_t* ep, /* in: communications endpoint */
byte* buf, /* in: datagram buffer */
ulint len, /* in: datagram length */
char* to, /* in: address name buffer */
ulint tolen); /* in: address name length */
/*************************************************************************
Gets the maximum datagram size for an endpoint. */
ulint
com_endpoint_get_max_size(
/*======================*/
/* out: maximum size */
com_endpoint_t* ep); /* in: endpoint */
#ifndef UNIV_NONINL
#include "com0com.ic"
#endif
#endif
/******************************************************
The communication primitives
(c) 1995 Innobase Oy
Created 9/23/1995 Heikki Tuuri
*******************************************************/
/******************************************************
The communication through shared memory
(c) 1995 Innobase Oy
Created 9/23/1995 Heikki Tuuri
*******************************************************/
#ifndef com0shm_h
#define com0shm_h
#include "univ.i"
typedef struct com_shm_endpoint_struct com_shm_endpoint_t;
/* The performance of communication in NT depends on how
many times a system call is made (excluding os_thread_yield,
as that is the fastest way to switch thread).
The following variable counts such events. */
extern ulint com_shm_system_call_count;
/*************************************************************************
Creates a communications endpoint. */
com_shm_endpoint_t*
com_shm_endpoint_create(void);
/*=========================*/
/* out, own: communications endpoint, NULL if
did not succeed */
/*************************************************************************
Frees a communications endpoint. */
ulint
com_shm_endpoint_free(
/*==================*/
/* out: O if succeed, else error number */
com_shm_endpoint_t* ep);/* in, own: communications endpoint */
/*************************************************************************
Sets an option, like the maximum datagram size for an endpoint.
The options may vary depending on the endpoint type. */
ulint
com_shm_endpoint_set_option(
/*========================*/
/* out: 0 if succeed, else error number */
com_shm_endpoint_t* ep, /* in: endpoint */
ulint optno, /* in: option number, only
COM_OPT_MAX_DGRAM_SIZE currently supported */
byte* optval, /* in: pointer to a buffer containing the
option value to set */
ulint optlen);/* in: option value buffer length */
/*************************************************************************
Bind a communications endpoint to a specified address. */
ulint
com_shm_bind(
/*=========*/
/* out: 0 if succeed, else error number */
com_shm_endpoint_t* ep, /* in: communications endpoint */
char* name, /* in: address name */
ulint len); /* in: address name length */
/*************************************************************************
Waits for a datagram to arrive at an endpoint. */
ulint
com_shm_recvfrom(
/*=============*/
/* out: 0 if succeed, else error number */
com_shm_endpoint_t* ep, /* in: communications endpoint */
byte* buf, /* out: datagram buffer; the buffer is
supplied by the caller */
ulint buf_len,/* in: datagram buffer length */
ulint* len, /* out: datagram length */
char* from, /* out: address name buffer; the buffer is
supplied by the caller */
ulint from_len,/* in: address name buffer length */
ulint* addr_len);/* out: address name length */
/*************************************************************************
Sends a datagram to the specified destination. */
ulint
com_shm_sendto(
/*===========*/
/* out: 0 if succeed, else error number */
com_shm_endpoint_t* ep, /* in: communications endpoint */
byte* buf, /* in: datagram buffer */
ulint len, /* in: datagram length */
char* to, /* in: address name buffer */
ulint tolen); /* in: address name length */
ulint
com_shm_endpoint_get_size(
/*======================*/
com_shm_endpoint_t* ep);
#ifndef UNIV_NONINL
#include "com0shm.ic"
#endif
#endif
/******************************************************
Communication through shared memory
(c) 1995 Innobase Oy
Created 9/23/1995 Heikki Tuuri
*******************************************************/
...@@ -216,9 +216,7 @@ que_fork_start_command( ...@@ -216,9 +216,7 @@ que_fork_start_command(
QUE_THR_RUNNING state, or NULL; the query QUE_THR_RUNNING state, or NULL; the query
thread should be executed by que_run_threads thread should be executed by que_run_threads
by the caller */ by the caller */
que_fork_t* fork, /* in: a query fork */ que_fork_t* fork); /* in: a query fork */
ulint command,/* in: command SESS_COMM_FETCH_NEXT, ... */
ulint param); /* in: possible parameter to the command */
/*************************************************************************** /***************************************************************************
Gets the trx of a query thread. */ Gets the trx of a query thread. */
UNIV_INLINE UNIV_INLINE
...@@ -388,11 +386,6 @@ struct que_fork_struct{ ...@@ -388,11 +386,6 @@ struct que_fork_struct{
sym_tab_t* sym_tab; /* symbol table of the query, sym_tab_t* sym_tab; /* symbol table of the query,
generated by the parser, or NULL generated by the parser, or NULL
if the graph was created 'by hand' */ if the graph was created 'by hand' */
ulint id; /* id of this query graph */
ulint command; /* command currently executed in the
graph */
ulint param; /* possible command parameter */
/* The following cur_... fields are relevant only in a select graph */ /* The following cur_... fields are relevant only in a select graph */
ulint cur_end; /* QUE_CUR_NOT_DEFINED, QUE_CUR_START, ulint cur_end; /* QUE_CUR_NOT_DEFINED, QUE_CUR_START,
......
...@@ -13,7 +13,6 @@ Created 10/10/1995 Heikki Tuuri ...@@ -13,7 +13,6 @@ Created 10/10/1995 Heikki Tuuri
#include "univ.i" #include "univ.i"
#include "sync0sync.h" #include "sync0sync.h"
#include "os0sync.h" #include "os0sync.h"
#include "com0com.h"
#include "que0types.h" #include "que0types.h"
#include "trx0types.h" #include "trx0types.h"
...@@ -398,9 +397,6 @@ struct srv_sys_struct{ ...@@ -398,9 +397,6 @@ struct srv_sys_struct{
os_event_t operational; /* created threads must wait for the os_event_t operational; /* created threads must wait for the
server to become operational by server to become operational by
waiting for this event */ waiting for this event */
com_endpoint_t* endpoint; /* the communication endpoint of the
server */
srv_table_t* threads; /* server thread table */ srv_table_t* threads; /* server thread table */
UT_LIST_BASE_NODE_T(que_thr_t) UT_LIST_BASE_NODE_T(que_thr_t)
tasks; /* task queue */ tasks; /* task queue */
......
...@@ -203,13 +203,9 @@ trx_sig_send( ...@@ -203,13 +203,9 @@ trx_sig_send(
ulint type, /* in: signal type */ ulint type, /* in: signal type */
ulint sender, /* in: TRX_SIG_SELF or ulint sender, /* in: TRX_SIG_SELF or
TRX_SIG_OTHER_SESS */ TRX_SIG_OTHER_SESS */
ibool reply, /* in: TRUE if the sender of the signal
wants reply after the operation induced
by the signal is completed; if type
is TRX_SIG_END_WAIT, this must be
FALSE */
que_thr_t* receiver_thr, /* in: query thread which wants the que_thr_t* receiver_thr, /* in: query thread which wants the
reply, or NULL */ reply, or NULL; if type is
TRX_SIG_END_WAIT, this must be NULL */
trx_savept_t* savept, /* in: possible rollback savepoint, or trx_savept_t* savept, /* in: possible rollback savepoint, or
NULL */ NULL */
que_thr_t** next_thr); /* in/out: next query thread to run; que_thr_t** next_thr); /* in/out: next query thread to run;
...@@ -225,7 +221,6 @@ been handled. */ ...@@ -225,7 +221,6 @@ been handled. */
void void
trx_sig_reply( trx_sig_reply(
/*==========*/ /*==========*/
trx_t* trx, /* in: trx handle */
trx_sig_t* sig, /* in: signal */ trx_sig_t* sig, /* in: signal */
que_thr_t** next_thr); /* in/out: next query thread to run; que_thr_t** next_thr); /* in/out: next query thread to run;
if the value which is passed in is if the value which is passed in is
...@@ -297,15 +292,9 @@ struct trx_sig_struct{ ...@@ -297,15 +292,9 @@ struct trx_sig_struct{
TRX_SIG_BEING_HANDLED */ TRX_SIG_BEING_HANDLED */
ulint sender; /* TRX_SIG_SELF or ulint sender; /* TRX_SIG_SELF or
TRX_SIG_OTHER_SESS */ TRX_SIG_OTHER_SESS */
ibool reply; /* TRUE if the sender of the signal que_thr_t* receiver; /* non-NULL if the sender of the signal
wants reply after the operation induced wants reply after the operation induced
by the signal is completed; if this by the signal is completed */
field is TRUE and the receiver field
below is NULL, then a SUCCESS message
is sent to the client of the session
to which this trx belongs */
que_thr_t* receiver; /* query thread which wants the reply,
or NULL */
trx_savept_t savept; /* possible rollback savepoint */ trx_savept_t savept; /* possible rollback savepoint */
UT_LIST_NODE_T(trx_sig_t) UT_LIST_NODE_T(trx_sig_t)
signals; /* queue of pending signals to the signals; /* queue of pending signals to the
......
...@@ -11,7 +11,6 @@ Created 6/25/1996 Heikki Tuuri ...@@ -11,7 +11,6 @@ Created 6/25/1996 Heikki Tuuri
#include "univ.i" #include "univ.i"
#include "ut0byte.h" #include "ut0byte.h"
#include "hash0hash.h"
#include "trx0types.h" #include "trx0types.h"
#include "srv0srv.h" #include "srv0srv.h"
#include "trx0types.h" #include "trx0types.h"
...@@ -19,52 +18,14 @@ Created 6/25/1996 Heikki Tuuri ...@@ -19,52 +18,14 @@ Created 6/25/1996 Heikki Tuuri
#include "que0types.h" #include "que0types.h"
#include "data0data.h" #include "data0data.h"
#include "rem0rec.h" #include "rem0rec.h"
#include "com0com.h"
/* The session system global data structure */
extern sess_sys_t* sess_sys;
/*************************************************************************
Sets the session id in a client message. */
void
sess_cli_msg_set_sess(
/*==================*/
byte* str, /* in/out: message string */
dulint sess_id);/* in: session id */
/***************************************************************************
Sets the message type of a message from the client. */
UNIV_INLINE
void
sess_cli_msg_set_type(
/*==================*/
byte* str, /* in: message string */
ulint type); /* in: message type */
/***************************************************************************
Gets the message type of a message from the server. */
UNIV_INLINE
ulint
sess_srv_msg_get_type(
/*==================*/
/* out: message type */
byte* str); /* in: message string */
/***************************************************************************
Creates a session sytem at database start. */
void
sess_sys_init_at_db_start(void);
/*===========================*/
/************************************************************************* /*************************************************************************
Opens a session. */ Opens a session. */
sess_t* sess_t*
sess_open( sess_open(void);
/*======*/ /*============*/
/* out, own: session object */ /* out, own: session object */
com_endpoint_t* endpoint, /* in: communication endpoint used
for communicating with the client */
byte* addr_buf, /* in: client address */
ulint addr_len); /* in: client address length */
/************************************************************************* /*************************************************************************
Closes a session, freeing the memory occupied by it, if it is in a state Closes a session, freeing the memory occupied by it, if it is in a state
where it should be closed. */ where it should be closed. */
...@@ -74,200 +35,25 @@ sess_try_close( ...@@ -74,200 +35,25 @@ sess_try_close(
/*===========*/ /*===========*/
/* out: TRUE if closed */ /* out: TRUE if closed */
sess_t* sess); /* in, own: session object */ sess_t* sess); /* in, own: session object */
/*************************************************************************
Initializes the first fields of a message to client. */
void
sess_srv_msg_init(
/*==============*/
sess_t* sess, /* in: session object */
byte* buf, /* in: message buffer, must be at least of size
SESS_SRV_MSG_DATA */
ulint type); /* in: message type */
/*************************************************************************
Sends a simple message to client. */
void
sess_srv_msg_send_simple(
/*=====================*/
sess_t* sess, /* in: session object */
ulint type, /* in: message type */
ulint rel_kernel); /* in: SESS_RELEASE_KERNEL or
SESS_NOT_RELEASE_KERNEL */
/***************************************************************************
When a command has been completed, this function sends the message about it
to the client. */
void
sess_command_completed_message(
/*===========================*/
sess_t* sess, /* in: session */
byte* msg, /* in: message buffer */
ulint len); /* in: message data length */
/* The session handle. All fields are protected by the kernel mutex */ /* The session handle. All fields are protected by the kernel mutex */
struct sess_struct{ struct sess_struct{
dulint id; /* session id */
dulint usr_id; /* user id */
hash_node_t hash; /* hash chain node */
ulint refer_count; /* reference count to the session
object: when this drops to zero
and the session has no query graphs
left, discarding the session object
is allowed */
dulint error_count; /* if this counter has increased while
a thread is parsing an SQL command,
its graph should be discarded */
ibool disconnecting; /* TRUE if the session is to be
disconnected when its reference
count drops to 0 */
ulint state; /* state of the session */ ulint state; /* state of the session */
dulint msgs_sent; /* count of messages sent to the
client */
dulint msgs_recv; /* count of messages received from the
client */
ibool client_waits; /* when the session receives a message
from the client, this set to TRUE, and
when the session sends a message to
the client this is set to FALSE */
trx_t* trx; /* transaction object permanently trx_t* trx; /* transaction object permanently
assigned for the session: the assigned for the session: the
transaction instance designated by the transaction instance designated by the
trx id changes, but the memory trx id changes, but the memory
structure is preserved */ structure is preserved */
ulint next_graph_id; /* next query graph id to assign */
UT_LIST_BASE_NODE_T(que_t) UT_LIST_BASE_NODE_T(que_t)
graphs; /* query graphs belonging to this graphs; /* query graphs belonging to this
session */ session */
/*------------------------------*/
ulint err_no; /* latest error number, 0 if none */
char* err_str; /* latest error string */
ulint err_len; /* error string length */
/*------------------------------*/
com_endpoint_t* endpoint; /* server communications endpoint used
to communicate with the client */
char* addr_buf; /* client address string */
ulint addr_len; /* client address string length */
/*------------------------------*/
byte* big_msg; /* if the client sends a message which
does not fit in a single packet,
it is assembled in this buffer; if
this field is not NULL, it is assumed
that the message should be catenated
here */
ulint big_msg_size; /* size of the big message buffer */
ulint big_msg_len; /* length of data in the big message
buffer */
};
/* The session system; this is protected by the kernel mutex */
struct sess_sys_struct{
ulint state; /* state of the system:
SESS_SYS_RUNNING or
SESS_SYS_SHUTTING_DOWN */
sess_t* shutdown_req; /* if shutdown was requested by some
session, confirmation of shutdown
completion should be sent to this
session */
dulint free_sess_id; /* first unused session id */
hash_table_t* hash; /* hash table of the sessions */
}; };
/*---------------------------------------------------*/
/* The format of an incoming message from a client */
#define SESS_CLI_MSG_CHECKSUM 0 /* the checksum should be the first
field in the message */
#define SESS_CLI_MSG_SESS_ID 4 /* this is set to 0 if the client
wants to connect and establish
a new session */
#define SESS_CLI_MSG_SESS_ID_CHECK 12 /* checksum of the sess id field */
#define SESS_CLI_MSG_TYPE 16
#define SESS_CLI_MSG_NO 20
#define SESS_CLI_MSG_CONTINUE 28 /* 0, or SESS_MSG_FIRST_PART
SESS_MSG_MIDDLE_PART, or
SESS_MSG_LAST_PART */
#define SESS_CLI_MSG_CONT_SIZE 32 /* size of a multipart message in
kilobytes (rounded upwards) */
#define SESS_CLI_MSG_DATA 36
/*---------------------------------------------------*/
/* Client-to-session message types */
#define SESS_CLI_CONNECT 1
#define SESS_CLI_PREPARE 2
#define SESS_CLI_EXECUTE 3
#define SESS_CLI_BREAK_EXECUTION 4
/* Client-to-session statement command types */
#define SESS_COMM_FETCH_NEXT 1
#define SESS_COMM_FETCH_PREV 2
#define SESS_COMM_FETCH_FIRST 3
#define SESS_COMM_FETCH_LAST 4
#define SESS_COMM_FETCH_NTH 5
#define SESS_COMM_FETCH_NTH_LAST 6
#define SESS_COMM_EXECUTE 7
#define SESS_COMM_NO_COMMAND 8
/*---------------------------------------------------*/
/* The format of an outgoing message from a session to the client */
#define SESS_SRV_MSG_CHECKSUM 0 /* the checksum should be the first
field in the message */
#define SESS_SRV_MSG_SESS_ID 4
#define SESS_SRV_MSG_TYPE 12
#define SESS_SRV_MSG_NO 16
#define SESS_SRV_MSG_CONTINUE 24 /* 0, or SESS_MSG_FIRST_PART
SESS_MSG_MIDDLE_PART, or
SESS_MSG_LAST_PART */
#define SESS_SRV_MSG_CONT_SIZE 28 /* size of a multipart message
in kilobytes (rounded upward) */
#define SESS_SRV_MSG_DATA 32
/*---------------------------------------------------*/
/* Session-to-client message types */
#define SESS_SRV_ACCEPT_CONNECT 1
#define SESS_SRV_SUCCESS 2
#define SESS_SRV_ERROR 3
/* Multipart messages */
#define SESS_MSG_SINGLE_PART 0
#define SESS_MSG_FIRST_PART 1
#define SESS_MSG_MIDDLE_PART 2
#define SESS_MSG_LAST_PART 3
/* Error numbers */
#define SESS_ERR_NONE 0
#define SESS_ERR_TRX_COMMITTED 1
#define SESS_ERR_TRX_ROLLED_BACK 2
#define SESS_ERR_SESSION_DISCONNECTED 3
#define SESS_ERR_REPLY_FAILED 4
#define SESS_ERR_CANNOT_BREAK_OP 5
#define SESS_ERR_MSG_LOST 6
#define SESS_ERR_MSG_CORRUPTED 7
#define SESS_ERR_EXTRANEOUS_MSG 8
#define SESS_ERR_OUT_OF_MEMORY 9
#define SESS_ERR_SQL_ERROR 10
#define SESS_ERR_STMT_NOT_FOUND 11
#define SESS_ERR_STMT_NOT_READY 12
#define SESS_ERR_EXTRANEOUS_SRV_MSG 13
#define SESS_ERR_BREAK_BY_CLIENT 14
/* Session states */ /* Session states */
#define SESS_ACTIVE 1 #define SESS_ACTIVE 1
#define SESS_ERROR 2 /* session contains an error message #define SESS_ERROR 2 /* session contains an error message
which has not yet been communicated which has not yet been communicated
to the client */ to the client */
/* Session system states */
#define SESS_SYS_RUNNING 1
#define SESS_SYS_SHUTTING_DOWN 2
/* Session hash table size */
#define SESS_HASH_SIZE 1024
/* Flags used in sess_srv_msg_send */
#define SESS_RELEASE_KERNEL 1
#define SESS_NOT_RELEASE_KERNEL 2
#ifndef UNIV_NONINL #ifndef UNIV_NONINL
#include "usr0sess.ic" #include "usr0sess.ic"
#endif #endif
......
...@@ -5,27 +5,3 @@ Sessions ...@@ -5,27 +5,3 @@ Sessions
Created 6/25/1996 Heikki Tuuri Created 6/25/1996 Heikki Tuuri
*******************************************************/ *******************************************************/
/***************************************************************************
Sets the message type of a message from the client. */
UNIV_INLINE
void
sess_cli_msg_set_type(
/*==================*/
byte* str, /* in: message string */
ulint type) /* in: message type */
{
mach_write_to_4(str + SESS_CLI_MSG_TYPE, type);
}
/***************************************************************************
Gets the message type of a message from the server. */
UNIV_INLINE
ulint
sess_srv_msg_get_type(
/*==================*/
/* out: message type */
byte* str) /* in: message string */
{
return(mach_read_from_4(str + SESS_SRV_MSG_TYPE));
}
...@@ -10,7 +10,5 @@ Created 6/25/1996 Heikki Tuuri ...@@ -10,7 +10,5 @@ Created 6/25/1996 Heikki Tuuri
#define usr0types_h #define usr0types_h
typedef struct sess_struct sess_t; typedef struct sess_struct sess_t;
typedef struct sess_sys_struct sess_sys_t;
typedef struct sess_sig_struct sess_sig_t;
#endif #endif
...@@ -25,7 +25,6 @@ Created 5/27/1996 Heikki Tuuri ...@@ -25,7 +25,6 @@ Created 5/27/1996 Heikki Tuuri
#include "log0log.h" #include "log0log.h"
#include "eval0proc.h" #include "eval0proc.h"
#include "eval0eval.h" #include "eval0eval.h"
#include "odbc0odbc.h"
#define QUE_PARALLELIZE_LIMIT (64 * 256 * 256 * 256) #define QUE_PARALLELIZE_LIMIT (64 * 256 * 256 * 256)
#define QUE_ROUND_ROBIN_LIMIT (64 * 256 * 256 * 256) #define QUE_ROUND_ROBIN_LIMIT (64 * 256 * 256 * 256)
...@@ -83,7 +82,9 @@ que_graph_publish( ...@@ -83,7 +82,9 @@ que_graph_publish(
que_t* graph, /* in: graph */ que_t* graph, /* in: graph */
sess_t* sess) /* in: session */ sess_t* sess) /* in: session */
{ {
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
UT_LIST_ADD_LAST(graphs, sess->graphs, graph); UT_LIST_ADD_LAST(graphs, sess->graphs, graph);
} }
...@@ -190,7 +191,9 @@ que_thr_end_wait( ...@@ -190,7 +191,9 @@ que_thr_end_wait(
{ {
ibool was_active; ibool was_active;
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
ut_ad(thr); ut_ad(thr);
ut_ad((thr->state == QUE_THR_LOCK_WAIT) ut_ad((thr->state == QUE_THR_LOCK_WAIT)
|| (thr->state == QUE_THR_PROCEDURE_WAIT) || (thr->state == QUE_THR_PROCEDURE_WAIT)
...@@ -229,7 +232,9 @@ que_thr_end_wait_no_next_thr( ...@@ -229,7 +232,9 @@ que_thr_end_wait_no_next_thr(
ut_a(thr->state == QUE_THR_LOCK_WAIT); /* In MySQL this is the ut_a(thr->state == QUE_THR_LOCK_WAIT); /* In MySQL this is the
only possible state here */ only possible state here */
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
ut_ad(thr); ut_ad(thr);
ut_ad((thr->state == QUE_THR_LOCK_WAIT) ut_ad((thr->state == QUE_THR_LOCK_WAIT)
|| (thr->state == QUE_THR_PROCEDURE_WAIT) || (thr->state == QUE_THR_PROCEDURE_WAIT)
...@@ -279,16 +284,10 @@ que_fork_start_command( ...@@ -279,16 +284,10 @@ que_fork_start_command(
QUE_THR_RUNNING state, or NULL; the query QUE_THR_RUNNING state, or NULL; the query
thread should be executed by que_run_threads thread should be executed by que_run_threads
by the caller */ by the caller */
que_fork_t* fork, /* in: a query fork */ que_fork_t* fork) /* in: a query fork */
ulint command,/* in: command SESS_COMM_FETCH_NEXT, ... */
ulint param) /* in: possible parameter to the command */
{ {
que_thr_t* thr; que_thr_t* thr;
/* Set the command parameters in the fork root */
fork->command = command;
fork->param = param;
fork->state = QUE_FORK_ACTIVE; fork->state = QUE_FORK_ACTIVE;
fork->last_sel_node = NULL; fork->last_sel_node = NULL;
...@@ -370,7 +369,9 @@ que_fork_error_handle( ...@@ -370,7 +369,9 @@ que_fork_error_handle(
{ {
que_thr_t* thr; que_thr_t* thr;
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
ut_ad(trx->sess->state == SESS_ERROR); ut_ad(trx->sess->state == SESS_ERROR);
ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0); ut_ad(UT_LIST_GET_LEN(trx->reply_signals) == 0);
ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0); ut_ad(UT_LIST_GET_LEN(trx->wait_thrs) == 0);
...@@ -640,7 +641,9 @@ que_graph_try_free( ...@@ -640,7 +641,9 @@ que_graph_try_free(
{ {
sess_t* sess; sess_t* sess;
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
sess = (graph->trx)->sess; sess = (graph->trx)->sess;
...@@ -665,49 +668,20 @@ does nothing! */ ...@@ -665,49 +668,20 @@ does nothing! */
void void
que_thr_handle_error( que_thr_handle_error(
/*=================*/ /*=================*/
que_thr_t* thr, /* in: query thread */ que_thr_t* thr __attribute((unused)),
ulint err_no, /* in: error number */ /* in: query thread */
byte* err_str,/* in, own: error string or NULL; NOTE: the ulint err_no __attribute((unused)),
/* in: error number */
byte* err_str __attribute((unused)),
/* in, own: error string or NULL; NOTE: the
function will take care of freeing of the function will take care of freeing of the
string! */ string! */
ulint err_len)/* in: error string length */ ulint err_len __attribute((unused)))
/* in: error string length */
{ {
UT_NOT_USED(thr);
UT_NOT_USED(err_no);
UT_NOT_USED(err_str);
UT_NOT_USED(err_len);
/* Does nothing */ /* Does nothing */
} }
/********************************************************************
Builds a command completed-message to the client. */
static
ulint
que_build_srv_msg(
/*==============*/
/* out: message data length */
byte* buf, /* in: message buffer */
que_fork_t* fork, /* in: query graph where execution completed */
sess_t* sess) /* in: session */
{
ulint len;
/* Currently, we only support stored procedures: */
ut_ad(fork->fork_type == QUE_FORK_PROCEDURE);
if (sess->state == SESS_ERROR) {
return(0);
}
sess_srv_msg_init(sess, buf, SESS_SRV_SUCCESS);
len = pars_proc_write_output_params_to_buf(buf + SESS_SRV_MSG_DATA,
fork);
return(len);
}
/******************************************************************** /********************************************************************
Performs an execution step on a thr node. */ Performs an execution step on a thr node. */
static static
...@@ -804,10 +778,6 @@ que_thr_dec_refer_count( ...@@ -804,10 +778,6 @@ que_thr_dec_refer_count(
que_fork_t* fork; que_fork_t* fork;
trx_t* trx; trx_t* trx;
sess_t* sess; sess_t* sess;
ibool send_srv_msg = FALSE;
ibool release_stored_proc = FALSE;
ulint msg_len = 0;
byte msg_buf[ODBC_DATAGRAM_SIZE];
ulint fork_type; ulint fork_type;
ibool stopped; ibool stopped;
...@@ -828,8 +798,8 @@ que_thr_dec_refer_count( ...@@ -828,8 +798,8 @@ que_thr_dec_refer_count(
already canceled before we came here: continue already canceled before we came here: continue
running the thread */ running the thread */
/* printf( /* fputs("!!!!!!!! Wait already ended: continue thr\n",
"!!!!!!!!!! Wait already ended: continue thr\n"); */ stderr); */
if (next_thr && *next_thr == NULL) { if (next_thr && *next_thr == NULL) {
*next_thr = thr; *next_thr = thr;
...@@ -882,40 +852,13 @@ que_thr_dec_refer_count( ...@@ -882,40 +852,13 @@ que_thr_dec_refer_count(
} else if (fork_type == QUE_FORK_MYSQL_INTERFACE) { } else if (fork_type == QUE_FORK_MYSQL_INTERFACE) {
/* Do nothing */ /* Do nothing */
} else if (fork->common.parent == NULL
&& fork->caller == NULL
&& UT_LIST_GET_LEN(trx->signals) == 0) {
ut_a(0); /* not used in MySQL */
/* Reply to the client */
/* que_thr_add_update_info(thr); */
fork->state = QUE_FORK_COMMAND_WAIT;
msg_len = que_build_srv_msg(msg_buf, fork, sess);
send_srv_msg = TRUE;
if (fork->fork_type == QUE_FORK_PROCEDURE) {
release_stored_proc = TRUE;
}
ut_ad(trx->graph == fork);
trx->graph = NULL;
} else { } else {
/* Subprocedure calls not implemented yet */ ut_error; /* not used in MySQL */
ut_a(0);
} }
} }
if (UT_LIST_GET_LEN(trx->signals) > 0 && trx->n_active_thrs == 0) { if (UT_LIST_GET_LEN(trx->signals) > 0 && trx->n_active_thrs == 0) {
ut_ad(!send_srv_msg);
/* If the trx is signaled and its query thread count drops to /* If the trx is signaled and its query thread count drops to
zero, then we start processing a signal; from it we may get zero, then we start processing a signal; from it we may get
a new query thread to run */ a new query thread to run */
...@@ -929,26 +872,6 @@ que_thr_dec_refer_count( ...@@ -929,26 +872,6 @@ que_thr_dec_refer_count(
} }
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
if (send_srv_msg) {
/* Note that, as we do not own the kernel mutex at this point,
and neither do we own it all the time when doing the actual
communication operation within the next function, it is
possible that the messages will not get delivered in the right
sequential order. This is possible if the client communicates
an extra message to the server while the message below is still
undelivered. But then the client should notice that there
is an error in the order numbers of the messages. */
sess_command_completed_message(sess, msg_buf, msg_len);
}
if (release_stored_proc) {
/* Return the stored procedure graph to the dictionary cache */
dict_procedure_release_parsed_copy(fork);
}
} }
/************************************************************************** /**************************************************************************
...@@ -966,7 +889,9 @@ que_thr_stop( ...@@ -966,7 +889,9 @@ que_thr_stop(
que_t* graph; que_t* graph;
ibool ret = TRUE; ibool ret = TRUE;
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
graph = thr->graph; graph = thr->graph;
trx = graph->trx; trx = graph->trx;
...@@ -1118,58 +1043,55 @@ que_node_print_info( ...@@ -1118,58 +1043,55 @@ que_node_print_info(
que_node_t* node) /* in: query graph node */ que_node_t* node) /* in: query graph node */
{ {
ulint type; ulint type;
char* str; const char* str;
ulint addr;
type = que_node_get_type(node); type = que_node_get_type(node);
addr = (ulint)node;
if (type == QUE_NODE_SELECT) { if (type == QUE_NODE_SELECT) {
str = (char *) "SELECT"; str = "SELECT";
} else if (type == QUE_NODE_INSERT) { } else if (type == QUE_NODE_INSERT) {
str = (char *) "INSERT"; str = "INSERT";
} else if (type == QUE_NODE_UPDATE) { } else if (type == QUE_NODE_UPDATE) {
str = (char *) "UPDATE"; str = "UPDATE";
} else if (type == QUE_NODE_WHILE) { } else if (type == QUE_NODE_WHILE) {
str = (char *) "WHILE"; str = "WHILE";
} else if (type == QUE_NODE_ASSIGNMENT) { } else if (type == QUE_NODE_ASSIGNMENT) {
str = (char *) "ASSIGNMENT"; str = "ASSIGNMENT";
} else if (type == QUE_NODE_IF) { } else if (type == QUE_NODE_IF) {
str = (char *) "IF"; str = "IF";
} else if (type == QUE_NODE_FETCH) { } else if (type == QUE_NODE_FETCH) {
str = (char *) "FETCH"; str = "FETCH";
} else if (type == QUE_NODE_OPEN) { } else if (type == QUE_NODE_OPEN) {
str = (char *) "OPEN"; str = "OPEN";
} else if (type == QUE_NODE_PROC) { } else if (type == QUE_NODE_PROC) {
str = (char *) "STORED PROCEDURE"; str = "STORED PROCEDURE";
} else if (type == QUE_NODE_FUNC) { } else if (type == QUE_NODE_FUNC) {
str = (char *) "FUNCTION"; str = "FUNCTION";
} else if (type == QUE_NODE_LOCK) { } else if (type == QUE_NODE_LOCK) {
str = (char *) "LOCK"; str = "LOCK";
} else if (type == QUE_NODE_THR) { } else if (type == QUE_NODE_THR) {
str = (char *) "QUERY THREAD"; str = "QUERY THREAD";
} else if (type == QUE_NODE_COMMIT) { } else if (type == QUE_NODE_COMMIT) {
str = (char *) "COMMIT"; str = "COMMIT";
} else if (type == QUE_NODE_UNDO) { } else if (type == QUE_NODE_UNDO) {
str = (char *) "UNDO ROW"; str = "UNDO ROW";
} else if (type == QUE_NODE_PURGE) { } else if (type == QUE_NODE_PURGE) {
str = (char *) "PURGE ROW"; str = "PURGE ROW";
} else if (type == QUE_NODE_ROLLBACK) { } else if (type == QUE_NODE_ROLLBACK) {
str = (char *) "ROLLBACK"; str = "ROLLBACK";
} else if (type == QUE_NODE_CREATE_TABLE) { } else if (type == QUE_NODE_CREATE_TABLE) {
str = (char *) "CREATE TABLE"; str = "CREATE TABLE";
} else if (type == QUE_NODE_CREATE_INDEX) { } else if (type == QUE_NODE_CREATE_INDEX) {
str = (char *) "CREATE INDEX"; str = "CREATE INDEX";
} else if (type == QUE_NODE_FOR) { } else if (type == QUE_NODE_FOR) {
str = (char *) "FOR LOOP"; str = "FOR LOOP";
} else if (type == QUE_NODE_RETURN) { } else if (type == QUE_NODE_RETURN) {
str = (char *) "RETURN"; str = "RETURN";
} else { } else {
str = (char *) "UNKNOWN NODE TYPE"; str = "UNKNOWN NODE TYPE";
} }
printf("Node type %lu: %s, address %lx\n", type, str, addr); fprintf(stderr, "Node type %lu: %s, address %p\n", type, str, node);
} }
/************************************************************************** /**************************************************************************
...@@ -1199,7 +1121,7 @@ que_thr_step( ...@@ -1199,7 +1121,7 @@ que_thr_step(
#ifdef UNIV_DEBUG #ifdef UNIV_DEBUG
if (que_trace_on) { if (que_trace_on) {
printf("To execute: "); fputs("To execute: ", stderr);
que_node_print_info(node); que_node_print_info(node);
} }
#endif #endif
...@@ -1296,7 +1218,9 @@ que_run_threads( ...@@ -1296,7 +1218,9 @@ que_run_threads(
ulint loop_count; ulint loop_count;
ut_ad(thr->state == QUE_THR_RUNNING); ut_ad(thr->state == QUE_THR_RUNNING);
#ifdef UNIV_SYNC_DEBUG
ut_ad(!mutex_own(&kernel_mutex)); ut_ad(!mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
/* cumul_resource counts how much resources the OS thread (NOT the /* cumul_resource counts how much resources the OS thread (NOT the
query thread) has spent in this function */ query thread) has spent in this function */
......
...@@ -1402,8 +1402,7 @@ row_create_table_for_mysql( ...@@ -1402,8 +1402,7 @@ row_create_table_for_mysql(
thr = pars_complete_graph_for_exec(node, trx, heap); thr = pars_complete_graph_for_exec(node, trx, heap);
ut_a(thr == que_fork_start_command(que_node_get_parent(thr), ut_a(thr == que_fork_start_command(que_node_get_parent(thr)));
SESS_COMM_EXECUTE, 0));
que_run_threads(thr); que_run_threads(thr);
err = trx->error_state; err = trx->error_state;
...@@ -1525,8 +1524,7 @@ row_create_index_for_mysql( ...@@ -1525,8 +1524,7 @@ row_create_index_for_mysql(
thr = pars_complete_graph_for_exec(node, trx, heap); thr = pars_complete_graph_for_exec(node, trx, heap);
ut_a(thr == que_fork_start_command(que_node_get_parent(thr), ut_a(thr == que_fork_start_command(que_node_get_parent(thr)));
SESS_COMM_EXECUTE, 0));
que_run_threads(thr); que_run_threads(thr);
err = trx->error_state; err = trx->error_state;
...@@ -2070,7 +2068,7 @@ row_drop_table_for_mysql( ...@@ -2070,7 +2068,7 @@ row_drop_table_for_mysql(
trx->dict_operation = TRUE; trx->dict_operation = TRUE;
trx->table_id = table->id; trx->table_id = table->id;
ut_a(thr = que_fork_start_command(graph, SESS_COMM_EXECUTE, 0)); ut_a(thr = que_fork_start_command(graph));
que_run_threads(thr); que_run_threads(thr);
...@@ -2450,7 +2448,7 @@ row_rename_table_for_mysql( ...@@ -2450,7 +2448,7 @@ row_rename_table_for_mysql(
graph->fork_type = QUE_FORK_MYSQL_INTERFACE; graph->fork_type = QUE_FORK_MYSQL_INTERFACE;
ut_a(thr = que_fork_start_command(graph, SESS_COMM_EXECUTE, 0)); ut_a(thr = que_fork_start_command(graph));
que_run_threads(thr); que_run_threads(thr);
......
...@@ -34,8 +34,6 @@ Created 10/8/1995 Heikki Tuuri ...@@ -34,8 +34,6 @@ Created 10/8/1995 Heikki Tuuri
#include "sync0sync.h" #include "sync0sync.h"
#include "sync0ipm.h" #include "sync0ipm.h"
#include "thr0loc.h" #include "thr0loc.h"
#include "com0com.h"
#include "com0shm.h"
#include "que0que.h" #include "que0que.h"
#include "srv0que.h" #include "srv0que.h"
#include "log0recv.h" #include "log0recv.h"
...@@ -235,9 +233,6 @@ int srv_query_thread_priority = 0; ...@@ -235,9 +233,6 @@ int srv_query_thread_priority = 0;
ulint srv_n_spin_wait_rounds = 20; ulint srv_n_spin_wait_rounds = 20;
ulint srv_spin_wait_delay = 5; ulint srv_spin_wait_delay = 5;
ibool srv_priority_boost = TRUE; ibool srv_priority_boost = TRUE;
char srv_endpoint_name[COM_MAX_ADDR_LEN];
ulint srv_n_com_threads = ULINT_MAX;
ulint srv_n_worker_threads = ULINT_MAX;
ibool srv_print_thread_releases = FALSE; ibool srv_print_thread_releases = FALSE;
ibool srv_print_lock_waits = FALSE; ibool srv_print_lock_waits = FALSE;
...@@ -249,10 +244,10 @@ ulint srv_n_rows_inserted = 0; ...@@ -249,10 +244,10 @@ ulint srv_n_rows_inserted = 0;
ulint srv_n_rows_updated = 0; ulint srv_n_rows_updated = 0;
ulint srv_n_rows_deleted = 0; ulint srv_n_rows_deleted = 0;
ulint srv_n_rows_read = 0; ulint srv_n_rows_read = 0;
ulint srv_n_rows_inserted_old = 0; static ulint srv_n_rows_inserted_old = 0;
ulint srv_n_rows_updated_old = 0; static ulint srv_n_rows_updated_old = 0;
ulint srv_n_rows_deleted_old = 0; static ulint srv_n_rows_deleted_old = 0;
ulint srv_n_rows_read_old = 0; static ulint srv_n_rows_read_old = 0;
/* /*
Set the following to 0 if you want InnoDB to write messages on Set the following to 0 if you want InnoDB to write messages on
......
...@@ -39,7 +39,6 @@ Created 2/16/1996 Heikki Tuuri ...@@ -39,7 +39,6 @@ Created 2/16/1996 Heikki Tuuri
#include "rem0rec.h" #include "rem0rec.h"
#include "srv0srv.h" #include "srv0srv.h"
#include "que0que.h" #include "que0que.h"
#include "com0com.h"
#include "usr0sess.h" #include "usr0sess.h"
#include "lock0lock.h" #include "lock0lock.h"
#include "trx0roll.h" #include "trx0roll.h"
...@@ -1211,8 +1210,6 @@ NetWare. */ ...@@ -1211,8 +1210,6 @@ NetWare. */
mutex_exit(&(log_sys->mutex)); mutex_exit(&(log_sys->mutex));
} }
sess_sys_init_at_db_start();
if (create_new_db) { if (create_new_db) {
mtr_start(&mtr); mtr_start(&mtr);
......
...@@ -195,8 +195,6 @@ void ...@@ -195,8 +195,6 @@ void
trx_purge_sys_create(void) trx_purge_sys_create(void)
/*======================*/ /*======================*/
{ {
com_endpoint_t* com_endpoint;
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
purge_sys = mem_alloc(sizeof(trx_purge_t)); purge_sys = mem_alloc(sizeof(trx_purge_t));
...@@ -219,9 +217,7 @@ trx_purge_sys_create(void) ...@@ -219,9 +217,7 @@ trx_purge_sys_create(void)
purge_sys->arr = trx_undo_arr_create(); purge_sys->arr = trx_undo_arr_create();
com_endpoint = (com_endpoint_t*)purge_sys; /* This is a dummy non-NULL purge_sys->sess = sess_open();
value */
purge_sys->sess = sess_open(com_endpoint, (byte*)"purge_system", 13);
purge_sys->trx = purge_sys->sess->trx; purge_sys->trx = purge_sys->sess->trx;
...@@ -1034,11 +1030,11 @@ trx_purge(void) ...@@ -1034,11 +1030,11 @@ trx_purge(void)
mutex_enter(&kernel_mutex); mutex_enter(&kernel_mutex);
thr = que_fork_start_command(purge_sys->query, SESS_COMM_EXECUTE, 0); thr = que_fork_start_command(purge_sys->query);
ut_ad(thr); ut_ad(thr);
/* thr2 = que_fork_start_command(purge_sys->query, SESS_COMM_EXECUTE, 0); /* thr2 = que_fork_start_command(purge_sys->query);
ut_ad(thr2); */ ut_ad(thr2); */
......
...@@ -73,8 +73,7 @@ trx_general_rollback_for_mysql( ...@@ -73,8 +73,7 @@ trx_general_rollback_for_mysql(
thr = pars_complete_graph_for_exec(roll_node, trx, heap); thr = pars_complete_graph_for_exec(roll_node, trx, heap);
ut_a(thr == que_fork_start_command(que_node_get_parent(thr), ut_a(thr == que_fork_start_command(que_node_get_parent(thr)));
SESS_COMM_EXECUTE, 0));
que_run_threads(thr); que_run_threads(thr);
mutex_enter(&kernel_mutex); mutex_enter(&kernel_mutex);
...@@ -354,8 +353,7 @@ trx_rollback_or_clean_all_without_sess(void) ...@@ -354,8 +353,7 @@ trx_rollback_or_clean_all_without_sess(void)
/* Open a dummy session */ /* Open a dummy session */
if (!trx_dummy_sess) { if (!trx_dummy_sess) {
trx_dummy_sess = sess_open(NULL, (byte*)"Dummy sess", trx_dummy_sess = sess_open();
ut_strlen((char *) "Dummy sess"));
} }
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
...@@ -418,7 +416,7 @@ trx_rollback_or_clean_all_without_sess(void) ...@@ -418,7 +416,7 @@ trx_rollback_or_clean_all_without_sess(void)
trx->graph = fork; trx->graph = fork;
ut_a(thr == que_fork_start_command(fork, SESS_COMM_EXECUTE, 0)); ut_a(thr == que_fork_start_command(fork));
trx_roll_max_undo_no = ut_conv_dulint_to_longlong(trx->undo_no); trx_roll_max_undo_no = ut_conv_dulint_to_longlong(trx->undo_no);
trx_roll_progress_printed_pct = 0; trx_roll_progress_printed_pct = 0;
...@@ -981,11 +979,11 @@ trx_rollback( ...@@ -981,11 +979,11 @@ trx_rollback(
trx->graph = roll_graph; trx->graph = roll_graph;
trx->que_state = TRX_QUE_ROLLING_BACK; trx->que_state = TRX_QUE_ROLLING_BACK;
thr = que_fork_start_command(roll_graph, SESS_COMM_EXECUTE, 0); thr = que_fork_start_command(roll_graph);
ut_ad(thr); ut_ad(thr);
/* thr2 = que_fork_start_command(roll_graph, SESS_COMM_EXECUTE, 0); /* thr2 = que_fork_start_command(roll_graph);
ut_ad(thr2); */ ut_ad(thr2); */
...@@ -1082,7 +1080,7 @@ trx_finish_partial_rollback_off_kernel( ...@@ -1082,7 +1080,7 @@ trx_finish_partial_rollback_off_kernel(
/* Remove the signal from the signal queue and send reply message /* Remove the signal from the signal queue and send reply message
to it */ to it */
trx_sig_reply(trx, sig, next_thr); trx_sig_reply(sig, next_thr);
trx_sig_remove(trx, sig); trx_sig_remove(trx, sig);
trx->que_state = TRX_QUE_RUNNING; trx->que_state = TRX_QUE_RUNNING;
...@@ -1145,7 +1143,7 @@ trx_finish_rollback_off_kernel( ...@@ -1145,7 +1143,7 @@ trx_finish_rollback_off_kernel(
if (sig->type == TRX_SIG_TOTAL_ROLLBACK) { if (sig->type == TRX_SIG_TOTAL_ROLLBACK) {
trx_sig_reply(trx, sig, next_thr); trx_sig_reply(sig, next_thr);
trx_sig_remove(trx, sig); trx_sig_remove(trx, sig);
} }
...@@ -1213,7 +1211,7 @@ trx_rollback_step( ...@@ -1213,7 +1211,7 @@ trx_rollback_step(
success = trx_sig_send(thr_get_trx(thr), success = trx_sig_send(thr_get_trx(thr),
sig_no, TRX_SIG_SELF, sig_no, TRX_SIG_SELF,
TRUE, thr, savept, NULL); thr, savept, NULL);
thr->state = QUE_THR_SIG_REPLY_WAIT; thr->state = QUE_THR_SIG_REPLY_WAIT;
......
...@@ -171,8 +171,7 @@ trx_allocate_for_mysql(void) ...@@ -171,8 +171,7 @@ trx_allocate_for_mysql(void)
/* Open a dummy session */ /* Open a dummy session */
if (!trx_dummy_sess) { if (!trx_dummy_sess) {
trx_dummy_sess = sess_open(NULL, (byte*)"Dummy sess", trx_dummy_sess = sess_open();
ut_strlen((char *) "Dummy sess"));
} }
trx = trx_create(trx_dummy_sess); trx = trx_create(trx_dummy_sess);
...@@ -205,8 +204,7 @@ trx_allocate_for_background(void) ...@@ -205,8 +204,7 @@ trx_allocate_for_background(void)
/* Open a dummy session */ /* Open a dummy session */
if (!trx_dummy_sess) { if (!trx_dummy_sess) {
trx_dummy_sess = sess_open(NULL, (byte*)"Dummy sess", trx_dummy_sess = sess_open();
ut_strlen("Dummy sess"));
} }
trx = trx_create(trx_dummy_sess); trx = trx_create(trx_dummy_sess);
...@@ -913,7 +911,7 @@ trx_handle_commit_sig_off_kernel( ...@@ -913,7 +911,7 @@ trx_handle_commit_sig_off_kernel(
if (sig->type == TRX_SIG_COMMIT) { if (sig->type == TRX_SIG_COMMIT) {
trx_sig_reply(trx, sig, next_thr); trx_sig_reply(sig, next_thr);
trx_sig_remove(trx, sig); trx_sig_remove(trx, sig);
} }
...@@ -1002,7 +1000,6 @@ trx_sig_reply_wait_to_suspended( ...@@ -1002,7 +1000,6 @@ trx_sig_reply_wait_to_suspended(
thr->state = QUE_THR_SUSPENDED; thr->state = QUE_THR_SUSPENDED;
sig->receiver = NULL; sig->receiver = NULL;
sig->reply = FALSE;
UT_LIST_REMOVE(reply_signals, trx->reply_signals, sig); UT_LIST_REMOVE(reply_signals, trx->reply_signals, sig);
...@@ -1096,13 +1093,9 @@ trx_sig_send( ...@@ -1096,13 +1093,9 @@ trx_sig_send(
ulint type, /* in: signal type */ ulint type, /* in: signal type */
ulint sender, /* in: TRX_SIG_SELF or ulint sender, /* in: TRX_SIG_SELF or
TRX_SIG_OTHER_SESS */ TRX_SIG_OTHER_SESS */
ibool reply, /* in: TRUE if the sender of the signal
wants reply after the operation induced
by the signal is completed; if type
is TRX_SIG_END_WAIT, this must be
FALSE */
que_thr_t* receiver_thr, /* in: query thread which wants the que_thr_t* receiver_thr, /* in: query thread which wants the
reply, or NULL */ reply, or NULL; if type is
TRX_SIG_END_WAIT, this must be NULL */
trx_savept_t* savept, /* in: possible rollback savepoint, or trx_savept_t* savept, /* in: possible rollback savepoint, or
NULL */ NULL */
que_thr_t** next_thr) /* in/out: next query thread to run; que_thr_t** next_thr) /* in/out: next query thread to run;
...@@ -1146,7 +1139,6 @@ trx_sig_send( ...@@ -1146,7 +1139,6 @@ trx_sig_send(
sig->type = type; sig->type = type;
sig->state = TRX_SIG_WAITING; sig->state = TRX_SIG_WAITING;
sig->sender = sender; sig->sender = sender;
sig->reply = reply;
sig->receiver = receiver_thr; sig->receiver = receiver_thr;
if (savept) { if (savept) {
...@@ -1305,7 +1297,7 @@ trx_sig_start_handle( ...@@ -1305,7 +1297,7 @@ trx_sig_start_handle(
} else if (type == TRX_SIG_BREAK_EXECUTION) { } else if (type == TRX_SIG_BREAK_EXECUTION) {
trx_sig_reply(trx, sig, next_thr); trx_sig_reply(sig, next_thr);
trx_sig_remove(trx, sig); trx_sig_remove(trx, sig);
} else { } else {
ut_error; ut_error;
...@@ -1321,7 +1313,6 @@ handled. */ ...@@ -1321,7 +1313,6 @@ handled. */
void void
trx_sig_reply( trx_sig_reply(
/*==========*/ /*==========*/
trx_t* trx, /* in: trx handle */
trx_sig_t* sig, /* in: signal */ trx_sig_t* sig, /* in: signal */
que_thr_t** next_thr) /* in/out: next query thread to run; que_thr_t** next_thr) /* in/out: next query thread to run;
if the value which is passed in is if the value which is passed in is
...@@ -1331,11 +1322,10 @@ trx_sig_reply( ...@@ -1331,11 +1322,10 @@ trx_sig_reply(
{ {
trx_t* receiver_trx; trx_t* receiver_trx;
ut_ad(trx && sig); ut_ad(sig);
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
if (sig->reply && (sig->receiver != NULL)) { if (sig->receiver != NULL) {
ut_ad((sig->receiver)->state == QUE_THR_SIG_REPLY_WAIT); ut_ad((sig->receiver)->state == QUE_THR_SIG_REPLY_WAIT);
receiver_trx = thr_get_trx(sig->receiver); receiver_trx = thr_get_trx(sig->receiver);
...@@ -1346,18 +1336,8 @@ trx_sig_reply( ...@@ -1346,18 +1336,8 @@ trx_sig_reply(
que_thr_end_wait(sig->receiver, next_thr); que_thr_end_wait(sig->receiver, next_thr);
sig->reply = FALSE;
sig->receiver = NULL;
} else if (sig->reply) {
/* In this case the reply should be sent to the client of
the session of the transaction */
sig->reply = FALSE;
sig->receiver = NULL; sig->receiver = NULL;
sess_srv_msg_send_simple(trx->sess, SESS_SRV_SUCCESS,
SESS_NOT_RELEASE_KERNEL);
} }
} }
...@@ -1373,7 +1353,6 @@ trx_sig_remove( ...@@ -1373,7 +1353,6 @@ trx_sig_remove(
ut_ad(trx && sig); ut_ad(trx && sig);
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad(sig->reply == FALSE);
ut_ad(sig->receiver == NULL); ut_ad(sig->receiver == NULL);
UT_LIST_REMOVE(signals, trx->signals, sig); UT_LIST_REMOVE(signals, trx->signals, sig);
...@@ -1435,8 +1414,7 @@ trx_commit_step( ...@@ -1435,8 +1414,7 @@ trx_commit_step(
/* Send the commit signal to the transaction */ /* Send the commit signal to the transaction */
success = trx_sig_send(thr_get_trx(thr), TRX_SIG_COMMIT, success = trx_sig_send(thr_get_trx(thr), TRX_SIG_COMMIT,
TRX_SIG_SELF, TRUE, thr, NULL, TRX_SIG_SELF, thr, NULL, &next_thr);
&next_thr);
mutex_exit(&kernel_mutex); mutex_exit(&kernel_mutex);
......
...@@ -12,21 +12,7 @@ Created 6/25/1996 Heikki Tuuri ...@@ -12,21 +12,7 @@ Created 6/25/1996 Heikki Tuuri
#include "usr0sess.ic" #include "usr0sess.ic"
#endif #endif
#include "ut0rnd.h"
#include "mach0data.h"
#include "ha0ha.h"
#include "trx0trx.h" #include "trx0trx.h"
#include "que0que.h"
#include "pars0pars.h"
#include "pars0sym.h"
#include "dict0dict.h"
#include "dict0mem.h"
#include "odbc0odbc.h"
#define SESS_ERR_BUF_SIZE 8192
/* The session system global data structure */
sess_sys_t* sess_sys = NULL;
/************************************************************************* /*************************************************************************
Closes a session, freeing the memory occupied by it. */ Closes a session, freeing the memory occupied by it. */
...@@ -35,247 +21,28 @@ void ...@@ -35,247 +21,28 @@ void
sess_close( sess_close(
/*=======*/ /*=======*/
sess_t* sess); /* in, own: session object */ sess_t* sess); /* in, own: session object */
/*************************************************************************
Communicates an error message to the client. If sess->client_waits is not
TRUE, puts the session to error state and does not try to send the error
message. */
static
void
sess_srv_msg_send_error(
/*====================*/
sess_t* sess); /* in: session object */
/*************************************************************************
Copies error info to a session. Sends to the transaction a signal which will
rollback the latest incomplete SQL statement and then send the error message
to the client. NOTE: This function will take care of the freeing of the error
string, thus the caller must supply a copy of the error string. */
static
void
sess_error_low(
/*===========*/
sess_t* sess, /* in: session object */
ulint err_no, /* in: error number */
char* err_str);/* in, own: error string or NULL;
NOTE: the function will take care of freeing of the
string! */
/*************************************************************************
Folds a session id to a ulint. Because this function is used also in
calculating a checksum for the id to write in the message, it is performs
also a XOR operation to mix the values more thoroughly. */
UNIV_INLINE
ulint
sess_id_fold(
/*=========*/
/* out: folded value; can be used also as the checksum
for id */
dulint id) /* in: session id */
{
return(ut_fold_dulint(id) ^ 2945794411U);
}
/*************************************************************************
Sets the session id in a client message. */
void
sess_cli_msg_set_sess(
/*==================*/
byte* str, /* in/out: message string */
dulint sess_id)/* in: session id */
{
ulint fold;
mach_write_to_8(str + SESS_CLI_MSG_SESS_ID, sess_id);
fold = sess_id_fold(sess_id);
mach_write_to_4(str + SESS_CLI_MSG_SESS_ID_CHECK, fold);
}
/***************************************************************************
Decrements the reference count of a session and closes it, if desired. */
UNIV_INLINE
void
sess_refer_count_dec(
/*=================*/
sess_t* sess) /* in: session */
{
ut_ad(mutex_own(&kernel_mutex));
ut_ad(sess->refer_count > 0);
sess->refer_count--;
if (sess->disconnecting && (sess->refer_count == 0)) {
sess_close(sess);
}
}
/***************************************************************************
Increments the reference count of a session. */
UNIV_INLINE
void
sess_refer_count_inc(
/*=================*/
sess_t* sess) /* in: session */
{
ut_ad(mutex_own(&kernel_mutex));
sess->refer_count++;
}
/***************************************************************************
Creates a session system at a database start. */
void
sess_sys_init_at_db_start(void)
/*===========================*/
{
sess_sys = mem_alloc(sizeof(sess_sys_t));
sess_sys->state = SESS_SYS_RUNNING;
sess_sys->free_sess_id = ut_dulint_create(0, 1);
sess_sys->hash = hash_create(SESS_HASH_SIZE);
}
/***************************************************************************
Gets the message type of a message from client. */
UNIV_INLINE
ulint
sess_cli_msg_get_type(
/*==================*/
/* out: message type */
byte* str) /* in: message string */
{
ut_ad(mutex_own(&kernel_mutex));
return(mach_read_from_4(str + SESS_CLI_MSG_TYPE));
}
/***************************************************************************
Gets the message number of a message from client. */
UNIV_INLINE
dulint
sess_cli_msg_get_msg_no(
/*====================*/
/* out: message number */
byte* str) /* in: message string */
{
ut_ad(mutex_own(&kernel_mutex));
return(mach_read_from_8(str + SESS_CLI_MSG_NO));
}
/***************************************************************************
Gets the continue field of a message from client. */
UNIV_INLINE
ulint
sess_cli_msg_get_continue(
/*======================*/
/* out: SESS_MSG_SINGLE_PART, ... */
byte* str) /* in: message string */
{
ut_ad(mutex_own(&kernel_mutex));
return(mach_read_from_4(str + SESS_CLI_MSG_CONTINUE));
}
/***************************************************************************
Gets the size of a big message in kilobytes. */
UNIV_INLINE
ulint
sess_cli_msg_get_cont_size(
/*=======================*/
/* out: size in kilobytes */
byte* str) /* in: message string */
{
ut_ad(mutex_own(&kernel_mutex));
return(mach_read_from_4(str + SESS_CLI_MSG_CONT_SIZE));
}
/*************************************************************************
Checks the consistency of a message from a client. */
UNIV_INLINE
ibool
sess_cli_msg_check_consistency(
/*===========================*/
/* out: TRUE if ok */
byte* str, /* in: message string */
ulint len) /* in: message string length */
{
ulint fold;
ut_ad(mutex_own(&kernel_mutex));
if (len < SESS_CLI_MSG_DATA) {
return(FALSE);
}
ut_ad(SESS_CLI_MSG_CHECKSUM == 0);
fold = ut_fold_binary(str + 4, len - 4);
if (mach_read_from_4(str + SESS_CLI_MSG_CHECKSUM) != fold) {
return(FALSE);
}
return(TRUE);
}
/************************************************************************* /*************************************************************************
Opens a session. */ Opens a session. */
sess_t* sess_t*
sess_open( sess_open(void)
/*======*/ /*===========*/
/* out, own: session object */ /* out, own: session object */
com_endpoint_t* endpoint, /* in: communication endpoint used
for receiving messages from the client,
or NULL if no client */
byte* addr_buf, /* in: client address (= user name) */
ulint addr_len) /* in: client address length */
{ {
sess_t* sess; sess_t* sess;
ulint fold;
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
sess = mem_alloc(sizeof(sess_t)); sess = mem_alloc(sizeof(sess_t));
sess->id = sess_sys->free_sess_id;
UT_DULINT_INC(sess_sys->free_sess_id);
sess->state = SESS_ACTIVE; sess->state = SESS_ACTIVE;
sess->disconnecting = FALSE;
sess->msgs_sent = ut_dulint_zero;
sess->msgs_recv = ut_dulint_zero;
sess->client_waits = TRUE;
sess->err_no = 0;
sess->err_str = NULL;
sess->error_count = ut_dulint_zero;
sess->big_msg = NULL;
sess->trx = trx_create(sess); sess->trx = trx_create(sess);
sess->next_graph_id = 0;
UT_LIST_INIT(sess->graphs); UT_LIST_INIT(sess->graphs);
fold = sess_id_fold(sess->id);
HASH_INSERT(sess_t, hash, sess_sys->hash, fold, sess);
sess->endpoint = endpoint;
sess->addr_buf = mem_alloc(addr_len);
ut_memcpy(sess->addr_buf, addr_buf, addr_len);
sess->addr_len = addr_len;
return(sess); return(sess);
} }
...@@ -288,23 +55,11 @@ sess_close( ...@@ -288,23 +55,11 @@ sess_close(
/*=======*/ /*=======*/
sess_t* sess) /* in, own: session object */ sess_t* sess) /* in, own: session object */
{ {
ulint fold; #ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
ut_ad(sess->disconnecting); #endif /* UNIV_SYNC_DEBUG */
ut_ad(sess->trx == NULL); ut_ad(sess->trx == NULL);
ut_ad(sess->refer_count == 0);
fold = ut_fold_dulint(sess->id);
HASH_DELETE(sess_t, hash, sess_sys->hash, fold, sess);
/* sess_reply_to_client_rel_kernel(sess); */
if (sess->err_str != NULL) {
mem_free(sess->err_str);
}
mem_free(sess->addr_buf);
mem_free(sess); mem_free(sess);
} }
...@@ -318,10 +73,10 @@ sess_try_close( ...@@ -318,10 +73,10 @@ sess_try_close(
/* out: TRUE if closed */ /* out: TRUE if closed */
sess_t* sess) /* in, own: session object */ sess_t* sess) /* in, own: session object */
{ {
#ifdef UNIV_SYNC_DEBUG
ut_ad(mutex_own(&kernel_mutex)); ut_ad(mutex_own(&kernel_mutex));
#endif /* UNIV_SYNC_DEBUG */
if (sess->disconnecting && (UT_LIST_GET_LEN(sess->graphs) == 0) if (UT_LIST_GET_LEN(sess->graphs) == 0) {
&& (sess->refer_count == 0)) {
sess_close(sess); sess_close(sess);
return(TRUE); return(TRUE);
...@@ -329,258 +84,3 @@ sess_try_close( ...@@ -329,258 +84,3 @@ sess_try_close(
return(FALSE); return(FALSE);
} }
/*************************************************************************
Initializes the first fields of a message to client. */
void
sess_srv_msg_init(
/*==============*/
sess_t* sess, /* in: session object */
byte* buf, /* in: message buffer, must be at least of size
SESS_SRV_MSG_DATA */
ulint type) /* in: message type */
{
ut_ad(mutex_own(&kernel_mutex));
sess->msgs_sent = ut_dulint_add(sess->msgs_sent, 1);
mach_write_to_8(buf + SESS_SRV_MSG_SESS_ID, sess->id);
mach_write_to_4(buf + SESS_SRV_MSG_TYPE, type);
mach_write_to_8(buf + SESS_SRV_MSG_NO, sess->msgs_sent);
ut_ad(com_endpoint_get_max_size(sess->endpoint) >= SESS_SRV_MSG_DATA);
}
/*************************************************************************
Sends a message to the client. */
static
ulint
sess_srv_msg_send_low(
/*==================*/
/* out: 0 if success, else error number */
sess_t* sess, /* in: session object */
byte* buf, /* in: message buffer */
ulint len, /* in: message length */
ulint rel_ker)/* in: SESS_RELEASE_KERNEL if the kernel mutex should
be temporarily released in the call; otherwise
SESS_NOT_RELEASE_KERNEL */
{
ulint ret;
ut_ad((rel_ker == SESS_NOT_RELEASE_KERNEL)
|| (rel_ker == SESS_RELEASE_KERNEL));
ut_ad(mutex_own(&kernel_mutex));
ut_ad(len <= com_endpoint_get_max_size(sess->endpoint));
ut_ad(len >= SESS_SRV_MSG_DATA);
if (sess->client_waits == FALSE) {
sess_error_low(sess, SESS_ERR_EXTRANEOUS_SRV_MSG, NULL);
return(1);
}
/* The client will now receive an error message: if the session is
in the error state, we can reset it to the normal state */
if (sess->state == SESS_ERROR) {
sess->state = SESS_ACTIVE;
}
/* We reset the client_waits flag to FALSE, regardless of whether the
message gets delivered to the client or not. This convention makes
things simpler. */
sess->client_waits = FALSE;
if (rel_ker == SESS_RELEASE_KERNEL) {
mutex_exit(&kernel_mutex);
}
ret = com_sendto(sess->endpoint, buf, len, sess->addr_buf,
sess->addr_len);
if (rel_ker == SESS_RELEASE_KERNEL) {
mutex_enter(&kernel_mutex);
}
if (ret != 0) {
sess_error_low(sess, SESS_ERR_REPLY_FAILED, NULL);
}
return(ret);
}
/*************************************************************************
Sends a message to the client. If the session is in the error state, sends
the error message instead of buf. */
static
ulint
sess_srv_msg_send(
/*==============*/
/* out: 0 if success, else error number */
sess_t* sess, /* in: session object */
byte* buf, /* in: message buffer */
ulint len, /* in: message length */
ulint rel_ker)/* in: SESS_RELEASE_KERNEL if the kernel mutex should
be temporarily released in the call; otherwise
SESS_NOT_RELEASE_KERNEL */
{
ulint ret;
ut_ad(mutex_own(&kernel_mutex));
if (sess->state == SESS_ERROR) {
sess_srv_msg_send_error(sess);
return(2);
}
ret = sess_srv_msg_send_low(sess, buf, len, rel_ker);
return(ret);
}
/*************************************************************************
Sends a simple message to client. */
void
sess_srv_msg_send_simple(
/*=====================*/
sess_t* sess, /* in: session object */
ulint type, /* in: message type */
ulint rel_kernel) /* in: SESS_RELEASE_KERNEL or
SESS_NOT_RELEASE_KERNEL */
{
byte buf[SESS_SRV_MSG_DATA];
ut_ad(mutex_own(&kernel_mutex));
sess_srv_msg_init(sess, buf, type);
sess_srv_msg_send(sess, buf, SESS_SRV_MSG_DATA, rel_kernel);
}
/*************************************************************************
Communicates an error message to the client. If sess->client_waits is not
TRUE, puts the session to error state and does not try to send the error
message. */
static
void
sess_srv_msg_send_error(
/*====================*/
sess_t* sess) /* in: session object */
{
ulint err_no;
byte* err_str;
ulint err_len;
ulint max_len;
byte buf[SESS_ERR_BUF_SIZE];
ulint ret;
ut_ad(sess->client_waits);
ut_ad(mutex_own(&kernel_mutex));
ut_ad(sess->state == SESS_ERROR);
ut_ad(!UT_LIST_GET_FIRST((sess->trx)->signals));
if (!sess->client_waits) {
/* Cannot send the error message now: leave the session to
the error state and send it later */
return;
}
err_no = sess->err_no;
err_str = (byte*)sess->err_str;
err_len = sess->err_len;
max_len = ut_min(SESS_ERR_BUF_SIZE,
com_endpoint_get_max_size(sess->endpoint));
sess_srv_msg_init(sess, buf, SESS_SRV_ERROR);
if (err_len + SESS_SRV_MSG_DATA > max_len) {
err_len = max_len - SESS_SRV_MSG_DATA;
}
ut_memcpy(buf + SESS_SRV_MSG_DATA, err_str, err_len);
ret = sess_srv_msg_send_low(sess, buf, SESS_SRV_MSG_DATA + err_len,
SESS_NOT_RELEASE_KERNEL);
}
/*************************************************************************
Copies error info to a session. Sends to the transaction a signal which will
rollback the latest incomplete SQL statement and then send the error message
to the client. NOTE: This function will take care of the freeing of the error
string, thus the caller must supply a copy of the error string. */
static
void
sess_error_low(
/*===========*/
sess_t* sess, /* in: session object */
ulint err_no, /* in: error number */
char* err_str)/* in, own: error string or NULL;
NOTE: the function will take care of freeing of the
string! */
{
ut_ad(mutex_own(&kernel_mutex));
UT_DULINT_INC(sess->error_count);
printf("Error string::: %s\n", err_str);
if (sess->state == SESS_ERROR) {
/* Ignore the error because the session is already in the
error state */
if (err_str) {
mem_free(err_str);
}
return;
}
sess->err_no = err_no;
if (sess->err_str) {
mem_free(sess->err_str);
}
sess->err_str = err_str;
sess->err_len = ut_strlen(err_str);
sess->state = SESS_ERROR;
if (sess->big_msg) {
mem_free(sess->big_msg);
}
/* Send a signal which will roll back the latest incomplete SQL
statement: the error message will be sent to the client by the error
handling mechanism after the rollback is completed. */
trx_sig_send(sess->trx, TRX_SIG_ERROR_OCCURRED, TRX_SIG_SELF, FALSE,
NULL, NULL, NULL);
}
/***************************************************************************
When a command has been completed, this function sends the message about it
to the client. */
void
sess_command_completed_message(
/*===========================*/
sess_t* sess, /* in: session */
byte* msg, /* in: message buffer */
ulint len) /* in: message data length */
{
mutex_enter(&kernel_mutex);
sess_srv_msg_send(sess, msg, SESS_SRV_MSG_DATA + len,
SESS_RELEASE_KERNEL);
mutex_exit(&kernel_mutex);
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment