Commit 9f726d99 authored by unknown's avatar unknown

Changes to support optimized bulk inserts. This will solve the problem of...

Changes to support optimized bulk inserts. This will solve the problem of selects occurring during a bulk insert, and causing the compression factor to drop. 


mysql-test/r/archive.result:
  Updated result set for changes amde to support delayed insert and bulk insert optimizations.
mysql-test/t/archive.test:
  Test cases to test bulk insert statements and delayed insert syntax.
sql/examples/ha_archive.cc:
  Added methods needed for bulk operations to be optimized. Bulk inserts now do not trigger the file as dirty until they are complete. A normal insert though can still cause them to be synced though since it is not effected by the flag. Removed share variable to determine if row count is right or not (it should not be treated as an upper end estimate).
sql/examples/ha_archive.h:
  Removed share variable for delayed flag since it is no longer needed. 
  Added flag in table instance to know if a bulk insert is happening.
  
  Added method declarations for bulk insert optimizations.
parent 30c54408
This diff is collapsed.
...@@ -1299,4 +1299,15 @@ INSERT INTO t2 VALUES (4,011403,37,'intercepted','audiology','tinily',''); ...@@ -1299,4 +1299,15 @@ INSERT INTO t2 VALUES (4,011403,37,'intercepted','audiology','tinily','');
SELECT * FROM t2; SELECT * FROM t2;
OPTIMIZE TABLE t2; OPTIMIZE TABLE t2;
SELECT * FROM t2; SELECT * FROM t2;
#
# Test bulk inserts
INSERT INTO t2 VALUES (1,000001,00,'Omaha','teethe','neat','') , (2,011401,37,'breaking','dreaded','Steinberg','W') , (3,011402,37,'Romans','scholastics','jarring','') , (4,011403,37,'intercepted','audiology','tinily','');
SELECT * FROM t2;
# Just test syntax, we will never know if the out put is right or wrong
INSERT DELAYED INTO t2 VALUES (4,011403,37,'intercepted','audiology','tinily','');
#
# Cleanup, test is over
#
drop table t1, t2; drop table t1, t2;
...@@ -305,7 +305,6 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, TABLE *table) ...@@ -305,7 +305,6 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, TABLE *table)
share->use_count= 0; share->use_count= 0;
share->table_name_length= length; share->table_name_length= length;
share->table_name= tmp_name; share->table_name= tmp_name;
share->delayed= FALSE;
fn_format(share->data_file_name,table_name,"",ARZ,MY_REPLACE_EXT|MY_UNPACK_FILENAME); fn_format(share->data_file_name,table_name,"",ARZ,MY_REPLACE_EXT|MY_UNPACK_FILENAME);
fn_format(meta_file_name,table_name,"",ARM,MY_REPLACE_EXT|MY_UNPACK_FILENAME); fn_format(meta_file_name,table_name,"",ARM,MY_REPLACE_EXT|MY_UNPACK_FILENAME);
strmov(share->table_name,table_name); strmov(share->table_name,table_name);
...@@ -536,11 +535,9 @@ int ha_archive::write_row(byte * buf) ...@@ -536,11 +535,9 @@ int ha_archive::write_row(byte * buf)
table->timestamp_field->set_time(); table->timestamp_field->set_time();
pthread_mutex_lock(&share->mutex); pthread_mutex_lock(&share->mutex);
written= gzwrite(share->archive_write, buf, table->reclength); written= gzwrite(share->archive_write, buf, table->reclength);
DBUG_PRINT("ha_archive::get_row", ("Wrote %d bytes expected %d", written, table->reclength)); DBUG_PRINT("ha_archive::write_row", ("Wrote %d bytes expected %d", written, table->reclength));
if (!delayed_insert) if (!delayed_insert || !bulk_insert)
share->dirty= TRUE; share->dirty= TRUE;
else
share->delayed= TRUE;
if (written != table->reclength) if (written != table->reclength)
goto error; goto error;
...@@ -599,7 +596,6 @@ int ha_archive::rnd_init(bool scan) ...@@ -599,7 +596,6 @@ int ha_archive::rnd_init(bool scan)
{ {
gzflush(share->archive_write, Z_SYNC_FLUSH); gzflush(share->archive_write, Z_SYNC_FLUSH);
share->dirty= FALSE; share->dirty= FALSE;
share->delayed= FALSE;
} }
pthread_mutex_unlock(&share->mutex); pthread_mutex_unlock(&share->mutex);
} }
...@@ -634,12 +630,12 @@ int ha_archive::get_row(gzFile file_to_read, byte *buf) ...@@ -634,12 +630,12 @@ int ha_archive::get_row(gzFile file_to_read, byte *buf)
if (read == 0) if (read == 0)
DBUG_RETURN(HA_ERR_END_OF_FILE); DBUG_RETURN(HA_ERR_END_OF_FILE);
/* If the record is the wrong size, the file is probably damaged, unless /*
we are dealing with a delayed insert. In that case we can assume the file is ok, If the record is the wrong size, the file is probably damaged, unless
but our row count doesn't match our data since the file has not been flushed. we are dealing with a delayed insert or a bulk insert.
*/ */
if ((ulong) read != table->reclength) if ((ulong) read != table->reclength)
DBUG_RETURN(share->delayed ? HA_ERR_END_OF_FILE : HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(HA_ERR_END_OF_FILE);
/* Calculate blob length, we use this for our buffer */ /* Calculate blob length, we use this for our buffer */
for (field=table->blob_field; *field ; field++) for (field=table->blob_field; *field ; field++)
...@@ -657,7 +653,7 @@ int ha_archive::get_row(gzFile file_to_read, byte *buf) ...@@ -657,7 +653,7 @@ int ha_archive::get_row(gzFile file_to_read, byte *buf)
{ {
read= gzread(file_to_read, last, size); read= gzread(file_to_read, last, size);
if ((size_t) read != size) if ((size_t) read != size)
DBUG_RETURN(share->delayed ? HA_ERR_END_OF_FILE : HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(HA_ERR_END_OF_FILE);
(*field)->set_ptr(size, last); (*field)->set_ptr(size, last);
last += size; last += size;
} }
...@@ -886,14 +882,47 @@ THR_LOCK_DATA **ha_archive::store_lock(THD *thd, ...@@ -886,14 +882,47 @@ THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
return to; return to;
} }
/*
Hints for optimizer, see ha_tina for more information
*/
void ha_archive::info(uint flag) void ha_archive::info(uint flag)
{ {
DBUG_ENTER("ha_archive::info"); DBUG_ENTER("ha_archive::info");
/* This is a lie, but you don't want the optimizer to see zero or 1 */ /*
This should be an accurate number now, though bulk and delayed inserts can
cause the number to be inaccurate.
*/
records= share->rows_recorded; records= share->rows_recorded;
deleted= 0; deleted= 0;
DBUG_VOID_RETURN; DBUG_VOID_RETURN;
} }
/*
This method tells us that a bulk insert operation is about to occur. We set
a flag which will keep write_row from saying that its data is dirty. This in
turn will keep selects from causing a sync to occur.
Basically, yet another optimizations to keep compression working well.
*/
void ha_archive::start_bulk_insert(ha_rows rows)
{
DBUG_ENTER("ha_archive::info");
bulk_insert= TRUE;
DBUG_VOID_RETURN;
}
/*
Other side of start_bulk_insert, is end_bulk_insert. Here we turn off the bulk insert
flag, and set the share dirty so that the next select will call sync for us.
*/
int ha_archive::end_bulk_insert()
{
bulk_insert= FALSE;
share->dirty= TRUE;
DBUG_RETURN(0);
}
#endif /* HAVE_ARCHIVE_DB */ #endif /* HAVE_ARCHIVE_DB */
...@@ -36,7 +36,6 @@ typedef struct st_archive_share { ...@@ -36,7 +36,6 @@ typedef struct st_archive_share {
gzFile archive_write; /* Archive file we are working with */ gzFile archive_write; /* Archive file we are working with */
bool dirty; /* Flag for if a flush should occur */ bool dirty; /* Flag for if a flush should occur */
ulonglong rows_recorded; /* Number of rows in tables */ ulonglong rows_recorded; /* Number of rows in tables */
bool delayed; /* If a delayed insert has happened since opena */
} ARCHIVE_SHARE; } ARCHIVE_SHARE;
/* /*
...@@ -55,9 +54,10 @@ class ha_archive: public handler ...@@ -55,9 +54,10 @@ class ha_archive: public handler
String buffer; /* Buffer used for blob storage */ String buffer; /* Buffer used for blob storage */
ulonglong scan_rows; /* Number of rows left in scan */ ulonglong scan_rows; /* Number of rows left in scan */
bool delayed_insert; /* If the insert is delayed */ bool delayed_insert; /* If the insert is delayed */
bool bulk_insert; /* If we are performing a bulk insert */
public: public:
ha_archive(TABLE *table): handler(table), delayed_insert(0) ha_archive(TABLE *table): handler(table), delayed_insert(0), bulk_insert(0)
{ {
/* Set our original buffer from pre-allocated memory */ /* Set our original buffer from pre-allocated memory */
buffer.set(byte_buffer, IO_SIZE, system_charset_info); buffer.set(byte_buffer, IO_SIZE, system_charset_info);
...@@ -99,6 +99,8 @@ class ha_archive: public handler ...@@ -99,6 +99,8 @@ class ha_archive: public handler
int external_lock(THD *thd, int lock_type); int external_lock(THD *thd, int lock_type);
int create(const char *name, TABLE *form, HA_CREATE_INFO *create_info); int create(const char *name, TABLE *form, HA_CREATE_INFO *create_info);
int optimize(THD* thd, HA_CHECK_OPT* check_opt); int optimize(THD* thd, HA_CHECK_OPT* check_opt);
void start_bulk_insert(ha_rows rows);
int end_bulk_insert();
THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to, THR_LOCK_DATA **store_lock(THD *thd, THR_LOCK_DATA **to,
enum thr_lock_type lock_type); enum thr_lock_type lock_type);
}; };
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment