Commit 54bfd8b1 authored by unknown's avatar unknown

Fixed locking issues around flushes.


storage/archive/ha_archive.cc:
  Added mutex around flush calls to make read's consistent (and not conflicting)
storage/archive/ha_archive.h:
  Fixed issues for fast count(*) call
parent b05c86bc
...@@ -436,6 +436,9 @@ int ha_archive::init_archive_writer() ...@@ -436,6 +436,9 @@ int ha_archive::init_archive_writer()
} }
/*
No locks are required because it is associated with just one handler instance
*/
int ha_archive::init_archive_reader() int ha_archive::init_archive_reader()
{ {
DBUG_ENTER("ha_archive::init_archive_reader"); DBUG_ENTER("ha_archive::init_archive_reader");
...@@ -794,15 +797,16 @@ int ha_archive::write_row(uchar *buf) ...@@ -794,15 +797,16 @@ int ha_archive::write_row(uchar *buf)
if (share->crashed) if (share->crashed)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
if (!share->archive_write_open)
if (init_archive_writer())
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
ha_statistic_increment(&SSV::ha_write_count); ha_statistic_increment(&SSV::ha_write_count);
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT) if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
table->timestamp_field->set_time(); table->timestamp_field->set_time();
pthread_mutex_lock(&share->mutex); pthread_mutex_lock(&share->mutex);
if (!share->archive_write_open)
if (init_archive_writer())
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
if (table->next_number_field && record == table->record[0]) if (table->next_number_field && record == table->record[0])
{ {
KEY *mkey= &table->s->key_info[0]; // We only support one key right now KEY *mkey= &table->s->key_info[0]; // We only support one key right now
...@@ -992,24 +996,6 @@ int ha_archive::rnd_init(bool scan) ...@@ -992,24 +996,6 @@ int ha_archive::rnd_init(bool scan)
{ {
DBUG_PRINT("info", ("archive will retrieve %llu rows", DBUG_PRINT("info", ("archive will retrieve %llu rows",
(unsigned long long) scan_rows)); (unsigned long long) scan_rows));
stats.records= 0;
/*
If dirty, we lock, and then reset/flush the data.
I found that just calling azflush() doesn't always work.
*/
pthread_mutex_lock(&share->mutex);
scan_rows= share->rows_recorded;
if (share->dirty == TRUE)
{
if (share->dirty == TRUE)
{
DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
azflush(&(share->archive_write), Z_SYNC_FLUSH);
share->dirty= FALSE;
}
}
pthread_mutex_unlock(&share->mutex);
if (read_data_header(&archive)) if (read_data_header(&archive))
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
...@@ -1223,9 +1209,7 @@ int ha_archive::rnd_next(uchar *buf) ...@@ -1223,9 +1209,7 @@ int ha_archive::rnd_next(uchar *buf)
current_position= aztell(&archive); current_position= aztell(&archive);
rc= get_row(&archive, buf); rc= get_row(&archive, buf);
table->status=rc ? STATUS_NOT_FOUND: 0;
if (rc != HA_ERR_END_OF_FILE)
stats.records++;
DBUG_RETURN(rc); DBUG_RETURN(rc);
} }
...@@ -1461,12 +1445,33 @@ void ha_archive::update_create_info(HA_CREATE_INFO *create_info) ...@@ -1461,12 +1445,33 @@ void ha_archive::update_create_info(HA_CREATE_INFO *create_info)
int ha_archive::info(uint flag) int ha_archive::info(uint flag)
{ {
DBUG_ENTER("ha_archive::info"); DBUG_ENTER("ha_archive::info");
/*
If dirty, we lock, and then reset/flush the data.
I found that just calling azflush() doesn't always work.
*/
pthread_mutex_lock(&share->mutex);
if (share->dirty == TRUE)
{
if (share->dirty == TRUE)
{
DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
azflush(&(share->archive_write), Z_SYNC_FLUSH);
share->dirty= FALSE;
}
}
/* /*
This should be an accurate number now, though bulk and delayed inserts can This should be an accurate number now, though bulk and delayed inserts can
cause the number to be inaccurate. cause the number to be inaccurate.
*/ */
stats.records= share->rows_recorded; stats.records= share->rows_recorded;
pthread_mutex_unlock(&share->mutex);
scan_rows= stats.records;
stats.deleted= 0; stats.deleted= 0;
DBUG_PRINT("ha_archive", ("Stats rows is %d\n", (int)stats.records));
/* Costs quite a bit more to get all information */ /* Costs quite a bit more to get all information */
if (flag & HA_STATUS_TIME) if (flag & HA_STATUS_TIME)
{ {
...@@ -1486,7 +1491,9 @@ int ha_archive::info(uint flag) ...@@ -1486,7 +1491,9 @@ int ha_archive::info(uint flag)
if (flag & HA_STATUS_AUTO) if (flag & HA_STATUS_AUTO)
{ {
init_archive_reader(); init_archive_reader();
pthread_mutex_lock(&share->mutex);
azflush(&archive, Z_SYNC_FLUSH); azflush(&archive, Z_SYNC_FLUSH);
pthread_mutex_unlock(&share->mutex);
stats.auto_increment_value= archive.auto_increment; stats.auto_increment_value= archive.auto_increment;
} }
...@@ -1554,7 +1561,9 @@ int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1554,7 +1561,9 @@ int ha_archive::check(THD* thd, HA_CHECK_OPT* check_opt)
old_proc_info= thd_proc_info(thd, "Checking table"); old_proc_info= thd_proc_info(thd, "Checking table");
/* Flush any waiting data */ /* Flush any waiting data */
pthread_mutex_lock(&share->mutex);
azflush(&(share->archive_write), Z_SYNC_FLUSH); azflush(&(share->archive_write), Z_SYNC_FLUSH);
pthread_mutex_unlock(&share->mutex);
/* /*
Now we will rewind the archive file so that we are positioned at the Now we will rewind the archive file so that we are positioned at the
......
...@@ -88,6 +88,8 @@ public: ...@@ -88,6 +88,8 @@ public:
{ {
return (HA_NO_TRANSACTIONS | HA_REC_NOT_IN_SEQ | HA_CAN_BIT_FIELD | return (HA_NO_TRANSACTIONS | HA_REC_NOT_IN_SEQ | HA_CAN_BIT_FIELD |
HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE | HA_BINLOG_ROW_CAPABLE | HA_BINLOG_STMT_CAPABLE |
HA_STATS_RECORDS_IS_EXACT |
HA_HAS_RECORDS |
HA_FILE_BASED | HA_CAN_INSERT_DELAYED | HA_CAN_GEOMETRY); HA_FILE_BASED | HA_CAN_INSERT_DELAYED | HA_CAN_GEOMETRY);
} }
ulong index_flags(uint idx, uint part, bool all_parts) const ulong index_flags(uint idx, uint part, bool all_parts) const
...@@ -101,6 +103,7 @@ public: ...@@ -101,6 +103,7 @@ public:
uint max_supported_keys() const { return 1; } uint max_supported_keys() const { return 1; }
uint max_supported_key_length() const { return sizeof(ulonglong); } uint max_supported_key_length() const { return sizeof(ulonglong); }
uint max_supported_key_part_length() const { return sizeof(ulonglong); } uint max_supported_key_part_length() const { return sizeof(ulonglong); }
ha_rows records() { return share->rows_recorded; }
int index_init(uint keynr, bool sorted); int index_init(uint keynr, bool sorted);
virtual int index_read(uchar * buf, const uchar * key, virtual int index_read(uchar * buf, const uchar * key,
uint key_len, enum ha_rkey_function find_flag); uint key_len, enum ha_rkey_function find_flag);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment