Commit d752a97e authored by Marko Mäkelä's avatar Marko Mäkelä

Merge 10.1 to 10.2

parents 19ceaf29 a41d4297
......@@ -41,9 +41,11 @@ INSERT INTO articles (title,body) VALUES
('1001 MySQL Tricks','1. Never run mysqld as root. 2. ...'),
('MySQL vs. YourSQL','In the following database comparison ...'),
('MySQL Security','When configured properly, MySQL ...');
connect dml, localhost, root,,;
BEGIN;
INSERT INTO articles (title,body) VALUES
('MySQL Tutorial','DBMS stands for DataBase ...');
connection default;
# Make durable the AUTO_INCREMENT in the above incomplete transaction.
connect flush_redo_log,localhost,root,,;
SET GLOBAL innodb_flush_log_at_trx_commit=1;
......@@ -52,6 +54,7 @@ DELETE FROM articles LIMIT 1;
ROLLBACK;
disconnect flush_redo_log;
connection default;
disconnect dml;
INSERT INTO articles (title,body) VALUES
('MySQL Tutorial','DBMS stands for DataBase ...');
SELECT * FROM articles
......@@ -79,9 +82,39 @@ INSERT INTO articles VALUES
(4, 11, '1001 MySQL Tricks','1. Never run mysqld as root. 2. ...'),
(5, 6, 'MySQL vs. YourSQL','In the following database comparison ...'),
(7, 4, 'MySQL Security','When configured properly, MySQL ...');
connect dml, localhost, root,,;
BEGIN;
INSERT INTO articles VALUES
(100, 200, 'MySQL Tutorial','DBMS stands for DataBase ...');
connect dml2, localhost, root,,;
#
# MDEV-19073 FTS row mismatch after crash recovery
#
CREATE TABLE mdev19073(id SERIAL, title VARCHAR(200), body TEXT,
FULLTEXT(title,body)) ENGINE=InnoDB;
INSERT INTO mdev19073 (title, body) VALUES
('MySQL Tutorial', 'DBMS stands for Database...');
CREATE FULLTEXT INDEX idx ON mdev19073(title, body);
CREATE TABLE mdev19073_2 LIKE mdev19073;
INSERT INTO mdev19073_2 (title, body) VALUES
('MySQL Tutorial', 'DBMS stands for Database...');
INSERT INTO mdev19073 (title, body) VALUES
('MariaDB Tutorial', 'DB means Database ...');
INSERT INTO mdev19073_2 (title, body) VALUES
('MariaDB Tutorial', 'DB means Database ...');
SELECT * FROM mdev19073 WHERE MATCH (title, body)
AGAINST ('Database' IN NATURAL LANGUAGE MODE);
id title body
1 MySQL Tutorial DBMS stands for Database...
2 MariaDB Tutorial DB means Database ...
SELECT * FROM mdev19073_2 WHERE MATCH (title, body)
AGAINST ('Database' IN NATURAL LANGUAGE MODE);
id title body
1 MySQL Tutorial DBMS stands for Database...
2 MariaDB Tutorial DB means Database ...
connection default;
disconnect dml;
disconnect dml2;
INSERT INTO articles VALUES (8, 12, 'MySQL Tutorial','DBMS stands for DataBase ...');
SELECT * FROM articles WHERE MATCH (title, body)
AGAINST ('Tutorial' IN NATURAL LANGUAGE MODE);
......@@ -90,3 +123,14 @@ id FTS_DOC_ID title body
1 10 MySQL Tutorial DBMS stands for DataBase ...
8 12 MySQL Tutorial DBMS stands for DataBase ...
DROP TABLE articles;
SELECT * FROM mdev19073 WHERE MATCH (title, body)
AGAINST ('Database' IN NATURAL LANGUAGE MODE);
id title body
1 MySQL Tutorial DBMS stands for Database...
2 MariaDB Tutorial DB means Database ...
SELECT * FROM mdev19073_2 WHERE MATCH (title, body)
AGAINST ('Database' IN NATURAL LANGUAGE MODE);
id title body
1 MySQL Tutorial DBMS stands for Database...
2 MariaDB Tutorial DB means Database ...
DROP TABLE mdev19073, mdev19073_2;
......@@ -6,6 +6,7 @@
--source include/have_innodb.inc
# The embedded server tests do not support restarting.
--source include/not_embedded.inc
--source include/maybe_debug.inc
FLUSH TABLES;
# Following are test for crash recovery on FTS index, the first scenario
......@@ -73,10 +74,12 @@ INSERT INTO articles (title,body) VALUES
('MySQL vs. YourSQL','In the following database comparison ...'),
('MySQL Security','When configured properly, MySQL ...');
connect(dml, localhost, root,,);
BEGIN;
INSERT INTO articles (title,body) VALUES
('MySQL Tutorial','DBMS stands for DataBase ...');
connection default;
--echo # Make durable the AUTO_INCREMENT in the above incomplete transaction.
--connect (flush_redo_log,localhost,root,,)
......@@ -89,6 +92,8 @@ ROLLBACK;
--source include/restart_mysqld.inc
disconnect dml;
# This insert will re-initialize the Doc ID counter, it should not crash
INSERT INTO articles (title,body) VALUES
('MySQL Tutorial','DBMS stands for DataBase ...');
......@@ -121,6 +126,7 @@ INSERT INTO articles VALUES
(5, 6, 'MySQL vs. YourSQL','In the following database comparison ...'),
(7, 4, 'MySQL Security','When configured properly, MySQL ...');
connect(dml, localhost, root,,);
BEGIN;
# Below we do not depend on the durability of the AUTO_INCREMENT sequence,
......@@ -128,7 +134,49 @@ BEGIN;
INSERT INTO articles VALUES
(100, 200, 'MySQL Tutorial','DBMS stands for DataBase ...');
connect(dml2, localhost, root,,);
--echo #
--echo # MDEV-19073 FTS row mismatch after crash recovery
--echo #
CREATE TABLE mdev19073(id SERIAL, title VARCHAR(200), body TEXT,
FULLTEXT(title,body)) ENGINE=InnoDB;
INSERT INTO mdev19073 (title, body) VALUES
('MySQL Tutorial', 'DBMS stands for Database...');
CREATE FULLTEXT INDEX idx ON mdev19073(title, body);
CREATE TABLE mdev19073_2 LIKE mdev19073;
if ($have_debug)
{
--disable_query_log
SET @saved_dbug = @@debug_dbug;
SET DEBUG_DBUG = '+d,fts_instrument_sync_debug';
--enable_query_log
}
INSERT INTO mdev19073_2 (title, body) VALUES
('MySQL Tutorial', 'DBMS stands for Database...');
if ($have_debug)
{
--disable_query_log
SET DEBUG_DBUG = @saved_dbug;
--enable_query_log
}
INSERT INTO mdev19073 (title, body) VALUES
('MariaDB Tutorial', 'DB means Database ...');
INSERT INTO mdev19073_2 (title, body) VALUES
('MariaDB Tutorial', 'DB means Database ...');
# Should return 2 rows
SELECT * FROM mdev19073 WHERE MATCH (title, body)
AGAINST ('Database' IN NATURAL LANGUAGE MODE);
SELECT * FROM mdev19073_2 WHERE MATCH (title, body)
AGAINST ('Database' IN NATURAL LANGUAGE MODE);
connection default;
--source include/restart_mysqld.inc
disconnect dml;
disconnect dml2;
# This would re-initialize the FTS index and do the re-tokenization
# of above records
......@@ -138,3 +186,10 @@ SELECT * FROM articles WHERE MATCH (title, body)
AGAINST ('Tutorial' IN NATURAL LANGUAGE MODE);
DROP TABLE articles;
# Should return 2 rows
SELECT * FROM mdev19073 WHERE MATCH (title, body)
AGAINST ('Database' IN NATURAL LANGUAGE MODE);
SELECT * FROM mdev19073_2 WHERE MATCH (title, body)
AGAINST ('Database' IN NATURAL LANGUAGE MODE);
DROP TABLE mdev19073, mdev19073_2;
......@@ -43,8 +43,7 @@ Created 4/24/1996 Heikki Tuuri
#include "rem0cmp.h"
#include "srv0start.h"
#include "srv0srv.h"
#include <stack>
#include <set>
#include "fts0opt.h"
/** Following are the InnoDB system tables. The positions in
this array are referenced by enum dict_system_table_id. */
......@@ -3121,8 +3120,12 @@ dict_load_table_one(
FTS */
fts_optimize_remove_table(table);
fts_free(table);
} else {
} else if (fts_optimize_wq) {
fts_optimize_add_table(table);
} else {
/* fts_optimize_thread is not started yet.
So make the table as non-evictable from cache. */
dict_table_move_from_lru_to_non_lru(table);
}
}
......
......@@ -2712,6 +2712,10 @@ fts_cmp_set_sync_doc_id(
}
if (read_only) {
/* InnoDB stores actual synced_doc_id value + 1 in
FTS_CONFIG table. Reduce the value by 1 while reading
after startup. */
if (*doc_id) *doc_id -= 1;
goto func_exit;
}
......@@ -5388,11 +5392,11 @@ fts_t::fts_t(
const dict_table_t* table,
mem_heap_t* heap)
:
in_queue(0), added_synced(0), dict_locked(0),
added_synced(0), dict_locked(0),
bg_threads(0),
add_wq(NULL),
cache(NULL),
doc_col(ULINT_UNDEFINED),
doc_col(ULINT_UNDEFINED), in_queue(false),
fts_heap(heap)
{
ut_a(table->fts == NULL);
......
......@@ -35,9 +35,10 @@ Completed 2011/7/10 Sunny and Jimmy Yang
#include "srv0start.h"
#include "ut0list.h"
#include "zlib.h"
#include "fts0opt.h"
/** The FTS optimize thread's work queue. */
static ib_wqueue_t* fts_optimize_wq;
ib_wqueue_t* fts_optimize_wq;
/** The FTS vector to store fts_slot_t */
static ib_vector_t* fts_slots;
......@@ -168,8 +169,8 @@ struct fts_encode_t {
/** We use this information to determine when to start the optimize
cycle for a table. */
struct fts_slot_t {
/** table identifier, or 0 if the slot is empty */
table_id_t table_id;
/** table, or NULL if the slot is unused */
dict_table_t* table;
/** whether this slot is being processed */
bool running;
......@@ -2391,14 +2392,7 @@ fts_optimize_table_bk(
return(DB_SUCCESS);
}
dict_table_t* table = dict_table_open_on_id(
slot->table_id, FALSE, DICT_TABLE_OP_NORMAL);
if (!table) {
slot->last_run = now;
return DB_SUCCESS;
}
dict_table_t* table = slot->table;
dberr_t error;
if (fil_table_accessible(table)
......@@ -2418,8 +2412,6 @@ fts_optimize_table_bk(
error = DB_SUCCESS;
}
dict_table_close(table, FALSE, FALSE);
return(error);
}
/*********************************************************************//**
......@@ -2564,11 +2556,13 @@ void fts_optimize_add_table(dict_table_t* table)
msg = fts_optimize_create_msg(FTS_MSG_ADD_TABLE, table);
ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
mutex_enter(&fts_optimize_wq->mutex);
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
mutex_enter(&table->fts->bg_threads_mutex);
table->fts->in_queue = true;
mutex_exit(&table->fts->bg_threads_mutex);
mutex_exit(&fts_optimize_wq->mutex);
}
/**********************************************************************//**
......@@ -2595,12 +2589,10 @@ fts_optimize_remove_table(
return;
}
fts_t* fts = table->fts;
mutex_enter(&fts->bg_threads_mutex);
bool is_in_optimize_queue = fts->in_queue;
mutex_exit(&fts->bg_threads_mutex);
mutex_enter(&fts_optimize_wq->mutex);
if (!is_in_optimize_queue) {
if (!table->fts->in_queue) {
mutex_exit(&fts_optimize_wq->mutex);
return;
}
......@@ -2616,15 +2608,17 @@ fts_optimize_remove_table(
remove->event = event;
msg->ptr = remove;
ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
mutex_exit(&fts_optimize_wq->mutex);
os_event_wait(event);
os_event_destroy(event);
mutex_enter(&fts->bg_threads_mutex);
fts->in_queue = false;
mutex_exit(&fts->bg_threads_mutex);
ut_d(mutex_enter(&fts_optimize_wq->mutex));
ut_ad(!table->fts->in_queue);
ut_d(mutex_exit(&fts_optimize_wq->mutex));
}
/** Send sync fts cache for the table.
......@@ -2633,9 +2627,6 @@ void
fts_optimize_request_sync_table(
dict_table_t* table)
{
fts_msg_t* msg;
table_id_t* table_id;
/* if the optimize system not yet initialized, return */
if (!fts_optimize_wq) {
return;
......@@ -2648,39 +2639,36 @@ fts_optimize_request_sync_table(
return;
}
msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, NULL);
fts_msg_t* msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, table);
table_id = static_cast<table_id_t*>(
mem_heap_alloc(msg->heap, sizeof(table_id_t)));
*table_id = table->id;
msg->ptr = table_id;
mutex_enter(&fts_optimize_wq->mutex);
ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
mutex_enter(&table->fts->bg_threads_mutex);
table->fts->in_queue = true;
mutex_exit(&table->fts->bg_threads_mutex);
mutex_exit(&fts_optimize_wq->mutex);
}
/** Add a table to fts_slots if it doesn't already exist. */
static bool fts_optimize_new_table(dict_table_t* table)
{
ut_ad(table);
ulint i;
fts_slot_t* slot;
fts_slot_t* empty = NULL;
const table_id_t table_id = table->id;
ut_ad(table_id);
/* Search for duplicates, also find a free slot if one exists. */
for (i = 0; i < ib_vector_size(fts_slots); ++i) {
slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
if (!slot->table_id) {
if (!slot->table) {
empty = slot;
} else if (slot->table_id == table_id) {
} else if (slot->table == table) {
/* Already exists in our optimize queue. */
return(FALSE);
return false;
}
}
......@@ -2689,36 +2677,35 @@ static bool fts_optimize_new_table(dict_table_t* table)
memset(slot, 0x0, sizeof(*slot));
slot->table_id = table->id;
slot->running = false;
return(TRUE);
slot->table = table;
return true;
}
/** Remove a table from fts_slots if it exists.
@param[in,out] table table to be removed from fts_slots */
static bool fts_optimize_del_table(const dict_table_t* table)
{
const table_id_t table_id = table->id;
ut_ad(table_id);
ut_ad(table);
for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
fts_slot_t* slot;
slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
if (slot->table_id == table_id) {
if (slot->table == table) {
if (fts_enable_diag_print) {
ib::info() << "FTS Optimize Removing table "
<< table->name;
}
slot->table_id = 0;
return(TRUE);
mutex_enter(&fts_optimize_wq->mutex);
slot->table->fts->in_queue = false;
mutex_exit(&fts_optimize_wq->mutex);
slot->table = NULL;
return true;
}
}
return(FALSE);
return false;
}
/**********************************************************************//**
......@@ -2732,7 +2719,7 @@ static ulint fts_optimize_how_many()
for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
const fts_slot_t* slot = static_cast<const fts_slot_t*>(
ib_vector_get_const(fts_slots, i));
if (slot->table_id == 0) {
if (!slot->table) {
continue;
}
......@@ -2768,22 +2755,14 @@ static bool fts_is_sync_needed()
const fts_slot_t* slot = static_cast<const fts_slot_t*>(
ib_vector_get_const(fts_slots, i));
if (slot->table_id == 0) {
continue;
}
dict_table_t* table = dict_table_open_on_id(
slot->table_id, FALSE, DICT_TABLE_OP_NORMAL);
if (!table) {
if (!slot->table) {
continue;
}
if (table->fts && table->fts->cache) {
total_memory += table->fts->cache->total_size;
if (slot->table->fts && slot->table->fts->cache) {
total_memory += slot->table->fts->cache->total_size;
}
dict_table_close(table, FALSE, FALSE);
if (total_memory > fts_max_total_cache_size) {
return(true);
}
......@@ -2793,22 +2772,14 @@ static bool fts_is_sync_needed()
}
/** Sync fts cache of a table
@param[in] table_id table id */
static void fts_optimize_sync_table(table_id_t table_id)
@param[in,out] table table to be synced */
static void fts_optimize_sync_table(dict_table_t* table)
{
if (dict_table_t* table = dict_table_open_on_id(
table_id, FALSE, DICT_TABLE_OP_NORMAL)) {
if (fil_table_accessible(table)
&& table->fts && table->fts->cache) {
fts_sync_table(table, false);
}
DBUG_EXECUTE_IF(
"ib_optimize_wq_hang",
os_thread_sleep(6000000););
dict_table_close(table, FALSE, FALSE);
if (table->fts && table->fts->cache && fil_table_accessible(table)) {
fts_sync_table(table, false);
}
DBUG_EXECUTE_IF("ib_optimize_wq_hang", os_thread_sleep(6000000););
}
/**********************************************************************//**
......@@ -2847,7 +2818,7 @@ fts_optimize_thread(
ib_vector_get(fts_slots, current));
/* Handle the case of empty slots. */
if (slot->table_id) {
if (slot->table) {
slot->running = true;
fts_optimize_table_bk(slot);
}
......@@ -2906,7 +2877,7 @@ fts_optimize_thread(
os_thread_sleep(300000););
fts_optimize_sync_table(
*static_cast<table_id_t*>(msg->ptr));
static_cast<dict_table_t*>(msg->ptr));
break;
default:
......@@ -2925,8 +2896,8 @@ fts_optimize_thread(
fts_slot_t* slot = static_cast<fts_slot_t*>(
ib_vector_get(fts_slots, i));
if (table_id_t table_id = slot->table_id) {
fts_optimize_sync_table(table_id);
if (slot->table) {
fts_optimize_sync_table(slot->table);
}
}
}
......@@ -2954,7 +2925,6 @@ fts_optimize_init(void)
{
mem_heap_t* heap;
ib_alloc_t* heap_alloc;
dict_table_t* table;
ut_ad(!srv_read_only_mode);
......@@ -2970,31 +2940,25 @@ fts_optimize_init(void)
heap_alloc = ib_heap_allocator_create(heap);
fts_slots = ib_vector_create(heap_alloc, sizeof(fts_slot_t), 4);
/* Add fts tables to the fts_slots vector which were skipped during restart */
std::vector<dict_table_t*> table_vector;
std::vector<dict_table_t*>::iterator it;
/* Add fts tables to fts_slots which could be skipped
during dict_load_table_one() because fts_optimize_thread
wasn't even started. */
mutex_enter(&dict_sys->mutex);
for (table = UT_LIST_GET_FIRST(dict_sys->table_LRU);
table != NULL;
table = UT_LIST_GET_NEXT(table_LRU, table)) {
if (table->fts &&
dict_table_has_fts_index(table)) {
if (fts_optimize_new_table(table)){
table_vector.push_back(table);
}
for (dict_table_t* table = UT_LIST_GET_FIRST(dict_sys->table_LRU);
table != NULL;
table = UT_LIST_GET_NEXT(table_LRU, table)) {
if (!table->fts || !dict_table_has_fts_index(table)) {
continue;
}
}
/* It is better to call dict_table_prevent_eviction()
outside the above loop because it operates on
dict_sys->table_LRU list.*/
for (it=table_vector.begin();it!=table_vector.end();++it) {
dict_table_prevent_eviction(*it);
/* fts_optimize_thread is not started yet. So there is no
need to acquire fts_optimize_wq->mutex for adding the fts
table to the fts slots. */
ut_ad(!table->can_be_evicted);
fts_optimize_new_table(table);
table->fts->in_queue = true;
}
mutex_exit(&dict_sys->mutex);
table_vector.clear();
fts_opt_shutdown_event = os_event_create(0);
last_check_sync_time = time(NULL);
......
......@@ -320,9 +320,6 @@ class fts_t {
/** Mutex protecting bg_threads* and fts_add_wq. */
ib_mutex_t bg_threads_mutex;
/** Whether the table was added to fts_optimize_wq();
protected by bg_threads_mutex */
unsigned in_queue:1;
/** Whether the ADDED table record sync-ed after
crash recovery; protected by bg_threads_mutex */
unsigned added_synced:1;
......@@ -348,6 +345,10 @@ class fts_t {
/** Vector of FTS indexes, this is mainly for caching purposes. */
ib_vector_t* indexes;
/** Whether the table exists in fts_optimize_wq;
protected by fts_optimize_wq mutex */
bool in_queue;
/** Heap for fts_t allocation. */
mem_heap_t* fts_heap;
};
......
......@@ -25,6 +25,9 @@ Created 2011-02-15 Jimmy Yang
#ifndef INNODB_FTS0OPT_H
#define INNODB_FTS0OPT_H
/** The FTS optimize thread's work queue. */
extern ib_wqueue_t* fts_optimize_wq;
/********************************************************************
Callback function to fetch the rows in an FTS INDEX record. */
ibool
......
......@@ -38,7 +38,18 @@ processing.
// Forward declaration
struct ib_list_t;
struct ib_wqueue_t;
/** Work queue */
struct ib_wqueue_t
{
/** Mutex protecting everything */
ib_mutex_t mutex;
/** Work item list */
ib_list_t* items;
/** event we use to signal additions to list;
os_event_set() and os_event_reset() are protected by the mutex */
os_event_t event;
};
/****************************************************************//**
Create a new work queue.
......@@ -54,15 +65,14 @@ ib_wqueue_free(
/*===========*/
ib_wqueue_t* wq); /*!< in: work queue */
/****************************************************************//**
Add a work item to the queue. */
/** Add a work item to the queue.
@param[in,out] wq work queue
@param[in] item work item
@param[in,out] heap memory heap to use for allocating list node
@param[in] wq_locked work queue mutex locked */
void
ib_wqueue_add(
/*==========*/
ib_wqueue_t* wq, /*!< in: work queue */
void* item, /*!< in: work item */
mem_heap_t* heap); /*!< in: memory heap to use for
allocating the list node */
ib_wqueue_add(ib_wqueue_t* wq, void* item, mem_heap_t* heap,
bool wq_locked = false);
/** Check if queue is empty.
@param wq wait queue
......@@ -101,5 +111,4 @@ ib_wqueue_len(
/*==========*/
ib_wqueue_t* wq); /*<! in: work queue */
#endif /* IB_WORK_QUEUE_H */
......@@ -28,15 +28,6 @@ A work queue
Created 4/26/2006 Osku Salerma
************************************************************************/
/* Work queue. */
struct ib_wqueue_t {
ib_mutex_t mutex; /*!< mutex protecting everything */
ib_list_t* items; /*!< work item list */
os_event_t event; /*!< event we use to signal additions to list;
os_event_set() and os_event_reset() are
protected by ib_wqueue_t::mutex */
};
/****************************************************************//**
Create a new work queue.
@return work queue */
......@@ -72,22 +63,24 @@ ib_wqueue_free(
ut_free(wq);
}
/****************************************************************//**
Add a work item to the queue. */
/** Add a work item to the queue.
@param[in,out] wq work queue
@param[in] item work item
@param[in,out] heap memory heap to use for allocating list node
@param[in] wq_locked work queue mutex locked */
void
ib_wqueue_add(
/*==========*/
ib_wqueue_t* wq, /*!< in: work queue */
void* item, /*!< in: work item */
mem_heap_t* heap) /*!< in: memory heap to use for allocating the
list node */
ib_wqueue_add(ib_wqueue_t* wq, void* item, mem_heap_t* heap, bool wq_locked)
{
mutex_enter(&wq->mutex);
if (!wq_locked) {
mutex_enter(&wq->mutex);
}
ib_list_add_last(wq->items, item, heap);
os_event_set(wq->event);
mutex_exit(&wq->mutex);
if (!wq_locked) {
mutex_exit(&wq->mutex);
}
}
/****************************************************************//**
......
......@@ -46,6 +46,7 @@ Created 4/24/1996 Heikki Tuuri
#include "dict0priv.h"
#include "ha_prototypes.h" /* innobase_casedn_str() */
#include "fts0priv.h"
#include "fts0opt.h"
/** Following are the InnoDB system tables. The positions in
this array are referenced by enum dict_system_table_id. */
......@@ -2570,8 +2571,12 @@ dict_load_table(
FTS */
fts_optimize_remove_table(table);
fts_free(table);
} else {
} else if (fts_optimize_wq) {
fts_optimize_add_table(table);
} else {
/* fts_optimize_thread is not started yet.
So make the table as non-evictable from cache. */
dict_table_move_from_lru_to_non_lru(table);
}
}
......
......@@ -2739,6 +2739,10 @@ fts_cmp_set_sync_doc_id(
}
if (read_only) {
/* InnoDB stores actual synced_doc_id value + 1 in
FTS_CONFIG table. Reduce the value by 1 while reading
after startup. */
if (*doc_id) *doc_id -= 1;
goto func_exit;
}
......
......@@ -34,6 +34,7 @@ Completed 2011/7/10 Sunny and Jimmy Yang
#include "ut0wqueue.h"
#include "srv0start.h"
#include "zlib.h"
#include "fts0opt.h"
#ifndef UNIV_NONINL
#include "fts0types.ic"
......@@ -41,7 +42,7 @@ Completed 2011/7/10 Sunny and Jimmy Yang
#endif
/** The FTS optimize thread's work queue. */
static ib_wqueue_t* fts_optimize_wq;
ib_wqueue_t* fts_optimize_wq;
/** The FTS vector to store fts_slot_t */
static ib_vector_t* fts_slots;
......@@ -169,8 +170,8 @@ struct fts_encode_t {
/** We use this information to determine when to start the optimize
cycle for a table. */
struct fts_slot_t {
/** table identifier, or 0 if the slot is empty */
table_id_t table_id;
/** table, or NULL if the slot is unused */
dict_table_t* table;
/** whether this slot is being processed */
bool running;
......@@ -2456,14 +2457,7 @@ fts_optimize_table_bk(
return(DB_SUCCESS);
}
dict_table_t* table = dict_table_open_on_id(
slot->table_id, FALSE, DICT_TABLE_OP_NORMAL);
if (!table) {
slot->last_run = now;
return DB_SUCCESS;
}
dict_table_t* table = slot->table;
dberr_t error;
if (fil_table_accessible(table)
......@@ -2483,8 +2477,6 @@ fts_optimize_table_bk(
error = DB_SUCCESS;
}
dict_table_close(table, FALSE, FALSE);
return(error);
}
/*********************************************************************//**
......@@ -2627,11 +2619,13 @@ UNIV_INTERN void fts_optimize_add_table(dict_table_t* table)
msg = fts_optimize_create_msg(FTS_MSG_ADD_TABLE, table);
ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
mutex_enter(&fts_optimize_wq->mutex);
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
mutex_enter(&table->fts->bg_threads_mutex);
table->fts->in_queue = true;
mutex_exit(&table->fts->bg_threads_mutex);
mutex_exit(&fts_optimize_wq->mutex);
}
/**********************************************************************//**
......@@ -2648,7 +2642,7 @@ fts_optimize_remove_table(
fts_msg_del_t* remove;
/* if the optimize system not yet initialized, return */
if (!fts_optimize_is_init()) {
if (!fts_optimize_wq) {
return;
}
......@@ -2660,12 +2654,10 @@ fts_optimize_remove_table(
return;
}
fts_t* fts = table->fts;
mutex_enter(&fts->bg_threads_mutex);
bool is_in_optimize_queue = fts->in_queue;
mutex_exit(&fts->bg_threads_mutex);
mutex_enter(&fts_optimize_wq->mutex);
if (!is_in_optimize_queue) {
if (!table->fts->in_queue) {
mutex_exit(&fts_optimize_wq->mutex);
return;
}
......@@ -2681,15 +2673,17 @@ fts_optimize_remove_table(
remove->event = event;
msg->ptr = remove;
ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
mutex_exit(&fts_optimize_wq->mutex);
os_event_wait(event);
os_event_free(event);
mutex_enter(&fts->bg_threads_mutex);
fts->in_queue = false;
mutex_exit(&fts->bg_threads_mutex);
ut_d(mutex_enter(&fts_optimize_wq->mutex));
ut_ad(!table->fts->in_queue);
ut_d(mutex_exit(&fts_optimize_wq->mutex));
}
/** Send sync fts cache for the table.
......@@ -2700,10 +2694,9 @@ fts_optimize_request_sync_table(
dict_table_t* table)
{
fts_msg_t* msg;
table_id_t* table_id;
/* if the optimize system not yet initialized, return */
if (!fts_optimize_is_init()) {
if (!fts_optimize_wq) {
return;
}
......@@ -2715,39 +2708,36 @@ fts_optimize_request_sync_table(
return;
}
msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, NULL);
msg = fts_optimize_create_msg(FTS_MSG_SYNC_TABLE, table);
table_id = static_cast<table_id_t*>(
mem_heap_alloc(msg->heap, sizeof(table_id_t)));
*table_id = table->id;
msg->ptr = table_id;
mutex_enter(&fts_optimize_wq->mutex);
ib_wqueue_add(fts_optimize_wq, msg, msg->heap);
ib_wqueue_add(fts_optimize_wq, msg, msg->heap, true);
mutex_enter(&table->fts->bg_threads_mutex);
table->fts->in_queue = true;
mutex_exit(&table->fts->bg_threads_mutex);
mutex_exit(&fts_optimize_wq->mutex);
}
/** Add a table to fts_slots if it doesn't already exist. */
static bool fts_optimize_new_table(dict_table_t* table)
{
ut_ad(table);
ulint i;
fts_slot_t* slot;
fts_slot_t* empty = NULL;
const table_id_t table_id = table->id;
ut_ad(table_id);
/* Search for duplicates, also find a free slot if one exists. */
for (i = 0; i < ib_vector_size(fts_slots); ++i) {
slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
if (!slot->table_id) {
if (!slot->table) {
empty = slot;
} else if (slot->table_id == table_id) {
} else if (slot->table == table) {
/* Already exists in our optimize queue. */
return(FALSE);
return false;
}
}
......@@ -2756,37 +2746,36 @@ static bool fts_optimize_new_table(dict_table_t* table)
memset(slot, 0x0, sizeof(*slot));
slot->table_id = table->id;
slot->running = false;
return(TRUE);
slot->table = table;
return true;
}
/** Remove a table from fts_slots if it exists.
@param[in,out] table table to be removed from fts_slots */
static bool fts_optimize_del_table(const dict_table_t* table)
{
const table_id_t table_id = table->id;
ut_ad(table_id);
for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
fts_slot_t* slot;
slot = static_cast<fts_slot_t*>(ib_vector_get(fts_slots, i));
if (slot->table_id == table_id) {
if (slot->table == table) {
if (fts_enable_diag_print) {
ib_logf(IB_LOG_LEVEL_INFO,
"FTS Optimize Removing table %s",
table->name);
}
slot->table_id = 0;
return(TRUE);
mutex_enter(&fts_optimize_wq->mutex);
slot->table->fts->in_queue = false;
mutex_exit(&fts_optimize_wq->mutex);
slot->table = NULL;
return true;
}
}
return(FALSE);
return false;
}
/**********************************************************************//**
......@@ -2800,7 +2789,7 @@ static ulint fts_optimize_how_many()
for (ulint i = 0; i < ib_vector_size(fts_slots); ++i) {
const fts_slot_t* slot = static_cast<const fts_slot_t*>(
ib_vector_get_const(fts_slots, i));
if (slot->table_id == 0) {
if (!slot->table) {
continue;
}
......@@ -2836,22 +2825,14 @@ static bool fts_is_sync_needed()
const fts_slot_t* slot = static_cast<const fts_slot_t*>(
ib_vector_get_const(fts_slots, i));
if (slot->table_id == 0) {
continue;
}
dict_table_t* table = dict_table_open_on_id(
slot->table_id, FALSE, DICT_TABLE_OP_NORMAL);
if (!table) {
if (!slot->table) {
continue;
}
if (table->fts && table->fts->cache) {
total_memory += table->fts->cache->total_size;
if (slot->table->fts && slot->table->fts->cache) {
total_memory += slot->table->fts->cache->total_size;
}
dict_table_close(table, FALSE, FALSE);
if (total_memory > fts_max_total_cache_size) {
return(true);
}
......@@ -2861,22 +2842,16 @@ static bool fts_is_sync_needed()
}
/** Sync fts cache of a table
@param[in] table_id table id */
static void fts_optimize_sync_table(table_id_t table_id)
@param[in,out] table table to be synced */
static void fts_optimize_sync_table(dict_table_t* table)
{
if (dict_table_t* table = dict_table_open_on_id(
table_id, FALSE, DICT_TABLE_OP_NORMAL)) {
if (fil_table_accessible(table)
&& table->fts && table->fts->cache) {
fts_sync_table(table, true, false, false);
}
if (fil_table_accessible(table)
&& table->fts && table->fts->cache) {
fts_sync_table(table, true, false, false);
}
DBUG_EXECUTE_IF(
"ib_optimize_wq_hang",
DBUG_EXECUTE_IF("ib_optimize_wq_hang",
os_thread_sleep(6000000););
dict_table_close(table, FALSE, FALSE);
}
}
/**********************************************************************//**
......@@ -2918,7 +2893,7 @@ fts_optimize_thread(
ib_vector_get(fts_slots, current));
/* Handle the case of empty slots. */
if (slot->table_id) {
if (slot->table) {
slot->running = true;
fts_optimize_table_bk(slot);
}
......@@ -2978,7 +2953,7 @@ fts_optimize_thread(
os_thread_sleep(300000););
fts_optimize_sync_table(
*static_cast<table_id_t*>(msg->ptr));
static_cast<dict_table_t*>(msg->ptr));
break;
default:
......@@ -2997,8 +2972,8 @@ fts_optimize_thread(
fts_slot_t* slot = static_cast<fts_slot_t*>(
ib_vector_get(fts_slots, i));
if (table_id_t table_id = slot->table_id) {
fts_optimize_sync_table(table_id);
if (slot->table) {
fts_optimize_sync_table(slot->table);
}
}
}
......@@ -3028,24 +3003,35 @@ fts_optimize_init(void)
ut_ad(!srv_read_only_mode);
/* For now we only support one optimize thread. */
ut_a(!fts_optimize_is_init());
ut_a(!fts_optimize_wq);
fts_optimize_wq = ib_wqueue_create();
ut_a(fts_optimize_wq != NULL);
last_check_sync_time = time(NULL);
os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
}
/* Add fts tables to fts slots which could be skipped
during dict_load_table() because fts_optimize_thread
wasn't even started. */
mutex_enter(&dict_sys->mutex);
/**********************************************************************//**
Check whether the work queue is initialized.
@return TRUE if optimze queue is initialized. */
UNIV_INTERN
ibool
fts_optimize_is_init(void)
/*======================*/
{
return(fts_optimize_wq != NULL);
for (dict_table_t* table = UT_LIST_GET_FIRST(dict_sys->table_LRU);
table != NULL;
table = UT_LIST_GET_NEXT(table_LRU, table)) {
if (!table->fts || !dict_table_has_fts_index(table)) {
continue;
}
/* fts_optimize_thread is not started yet. So there is no
need to acqquire fts_optimize_wq->mutex for adding the fts
table to the fts slots. */
ut_ad(!table->can_be_evicted);
fts_optimize_new_table(table);
table->fts->in_queue = true;
}
mutex_exit(&dict_sys->mutex);
os_thread_create(fts_optimize_thread, fts_optimize_wq, NULL);
}
/**********************************************************************//**
......
......@@ -285,9 +285,6 @@ struct fts_t {
fts_add_wq. */
ib_mutex_t bg_threads_mutex;
/* Whether the table was added to fts_optimize_wq();
protected by bg_threads mutex */
unsigned in_queue:1;
/* Whether the ADDED table record sync-ed afer
crash recovery; protected by bg_threads mutex */
unsigned added_synced:1;
......@@ -310,6 +307,11 @@ struct fts_t {
ib_vector_t* indexes; /*!< Vector of FTS indexes, this is
mainly for caching purposes. */
/* Whether the table was added to fts_optimize_wq();
protected by fts_optimize_wq mutex */
bool in_queue;
mem_heap_t* fts_heap; /*!< heap for fts_t allocation */
};
......@@ -631,14 +633,6 @@ void
fts_optimize_init(void);
/*====================*/
/**********************************************************************//**
Check whether the work queue is initialized.
@return TRUE if optimze queue is initialized. */
UNIV_INTERN
ibool
fts_optimize_is_init(void);
/*======================*/
/****************************************************************//**
Drops index ancillary tables for a FTS index
@return DB_SUCCESS or error code */
......
......@@ -25,6 +25,9 @@ Created 2011-02-15 Jimmy Yang
#ifndef INNODB_FTS0OPT_H
#define INNODB_FTS0OPT_H
/** The FTS optimize thread's work queue. */
extern ib_wqueue_t* fts_optimize_wq;
/********************************************************************
Callback function to fetch the rows in an FTS INDEX record. */
UNIV_INTERN
......
......@@ -56,16 +56,15 @@ ib_wqueue_free(
/*===========*/
ib_wqueue_t* wq); /*!< in: work queue */
/****************************************************************//**
Add a work item to the queue. */
/** Add a work item to the queue.
@param[in,out] wq work queue
@param[in] item work item
@param[in,out] heap memory heap to use for allocating list node
@param[in] wq_locked work queue mutex locked */
UNIV_INTERN
void
ib_wqueue_add(
/*==========*/
ib_wqueue_t* wq, /*!< in: work queue */
void* item, /*!< in: work item */
mem_heap_t* heap); /*!< in: memory heap to use for allocating the
list node */
ib_wqueue_add(ib_wqueue_t* wq, void* item, mem_heap_t* heap,
bool wq_locked = false);
/********************************************************************
Check if queue is empty. */
......@@ -104,7 +103,6 @@ ib_wqueue_nowait(
/*=============*/
ib_wqueue_t* wq); /*<! in: work queue */
/********************************************************************
Get number of items on queue.
@return number of items on queue */
......@@ -113,13 +111,16 @@ ib_wqueue_len(
/*==========*/
ib_wqueue_t* wq); /*<! in: work queue */
/* Work queue. */
struct ib_wqueue_t {
ib_mutex_t mutex; /*!< mutex protecting everything */
ib_list_t* items; /*!< work item list */
os_event_t event; /*!< event we use to signal additions to list;
os_event_set() and os_event_reset() are
protected by ib_wqueue_t::mutex */
/** Work queue */
struct ib_wqueue_t
{
/** Mutex protecting everything */
ib_mutex_t mutex;
/** Work item list */
ib_list_t* items;
/** event we use to signal additions to list;
os_event_set() and os_event_reset() are protected by the mutex */
os_event_t event;
};
#endif
......@@ -60,23 +60,25 @@ ib_wqueue_free(
mem_free(wq);
}
/****************************************************************//**
Add a work item to the queue. */
/** Add a work item to the queue.
@param[in,out] wq work queue
@param[in] item work item
@param[in,out] heap memory heap to use for allocating list node
@param[in] wq_locked work queue mutex locked */
UNIV_INTERN
void
ib_wqueue_add(
/*==========*/
ib_wqueue_t* wq, /*!< in: work queue */
void* item, /*!< in: work item */
mem_heap_t* heap) /*!< in: memory heap to use for allocating the
list node */
ib_wqueue_add(ib_wqueue_t* wq, void* item, mem_heap_t* heap, bool wq_locked)
{
mutex_enter(&wq->mutex);
if (!wq_locked) {
mutex_enter(&wq->mutex);
}
ib_list_add_last(wq->items, item, heap);
os_event_set(wq->event);
mutex_exit(&wq->mutex);
if (!wq_locked) {
mutex_exit(&wq->mutex);
}
}
/****************************************************************//**
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment