/* Copyright (C) 2000 MySQL AB & MySQL Finland AB & TCX DataKonsult AB

   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */


/*****************************************************************************
**
** This file implements classes defined in sql_class.h
** Especially the classes to handle a result from a select
**
*****************************************************************************/

#ifdef __GNUC__
#pragma implementation				// gcc: Class implementation
#endif

#include "mysql_priv.h"
#include "sql_acl.h"
#include <m_ctype.h>
#include <sys/stat.h>
#include <thr_alarm.h>
#ifdef	__WIN__
#include <io.h>
#endif
#include <mysys_err.h>
#include <assert.h>

/*****************************************************************************
** Instansiate templates
*****************************************************************************/

#ifdef __GNUC__
/* Used templates */
template class List<Key>;
template class List_iterator<Key>;
template class List<key_part_spec>;
template class List_iterator<key_part_spec>;
template class List<Alter_drop>;
template class List_iterator<Alter_drop>;
template class List<Alter_column>;
template class List_iterator<Alter_column>;
template class List<Set_option>;
template class List_iterator<Set_option>;
#endif

/****************************************************************************
** User variables
****************************************************************************/

static byte* get_var_key(user_var_entry *entry, uint *length,
			 my_bool not_used __attribute__((unused)))
{
  *length=(uint) entry->name.length;
  return (byte*) entry->name.str;
}

static void free_var(user_var_entry *entry)
{
  char *pos= (char*) entry+ALIGN_SIZE(sizeof(*entry));
  if (entry->value && entry->value != pos)
    my_free(entry->value, MYF(0));
  my_free((char*) entry,MYF(0));
}


/****************************************************************************
** Thread specific functions
****************************************************************************/

THD::THD():user_time(0),fatal_error(0),last_insert_id_used(0),
	   insert_id_used(0),in_lock_tables(0),
	   global_read_lock(0),bootstrap(0)
{
  host=user=priv_user=db=query=ip=0;
  host_or_ip="unknown ip";
  locked=killed=count_cuted_fields=some_tables_deleted=no_errors=password=
    query_start_used=safe_to_cache_query=0;
  db_length=query_length=col_access=0;
  query_error=0;
  next_insert_id=last_insert_id=0;
  open_tables=temporary_tables=handler_tables=0;
  handler_items=0;
  tmp_table=0;
  lock=locked_tables=0;
  used_tables=0;
  cuted_fields=sent_row_count=0L;
  start_time=(time_t) 0;
  current_linfo =  0;
  slave_thread = 0;
  slave_proxy_id = 0;
  file_id = 0;
  cond_count=0;
  convert_set=0;
  mysys_var=0;
#ifndef DBUG_OFF
  dbug_sentry=THD_SENTRY_MAGIC;
#endif  
  net.vio=0;
  ull=0;
  system_thread=cleanup_done=0;
  transaction.changed_tables = 0;
#ifdef	__WIN__
  real_id = 0;
#endif
#ifdef SIGNAL_WITH_VIO_CLOSE
  active_vio = 0;
  pthread_mutex_init(&active_vio_lock, MY_MUTEX_INIT_FAST);
#endif  

  /* Variables with default values */
  proc_info="login";
  where="field list";
  server_id = ::server_id;
  slave_net = 0;
  log_pos = 0;
  server_status= SERVER_STATUS_AUTOCOMMIT;
  update_lock_default= low_priority_updates ? TL_WRITE_LOW_PRIORITY : TL_WRITE;
  options= thd_startup_options;
#ifdef HAVE_QUERY_CACHE
  query_cache_type= (byte) query_cache_startup_type;
#else
  query_cache_type= 0; //Safety
#endif
  sql_mode=(uint) opt_sql_mode;
  inactive_timeout=net_wait_timeout;
  open_options=ha_open_options;
  tx_isolation=session_tx_isolation=default_tx_isolation;
  command=COM_CONNECT;
  set_query_id=1;
  default_select_limit= HA_POS_ERROR;
  max_join_size=  ((::max_join_size != ~ (ulong) 0L) ? ::max_join_size :
		   HA_POS_ERROR);
  db_access=NO_ACCESS;

  /* Initialize sub structures */
  bzero((char*) &mem_root,sizeof(mem_root));
  bzero((char*) &transaction.mem_root,sizeof(transaction.mem_root));
  user_connect=(UC *)0;
  hash_init(&user_vars, system_charset_info, USER_VARS_HASH_SIZE, 0, 0,
	    (hash_get_key) get_var_key,
	    (void (*)(void*)) free_var,0);
#ifdef USING_TRANSACTIONS
  bzero((char*) &transaction,sizeof(transaction));
  if (opt_using_transactions)
  {
    if (open_cached_file(&transaction.trans_log,
			 mysql_tmpdir, LOG_PREFIX, binlog_cache_size,
			 MYF(MY_WME)))
      killed=1;
    transaction.trans_log.end_of_file= max_binlog_cache_size;
  }
#endif
}

/* Do operations that may take a long time */

void THD::cleanup(void)
{
  DBUG_ENTER("THD::cleanup");
  ha_rollback(this);
  if (locked_tables)
  {
    lock=locked_tables; locked_tables=0;
    close_thread_tables(this);
  }
  if (handler_tables)
  {
    open_tables=handler_tables; handler_tables=0;
    close_thread_tables(this);
  }
  close_temporary_tables(this);
#ifdef USING_TRANSACTIONS
  if (opt_using_transactions)
  {
    close_cached_file(&transaction.trans_log);
    ha_close_connection(this);
  }
#endif
  cleanup_done=1;
  DBUG_VOID_RETURN;
}

THD::~THD()
{
  THD_CHECK_SENTRY(this);
  DBUG_ENTER("~THD()");
  /* Close connection */
  if (net.vio)
  {
    vio_delete(net.vio);
    net_end(&net); 
  }
  if (!cleanup_done)
    cleanup();
  if (global_read_lock)
    unlock_global_read_lock(this);
  if (ull)
  {
    pthread_mutex_lock(&LOCK_user_locks);
    item_user_lock_release(ull);
    pthread_mutex_unlock(&LOCK_user_locks);
  }
  hash_free(&user_vars);

  DBUG_PRINT("info", ("freeing host"));

  if (host != localhost)			// If not pointer to constant
    safeFree(host);
  if (user != delayed_user)
    safeFree(user);
  safeFree(db);
  safeFree(ip);
  free_root(&mem_root,MYF(0));
  free_root(&transaction.mem_root,MYF(0));
  mysys_var=0;					// Safety (shouldn't be needed)
#ifdef SIGNAL_WITH_VIO_CLOSE
  pthread_mutex_destroy(&active_vio_lock);
#endif
#ifndef DBUG_OFF
  dbug_sentry = THD_SENTRY_GONE;
#endif  
  DBUG_VOID_RETURN;
}

void THD::awake(bool prepare_to_die)
{
  THD_CHECK_SENTRY(this);
  if (prepare_to_die)
    killed = 1;
  thr_alarm_kill(real_id);
#ifdef SIGNAL_WITH_VIO_CLOSE
  close_active_vio();
#endif    
  if (mysys_var)
    {
      pthread_mutex_lock(&mysys_var->mutex);
      if (!system_thread)		// Don't abort locks
	mysys_var->abort=1;
      // this broadcast could be up in the air if the victim thread
      // exits the cond in the time between read and broadcast, but that is
      // ok since all we want to do is to make the victim thread get out
      // of waiting on  current_cond
      if (mysys_var->current_cond)
      {
	pthread_mutex_lock(mysys_var->current_mutex);
	pthread_cond_broadcast(mysys_var->current_cond);
	pthread_mutex_unlock(mysys_var->current_mutex);
      }
      pthread_mutex_unlock(&mysys_var->mutex);
    }
}

// remember the location of thread info, the structure needed for
// sql_alloc() and the structure for the net buffer

bool THD::store_globals()
{
  return (my_pthread_setspecific_ptr(THR_THD,  this) ||
	  my_pthread_setspecific_ptr(THR_MALLOC, &mem_root) ||
	  my_pthread_setspecific_ptr(THR_NET,  &net));
}

/* routings to adding tables to list of changed in transaction tables */

inline static void list_include(CHANGED_TABLE_LIST** prev,
				CHANGED_TABLE_LIST* curr,
				CHANGED_TABLE_LIST* new_table)
{
  if (new_table)
  {
    *prev = new_table;
    (*prev)->next = curr;
  }
}

/* add table to list of changed in transaction tables */
void THD::add_changed_table(TABLE *table)
{
  DBUG_ENTER("THD::add_changed_table (table)");

  DBUG_ASSERT((options & (OPTION_NOT_AUTO_COMMIT | OPTION_BEGIN)) &&
		table->file->has_transactions());

  CHANGED_TABLE_LIST** prev = &transaction.changed_tables;
  CHANGED_TABLE_LIST* curr = transaction.changed_tables;

  for(; curr; prev = &(curr->next), curr = curr->next)
  {
    int cmp =  (long)curr->key_length - (long)table->key_length;
    if (cmp < 0)
    {
      list_include(prev, curr, changed_table_dup(table));
      DBUG_PRINT("info", 
		 ("key_length %u %u", table->key_length, (*prev)->key_length));
      DBUG_VOID_RETURN;
    }
    else if (cmp == 0)
    {
      cmp = memcmp(curr->key ,table->table_cache_key, curr->key_length);
      if (cmp < 0)
      {
	list_include(prev, curr, changed_table_dup(table));
	DBUG_PRINT("info", 
		   ("key_length %u %u", table->key_length, (*prev)->key_length));
	DBUG_VOID_RETURN;
      }
      else if (cmp == 0)
      {
	DBUG_PRINT("info", ("already in list"));
	DBUG_VOID_RETURN;
      }
    }
  }
  *prev = changed_table_dup(table);
  DBUG_PRINT("info", ("key_length %u %u", table->key_length, (*prev)->key_length));
  DBUG_VOID_RETURN;
}

CHANGED_TABLE_LIST* THD::changed_table_dup(TABLE *table)
{
  CHANGED_TABLE_LIST* new_table = 
    (CHANGED_TABLE_LIST*) trans_alloc(ALIGN_SIZE(sizeof(CHANGED_TABLE_LIST))+
				      table->key_length + 1);
  if (!new_table)
  {
    my_error(EE_OUTOFMEMORY, MYF(ME_BELL),
	     ALIGN_SIZE(sizeof(TABLE_LIST)) + table->key_length + 1);
    killed= 1;
    return 0;
  }

  new_table->key = (char *) (((byte*)new_table)+
			     ALIGN_SIZE(sizeof(CHANGED_TABLE_LIST)));
  new_table->next = 0;
  new_table->key_length = table->key_length;
  uint32 db_len = ((new_table->table_name =
		    ::strmake(new_table->key, table->table_cache_key, 
			      table->key_length) + 1) - new_table->key);
  ::memcpy(new_table->key + db_len, table->table_cache_key + db_len,
	   table->key_length - db_len);
  return new_table;
}


/*****************************************************************************
** Functions to provide a interface to select results
*****************************************************************************/

select_result::select_result()
{
  thd=current_thd;
}

static String default_line_term("\n"),default_escaped("\\"),
	      default_field_term("\t");

sql_exchange::sql_exchange(char *name,bool flag)
  :file_name(name), opt_enclosed(0), dumpfile(flag), skip_lines(0)
{
  field_term= &default_field_term;
  enclosed=   line_start= &empty_string;
  line_term=  &default_line_term;
  escaped=    &default_escaped;
}

bool select_send::send_fields(List<Item> &list,uint flag)
{
  return ::send_fields(thd,list,flag);
}


/* Send data to client. Returns 0 if ok */

bool select_send::send_data(List<Item> &items)
{
  List_iterator_fast<Item> li(items);
  String *packet= &thd->packet;
  DBUG_ENTER("send_data");

  if (unit->offset_limit_cnt)
  {						// using limit offset,count
    unit->offset_limit_cnt--;
    DBUG_RETURN(0);
  }
  packet->length(0);				// Reset packet
  Item *item;
  while ((item=li++))
  {
    if (item->send(thd, packet))
    {
      packet->free();				// Free used
      my_error(ER_OUT_OF_RESOURCES,MYF(0));
      DBUG_RETURN(1);
    }
  }
  thd->sent_row_count++;
  bool error=my_net_write(&thd->net,(char*) packet->ptr(),packet->length());
  DBUG_RETURN(error);
}

bool select_send::send_eof()
{
  /* Unlock tables before sending packet to gain some speed */
  if (thd->lock)
  {
    mysql_unlock_tables(thd, thd->lock); thd->lock=0;
  }
  ::send_eof(&thd->net);
  return 0;
}


/***************************************************************************
** Export of select to textfile
***************************************************************************/


select_export::~select_export()
{
  if (file >= 0)
  {					// This only happens in case of error
    (void) end_io_cache(&cache);
    (void) my_close(file,MYF(0));
    file= -1;
  }
  thd->sent_row_count=row_count;
}

int
select_export::prepare(List<Item> &list, SELECT_LEX_UNIT *u)
{
  char path[FN_REFLEN];
  uint option=4;
  bool blob_flag=0;
  unit= u;
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
  option|=1;					// Force use of db directory
#endif
  if ((uint) strlen(exchange->file_name) + NAME_LEN >= FN_REFLEN)
    strmake(path,exchange->file_name,FN_REFLEN-1);
  (void) fn_format(path,exchange->file_name, thd->db ? thd->db : "", "",
		   option);
  if (!access(path,F_OK))
  {
    my_error(ER_FILE_EXISTS_ERROR,MYF(0),exchange->file_name);
    return 1;
  }
  /* Create the file world readable */
  if ((file=my_create(path, 0666, O_WRONLY, MYF(MY_WME))) < 0)
    return 1;
#ifdef HAVE_FCHMOD
  (void) fchmod(file,0666);			// Because of umask()
#else
  (void) chmod(path,0666);
#endif
  if (init_io_cache(&cache,file,0L,WRITE_CACHE,0L,1,MYF(MY_WME)))
  {
    my_close(file,MYF(0));
    file= -1;
    return 1;
  }
  /* Check if there is any blobs in data */
  {
    List_iterator_fast<Item> li(list);
    Item *item;
    while ((item=li++))
    {
      if (item->max_length >= MAX_BLOB_WIDTH)
      {
	blob_flag=1;
	break;
      }
    }
  }
  field_term_length=exchange->field_term->length();
  if (!exchange->line_term->length())
    exchange->line_term=exchange->field_term;	// Use this if it exists
  field_sep_char= (exchange->enclosed->length() ? (*exchange->enclosed)[0] :
		   field_term_length ? (*exchange->field_term)[0] : INT_MAX);
  escape_char=	(exchange->escaped->length() ? (*exchange->escaped)[0] : -1);
  line_sep_char= (exchange->line_term->length() ?
		  (*exchange->line_term)[0] : INT_MAX);
  if (!field_term_length)
    exchange->opt_enclosed=0;
  if (!exchange->enclosed->length())
    exchange->opt_enclosed=1;			// A little quicker loop
  fixed_row_size= (!field_term_length && !exchange->enclosed->length() &&
		   !blob_flag);
  return 0;
}


bool select_export::send_data(List<Item> &items)
{

  DBUG_ENTER("send_data");
  char buff[MAX_FIELD_WIDTH],null_buff[2],space[MAX_FIELD_WIDTH];
  bool space_inited=0;
  String tmp(buff,sizeof(buff)),*res;
  tmp.length(0);

  if (unit->offset_limit_cnt)
  {						// using limit offset,count
    unit->offset_limit_cnt--;
    DBUG_RETURN(0);
  }
  row_count++;
  Item *item;
  char *buff_ptr=buff;
  uint used_length=0,items_left=items.elements;
  List_iterator_fast<Item> li(items);

  if (my_b_write(&cache,(byte*) exchange->line_start->ptr(),
		 exchange->line_start->length()))
    goto err;
  while ((item=li++))
  {
    Item_result result_type=item->result_type();
    res=item->str_result(&tmp);
    if (res && (!exchange->opt_enclosed || result_type == STRING_RESULT))
    {
      if (my_b_write(&cache,(byte*) exchange->enclosed->ptr(),
		     exchange->enclosed->length()))
	goto err;
    }
    if (!res)
    {						// NULL
      if (!fixed_row_size)
      {
	if (escape_char != -1)			// Use \N syntax
	{
	  null_buff[0]=escape_char;
	  null_buff[1]='N';
	  if (my_b_write(&cache,(byte*) null_buff,2))
	    goto err;
	}
	else if (my_b_write(&cache,(byte*) "NULL",4))
	  goto err;
      }
      else
      {
	used_length=0;				// Fill with space
      }
    }
    else
    {
      if (fixed_row_size)
	used_length=min(res->length(),item->max_length);
      else
	used_length=res->length();
      if (result_type == STRING_RESULT && escape_char != -1)
      {
	char *pos,*start,*end;

	for (start=pos=(char*) res->ptr(),end=pos+used_length ;
	     pos != end ;
	     pos++)
	{
#ifdef USE_MB
	  if (use_mb(default_charset_info))
	  {
	    int l;
	    if ((l=my_ismbchar(default_charset_info, pos, end)))
	    {
	      pos += l-1;
	      continue;
	    }
	  }
#endif
	  if ((int) *pos == escape_char || (int) *pos == field_sep_char ||
	      (int) *pos == line_sep_char || !*pos)
	  {
	    char tmp_buff[2];
	    tmp_buff[0]= escape_char;
	    tmp_buff[1]= *pos ? *pos : '0';
	    if (my_b_write(&cache,(byte*) start,(uint) (pos-start)) ||
		my_b_write(&cache,(byte*) tmp_buff,2))
	      goto err;
	    start=pos+1;
	  }
	}
	if (my_b_write(&cache,(byte*) start,(uint) (pos-start)))
	  goto err;
      }
      else if (my_b_write(&cache,(byte*) res->ptr(),used_length))
	goto err;
    }
    if (fixed_row_size)
    {						// Fill with space
      if (item->max_length > used_length)
      {
	/* QQ:  Fix by adding a my_b_fill() function */
	if (!space_inited)
	{
	  space_inited=1;
	  bfill(space,sizeof(space),' ');
	}
	uint length=item->max_length-used_length;
	for ( ; length > sizeof(space) ; length-=sizeof(space))
	{
	  if (my_b_write(&cache,(byte*) space,sizeof(space)))
	    goto err;
	}
	if (my_b_write(&cache,(byte*) space,length))
	  goto err;
      }
    }
    buff_ptr=buff;				// Place separators here
    if (res && (!exchange->opt_enclosed || result_type == STRING_RESULT))
    {
      memcpy(buff_ptr,exchange->enclosed->ptr(),exchange->enclosed->length());
      buff_ptr+=exchange->enclosed->length();
    }
    if (--items_left)
    {
      memcpy(buff_ptr,exchange->field_term->ptr(),field_term_length);
      buff_ptr+=field_term_length;
    }
    if (my_b_write(&cache,(byte*) buff,(uint) (buff_ptr-buff)))
      goto err;
  }
  if (my_b_write(&cache,(byte*) exchange->line_term->ptr(),
		 exchange->line_term->length()))
    goto err;
  DBUG_RETURN(0);
err:
  DBUG_RETURN(1);
}


void select_export::send_error(uint errcode,const char *err)
{
  ::send_error(&thd->net,errcode,err);
  (void) end_io_cache(&cache);
  (void) my_close(file,MYF(0));
  file= -1;
}


bool select_export::send_eof()
{
  int error=test(end_io_cache(&cache));
  if (my_close(file,MYF(MY_WME)))
    error=1;
  if (error)
    ::send_error(&thd->net);
  else
    ::send_ok(&thd->net,row_count);
  file= -1;
  return error;
}


/***************************************************************************
** Dump  of select to a binary file
***************************************************************************/


select_dump::~select_dump()
{
  if (file >= 0)
  {					// This only happens in case of error
    (void) end_io_cache(&cache);
    (void) my_close(file,MYF(0));
    file= -1;
  }
}

int
select_dump::prepare(List<Item> &list __attribute__((unused)),
		     SELECT_LEX_UNIT *u)
{
  uint option=4;
  unit= u;
#ifdef DONT_ALLOW_FULL_LOAD_DATA_PATHS
  option|=1;					// Force use of db directory
#endif
  (void) fn_format(path,exchange->file_name, thd->db ? thd->db : "", "",
		   option);
  if (!access(path,F_OK))
  {
    my_error(ER_FILE_EXISTS_ERROR,MYF(0),exchange->file_name);
    return 1;
  }
  /* Create the file world readable */
  if ((file=my_create(path, 0666, O_WRONLY, MYF(MY_WME))) < 0)
    return 1;
#ifdef HAVE_FCHMOD
  (void) fchmod(file,0666);			// Because of umask()
#else
  (void) chmod(path,0666);
#endif
  if (init_io_cache(&cache,file,0L,WRITE_CACHE,0L,1,MYF(MY_WME)))
  {
    my_close(file,MYF(0));
    my_delete(path,MYF(0));
    file= -1;
    return 1;
  }
  return 0;
}


bool select_dump::send_data(List<Item> &items)
{
  List_iterator_fast<Item> li(items);
  char buff[MAX_FIELD_WIDTH];
  String tmp(buff,sizeof(buff)),*res;
  tmp.length(0);
  Item *item;
  DBUG_ENTER("send_data");

  if (unit->offset_limit_cnt)
  {						// using limit offset,count
    unit->offset_limit_cnt--;
    DBUG_RETURN(0);
  }
  if (row_count++ > 1) 
  {
    my_error(ER_TOO_MANY_ROWS,MYF(0));
    goto err;
  }
  while ((item=li++))
  {
    res=item->str_result(&tmp);
    if (!res)					// If NULL
    {
      if (my_b_write(&cache,(byte*) "",1))
	goto err;
    }
    else if (my_b_write(&cache,(byte*) res->ptr(),res->length()))
    {
      my_error(ER_ERROR_ON_WRITE,MYF(0), path, my_errno);
      goto err;
    }
  }
  DBUG_RETURN(0);
err:
  DBUG_RETURN(1);
}


void select_dump::send_error(uint errcode,const char *err)
{
  ::send_error(&thd->net,errcode,err);
  (void) end_io_cache(&cache);
  (void) my_close(file,MYF(0));
  (void) my_delete(path,MYF(0));		// Delete file on error
  file= -1;
}


bool select_dump::send_eof()
{
  int error=test(end_io_cache(&cache));
  if (my_close(file,MYF(MY_WME)))
    error=1;
  if (error)
    ::send_error(&thd->net);
  else
    ::send_ok(&thd->net,row_count);
  file= -1;
  return error;
}