Commit 15735b24 authored by brian@avenger.(none)'s avatar brian@avenger.(none)

Adding support for archive to handle delayed inserts.

parent 71f31d1c
...@@ -305,6 +305,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, TABLE *table) ...@@ -305,6 +305,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, TABLE *table)
share->use_count= 0; share->use_count= 0;
share->table_name_length= length; share->table_name_length= length;
share->table_name= tmp_name; share->table_name= tmp_name;
share->delayed= FALSE;
fn_format(share->data_file_name,table_name,"",ARZ,MY_REPLACE_EXT|MY_UNPACK_FILENAME); fn_format(share->data_file_name,table_name,"",ARZ,MY_REPLACE_EXT|MY_UNPACK_FILENAME);
fn_format(meta_file_name,table_name,"",ARM,MY_REPLACE_EXT|MY_UNPACK_FILENAME); fn_format(meta_file_name,table_name,"",ARM,MY_REPLACE_EXT|MY_UNPACK_FILENAME);
strmov(share->table_name,table_name); strmov(share->table_name,table_name);
...@@ -536,7 +537,11 @@ int ha_archive::write_row(byte * buf) ...@@ -536,7 +537,11 @@ int ha_archive::write_row(byte * buf)
pthread_mutex_lock(&share->mutex); pthread_mutex_lock(&share->mutex);
written= gzwrite(share->archive_write, buf, table->reclength); written= gzwrite(share->archive_write, buf, table->reclength);
DBUG_PRINT("ha_archive::get_row", ("Wrote %d bytes expected %d", written, table->reclength)); DBUG_PRINT("ha_archive::get_row", ("Wrote %d bytes expected %d", written, table->reclength));
if (!delayed_insert)
share->dirty= TRUE; share->dirty= TRUE;
else
share->delayed= TRUE;
if (written != table->reclength) if (written != table->reclength)
goto error; goto error;
/* /*
...@@ -594,6 +599,7 @@ int ha_archive::rnd_init(bool scan) ...@@ -594,6 +599,7 @@ int ha_archive::rnd_init(bool scan)
{ {
gzflush(share->archive_write, Z_SYNC_FLUSH); gzflush(share->archive_write, Z_SYNC_FLUSH);
share->dirty= FALSE; share->dirty= FALSE;
share->delayed= FALSE;
} }
pthread_mutex_unlock(&share->mutex); pthread_mutex_unlock(&share->mutex);
} }
...@@ -628,9 +634,12 @@ int ha_archive::get_row(gzFile file_to_read, byte *buf) ...@@ -628,9 +634,12 @@ int ha_archive::get_row(gzFile file_to_read, byte *buf)
if (read == 0) if (read == 0)
DBUG_RETURN(HA_ERR_END_OF_FILE); DBUG_RETURN(HA_ERR_END_OF_FILE);
/* If the record is the wrong size, the file is probably damaged */ /* If the record is the wrong size, the file is probably damaged, unless
we are dealing with a delayed insert. In that case we can assume the file is ok,
but our row count doesn't match our data since the file has not been flushed.
*/
if ((ulong) read != table->reclength) if ((ulong) read != table->reclength)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(share->delayed ? HA_ERR_END_OF_FILE : HA_ERR_CRASHED_ON_USAGE);
/* Calculate blob length, we use this for our buffer */ /* Calculate blob length, we use this for our buffer */
for (field=table->blob_field; *field ; field++) for (field=table->blob_field; *field ; field++)
...@@ -648,7 +657,7 @@ int ha_archive::get_row(gzFile file_to_read, byte *buf) ...@@ -648,7 +657,7 @@ int ha_archive::get_row(gzFile file_to_read, byte *buf)
{ {
read= gzread(file_to_read, last, size); read= gzread(file_to_read, last, size);
if ((size_t) read != size) if ((size_t) read != size)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(share->delayed ? HA_ERR_END_OF_FILE : HA_ERR_CRASHED_ON_USAGE);
(*field)->set_ptr(size, last); (*field)->set_ptr(size, last);
last += size; last += size;
} }
...@@ -839,6 +848,11 @@ THR_LOCK_DATA **ha_archive::store_lock(THD *thd, ...@@ -839,6 +848,11 @@ THR_LOCK_DATA **ha_archive::store_lock(THD *thd,
THR_LOCK_DATA **to, THR_LOCK_DATA **to,
enum thr_lock_type lock_type) enum thr_lock_type lock_type)
{ {
if (lock_type == TL_WRITE_DELAYED)
delayed_insert= TRUE;
else
delayed_insert= FALSE;
if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK) if (lock_type != TL_IGNORE && lock.type == TL_UNLOCK)
{ {
/* /*
......
...@@ -36,6 +36,7 @@ typedef struct st_archive_share { ...@@ -36,6 +36,7 @@ typedef struct st_archive_share {
gzFile archive_write; /* Archive file we are working with */ gzFile archive_write; /* Archive file we are working with */
bool dirty; /* Flag for if a flush should occur */ bool dirty; /* Flag for if a flush should occur */
ulonglong rows_recorded; /* Number of rows in tables */ ulonglong rows_recorded; /* Number of rows in tables */
bool delayed; /* If a delayed insert has happened since opena */
} ARCHIVE_SHARE; } ARCHIVE_SHARE;
/* /*
...@@ -53,9 +54,10 @@ class ha_archive: public handler ...@@ -53,9 +54,10 @@ class ha_archive: public handler
byte byte_buffer[IO_SIZE]; /* Initial buffer for our string */ byte byte_buffer[IO_SIZE]; /* Initial buffer for our string */
String buffer; /* Buffer used for blob storage */ String buffer; /* Buffer used for blob storage */
ulonglong scan_rows; /* Number of rows left in scan */ ulonglong scan_rows; /* Number of rows left in scan */
bool delayed_insert; /* If the insert is delayed */
public: public:
ha_archive(TABLE *table): handler(table) ha_archive(TABLE *table): handler(table), delayed_insert(0)
{ {
/* Set our original buffer from pre-allocated memory */ /* Set our original buffer from pre-allocated memory */
buffer.set(byte_buffer, IO_SIZE, system_charset_info); buffer.set(byte_buffer, IO_SIZE, system_charset_info);
...@@ -72,7 +74,7 @@ public: ...@@ -72,7 +74,7 @@ public:
ulong table_flags() const ulong table_flags() const
{ {
return (HA_REC_NOT_IN_SEQ | HA_NOT_EXACT_COUNT | HA_NO_AUTO_INCREMENT | return (HA_REC_NOT_IN_SEQ | HA_NOT_EXACT_COUNT | HA_NO_AUTO_INCREMENT |
HA_FILE_BASED); HA_FILE_BASED | HA_CAN_INSERT_DELAYED);
} }
ulong index_flags(uint idx, uint part, bool all_parts) const ulong index_flags(uint idx, uint part, bool all_parts) const
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment