Commit ae6c5d0d authored by Michael Widenius's avatar Michael Widenius

Fix for LP#571200 MySQL Bug#32426: FederatedX corrupt ORDER BY with TEXT

Patch taken from lp:~capttofu/maria/bug_571200 (originally for MariaDB 5.3, but adapted for 5.1)

parent 6bbb0b57
...@@ -213,7 +213,7 @@ id name ...@@ -213,7 +213,7 @@ id name
alter server s1 options (database 'db_bogus'); alter server s1 options (database 'db_bogus');
flush tables; flush tables;
select * from federated.t1; select * from federated.t1;
ERROR 42000: Got error: 1044 : Access denied for user 'test_fed'@'localhost' to database 'db_bogus' ERROR 42000: Received error: 1044 : Access denied for user 'test_fed'@'localhost' to database 'db_bogus'
drop server if exists 's1'; drop server if exists 's1';
ERROR 42000: Access denied; you need the SUPER privilege for this operation ERROR 42000: Access denied; you need the SUPER privilege for this operation
create server 's1' foreign data wrapper 'mysql' options create server 's1' foreign data wrapper 'mysql' options
......
/* /*
Copyright (c) 2007, Antony T Curtis Copyright (c) 2007, Antony T Curtis
All rights reserved. All rights reserved.
...@@ -51,6 +51,12 @@ typedef struct federatedx_savepoint ...@@ -51,6 +51,12 @@ typedef struct federatedx_savepoint
uint flags; uint flags;
} SAVEPT; } SAVEPT;
struct mysql_position
{
MYSQL_RES* result;
MYSQL_ROW_OFFSET offset;
};
class federatedx_io_mysql :public federatedx_io class federatedx_io_mysql :public federatedx_io
{ {
...@@ -76,16 +82,16 @@ class federatedx_io_mysql :public federatedx_io ...@@ -76,16 +82,16 @@ class federatedx_io_mysql :public federatedx_io
virtual int error_code(); virtual int error_code();
virtual const char *error_str(); virtual const char *error_str();
void reset(); void reset();
int commit(); int commit();
int rollback(); int rollback();
int savepoint_set(ulong sp); int savepoint_set(ulong sp);
ulong savepoint_release(ulong sp); ulong savepoint_release(ulong sp);
ulong savepoint_rollback(ulong sp); ulong savepoint_rollback(ulong sp);
void savepoint_restrict(ulong sp); void savepoint_restrict(ulong sp);
ulong last_savepoint() const; ulong last_savepoint() const;
ulong actual_savepoint() const; ulong actual_savepoint() const;
bool is_autocommit() const; bool is_autocommit() const;
...@@ -94,7 +100,7 @@ class federatedx_io_mysql :public federatedx_io ...@@ -94,7 +100,7 @@ class federatedx_io_mysql :public federatedx_io
uint table_name_length, uint flag); uint table_name_length, uint flag);
/* resultset operations */ /* resultset operations */
virtual void free_result(FEDERATEDX_IO_RESULT *io_result); virtual void free_result(FEDERATEDX_IO_RESULT *io_result);
virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result); virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result);
virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result); virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result);
...@@ -104,6 +110,12 @@ class federatedx_io_mysql :public federatedx_io ...@@ -104,6 +110,12 @@ class federatedx_io_mysql :public federatedx_io
unsigned int column); unsigned int column);
virtual bool is_column_null(const FEDERATEDX_IO_ROW *row, virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
unsigned int column) const; unsigned int column) const;
virtual size_t get_ref_length() const;
virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
void *ref);
virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
const void *ref);
}; };
...@@ -466,14 +478,13 @@ const char *federatedx_io_mysql::error_str() ...@@ -466,14 +478,13 @@ const char *federatedx_io_mysql::error_str()
return mysql_error(&mysql); return mysql_error(&mysql);
} }
FEDERATEDX_IO_RESULT *federatedx_io_mysql::store_result() FEDERATEDX_IO_RESULT *federatedx_io_mysql::store_result()
{ {
FEDERATEDX_IO_RESULT *result; FEDERATEDX_IO_RESULT *result;
DBUG_ENTER("federatedx_io_mysql::store_result"); DBUG_ENTER("federatedx_io_mysql::store_result");
result= (FEDERATEDX_IO_RESULT *) mysql_store_result(&mysql); result= (FEDERATEDX_IO_RESULT *) mysql_store_result(&mysql);
DBUG_RETURN(result); DBUG_RETURN(result);
} }
...@@ -590,3 +601,45 @@ bool federatedx_io_mysql::table_metadata(ha_statistics *stats, ...@@ -590,3 +601,45 @@ bool federatedx_io_mysql::table_metadata(ha_statistics *stats,
free_result(result); free_result(result);
return 1; return 1;
} }
size_t federatedx_io_mysql::get_ref_length() const
{
return sizeof(mysql_position);
}
void federatedx_io_mysql::mark_position(FEDERATEDX_IO_RESULT *io_result,
void *ref)
{
MYSQL_ROWS *tmp= 0;
mysql_position& pos= *reinterpret_cast<mysql_position*>(ref);
pos.result= (MYSQL_RES *) io_result;
if (pos.result && pos.result->data)
{
for (tmp= pos.result->data->data;
tmp && (tmp->next != pos.result->data_cursor);
tmp= tmp->next)
{}
}
pos.offset= tmp;
}
int federatedx_io_mysql::seek_position(FEDERATEDX_IO_RESULT **io_result,
const void *ref)
{
const mysql_position& pos= *reinterpret_cast<const mysql_position*>(ref);
if (!pos.result || !pos.offset)
return HA_ERR_END_OF_FILE;
pos.result->current_row= 0;
pos.result->data_cursor= pos.offset;
*io_result= (FEDERATEDX_IO_RESULT*) pos.result;
return 0;
}
...@@ -96,6 +96,11 @@ class federatedx_io_null :public federatedx_io ...@@ -96,6 +96,11 @@ class federatedx_io_null :public federatedx_io
unsigned int column); unsigned int column);
virtual bool is_column_null(const FEDERATEDX_IO_ROW *row, virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
unsigned int column) const; unsigned int column) const;
virtual size_t get_ref_length() const;
virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
void *ref);
virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
const void *ref);
}; };
...@@ -275,3 +280,20 @@ bool federatedx_io_null::table_metadata(ha_statistics *stats, ...@@ -275,3 +280,20 @@ bool federatedx_io_null::table_metadata(ha_statistics *stats,
return 0; return 0;
} }
size_t federatedx_io_null::get_ref_length() const
{
return sizeof(int);
}
void federatedx_io_null::mark_position(FEDERATEDX_IO_RESULT *io_result,
void *ref)
{
}
int federatedx_io_null::seek_position(FEDERATEDX_IO_RESULT **io_result,
const void *ref)
{
return 0;
}
...@@ -1717,14 +1717,14 @@ federatedx_txn *ha_federatedx::get_txn(THD *thd, bool no_create) ...@@ -1717,14 +1717,14 @@ federatedx_txn *ha_federatedx::get_txn(THD *thd, bool no_create)
return *txnp; return *txnp;
} }
int ha_federatedx::disconnect(handlerton *hton, MYSQL_THD thd) int ha_federatedx::disconnect(handlerton *hton, MYSQL_THD thd)
{ {
federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton); federatedx_txn *txn= (federatedx_txn *) thd_get_ha_data(thd, hton);
delete txn; delete txn;
return 0; return 0;
} }
/* /*
Used for opening tables. The name will be the name of the file. Used for opening tables. The name will be the name of the file.
...@@ -1756,14 +1756,15 @@ int ha_federatedx::open(const char *name, int mode, uint test_if_locked) ...@@ -1756,14 +1756,15 @@ int ha_federatedx::open(const char *name, int mode, uint test_if_locked)
free_share(txn, share); free_share(txn, share);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
ref_length= io->get_ref_length();
txn->release(&io); txn->release(&io);
ref_length= (table->s->primary_key != MAX_KEY ?
table->key_info[table->s->primary_key].key_length :
table->s->reclength);
DBUG_PRINT("info", ("ref_length: %u", ref_length)); DBUG_PRINT("info", ("ref_length: %u", ref_length));
my_init_dynamic_array(&results, sizeof(FEDERATEDX_IO_RESULT*), 4, 4);
reset(); reset();
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -1788,8 +1789,7 @@ int ha_federatedx::close(void) ...@@ -1788,8 +1789,7 @@ int ha_federatedx::close(void)
DBUG_ENTER("ha_federatedx::close"); DBUG_ENTER("ha_federatedx::close");
/* free the result set */ /* free the result set */
if (stored_result) reset();
retval= free_result();
/* Disconnect from mysql */ /* Disconnect from mysql */
if (!thd || !(txn= get_txn(thd, true))) if (!thd || !(txn= get_txn(thd, true)))
...@@ -1799,7 +1799,7 @@ int ha_federatedx::close(void) ...@@ -1799,7 +1799,7 @@ int ha_federatedx::close(void)
tmp_txn.release(&io); tmp_txn.release(&io);
DBUG_ASSERT(io == NULL); DBUG_ASSERT(io == NULL);
if ((error= free_share(&tmp_txn, share))) if ((error= free_share(&tmp_txn, share)))
retval= error; retval= error;
} }
...@@ -2525,7 +2525,7 @@ int ha_federatedx::index_read_idx(uchar *buf, uint index, const uchar *key, ...@@ -2525,7 +2525,7 @@ int ha_federatedx::index_read_idx(uchar *buf, uint index, const uchar *key,
uint key_len, enum ha_rkey_function find_flag) uint key_len, enum ha_rkey_function find_flag)
{ {
int retval; int retval;
FEDERATEDX_IO_RESULT *io_result; FEDERATEDX_IO_RESULT *io_result= 0;
DBUG_ENTER("ha_federatedx::index_read_idx"); DBUG_ENTER("ha_federatedx::index_read_idx");
if ((retval= index_read_idx_with_result_set(buf, index, key, if ((retval= index_read_idx_with_result_set(buf, index, key,
...@@ -2601,7 +2601,7 @@ int ha_federatedx::index_read_idx_with_result_set(uchar *buf, uint index, ...@@ -2601,7 +2601,7 @@ int ha_federatedx::index_read_idx_with_result_set(uchar *buf, uint index,
if (!(retval= read_next(buf, *result))) if (!(retval= read_next(buf, *result)))
DBUG_RETURN(retval); DBUG_RETURN(retval);
io->free_result(*result); insert_dynamic(&results, (uchar*) result);
*result= 0; *result= 0;
table->status= STATUS_NOT_FOUND; table->status= STATUS_NOT_FOUND;
DBUG_RETURN(retval); DBUG_RETURN(retval);
...@@ -2669,10 +2669,7 @@ int ha_federatedx::read_range_first(const key_range *start_key, ...@@ -2669,10 +2669,7 @@ int ha_federatedx::read_range_first(const key_range *start_key,
DBUG_RETURN(retval); DBUG_RETURN(retval);
if (stored_result) if (stored_result)
{ (void) free_result();
io->free_result(stored_result);
stored_result= 0;
}
if (io->query(sql_query.ptr(), sql_query.length())) if (io->query(sql_query.ptr(), sql_query.length()))
{ {
...@@ -2773,10 +2770,7 @@ int ha_federatedx::rnd_init(bool scan) ...@@ -2773,10 +2770,7 @@ int ha_federatedx::rnd_init(bool scan)
DBUG_RETURN(error); DBUG_RETURN(error);
if (stored_result) if (stored_result)
{ (void) free_result();
io->free_result(stored_result);
stored_result= 0;
}
if (io->query(share->select_query, if (io->query(share->select_query,
strlen(share->select_query))) strlen(share->select_query)))
...@@ -2803,17 +2797,35 @@ int ha_federatedx::rnd_end() ...@@ -2803,17 +2797,35 @@ int ha_federatedx::rnd_end()
int ha_federatedx::free_result() int ha_federatedx::free_result()
{ {
int error; int error;
federatedx_io *tmp_io= 0, **iop; DBUG_ENTER("ha_federatedx::free_result");
DBUG_ASSERT(stored_result); DBUG_ASSERT(stored_result);
if (!*(iop= &io) && (error= txn->acquire(share, TRUE, (iop= &tmp_io)))) for (uint i= 0; i < results.elements; ++i)
{
FEDERATEDX_IO_RESULT *result= 0;
get_dynamic(&results, (uchar*) &result, i);
if (result == stored_result)
goto end;
}
if (position_called)
{ {
DBUG_ASSERT(0); // Fail when testing insert_dynamic(&results, (uchar*) &stored_result);
return error;
} }
(*iop)->free_result(stored_result); else
{
federatedx_io *tmp_io= 0, **iop;
if (!*(iop= &io) && (error= txn->acquire(share, TRUE, (iop= &tmp_io))))
{
DBUG_ASSERT(0); // Fail when testing
insert_dynamic(&results, (uchar*) &stored_result);
goto end;
}
(*iop)->free_result(stored_result);
txn->release(&tmp_io);
}
end:
stored_result= 0; stored_result= 0;
txn->release(&tmp_io); position_called= FALSE;
return 0; DBUG_RETURN(0);
} }
int ha_federatedx::index_end(void) int ha_federatedx::index_end(void)
...@@ -2862,8 +2874,8 @@ int ha_federatedx::rnd_next(uchar *buf) ...@@ -2862,8 +2874,8 @@ int ha_federatedx::rnd_next(uchar *buf)
SYNOPSIS SYNOPSIS
field_in_record_is_null() field_in_record_is_null()
buf byte pointer to record buf byte pointer to record
result mysql result set result mysql result set
DESCRIPTION DESCRIPTION
This method is a wrapper method that reads one record from a result This method is a wrapper method that reads one record from a result
...@@ -2896,24 +2908,43 @@ int ha_federatedx::read_next(uchar *buf, FEDERATEDX_IO_RESULT *result) ...@@ -2896,24 +2908,43 @@ int ha_federatedx::read_next(uchar *buf, FEDERATEDX_IO_RESULT *result)
} }
/* /**
store reference to current row so that we can later find it for @brief Store a reference to current row.
a re-read, update or delete.
@details During a query execution we may have different result sets (RS),
In case of federatedx, a reference is either a primary key or e.g. for different ranges. All the RS's used are stored in
the whole record. memory and placed in @c results dynamic array. At the end of
execution all stored RS's are freed at once in the
@c ha_federated::reset().
So, in case of federated, a reference to current row is a
stored result address and current data cursor position.
As we keep all RS in memory during a query execution,
we can get any record using the reference any time until
@c ha_federated::reset() is called.
TODO: we don't have to store all RS's rows but only those
we call @c ha_federated::position() for, so we can free memory
where we store other rows in the @c ha_federated::index_end().
@param[in] record record data (unused)
Called from filesort.cc, sql_select.cc, sql_delete.cc and sql_update.cc.
*/ */
void ha_federatedx::position(const uchar *record) void ha_federatedx::position(const uchar *record __attribute__ ((unused)))
{ {
DBUG_ENTER("ha_federatedx::position"); DBUG_ENTER("ha_federatedx::position");
if (table->s->primary_key != MAX_KEY)
key_copy(ref, (uchar *)record, table->key_info + table->s->primary_key, bzero(ref, ref_length);
ref_length);
else if (!stored_result)
memcpy(ref, record, ref_length); DBUG_VOID_RETURN;
if (txn->acquire(share, TRUE, &io))
DBUG_VOID_RETURN;
io->mark_position(stored_result, ref);
position_called= TRUE;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
...@@ -2929,23 +2960,23 @@ void ha_federatedx::position(const uchar *record) ...@@ -2929,23 +2960,23 @@ void ha_federatedx::position(const uchar *record)
int ha_federatedx::rnd_pos(uchar *buf, uchar *pos) int ha_federatedx::rnd_pos(uchar *buf, uchar *pos)
{ {
int result; int retval;
FEDERATEDX_IO_RESULT *result= stored_result;
DBUG_ENTER("ha_federatedx::rnd_pos"); DBUG_ENTER("ha_federatedx::rnd_pos");
ha_statistic_increment(&SSV::ha_read_rnd_count); ha_statistic_increment(&SSV::ha_read_rnd_count);
if (table->s->primary_key != MAX_KEY)
{ if ((retval= txn->acquire(share, TRUE, &io)))
/* We have a primary key, so use index_read_idx to find row */ goto error;
result= index_read_idx(buf, table->s->primary_key, pos,
ref_length, HA_READ_KEY_EXACT); if ((retval= io->seek_position(&result, pos)))
} goto error;
else
{ retval= read_next(buf, result);
/* otherwise, get the old record ref as obtained in ::position */ DBUG_RETURN(retval);
memcpy(buf, pos, ref_length);
result= 0; error:
} table->status= STATUS_NOT_FOUND;
table->status= result ? STATUS_NOT_FOUND : 0; DBUG_RETURN(retval);
DBUG_RETURN(result);
} }
...@@ -2996,15 +3027,20 @@ int ha_federatedx::rnd_pos(uchar *buf, uchar *pos) ...@@ -2996,15 +3027,20 @@ int ha_federatedx::rnd_pos(uchar *buf, uchar *pos)
int ha_federatedx::info(uint flag) int ha_federatedx::info(uint flag)
{ {
uint error_code; uint error_code;
THD *thd= current_thd;
federatedx_txn *tmp_txn;
federatedx_io *tmp_io= 0, **iop= 0; federatedx_io *tmp_io= 0, **iop= 0;
DBUG_ENTER("ha_federatedx::info"); DBUG_ENTER("ha_federatedx::info");
error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE; error_code= ER_QUERY_ON_FOREIGN_DATA_SOURCE;
// external_lock may not have been called so txn may not be set
tmp_txn= get_txn(thd);
/* we want not to show table status if not needed to do so */ /* we want not to show table status if not needed to do so */
if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST | HA_STATUS_AUTO)) if (flag & (HA_STATUS_VARIABLE | HA_STATUS_CONST | HA_STATUS_AUTO))
{ {
if (!*(iop= &io) && (error_code= txn->acquire(share, TRUE, (iop= &tmp_io)))) if (!*(iop= &io) && (error_code= tmp_txn->acquire(share, TRUE, (iop= &tmp_io))))
goto fail; goto fail;
} }
...@@ -3029,14 +3065,14 @@ int ha_federatedx::info(uint flag) ...@@ -3029,14 +3065,14 @@ int ha_federatedx::info(uint flag)
If ::info created it's own transaction, close it. This happens in case If ::info created it's own transaction, close it. This happens in case
of show table status; of show table status;
*/ */
txn->release(&tmp_io); tmp_txn->release(&tmp_io);
DBUG_RETURN(0); DBUG_RETURN(0);
error: error:
if (iop && *iop) if (iop && *iop)
{ {
my_printf_error((*iop)->error_code(), "Got error: %d : %s", MYF(0), my_printf_error((*iop)->error_code(), "Received error: %d : %s", MYF(0),
(*iop)->error_code(), (*iop)->error_str()); (*iop)->error_code(), (*iop)->error_str());
} }
else if (remote_error_number != -1 /* error already reported */) else if (remote_error_number != -1 /* error already reported */)
...@@ -3045,7 +3081,7 @@ int ha_federatedx::info(uint flag) ...@@ -3045,7 +3081,7 @@ int ha_federatedx::info(uint flag)
my_error(error_code, MYF(0), ER(error_code)); my_error(error_code, MYF(0), ER(error_code));
} }
fail: fail:
txn->release(&tmp_io); tmp_txn->release(&tmp_io);
DBUG_RETURN(error_code); DBUG_RETURN(error_code);
} }
...@@ -3105,12 +3141,44 @@ int ha_federatedx::extra(ha_extra_function operation) ...@@ -3105,12 +3141,44 @@ int ha_federatedx::extra(ha_extra_function operation)
int ha_federatedx::reset(void) int ha_federatedx::reset(void)
{ {
int error = 0;
insert_dup_update= FALSE; insert_dup_update= FALSE;
ignore_duplicates= FALSE; ignore_duplicates= FALSE;
replace_duplicates= FALSE; replace_duplicates= FALSE;
return 0; position_called= FALSE;
}
if (stored_result)
insert_dynamic(&results, (uchar*) &stored_result);
stored_result= 0;
if (results.elements)
{
federatedx_txn *tmp_txn;
federatedx_io *tmp_io= 0, **iop;
// external_lock may not have been called so txn may not be set
tmp_txn= get_txn(current_thd);
if (!*(iop= &io) && (error= tmp_txn->acquire(share, TRUE, (iop= &tmp_io))))
{
DBUG_ASSERT(0); // Fail when testing
return error;
}
for (uint i= 0; i < results.elements; ++i)
{
FEDERATEDX_IO_RESULT *result= 0;
get_dynamic(&results, (uchar*) &result, i);
(*iop)->free_result(result);
}
tmp_txn->release(&tmp_io);
reset_dynamic(&results);
}
return error;
}
/* /*
Used to delete all rows in a table. Both for cases of truncate and Used to delete all rows in a table. Both for cases of truncate and
...@@ -3237,7 +3305,7 @@ static int test_connection(MYSQL_THD thd, federatedx_io *io, ...@@ -3237,7 +3305,7 @@ static int test_connection(MYSQL_THD thd, federatedx_io *io,
str.length(0); str.length(0);
str.append(STRING_WITH_LEN("SELECT * FROM ")); str.append(STRING_WITH_LEN("SELECT * FROM "));
append_identifier(thd, &str, share->table_name, append_identifier(thd, &str, share->table_name,
share->table_name_length); share->table_name_length);
str.append(STRING_WITH_LEN(" WHERE 1=0")); str.append(STRING_WITH_LEN(" WHERE 1=0"));
...@@ -3288,14 +3356,14 @@ int ha_federatedx::create(const char *name, TABLE *table_arg, ...@@ -3288,14 +3356,14 @@ int ha_federatedx::create(const char *name, TABLE *table_arg,
pthread_mutex_lock(&federatedx_mutex); pthread_mutex_lock(&federatedx_mutex);
tmp_share.s= get_server(&tmp_share, NULL); tmp_share.s= get_server(&tmp_share, NULL);
pthread_mutex_unlock(&federatedx_mutex); pthread_mutex_unlock(&federatedx_mutex);
if (tmp_share.s) if (tmp_share.s)
{ {
tmp_txn= get_txn(thd); tmp_txn= get_txn(thd);
if (!(retval= tmp_txn->acquire(&tmp_share, TRUE, &tmp_io))) if (!(retval= tmp_txn->acquire(&tmp_share, TRUE, &tmp_io)))
{ {
retval= test_connection(thd, tmp_io, &tmp_share); retval= test_connection(thd, tmp_io, &tmp_share);
tmp_txn->release(&tmp_io); tmp_txn->release(&tmp_io);
} }
free_server(tmp_txn, tmp_share.s); free_server(tmp_txn, tmp_share.s);
} }
......
/* /*
Copyright (c) 2008, Patrick Galbraith Copyright (c) 2008, Patrick Galbraith
All rights reserved. All rights reserved.
Redistribution and use in source and binary forms, with or without Redistribution and use in source and binary forms, with or without
...@@ -43,7 +43,7 @@ class federatedx_io; ...@@ -43,7 +43,7 @@ class federatedx_io;
typedef struct st_fedrated_server { typedef struct st_fedrated_server {
MEM_ROOT mem_root; MEM_ROOT mem_root;
uint use_count, io_count; uint use_count, io_count;
uchar *key; uchar *key;
uint key_length; uint key_length;
...@@ -74,10 +74,10 @@ typedef struct st_fedrated_server { ...@@ -74,10 +74,10 @@ typedef struct st_fedrated_server {
#include <mysql.h> #include <mysql.h>
/* /*
handler::print_error has a case statement for error numbers. handler::print_error has a case statement for error numbers.
This value is (10000) is far out of range and will envoke the This value is (10000) is far out of range and will envoke the
default: case. default: case.
(Current error range is 120-159 from include/my_base.h) (Current error range is 120-159 from include/my_base.h)
*/ */
#define HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM 10000 #define HA_FEDERATEDX_ERROR_WITH_REMOTE_SYSTEM 10000
...@@ -158,7 +158,7 @@ class federatedx_io ...@@ -158,7 +158,7 @@ class federatedx_io
const char * get_database() const { return server->database; } const char * get_database() const { return server->database; }
ushort get_port() const { return server->port; } ushort get_port() const { return server->port; }
const char * get_socket() const { return server->socket; } const char * get_socket() const { return server->socket; }
static bool handles_scheme(const char *scheme); static bool handles_scheme(const char *scheme);
static federatedx_io *construct(MEM_ROOT *server_root, static federatedx_io *construct(MEM_ROOT *server_root,
FEDERATEDX_SERVER *server); FEDERATEDX_SERVER *server);
...@@ -167,7 +167,7 @@ class federatedx_io ...@@ -167,7 +167,7 @@ class federatedx_io
{ return alloc_root(mem_root, size); } { return alloc_root(mem_root, size); }
static void operator delete(void *ptr, size_t size) static void operator delete(void *ptr, size_t size)
{ TRASH(ptr, size); } { TRASH(ptr, size); }
virtual int query(const char *buffer, uint length)=0; virtual int query(const char *buffer, uint length)=0;
virtual FEDERATEDX_IO_RESULT *store_result()=0; virtual FEDERATEDX_IO_RESULT *store_result()=0;
...@@ -178,25 +178,25 @@ class federatedx_io ...@@ -178,25 +178,25 @@ class federatedx_io
virtual int error_code()=0; virtual int error_code()=0;
virtual const char *error_str()=0; virtual const char *error_str()=0;
virtual void reset()=0; virtual void reset()=0;
virtual int commit()=0; virtual int commit()=0;
virtual int rollback()=0; virtual int rollback()=0;
virtual int savepoint_set(ulong sp)=0; virtual int savepoint_set(ulong sp)=0;
virtual ulong savepoint_release(ulong sp)=0; virtual ulong savepoint_release(ulong sp)=0;
virtual ulong savepoint_rollback(ulong sp)=0; virtual ulong savepoint_rollback(ulong sp)=0;
virtual void savepoint_restrict(ulong sp)=0; virtual void savepoint_restrict(ulong sp)=0;
virtual ulong last_savepoint() const=0; virtual ulong last_savepoint() const=0;
virtual ulong actual_savepoint() const=0; virtual ulong actual_savepoint() const=0;
virtual bool is_autocommit() const=0; virtual bool is_autocommit() const=0;
virtual bool table_metadata(ha_statistics *stats, const char *table_name, virtual bool table_metadata(ha_statistics *stats, const char *table_name,
uint table_name_length, uint flag) = 0; uint table_name_length, uint flag) = 0;
/* resultset operations */ /* resultset operations */
virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0; virtual void free_result(FEDERATEDX_IO_RESULT *io_result)=0;
virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0; virtual unsigned int get_num_fields(FEDERATEDX_IO_RESULT *io_result)=0;
virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0; virtual my_ulonglong get_num_rows(FEDERATEDX_IO_RESULT *io_result)=0;
...@@ -206,6 +206,13 @@ class federatedx_io ...@@ -206,6 +206,13 @@ class federatedx_io
unsigned int column)=0; unsigned int column)=0;
virtual bool is_column_null(const FEDERATEDX_IO_ROW *row, virtual bool is_column_null(const FEDERATEDX_IO_ROW *row,
unsigned int column) const=0; unsigned int column) const=0;
virtual size_t get_ref_length() const=0;
virtual void mark_position(FEDERATEDX_IO_RESULT *io_result,
void *ref)=0;
virtual int seek_position(FEDERATEDX_IO_RESULT **io_result,
const void *ref)=0;
}; };
...@@ -215,12 +222,12 @@ class federatedx_txn ...@@ -215,12 +222,12 @@ class federatedx_txn
ulong savepoint_level; ulong savepoint_level;
ulong savepoint_stmt; ulong savepoint_stmt;
ulong savepoint_next; ulong savepoint_next;
void release_scan(); void release_scan();
public: public:
federatedx_txn(); federatedx_txn();
~federatedx_txn(); ~federatedx_txn();
bool has_connections() const { return txn_list != NULL; } bool has_connections() const { return txn_list != NULL; }
bool in_transaction() const { return savepoint_next != 0; } bool in_transaction() const { return savepoint_next != 0; }
int acquire(FEDERATEDX_SHARE *share, bool readonly, federatedx_io **io); int acquire(FEDERATEDX_SHARE *share, bool readonly, federatedx_io **io);
...@@ -254,8 +261,12 @@ class ha_federatedx: public handler ...@@ -254,8 +261,12 @@ class ha_federatedx: public handler
federatedx_txn *txn; federatedx_txn *txn;
federatedx_io *io; federatedx_io *io;
FEDERATEDX_IO_RESULT *stored_result; FEDERATEDX_IO_RESULT *stored_result;
/**
Array of all stored results we get during a query execution.
*/
DYNAMIC_ARRAY results;
bool position_called;
uint fetch_num; // stores the fetch num uint fetch_num; // stores the fetch num
FEDERATEDX_IO_OFFSET current_position; // Current position used by ::position()
int remote_error_number; int remote_error_number;
char remote_error_buf[FEDERATEDX_QUERY_BUFFER_SIZE]; char remote_error_buf[FEDERATEDX_QUERY_BUFFER_SIZE];
bool ignore_duplicates, replace_duplicates; bool ignore_duplicates, replace_duplicates;
...@@ -269,7 +280,7 @@ class ha_federatedx: public handler ...@@ -269,7 +280,7 @@ class ha_federatedx: public handler
*/ */
uint convert_row_to_internal_format(uchar *buf, FEDERATEDX_IO_ROW *row, uint convert_row_to_internal_format(uchar *buf, FEDERATEDX_IO_ROW *row,
FEDERATEDX_IO_RESULT *result); FEDERATEDX_IO_RESULT *result);
bool create_where_from_key(String *to, KEY *key_info, bool create_where_from_key(String *to, KEY *key_info,
const key_range *start_key, const key_range *start_key,
const key_range *end_key, const key_range *end_key,
bool records_in_range, bool eq_range); bool records_in_range, bool eq_range);
...@@ -348,18 +359,18 @@ class ha_federatedx: public handler ...@@ -348,18 +359,18 @@ class ha_federatedx: public handler
Talk to Kostja about this - how to get the Talk to Kostja about this - how to get the
number of rows * ... number of rows * ...
disk scan time on other side (block size, size of the row) + network time ... disk scan time on other side (block size, size of the row) + network time ...
The reason for "records * 1000" is that such a large number forces The reason for "records * 1000" is that such a large number forces
this to use indexes " this to use indexes "
*/ */
double scan_time() double scan_time()
{ {
DBUG_PRINT("info", ("records %lu", (ulong) stats.records)); DBUG_PRINT("info", ("records %lu", (ulong) stats.records));
return (double)(stats.records*1000); return (double)(stats.records*1000);
} }
/* /*
The next method will never be called if you do not implement indexes. The next method will never be called if you do not implement indexes.
*/ */
double read_time(uint index, uint ranges, ha_rows rows) double read_time(uint index, uint ranges, ha_rows rows)
{ {
/* /*
Per Brian, this number is bugus, but this method must be implemented, Per Brian, this number is bugus, but this method must be implemented,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment