Commit 37a5b125 authored by Will DeVries's avatar Will DeVries Committed by Sergei Petrunia

Add table synchronization for direct update.

parent d4ae797b
......@@ -626,13 +626,17 @@ int ha_xpand::write_row(const uchar *buf)
if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trx->auto_commit_next();
error_code= trx->update_query(update_stmt, table->s->db, &update_rows);
ulonglong *oids = xpand_extract_table_oids(thd, thd->lex);
error_code= trx->update_query(update_stmt, table->s->db, oids,
&update_rows);
if (upsert_flag & XPAND_BULK_UPSERT)
upsert_flag |= XPAND_UPSERT_SENT;
else
upsert_flag &= ~XPAND_HAS_UPSERT;
}
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_tables_for_discovery(thd->lex);
return error_code;
}
......@@ -721,8 +725,12 @@ int ha_xpand::direct_update_rows(ha_rows *update_rows, ha_rows *found_rows)
if (!thd_test_options(thd, OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trx->auto_commit_next();
error_code = trx->update_query(update_stmt, table->s->db, update_rows);
ulonglong *oids = xpand_extract_table_oids(thd, thd->lex);
error_code = trx->update_query(update_stmt, table->s->db, oids, update_rows);
*found_rows = *update_rows;
if (error_code == HA_ERR_TABLE_DEF_CHANGED)
xpand_mark_tables_for_discovery(thd->lex);
DBUG_RETURN(error_code);
}
......
......@@ -831,7 +831,7 @@ int xpand_connection::scan_query(String &stmt, uchar *fieldtype, uint fields,
* dbname &current database name
**/
int xpand_connection::update_query(String &stmt, LEX_CSTRING &dbname,
ulonglong *affected_rows)
ulonglong *oids, ulonglong *affected_rows)
{
int error_code;
command_length = 0;
......@@ -839,6 +839,12 @@ int xpand_connection::update_query(String &stmt, LEX_CSTRING &dbname,
if ((error_code = begin_command(XPAND_UPDATE_QUERY)))
return error_code;
do {
if ((error_code = add_command_operand_ulonglong(*oids)))
return error_code;
}
while (*oids++);
if ((error_code = add_command_operand_str((uchar*)dbname.str, dbname.length)))
return error_code;
......
......@@ -113,7 +113,8 @@ class xpand_connection
uint null_bits_size, uchar *field_metadata,
uint field_metadata_size, ushort row_req, ulonglong *oids,
xpand_connection_cursor **scan);
int update_query(String &stmt, LEX_CSTRING &dbname, ulonglong *affected_rows);
int update_query(String &stmt, LEX_CSTRING &dbname, ulonglong *oids,
ulonglong *affected_rows);
int scan_from_key(ulonglong xpand_table_oid, uint index,
xpand_lock_mode_t lock_mode,
enum scan_type scan_dir, int no_key_cols, bool sorted_scan,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment