Commit 1ff23d0c authored by Monty's avatar Monty

Fixed bug in free locks that caused rows in cache to not be properly flushed

Fixed by doing adding external_lock(F_UNLCK) in free_locks.
I also moved unlock of cache_handler to be after changing lock type of
cached tables to ensure that no one can use cached table while this is
happening (as cache table is locked with write lock).
I also fixed a wrong mutex order bug in ha_cache::flush_insert_cache()

Other things:
- Addded share::cached_rows to track inserted rows. This is only used
  for asserts to check the number of rows in the cache.
- Fixed wrong mysql_file_chsize() in case of repair
parent 36590b33
......@@ -158,7 +158,10 @@ my_bool cache_start_trans(void* param)
{
ha_cache *cache= (ha_cache*) param;
if (!cache->insert_command)
{
cache->free_locks();
return 0;
}
return (*cache->share->org_lock.start_trans)(cache->cache_handler->file);
}
......@@ -207,7 +210,7 @@ static mysql_mutex_t LOCK_cache_share;
Find or create a share
*/
ha_cache_share *find_cache_share(const char *name)
ha_cache_share *find_cache_share(const char *name, ulonglong cached_rows)
{
ha_cache_share *pos, *share;
mysql_mutex_lock(&LOCK_cache_share);
......@@ -229,6 +232,7 @@ ha_cache_share *find_cache_share(const char *name)
}
share->name= (char*) (share+1);
share->open_count= 1;
share->cached_rows= cached_rows;
strmov((char*) share->name, name);
share->next= cache_share_list;
cache_share_list= share;
......@@ -267,6 +271,7 @@ ha_cache::ha_cache(handlerton *hton, TABLE_SHARE *table_arg, MEM_ROOT *mem_root)
cache_handler= new (mem_root) ha_maria(maria_hton, table_arg);
share= 0;
lock_counter= 0;
cache_locked= 0;
}
......@@ -325,7 +330,7 @@ int ha_cache::open(const char *name, int mode, uint open_flags)
if ((error= cache_handler->open(cache_name, mode, open_flags)))
DBUG_RETURN(error);
if (!(share= find_cache_share(name)))
if (!(share= find_cache_share(name, cache_handler->file->state->records)))
{
cache_handler->close();
DBUG_RETURN(ER_OUTOFMEMORY);
......@@ -336,6 +341,11 @@ int ha_cache::open(const char *name, int mode, uint open_flags)
if (lock->get_status != &get_status_and_flush_cache)
{
mysql_mutex_lock(&cache_handler->file->s->intern_lock);
/* The following lock is here just to establish mutex locking order */
mysql_mutex_lock(&lock->mutex);
mysql_mutex_unlock(&lock->mutex);
if (lock->get_status != &get_status_and_flush_cache)
{
/* Remember original lock. Used by the THR_lock cache functions */
......@@ -433,7 +443,12 @@ int ha_cache::external_lock(THD *thd, int lock_type)
if (lock_type == F_UNLCK)
{
int error2;
error= cache_handler->external_lock(thd, lock_type);
error= 0;
if (cache_locked)
{
error= cache_handler->external_lock(thd, lock_type);
cache_locked= 0;
}
if ((error2= parent::external_lock(thd, lock_type)))
error= error2;
DBUG_RETURN(error);
......@@ -449,6 +464,7 @@ int ha_cache::external_lock(THD *thd, int lock_type)
error= cache_handler->external_lock(thd, F_UNLCK);
DBUG_RETURN(error);
}
cache_locked= 1;
DBUG_RETURN(0);
}
......@@ -495,6 +511,7 @@ int ha_cache::delete_all_rows(void)
error= cache_handler->delete_all_rows();
if ((error2= parent::delete_all_rows()))
error= error2;
share->cached_rows= 0;
DBUG_RETURN(error);
}
......@@ -513,7 +530,7 @@ bool ha_cache::is_crashed() const
In the case of 1) we don't want to run repair on both tables as
the repair can be a slow process. Instead we only run repair
on the crashed tables. If not tables are marked crashed, we
on the crashed tables. If no tables are marked crashed, we
run repair on both tables.
Repair on the cache table will delete the part of the cache that was
......@@ -533,12 +550,14 @@ int ha_cache::repair(THD *thd, HA_CHECK_OPT *check_opt)
{
/* Delete everything that was not already committed */
mysql_file_chsize(cache_handler->file->dfile.file,
cache_handler->file->s->state.state.key_file_length,
cache_handler->file->s->state.state.data_file_length,
0, MYF(MY_WME));
mysql_file_chsize(cache_handler->file->s->kfile.file,
cache_handler->file->s->state.state.data_file_length,
cache_handler->file->s->state.state.key_file_length,
0, MYF(MY_WME));
check_opt->flags|= T_AUTO_REPAIR;
error= cache_handler->repair(thd, check_opt);
share->cached_rows= cache_handler->file->state->records;
}
if (parent::is_crashed() || !something_crashed)
if ((error2= parent::repair(thd, check_opt)))
......@@ -554,7 +573,11 @@ int ha_cache::repair(THD *thd, HA_CHECK_OPT *check_opt)
int ha_cache::write_row(const uchar *buf)
{
if (insert_command)
{
DBUG_ASSERT(share->cached_rows == cache_handler->file->state->records);
share->cached_rows++;
return cache_handler->write_row(buf);
}
return parent::write_row(buf);
}
......@@ -664,15 +687,20 @@ bool ha_cache::rows_cached()
void ha_cache::free_locks()
{
/* We don't need to lock cache_handler anymore as it's already flushed */
thr_unlock(&cache_handler->file->lock, 0);
/* Restart transaction for columnstore table */
if (original_lock_type != F_WRLCK)
{
/* CVS engine need to have get_status() done before unlock */
if (extern_get_status)
extern_get_status(this, 0);
parent::external_lock(table->in_use, F_UNLCK);
parent::external_lock(table->in_use, original_lock_type);
}
/* We don't need to lock cache_handler anymore as it's already flushed */
cache_handler->external_lock(table->in_use, F_UNLCK);
thr_unlock(&cache_handler->file->lock, 0);
cache_locked= false;
}
......@@ -689,19 +717,25 @@ int ha_cache::flush_insert_cache()
ha_maria *from= cache_handler;
parent *to= this;
uchar *record= to->table->record[0];
ulonglong copied_rows= 0;
DBUG_ENTER("flush_insert_cache");
if (extern_get_status)
extern_get_status(to, 0);
DBUG_ASSERT(from->file->state->records == share->cached_rows);
to->start_bulk_insert(from->file->state->records, 0);
from->rnd_init(1);
while (!(error= from->rnd_next(record)))
{
copied_rows++;
if ((error= to->write_row(record)))
goto end;
}
if (error == HA_ERR_END_OF_FILE)
error= 0;
DBUG_ASSERT(copied_rows == share->cached_rows);
end:
from->rnd_end();
......@@ -722,13 +756,24 @@ int ha_cache::flush_insert_cache()
to->ht->rollback(to->ht, table->in_use, 1);
}
DBUG_ASSERT(error == 0);
if (!error)
{
/*
Everything when fine, delete all rows from the cache and allow others
to use it.
*/
from->delete_all_rows();
{
/*
We have to unlock the lock mutex as otherwise we get a conflict in
mutex order. This is fine as we have a write lock on the mutex,
so we will be able to get it again
*/
mysql_mutex_unlock(&from->file->s->lock.mutex);
from->delete_all_rows();
share->cached_rows= 0;
mysql_mutex_lock(&from->file->s->lock.mutex);
}
}
DBUG_RETURN(error);
}
......@@ -27,8 +27,10 @@ class ha_cache_share
const char *name;
uint open_count;
public:
ulonglong cached_rows;
THR_LOCK org_lock;
friend ha_cache_share *find_cache_share(const char *name);
friend ha_cache_share *find_cache_share(const char *name,
ulonglong cached_rows);
void close();
};
......@@ -37,7 +39,7 @@ class ha_cache :public ha_tina
{
typedef ha_tina parent;
int original_lock_type;
bool insert_command;
bool insert_command, cache_locked;
public:
uint lock_counter;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment