Commit 8c322769 authored by brian@zim.(none)'s avatar brian@zim.(none)

Merge baker@bk-internal.mysql.com:/home/bk/mysql-5.1-arch

into  zim.(none):/home/bk/mysql-5.1-arch
parents 503d7b20 bbe97fa4
...@@ -217,14 +217,13 @@ int main(int argc, char *argv[]) ...@@ -217,14 +217,13 @@ int main(int argc, char *argv[])
azclose(&writer_handle); azclose(&writer_handle);
azclose(&reader_handle); azclose(&reader_handle);
exit(0);
unlink(TEST_FILENAME); unlink(TEST_FILENAME);
/* Start size tests */ /* Start size tests */
printf("About to run 2/4/8 gig tests now, you may want to hit CTRL-C\n"); printf("About to run 2/4/8 gig tests now, you may want to hit CTRL-C\n");
size_test(TWOGIG, 2097152L); size_test(TWOGIG, 2088992L);
size_test(FOURGIG, 4194304L); size_test(FOURGIG, 4177984L);
size_test(EIGHTGIG, 8388608L); size_test(EIGHTGIG, 8355968L);
return 0; return 0;
} }
...@@ -234,6 +233,7 @@ int size_test(unsigned long long length, unsigned long long rows_to_test_for) ...@@ -234,6 +233,7 @@ int size_test(unsigned long long length, unsigned long long rows_to_test_for)
azio_stream writer_handle, reader_handle; azio_stream writer_handle, reader_handle;
unsigned long long write_length; unsigned long long write_length;
unsigned long long read_length= 0; unsigned long long read_length= 0;
unsigned long long count;
unsigned int ret; unsigned int ret;
char buffer[BUFFER_LEN]; char buffer[BUFFER_LEN];
int error; int error;
...@@ -244,8 +244,10 @@ int size_test(unsigned long long length, unsigned long long rows_to_test_for) ...@@ -244,8 +244,10 @@ int size_test(unsigned long long length, unsigned long long rows_to_test_for)
return 0; return 0;
} }
for (write_length= 0; write_length < length ; write_length+= ret) for (count= 0, write_length= 0; write_length < length ;
write_length+= ret)
{ {
count++;
ret= azwrite(&writer_handle, test_string, BUFFER_LEN); ret= azwrite(&writer_handle, test_string, BUFFER_LEN);
if (ret != BUFFER_LEN) if (ret != BUFFER_LEN)
{ {
...@@ -257,7 +259,7 @@ int size_test(unsigned long long length, unsigned long long rows_to_test_for) ...@@ -257,7 +259,7 @@ int size_test(unsigned long long length, unsigned long long rows_to_test_for)
azflush(&writer_handle, Z_SYNC_FLUSH); azflush(&writer_handle, Z_SYNC_FLUSH);
} }
} }
assert(write_length == length); assert(write_length != count * BUFFER_LEN); /* Number of rows time BUFFER_LEN */
azflush(&writer_handle, Z_SYNC_FLUSH); azflush(&writer_handle, Z_SYNC_FLUSH);
printf("Reading back data\n"); printf("Reading back data\n");
...@@ -279,7 +281,7 @@ int size_test(unsigned long long length, unsigned long long rows_to_test_for) ...@@ -279,7 +281,7 @@ int size_test(unsigned long long length, unsigned long long rows_to_test_for)
} }
} }
assert(read_length == length); assert(read_length == write_length);
assert(writer_handle.rows == rows_to_test_for); assert(writer_handle.rows == rows_to_test_for);
azclose(&writer_handle); azclose(&writer_handle);
azclose(&reader_handle); azclose(&reader_handle);
......
...@@ -55,8 +55,8 @@ int az_open (azio_stream *s, const char *path, int Flags, File fd) ...@@ -55,8 +55,8 @@ int az_open (azio_stream *s, const char *path, int Flags, File fd)
s->stream.zalloc = (alloc_func)0; s->stream.zalloc = (alloc_func)0;
s->stream.zfree = (free_func)0; s->stream.zfree = (free_func)0;
s->stream.opaque = (voidpf)0; s->stream.opaque = (voidpf)0;
memset(s->inbuf, 0, AZ_BUFSIZE); memset(s->inbuf, 0, AZ_BUFSIZE_READ);
memset(s->outbuf, 0, AZ_BUFSIZE); memset(s->outbuf, 0, AZ_BUFSIZE_WRITE);
s->stream.next_in = s->inbuf; s->stream.next_in = s->inbuf;
s->stream.next_out = s->outbuf; s->stream.next_out = s->outbuf;
s->stream.avail_in = s->stream.avail_out = 0; s->stream.avail_in = s->stream.avail_out = 0;
...@@ -109,7 +109,7 @@ int az_open (azio_stream *s, const char *path, int Flags, File fd) ...@@ -109,7 +109,7 @@ int az_open (azio_stream *s, const char *path, int Flags, File fd)
return Z_NULL; return Z_NULL;
} }
} }
s->stream.avail_out = AZ_BUFSIZE; s->stream.avail_out = AZ_BUFSIZE_WRITE;
errno = 0; errno = 0;
s->file = fd < 0 ? my_open(path, Flags, MYF(0)) : fd; s->file = fd < 0 ? my_open(path, Flags, MYF(0)) : fd;
...@@ -159,7 +159,7 @@ void write_header(azio_stream *s) ...@@ -159,7 +159,7 @@ void write_header(azio_stream *s)
char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE]; char buffer[AZHEADER_SIZE + AZMETA_BUFFER_SIZE];
char *ptr= buffer; char *ptr= buffer;
s->block_size= AZ_BUFSIZE; s->block_size= AZ_BUFSIZE_WRITE;
s->version = (unsigned char)az_magic[1]; s->version = (unsigned char)az_magic[1];
s->minor_version = (unsigned char)az_magic[2]; s->minor_version = (unsigned char)az_magic[2];
...@@ -224,7 +224,7 @@ int get_byte(s) ...@@ -224,7 +224,7 @@ int get_byte(s)
if (s->stream.avail_in == 0) if (s->stream.avail_in == 0)
{ {
errno = 0; errno = 0;
s->stream.avail_in = my_read(s->file, (byte *)s->inbuf, AZ_BUFSIZE, MYF(0)); s->stream.avail_in = my_read(s->file, (byte *)s->inbuf, AZ_BUFSIZE_READ, MYF(0));
if (s->stream.avail_in == 0) if (s->stream.avail_in == 0)
{ {
s->z_eof = 1; s->z_eof = 1;
...@@ -260,7 +260,7 @@ void check_header(azio_stream *s) ...@@ -260,7 +260,7 @@ void check_header(azio_stream *s)
if (len < 2) { if (len < 2) {
if (len) s->inbuf[0] = s->stream.next_in[0]; if (len) s->inbuf[0] = s->stream.next_in[0];
errno = 0; errno = 0;
len = (uInt)my_read(s->file, (byte *)s->inbuf + len, AZ_BUFSIZE >> len, MYF(0)); len = (uInt)my_read(s->file, (byte *)s->inbuf + len, AZ_BUFSIZE_READ >> len, MYF(0));
if (len == 0) s->z_err = Z_ERRNO; if (len == 0) s->z_err = Z_ERRNO;
s->stream.avail_in += len; s->stream.avail_in += len;
s->stream.next_in = s->inbuf; s->stream.next_in = s->inbuf;
...@@ -455,7 +455,7 @@ unsigned int ZEXPORT azread ( azio_stream *s, voidp buf, unsigned int len, int * ...@@ -455,7 +455,7 @@ unsigned int ZEXPORT azread ( azio_stream *s, voidp buf, unsigned int len, int *
if (s->stream.avail_in == 0 && !s->z_eof) { if (s->stream.avail_in == 0 && !s->z_eof) {
errno = 0; errno = 0;
s->stream.avail_in = (uInt)my_read(s->file, (byte *)s->inbuf, AZ_BUFSIZE, MYF(0)); s->stream.avail_in = (uInt)my_read(s->file, (byte *)s->inbuf, AZ_BUFSIZE_READ, MYF(0));
if (s->stream.avail_in == 0) if (s->stream.avail_in == 0)
{ {
s->z_eof = 1; s->z_eof = 1;
...@@ -522,12 +522,13 @@ unsigned int azwrite (azio_stream *s, voidpc buf, unsigned int len) ...@@ -522,12 +522,13 @@ unsigned int azwrite (azio_stream *s, voidpc buf, unsigned int len)
{ {
s->stream.next_out = s->outbuf; s->stream.next_out = s->outbuf;
if (my_write(s->file, (byte *)s->outbuf, AZ_BUFSIZE, MYF(0)) != AZ_BUFSIZE) if (my_write(s->file, (byte *)s->outbuf, AZ_BUFSIZE_WRITE,
MYF(0)) != AZ_BUFSIZE_WRITE)
{ {
s->z_err = Z_ERRNO; s->z_err = Z_ERRNO;
break; break;
} }
s->stream.avail_out = AZ_BUFSIZE; s->stream.avail_out = AZ_BUFSIZE_WRITE;
} }
s->in += s->stream.avail_in; s->in += s->stream.avail_in;
s->out += s->stream.avail_out; s->out += s->stream.avail_out;
...@@ -563,7 +564,7 @@ int do_flush (azio_stream *s, int flush) ...@@ -563,7 +564,7 @@ int do_flush (azio_stream *s, int flush)
for (;;) for (;;)
{ {
len = AZ_BUFSIZE - s->stream.avail_out; len = AZ_BUFSIZE_WRITE - s->stream.avail_out;
if (len != 0) if (len != 0)
{ {
...@@ -574,7 +575,7 @@ int do_flush (azio_stream *s, int flush) ...@@ -574,7 +575,7 @@ int do_flush (azio_stream *s, int flush)
return Z_ERRNO; return Z_ERRNO;
} }
s->stream.next_out = s->outbuf; s->stream.next_out = s->outbuf;
s->stream.avail_out = AZ_BUFSIZE; s->stream.avail_out = AZ_BUFSIZE_WRITE;
} }
if (done) break; if (done) break;
s->out += s->stream.avail_out; s->out += s->stream.avail_out;
...@@ -675,8 +676,8 @@ my_off_t azseek (s, offset, whence) ...@@ -675,8 +676,8 @@ my_off_t azseek (s, offset, whence)
/* There was a zmemzero here if inbuf was null -Brian */ /* There was a zmemzero here if inbuf was null -Brian */
while (offset > 0) while (offset > 0)
{ {
uInt size = AZ_BUFSIZE; uInt size = AZ_BUFSIZE_WRITE;
if (offset < AZ_BUFSIZE) size = (uInt)offset; if (offset < AZ_BUFSIZE_WRITE) size = (uInt)offset;
size = azwrite(s, s->inbuf, size); size = azwrite(s, s->inbuf, size);
if (size == 0) return -1L; if (size == 0) return -1L;
...@@ -719,8 +720,8 @@ my_off_t azseek (s, offset, whence) ...@@ -719,8 +720,8 @@ my_off_t azseek (s, offset, whence)
} }
while (offset > 0) { while (offset > 0) {
int error; int error;
unsigned int size = AZ_BUFSIZE; unsigned int size = AZ_BUFSIZE_READ;
if (offset < AZ_BUFSIZE) size = (int)offset; if (offset < AZ_BUFSIZE_READ) size = (int)offset;
size = azread(s, s->outbuf, size, &error); size = azread(s, s->outbuf, size, &error);
if (error <= 0) return -1L; if (error <= 0) return -1L;
......
...@@ -196,7 +196,8 @@ extern "C" { ...@@ -196,7 +196,8 @@ extern "C" {
/* The deflate compression method (the only one supported in this version) */ /* The deflate compression method (the only one supported in this version) */
#define Z_NULL 0 /* for initializing zalloc, zfree, opaque */ #define Z_NULL 0 /* for initializing zalloc, zfree, opaque */
#define AZ_BUFSIZE 16384 #define AZ_BUFSIZE_READ 32768
#define AZ_BUFSIZE_WRITE 16384
typedef struct azio_stream { typedef struct azio_stream {
...@@ -204,8 +205,8 @@ typedef struct azio_stream { ...@@ -204,8 +205,8 @@ typedef struct azio_stream {
int z_err; /* error code for last stream operation */ int z_err; /* error code for last stream operation */
int z_eof; /* set if end of input file */ int z_eof; /* set if end of input file */
File file; /* .gz file */ File file; /* .gz file */
Byte inbuf[AZ_BUFSIZE]; /* input buffer */ Byte inbuf[AZ_BUFSIZE_READ]; /* input buffer */
Byte outbuf[AZ_BUFSIZE]; /* output buffer */ Byte outbuf[AZ_BUFSIZE_WRITE]; /* output buffer */
uLong crc; /* crc32 of uncompressed data */ uLong crc; /* crc32 of uncompressed data */
char *msg; /* error message */ char *msg; /* error message */
int transparent; /* 1 if input file is not a .gz file */ int transparent; /* 1 if input file is not a .gz file */
......
...@@ -210,7 +210,8 @@ ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg) ...@@ -210,7 +210,8 @@ ha_archive::ha_archive(handlerton *hton, TABLE_SHARE *table_arg)
buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info); buffer.set((char *)byte_buffer, IO_SIZE, system_charset_info);
/* The size of the offset value we will use for position() */ /* The size of the offset value we will use for position() */
ref_length = sizeof(my_off_t); ref_length= sizeof(my_off_t);
archive_reader_open= FALSE;
} }
int archive_discover(handlerton *hton, THD* thd, const char *db, int archive_discover(handlerton *hton, THD* thd, const char *db,
...@@ -434,6 +435,29 @@ int ha_archive::init_archive_writer() ...@@ -434,6 +435,29 @@ int ha_archive::init_archive_writer()
} }
int ha_archive::init_archive_reader()
{
DBUG_ENTER("ha_archive::init_archive_reader");
/*
It is expensive to open and close the data files and since you can't have
a gzip file that can be both read and written we keep a writer open
that is shared amoung all open tables.
*/
if (!archive_reader_open)
{
if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
{
DBUG_PRINT("ha_archive", ("Could not open archive read file"));
share->crashed= TRUE;
DBUG_RETURN(1);
}
archive_reader_open= TRUE;
}
DBUG_RETURN(0);
}
/* /*
We just implement one additional file extension. We just implement one additional file extension.
*/ */
...@@ -477,7 +501,6 @@ int ha_archive::open(const char *name, int mode, uint open_options) ...@@ -477,7 +501,6 @@ int ha_archive::open(const char *name, int mode, uint open_options)
DBUG_ASSERT(share); DBUG_ASSERT(share);
record_buffer= create_record_buffer(table->s->reclength + record_buffer= create_record_buffer(table->s->reclength +
ARCHIVE_ROW_HEADER_SIZE); ARCHIVE_ROW_HEADER_SIZE);
...@@ -489,14 +512,6 @@ int ha_archive::open(const char *name, int mode, uint open_options) ...@@ -489,14 +512,6 @@ int ha_archive::open(const char *name, int mode, uint open_options)
thr_lock_data_init(&share->lock, &lock, NULL); thr_lock_data_init(&share->lock, &lock, NULL);
DBUG_PRINT("ha_archive", ("archive data_file_name %s", share->data_file_name));
if (!(azopen(&archive, share->data_file_name, O_RDONLY|O_BINARY)))
{
if (errno == EROFS || errno == EACCES)
DBUG_RETURN(my_errno= errno);
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
}
DBUG_PRINT("ha_archive", ("archive table was crashed %s", DBUG_PRINT("ha_archive", ("archive table was crashed %s",
rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no")); rc == HA_ERR_CRASHED_ON_USAGE ? "yes" : "no"));
if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR) if (rc == HA_ERR_CRASHED_ON_USAGE && open_options & HA_OPEN_FOR_REPAIR)
...@@ -533,8 +548,11 @@ int ha_archive::close(void) ...@@ -533,8 +548,11 @@ int ha_archive::close(void)
destroy_record_buffer(record_buffer); destroy_record_buffer(record_buffer);
/* First close stream */ /* First close stream */
if (archive_reader_open)
{
if (azclose(&archive)) if (azclose(&archive))
rc= 1; rc= 1;
}
/* then also close share */ /* then also close share */
rc|= free_share(); rc|= free_share();
...@@ -979,6 +997,8 @@ int ha_archive::rnd_init(bool scan) ...@@ -979,6 +997,8 @@ int ha_archive::rnd_init(bool scan)
if (share->crashed) if (share->crashed)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE); DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
init_archive_reader();
/* We rewind the file so that we can read from the beginning if scan */ /* We rewind the file so that we can read from the beginning if scan */
if (scan) if (scan)
{ {
......
...@@ -71,6 +71,7 @@ class ha_archive: public handler ...@@ -71,6 +71,7 @@ class ha_archive: public handler
uint current_key_len; uint current_key_len;
uint current_k_offset; uint current_k_offset;
archive_record_buffer *record_buffer; archive_record_buffer *record_buffer;
bool archive_reader_open;
archive_record_buffer *create_record_buffer(unsigned int length); archive_record_buffer *create_record_buffer(unsigned int length);
void destroy_record_buffer(archive_record_buffer *r); void destroy_record_buffer(archive_record_buffer *r);
...@@ -119,6 +120,7 @@ class ha_archive: public handler ...@@ -119,6 +120,7 @@ class ha_archive: public handler
ARCHIVE_SHARE *get_share(const char *table_name, int *rc); ARCHIVE_SHARE *get_share(const char *table_name, int *rc);
int free_share(); int free_share();
int init_archive_writer(); int init_archive_writer();
int init_archive_reader();
bool auto_repair() const { return 1; } // For the moment we just do this bool auto_repair() const { return 1; } // For the moment we just do this
int read_data_header(azio_stream *file_to_read); int read_data_header(azio_stream *file_to_read);
void position(const byte *record); void position(const byte *record);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment