Commit 1e0152b4 authored by Will DeVries's avatar Will DeVries Committed by Sergei Petrunia

Add table synchronization for select and derived handlers.

parent 4252df83
...@@ -473,6 +473,37 @@ xpand_mark_table_for_discovery(TABLE *table) ...@@ -473,6 +473,37 @@ xpand_mark_table_for_discovery(TABLE *table)
xs->xpand_table_oid= 0; xs->xpand_table_oid= 0;
} }
void
xpand_mark_tables_for_discovery(LEX *lex)
{
for (TABLE_LIST *tbl= lex->query_tables; tbl; tbl= tbl->next_global)
if (tbl->table && tbl->table->file->ht == xpand_hton)
xpand_mark_table_for_discovery(tbl->table);
}
ulonglong *
xpand_extract_table_oids(THD *thd, LEX *lex)
{
int cnt = 1;
for (TABLE_LIST *tbl = lex->query_tables; tbl; tbl= tbl->next_global)
if (tbl->table && tbl->table->file->ht == xpand_hton)
cnt++;
ulonglong *oids = (ulonglong*)thd_alloc(thd, cnt * sizeof(ulonglong));
ulonglong *ptr = oids;
for (TABLE_LIST *tbl = lex->query_tables; tbl; tbl= tbl->next_global)
{
if (tbl->table && tbl->table->file->ht == xpand_hton)
{
ha_xpand *hndlr = static_cast<ha_xpand *>(tbl->table->file);
*ptr++ = hndlr->get_table_oid();
}
}
*ptr = 0;
return oids;
}
int ha_xpand::open(const char *name, int mode, uint test_if_locked) int ha_xpand::open(const char *name, int mode, uint test_if_locked)
{ {
THD *thd= ha_thd(); THD *thd= ha_thd();
...@@ -1190,6 +1221,11 @@ int ha_xpand::info_push(uint info_type, void *info) ...@@ -1190,6 +1221,11 @@ int ha_xpand::info_push(uint info_type, void *info)
return 0; return 0;
} }
ulonglong ha_xpand::get_table_oid()
{
return xpand_table_oid;
}
/**************************************************************************** /****************************************************************************
** Row encoding functions ** Row encoding functions
****************************************************************************/ ****************************************************************************/
......
...@@ -29,6 +29,8 @@ void remove_current_table_from_rpl_table_list(rpl_group_info *rgi); ...@@ -29,6 +29,8 @@ void remove_current_table_from_rpl_table_list(rpl_group_info *rgi);
int unpack_row_to_buf(rpl_group_info *rgi, TABLE *table, uchar *data, int unpack_row_to_buf(rpl_group_info *rgi, TABLE *table, uchar *data,
uchar const *const row_data, MY_BITMAP const *cols, uchar const *const row_data, MY_BITMAP const *cols,
uchar const *const row_end); uchar const *const row_end);
void xpand_mark_tables_for_discovery(LEX *lex);
ulonglong *xpand_extract_table_oids(THD *thd, LEX *lex);
class Xpand_share : public Handler_share { class Xpand_share : public Handler_share {
...@@ -131,6 +133,7 @@ class ha_xpand : public handler ...@@ -131,6 +133,7 @@ class ha_xpand : public handler
void cond_pop(); void cond_pop();
int info_push(uint info_type, void *info); int info_push(uint info_type, void *info);
ulonglong get_table_oid();
private: private:
void build_key_packed_row(uint index, const uchar *buf, void build_key_packed_row(uint index, const uchar *buf,
uchar *packed_key, size_t *packed_key_len); uchar *packed_key, size_t *packed_key_len);
......
...@@ -101,6 +101,7 @@ int get_field_types(THD *thd, TABLE *table__, SELECT_LEX *sl, uchar *fieldtype, ...@@ -101,6 +101,7 @@ int get_field_types(THD *thd, TABLE *table__, SELECT_LEX *sl, uchar *fieldtype,
select_handler* select_handler*
create_xpand_select_handler(THD* thd, SELECT_LEX* select_lex) create_xpand_select_handler(THD* thd, SELECT_LEX* select_lex)
{ {
ulonglong *oids = NULL;
ha_xpand_select_handler *sh = NULL; ha_xpand_select_handler *sh = NULL;
if (!select_handler_setting(thd)) { if (!select_handler_setting(thd)) {
return sh; return sh;
...@@ -156,20 +157,23 @@ create_xpand_select_handler(THD* thd, SELECT_LEX* select_lex) ...@@ -156,20 +157,23 @@ create_xpand_select_handler(THD* thd, SELECT_LEX* select_lex)
if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN)) if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trx->auto_commit_next(); trx->auto_commit_next();
oids = xpand_extract_table_oids(thd, select_lex->parent_lex);
if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits, if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits,
num_null_bytes, field_metadata, num_null_bytes, field_metadata,
field_metadata_size, field_metadata_size,
row_buffer_setting(thd), &scan))) { row_buffer_setting(thd), oids, &scan))) {
goto err; goto err;
} }
sh = new ha_xpand_select_handler(thd, select_lex, scan); sh = new ha_xpand_select_handler(thd, select_lex, scan);
err: err:
// deallocate buffers
if (meta_memory) if (meta_memory)
my_free(meta_memory); my_free(meta_memory);
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_tables_for_discovery(select_lex->parent_lex);
return sh; return sh;
} }
...@@ -376,6 +380,7 @@ int ha_xpand_derived_handler::init_scan() ...@@ -376,6 +380,7 @@ int ha_xpand_derived_handler::init_scan()
int error_code = 0; int error_code = 0;
int field_metadata_size = 0; int field_metadata_size = 0;
xpand_connection *trx = NULL; xpand_connection *trx = NULL;
ulonglong *oids = NULL;
// We presume this number is equal to types.elements in get_field_types // We presume this number is equal to types.elements in get_field_types
uint items_number= select->get_item_list()->elements; uint items_number= select->get_item_list()->elements;
...@@ -402,10 +407,11 @@ int ha_xpand_derived_handler::init_scan() ...@@ -402,10 +407,11 @@ int ha_xpand_derived_handler::init_scan()
if (!trx) if (!trx)
goto err; goto err;
oids = xpand_extract_table_oids(thd__, select->parent_lex);
if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits, if ((error_code = trx->scan_query(query, fieldtype, items_number, null_bits,
num_null_bytes, field_metadata, num_null_bytes, field_metadata,
field_metadata_size, field_metadata_size,
row_buffer_setting(thd), &scan))) { row_buffer_setting(thd), oids, &scan))) {
goto err; goto err;
} }
...@@ -420,10 +426,12 @@ int ha_xpand_derived_handler::init_scan() ...@@ -420,10 +426,12 @@ int ha_xpand_derived_handler::init_scan()
add_current_table_to_rpl_table_list(&rgi, thd__, table__); add_current_table_to_rpl_table_list(&rgi, thd__, table__);
err: err:
// deallocate buffers
if (meta_memory) if (meta_memory)
my_free(meta_memory); my_free(meta_memory);
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_tables_for_discovery(select->parent_lex);
return error_code; return error_code;
} }
......
...@@ -281,7 +281,7 @@ int xpand_connection::send_command() ...@@ -281,7 +281,7 @@ int xpand_connection::send_command()
committed or rolled back. committed or rolled back.
*/ */
trans_state = XPAND_TRANS_STARTED; trans_state = XPAND_TRANS_STARTED;
com_error = simple_command(&xpand_net, com_error = simple_command(&xpand_net,
(enum_server_command)XPAND_SERVER_REQUEST, (enum_server_command)XPAND_SERVER_REQUEST,
command_buffer, command_length, TRUE); command_buffer, command_length, TRUE);
...@@ -777,6 +777,7 @@ int xpand_connection::scan_query(String &stmt, uchar *fieldtype, uint fields, ...@@ -777,6 +777,7 @@ int xpand_connection::scan_query(String &stmt, uchar *fieldtype, uint fields,
uchar *null_bits, uint null_bits_size, uchar *null_bits, uint null_bits_size,
uchar *field_metadata, uchar *field_metadata,
uint field_metadata_size, ushort row_req, uint field_metadata_size, ushort row_req,
ulonglong *oids,
xpand_connection_cursor **scan) xpand_connection_cursor **scan)
{ {
int error_code; int error_code;
...@@ -785,6 +786,12 @@ int xpand_connection::scan_query(String &stmt, uchar *fieldtype, uint fields, ...@@ -785,6 +786,12 @@ int xpand_connection::scan_query(String &stmt, uchar *fieldtype, uint fields,
if ((error_code = begin_command(XPAND_SCAN_QUERY))) if ((error_code = begin_command(XPAND_SCAN_QUERY)))
return error_code; return error_code;
do {
if ((error_code = add_command_operand_ulonglong(*oids)))
return error_code;
}
while (*oids++);
if ((error_code = add_command_operand_ushort(row_req))) if ((error_code = add_command_operand_ushort(row_req)))
return error_code; return error_code;
......
...@@ -89,7 +89,7 @@ class xpand_connection ...@@ -89,7 +89,7 @@ class xpand_connection
xpand_connection_cursor **scan); xpand_connection_cursor **scan);
int scan_query(String &stmt, uchar *fieldtype, uint fields, uchar *null_bits, int scan_query(String &stmt, uchar *fieldtype, uint fields, uchar *null_bits,
uint null_bits_size, uchar *field_metadata, uint null_bits_size, uchar *field_metadata,
uint field_metadata_size, ushort row_req, uint field_metadata_size, ushort row_req, ulonglong *oids,
xpand_connection_cursor **scan); xpand_connection_cursor **scan);
int update_query(String &stmt, LEX_CSTRING &dbname, ulonglong *affected_rows); int update_query(String &stmt, LEX_CSTRING &dbname, ulonglong *affected_rows);
int scan_from_key(ulonglong xpand_table_oid, uint index, int scan_from_key(ulonglong xpand_table_oid, uint index,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment