Commit 185eefea authored by unknown's avatar unknown

Formailized the row buffer structure, implemented new streaming format.


mysql-test/r/archive.result:
  Added cleanup for additional tables
mysql-test/t/archive.test:
  Added cleanup for additional tables.
storage/archive/ha_archive.cc:
  Rows are now proceeded with length. Added new record buffer structure and methods.
storage/archive/ha_archive.h:
  New structure for buffer
parent b8992fd7
drop table if exists t1,t2,t3;
drop table if exists t1,t2,t3,t4,t5;
CREATE TABLE t1 (
Period smallint(4) unsigned zerofill DEFAULT '0000' NOT NULL,
Varor_period smallint(4) unsigned DEFAULT '0' NOT NULL
......
......@@ -6,7 +6,7 @@
-- source include/have_binlog_format_mixed_or_statement.inc
--disable_warnings
drop table if exists t1,t2,t3;
drop table if exists t1,t2,t3,t4,t5;
--enable_warnings
CREATE TABLE t1 (
......
......@@ -146,6 +146,11 @@ static handler *archive_create_handler(handlerton *hton,
*/
#define ARCHIVE_MIN_ROWS_TO_USE_BULK_INSERT 2
/*
Size of header used for row
*/
#define ARCHIVE_ROW_HEADER_SIZE 4
static handler *archive_create_handler(handlerton *hton,
TABLE_SHARE *table,
MEM_ROOT *mem_root)
......@@ -248,6 +253,8 @@ int ha_archive::read_data_header(azio_stream *file_to_read)
DBUG_PRINT("ha_archive::read_data_header", ("Check %u", data_buffer[0]));
DBUG_PRINT("ha_archive::read_data_header", ("Version %u", data_buffer[1]));
share->data_version= (uchar)data_buffer[1];
if ((data_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) &&
(data_buffer[1] != (uchar)ARCHIVE_VERSION))
......@@ -283,6 +290,7 @@ int ha_archive::write_data_header(azio_stream *file_to_write)
*rows will contain the current number of rows in the data file upon success.
*/
int ha_archive::read_meta_file(File meta_file, ha_rows *rows,
uint *meta_version,
ulonglong *auto_increment,
ulonglong *forced_flushes,
char *real_path)
......@@ -326,6 +334,8 @@ int ha_archive::read_meta_file(File meta_file, ha_rows *rows,
DBUG_PRINT("ha_archive::read_meta_file", ("Real Path %s", real_path));
DBUG_PRINT("ha_archive::read_meta_file", ("Dirty %d", (int)(*ptr)));
*meta_version= (uchar)meta_buffer[1];
if ((meta_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) ||
((bool)(*ptr)== TRUE))
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
......@@ -446,7 +456,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name,
VOID(pthread_mutex_init(&share->mutex,MY_MUTEX_INIT_FAST));
if ((share->meta_file= my_open(meta_file_name, O_RDWR, MYF(0))) == -1)
share->crashed= TRUE;
DBUG_PRINT("info", ("archive opening (1) up write at %s",
DBUG_PRINT("ha_archive", ("archive opening (1) up write at %s",
share->data_file_name));
/*
......@@ -454,6 +464,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name,
a write.
*/
if (read_meta_file(share->meta_file, &share->rows_recorded,
&share->meta_version,
&share->auto_increment_value,
&share->forced_flushes,
share->real_path))
......@@ -468,7 +479,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name,
thr_lock_init(&share->lock);
}
share->use_count++;
DBUG_PRINT("info", ("archive table %.*s has %d open handles now",
DBUG_PRINT("ha_archive", ("archive table %.*s has %d open handles now",
share->table_name_length, share->table_name,
share->use_count));
if (share->crashed)
......@@ -487,7 +498,7 @@ int ha_archive::free_share(ARCHIVE_SHARE *share)
{
int rc= 0;
DBUG_ENTER("ha_archive::free_share");
DBUG_PRINT("info", ("archive table %.*s has %d open handles on entrance",
DBUG_PRINT("ha_archive", ("archive table %.*s has %d open handles on entrance",
share->table_name_length, share->table_name,
share->use_count));
......@@ -539,7 +550,7 @@ int ha_archive::init_archive_writer()
if (!(azopen(&(share->archive_write), share->data_file_name,
O_WRONLY|O_APPEND|O_BINARY)))
{
DBUG_PRINT("info", ("Could not open archive write file"));
DBUG_PRINT("ha_archive", ("Could not open archive write file"));
share->crashed= TRUE;
DBUG_RETURN(1);
}
......@@ -575,7 +586,7 @@ int ha_archive::open(const char *name, int mode, uint open_options)
int rc= 0;
DBUG_ENTER("ha_archive::open");
DBUG_PRINT("info", ("archive table was opened for crash: %s",
DBUG_PRINT("ha_archive", ("archive table was opened for crash: %s",
(open_options & HA_OPEN_FOR_REPAIR) ? "yes" : "no"));
share= get_share(name, table, &rc);
......@@ -589,9 +600,17 @@ int ha_archive::open(const char *name, int mode, uint open_options)
DBUG_RETURN(rc);
}
thr_lock_data_init(&share->lock,&lock,NULL);
record_buffer= create_record_buffer(table->s->reclength);
if (!record_buffer)
{
free_share(share);
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
}
thr_lock_data_init(&share->lock, &lock, NULL);
DBUG_PRINT("info", ("archive data_file_name %s", share->data_file_name));
DBUG_PRINT("ha_archive", ("archive data_file_name %s", share->data_file_name));
if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
{
if (errno == EROFS || errno == EACCES)
......@@ -599,7 +618,7 @@ int ha_archive::open(const char *name, int mode, uint open_options)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
}
DBUG_PRINT("info", ("archive table was crashed %s",
DBUG_PRINT("ha_archive", ("archive table was crashed %s",
rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
{
......@@ -632,6 +651,8 @@ int ha_archive::close(void)
int rc= 0;
DBUG_ENTER("ha_archive::close");
destroy_record_buffer(record_buffer);
/* First close stream */
if (azclose(&archive))
rc= 1;
......@@ -676,7 +697,7 @@ int ha_archive::create(const char *name, TABLE *table_arg,
if (!(field->flags & AUTO_INCREMENT_FLAG))
{
error= -1;
DBUG_PRINT("info", ("Index error in creating archive table"));
DBUG_PRINT("ha_archive", ("Index error in creating archive table"));
goto error;
}
}
......@@ -701,7 +722,7 @@ int ha_archive::create(const char *name, TABLE *table_arg,
if (create_info->data_file_name)
{
char linkname[FN_REFLEN];
DBUG_PRINT("info", ("archive will create stream file %s",
DBUG_PRINT("ha_archive", ("archive will create stream file %s",
create_info->data_file_name));
fn_format(name_buff, create_info->data_file_name, "", ARZ,
......@@ -762,37 +783,74 @@ int ha_archive::real_write_row(byte *buf, azio_stream *writer)
{
my_off_t written;
uint *ptr, *end;
int r_pack_length;
byte size_buffer[ARCHIVE_ROW_HEADER_SIZE]; // Longest possible row length with blobs
DBUG_ENTER("ha_archive::real_write_row");
written= azwrite(writer, buf, table->s->reclength);
// We pack the row for writing
r_pack_length= pack_row(buf);
DBUG_PRINT("ha_archive",("Pack row length %d", r_pack_length));
// Store the size of the row before the row
bzero(size_buffer, ARCHIVE_ROW_HEADER_SIZE);
int4store(size_buffer, (int)r_pack_length);
DBUG_PRINT("ha_archive",("Pack %d %d %d %d", size_buffer[0], size_buffer[1], size_buffer[2], size_buffer[3]));
azwrite(writer, size_buffer, ARCHIVE_ROW_HEADER_SIZE);
written= azwrite(writer, record_buffer->buffer, r_pack_length);
DBUG_PRINT("ha_archive::real_write_row", ("Wrote %d bytes expected %d",
(uint32)written,
(uint32)table->s->reclength));
(uint32)r_pack_length));
if (!delayed_insert || !bulk_insert)
share->dirty= TRUE;
if (written != (my_off_t)table->s->reclength)
if (written != (my_off_t)r_pack_length)
DBUG_RETURN(errno ? errno : -1);
/*
We should probably mark the table as damagaged if the record is written
but the blob fails.
*/
for (ptr= table->s->blob_field, end= ptr + table->s->blob_fields ;
DBUG_RETURN(0);
}
/* Calculate max length needed for row */
int ha_archive::max_row_length(const byte *buf)
{
ulonglong length= table->s->reclength + table->s->fields*2;
uint *ptr, *end;
for (ptr= table->s->blob_field, end=ptr + table->s->blob_fields ;
ptr != end ;
ptr++)
{
char *data_ptr;
uint32 size= ((Field_blob*) table->field[*ptr])->get_length();
Field_blob *blob= ((Field_blob*) table->field[*ptr]);
length+= blob->get_length((char*) buf + blob->offset())+2;
}
if (size)
{
((Field_blob*) table->field[*ptr])->get_ptr(&data_ptr);
written= azwrite(writer, data_ptr, (unsigned)size);
if (written != (my_off_t)size)
DBUG_RETURN(errno ? errno : -1);
}
return length;
}
int ha_archive::pack_row(const byte *record)
{
byte *ptr;
DBUG_ENTER("ha_archive::pack_row");
if (table->s->blob_fields)
{
if (fix_rec_buff(max_row_length(record)))
DBUG_RETURN(HA_ERR_OUT_OF_MEM); /* purecov: inspected */
}
DBUG_RETURN(0);
/* Copy null bits */
memcpy(record_buffer->buffer, record, table->s->null_bytes);
ptr= record_buffer->buffer + table->s->null_bytes;
for (Field **field=table->field ; *field ; field++)
ptr=(byte*) (*field)->pack((char*) ptr,
(char*) record + (*field)->offset());
DBUG_RETURN((size_t) (ptr - record_buffer->buffer));
}
......@@ -809,7 +867,9 @@ int ha_archive::write_row(byte *buf)
{
int rc;
byte *read_buf= NULL;
byte *ptr;
ulonglong temp_auto;
DBUG_ENTER("ha_archive::write_row");
if (share->crashed)
......@@ -866,12 +926,6 @@ int ha_archive::write_row(byte *buf)
goto error;
}
/*
Now we read and check all of the rows.
if (!memcmp(table->next_number_field->ptr, mfield->ptr, mfield->max_length()))
if ((longlong)temp_auto ==
mfield->val_int((char*)(read_buf + mfield->offset())))
*/
Field *mfield= table->next_number_field;
while (!(get_row(&archive, read_buf)))
......@@ -899,37 +953,8 @@ int ha_archive::write_row(byte *buf)
if (init_archive_writer())
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
/*
Varchar structures are constant in size but are not cleaned up request
to request. The following sets all unused space to null to improve
compression.
*/
for (Field **field=table->field ; *field ; field++)
{
/*
Pack length will report 256 when you have 255 bytes
of data plus the single byte for length.
Probably could have added a method to say the number
of bytes taken up by field for the length data.
*/
uint32 actual_length= (*field)->data_length() +
((*field)->pack_length() > 256 ? 2 : 1);
if ((*field)->real_type() == MYSQL_TYPE_VARCHAR)
{
char *ptr= (*field)->ptr + actual_length;
DBUG_ASSERT(actual_length <= (*field)->pack_length());
uint32 to_free= (*field)->pack_length() - actual_length;
if (to_free > 0)
bzero(ptr, to_free);
}
}
share->rows_recorded++;
rc= real_write_row(buf, &(share->archive_write));
rc= real_write_row(buf, &(share->archive_write));
error:
pthread_mutex_unlock(&share->mutex);
if (read_buf)
......@@ -1054,7 +1079,7 @@ int ha_archive::rnd_init(bool scan)
if (scan)
{
scan_rows= share->rows_recorded;
DBUG_PRINT("info", ("archive will retrieve %llu rows",
DBUG_PRINT("ha_archive", ("archive will retrieve %llu rows",
(unsigned long long)scan_rows));
stats.records= 0;
......@@ -1067,7 +1092,7 @@ int ha_archive::rnd_init(bool scan)
pthread_mutex_lock(&share->mutex);
if (share->dirty == TRUE)
{
DBUG_PRINT("info", ("archive flushing out rows for scan"));
DBUG_PRINT("ha_archive", ("archive flushing out rows for scan"));
azflush(&(share->archive_write), Z_SYNC_FLUSH);
share->forced_flushes++;
share->dirty= FALSE;
......@@ -1088,16 +1113,90 @@ int ha_archive::rnd_init(bool scan)
positioned where you want it.
*/
int ha_archive::get_row(azio_stream *file_to_read, byte *buf)
{
int rc;
DBUG_ENTER("ha_archive::get_row");
if (share->data_version == ARCHIVE_VERSION)
rc= get_row_version3(file_to_read, buf);
else
rc= get_row_version2(file_to_read, buf);
DBUG_PRINT("ha_archive", ("Return %d\n", rc));
DBUG_RETURN(rc);
}
/* Reallocate buffer if needed */
bool ha_archive::fix_rec_buff(int length)
{
if (! record_buffer->buffer || length > record_buffer->length)
{
byte *newptr;
if (!(newptr=(byte*) my_realloc((gptr) record_buffer->buffer, length,
MYF(MY_ALLOW_ZERO_PTR))))
return 1; /* purecov: inspected */
record_buffer->buffer= newptr;
record_buffer->length= length;
}
return 0;
}
int ha_archive::unpack_row(azio_stream *file_to_read, char *record)
{
DBUG_ENTER("ha_archive::unpack_row");
int read; // Bytes read, azread() returns int
byte size_buffer[ARCHIVE_ROW_HEADER_SIZE];
int row_len;
/* First we grab the length stored */
read= azread(file_to_read, (byte *)size_buffer, ARCHIVE_ROW_HEADER_SIZE);
if (read == Z_STREAM_ERROR)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
/* If we read nothing we are at the end of the file */
if (read == 0 || read != ARCHIVE_ROW_HEADER_SIZE)
DBUG_RETURN(HA_ERR_END_OF_FILE);
row_len= sint4korr(size_buffer);
DBUG_PRINT("ha_archive",("Unpack row length %d -> %llu", row_len,
(unsigned long long)table->s->reclength));
fix_rec_buff(row_len);
if (azread(file_to_read, record_buffer->buffer, row_len) != row_len)
DBUG_RETURN(-1);
/* Copy null bits */
const char *ptr= (const char*) record_buffer->buffer;
memcpy(record, ptr, table->s->null_bytes);
ptr+= table->s->null_bytes;
for (Field **field=table->field ; *field ; field++)
ptr= (*field)->unpack(record + (*field)->offset(), ptr);
DBUG_RETURN(0);
}
int ha_archive::get_row_version3(azio_stream *file_to_read, byte *buf)
{
DBUG_ENTER("ha_archive::get_row_version3");
int returnable= unpack_row(file_to_read, buf);
DBUG_RETURN(returnable);
}
int ha_archive::get_row_version2(azio_stream *file_to_read, byte *buf)
{
int read; // Bytes read, azread() returns int
uint *ptr, *end;
char *last;
size_t total_blob_length= 0;
MY_BITMAP *read_set= table->read_set;
DBUG_ENTER("ha_archive::get_row");
DBUG_ENTER("ha_archive::get_row_version2");
read= azread(file_to_read, buf, table->s->reclength);
DBUG_PRINT("ha_archive::get_row", ("Read %d bytes expected %lu", read,
DBUG_PRINT("ha_archive::get_row_version2", ("Read %d bytes expected %lu", read,
(unsigned long)table->s->reclength));
if (read == Z_STREAM_ERROR)
......@@ -1266,8 +1365,11 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
if (check_opt->flags == T_EXTEND)
{
DBUG_PRINT("info", ("archive extended rebuild"));
DBUG_PRINT("ha_archive", ("archive extended rebuild"));
byte *buf;
archive_record_buffer *write_buffer, *read_buffer, *original_buffer;
original_buffer= record_buffer;
/*
First we create a buffer that we can use for reading rows, and can pass
......@@ -1279,6 +1381,15 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
goto error;
}
read_buffer= create_record_buffer(record_buffer->length);
write_buffer= create_record_buffer(record_buffer->length);
if (!write_buffer || !read_buffer)
{
rc= HA_ERR_OUT_OF_MEM;
goto error;
}
/*
Now we will rewind the archive file so that we are positioned at the
start of the file.
......@@ -1300,8 +1411,11 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
{
share->rows_recorded= 0;
stats.auto_increment_value= share->auto_increment_value= 0;
record_buffer= read_buffer;
while (!(rc= get_row(&archive, buf)))
{
record_buffer= write_buffer;
real_write_row(buf, &writer);
if (table->found_next_number_field)
{
......@@ -1313,18 +1427,24 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
auto_value;
}
share->rows_recorded++;
record_buffer= read_buffer;
}
}
DBUG_PRINT("info", ("recovered %llu archive rows",
DBUG_PRINT("ha_archive", ("recovered %llu archive rows",
(unsigned long long)share->rows_recorded));
record_buffer= original_buffer;
destroy_record_buffer(read_buffer);
destroy_record_buffer(write_buffer);
my_free((char*)buf, MYF(0));
if (rc && rc != HA_ERR_END_OF_FILE)
goto error;
}
else
{
DBUG_PRINT("info", ("archive quick rebuild"));
DBUG_PRINT("ha_archive", ("archive quick rebuild"));
/*
The quick method is to just read the data raw, and then compress it directly.
*/
......@@ -1333,7 +1453,7 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
if (azrewind(&archive) == -1)
{
rc= HA_ERR_CRASHED_ON_USAGE;
DBUG_PRINT("info", ("archive HA_ERR_CRASHED_ON_USAGE"));
DBUG_PRINT("ha_archive", ("archive HA_ERR_CRASHED_ON_USAGE"));
goto error;
}
......@@ -1359,12 +1479,12 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
know it failed.
We also need to reopen our read descriptor since it has changed.
*/
DBUG_PRINT("info", ("Reopening archive data file"));
DBUG_PRINT("ha_archive", ("Reopening archive data file"));
if (!azopen(&(share->archive_write), share->data_file_name,
O_WRONLY|O_APPEND|O_BINARY) ||
!azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY))
{
DBUG_PRINT("info", ("Could not open archive write file"));
DBUG_PRINT("ha_archive", ("Could not open archive write file"));
rc= HA_ERR_CRASHED_ON_USAGE;
}
......@@ -1577,6 +1697,36 @@ bool ha_archive::check_and_repair(THD *thd)
DBUG_RETURN(repair(thd, &check_opt));
}
archive_record_buffer *ha_archive::create_record_buffer(ulonglong length)
{
DBUG_ENTER("ha_archive::create_record_buffer");
archive_record_buffer *r;
if (!(r=
(archive_record_buffer*) my_malloc(sizeof(archive_record_buffer),
MYF(MY_WME))))
{
DBUG_RETURN(NULL); /* purecov: inspected */
}
r->length= (int)length;
if (!(r->buffer= (byte*) my_malloc(r->length,
MYF(MY_WME))))
{
my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
DBUG_RETURN(NULL); /* purecov: inspected */
}
DBUG_RETURN(r);
}
void ha_archive::destroy_record_buffer(archive_record_buffer *r)
{
DBUG_ENTER("ha_archive::destroy_record_buffer");
my_free((char*) r->buffer, MYF(MY_ALLOW_ZERO_PTR));
my_free((char*) r, MYF(MY_ALLOW_ZERO_PTR));
DBUG_VOID_RETURN;
}
struct st_mysql_storage_engine archive_storage_engine=
{ MYSQL_HANDLERTON_INTERFACE_VERSION };
......@@ -1590,7 +1740,7 @@ mysql_declare_plugin(archive)
PLUGIN_LICENSE_GPL,
archive_db_init, /* Plugin Init */
archive_db_done, /* Plugin Deinit */
0x0100 /* 1.0 */,
0x0300 /* 1.0 */,
NULL, /* status variables */
NULL, /* system variables */
NULL /* config options */
......
......@@ -27,6 +27,12 @@
ha_example.h.
*/
typedef struct st_archive_record_buffer {
byte *buffer;
int length;
} archive_record_buffer;
typedef struct st_archive_share {
char *table_name;
char data_file_name[FN_REFLEN];
......@@ -43,18 +49,23 @@ typedef struct st_archive_share {
ulonglong forced_flushes;
ulonglong mean_rec_length;
char real_path[FN_REFLEN];
uint meta_version;
uint data_version;
} ARCHIVE_SHARE;
/*
Version for file format.
1 - Initial Version
1 - Initial Version (Never Released)
2 - Stream Compression, seperate blobs, no packing
3 - One steam (row and blobs), with packing
*/
#define ARCHIVE_VERSION 2
#define ARCHIVE_VERSION 3
class ha_archive: public handler
{
THR_LOCK_DATA lock; /* MySQL lock */
ARCHIVE_SHARE *share; /* Shared lock info */
azio_stream archive; /* Archive file we are working with */
my_off_t current_position; /* The position of the row we just read */
byte byte_buffer[IO_SIZE]; /* Initial buffer for our string */
......@@ -65,6 +76,10 @@ class ha_archive: public handler
const byte *current_key;
uint current_key_len;
uint current_k_offset;
archive_record_buffer *record_buffer;
archive_record_buffer *create_record_buffer(ulonglong length);
void destroy_record_buffer(archive_record_buffer *r);
public:
ha_archive(handlerton *hton, TABLE_SHARE *table_arg);
......@@ -105,7 +120,10 @@ class ha_archive: public handler
int rnd_next(byte *buf);
int rnd_pos(byte * buf, byte *pos);
int get_row(azio_stream *file_to_read, byte *buf);
int get_row_version2(azio_stream *file_to_read, byte *buf);
int get_row_version3(azio_stream *file_to_read, byte *buf);
int read_meta_file(File meta_file, ha_rows *rows,
uint *meta_version,
ulonglong *auto_increment,
ulonglong *forced_flushes,
char *real_path);
......@@ -137,5 +155,9 @@ class ha_archive: public handler
bool is_crashed() const;
int check(THD* thd, HA_CHECK_OPT* check_opt);
bool check_and_repair(THD *thd);
int max_row_length(const byte *buf);
bool fix_rec_buff(int length);
int unpack_row(azio_stream *file_to_read, char *record);
int pack_row(const byte *record);
};
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment