Commit 1d5a5ca8 authored by unknown's avatar unknown

This changeset adds support for autoincrements to archive. It allows it to...

This changeset adds support for autoincrements to archive. It allows it to have them with both unique and non-unique indexes. 
Test cases will come in the next push (just doing this one so I can get a couple of reviews in). This is not the final patch. 


sql/field.h:
  Added val_int() method similar to the one like val_str() that will allow you to pop in another pointer and do a test.
sql/ha_archive.cc:
  Adds auto_increment support to archive. Cleans up a number of other small items.
sql/ha_archive.h:
  More additions for support of autoincrements
parent 4614fc25
......@@ -253,7 +253,15 @@ class Field
ptr-=row_offset;
return tmp;
}
inline longlong val_int(char *new_ptr)
{
char *old_ptr= ptr;
longlong return_value;
ptr= new_ptr;
return_value= val_int();
ptr= old_ptr;
return return_value;
}
inline String *val_str(String *str, char *new_ptr)
{
char *old_ptr= ptr;
......
......@@ -104,6 +104,7 @@
rows - This is an unsigned long long which is the number of rows in the data
file.
check point - Reserved for future use
auto increment - MAX value for autoincrement
dirty - Status of the file, whether or not its values are the latest. This
flag is what causes a repair to occur
......@@ -125,9 +126,11 @@ static HASH archive_open_tables;
#define ARN ".ARN" // Files used during an optimize call
#define ARM ".ARM" // Meta file
/*
uchar + uchar + ulonglong + ulonglong + uchar
uchar + uchar + ulonglong + ulonglong + ulonglong + uchar
*/
#define META_BUFFER_SIZE 19 // Size of the data used in the meta file
#define META_BUFFER_SIZE sizeof(uchar) + sizeof(uchar) + sizeof(ulonglong) \
+ sizeof(ulonglong) + sizeof(ulonglong) + sizeof(uchar)
/*
uchar + uchar
*/
......@@ -300,9 +303,11 @@ int ha_archive::write_data_header(azio_stream *file_to_write)
This method reads the header of a meta file and returns whether or not it was successful.
*rows will contain the current number of rows in the data file upon success.
*/
int ha_archive::read_meta_file(File meta_file, ha_rows *rows)
int ha_archive::read_meta_file(File meta_file, ha_rows *rows,
ulonglong *auto_increment)
{
uchar meta_buffer[META_BUFFER_SIZE];
uchar *ptr= meta_buffer;
ulonglong check_point;
DBUG_ENTER("ha_archive::read_meta_file");
......@@ -314,17 +319,24 @@ int ha_archive::read_meta_file(File meta_file, ha_rows *rows)
/*
Parse out the meta data, we ignore version at the moment
*/
*rows= (ha_rows)uint8korr(meta_buffer + 2);
check_point= uint8korr(meta_buffer + 10);
ptr+= sizeof(uchar)*2; // Move past header
*rows= (ha_rows)uint8korr(ptr);
ptr+= sizeof(ulonglong); // Move past rows
check_point= uint8korr(ptr);
ptr+= sizeof(ulonglong); // Move past check_point
*auto_increment= uint8korr(ptr);
ptr+= sizeof(ulonglong); // Move past auto_increment
DBUG_PRINT("ha_archive::read_meta_file", ("Check %d", (uint)meta_buffer[0]));
DBUG_PRINT("ha_archive::read_meta_file", ("Version %d", (uint)meta_buffer[1]));
DBUG_PRINT("ha_archive::read_meta_file", ("Rows %lld", *rows));
DBUG_PRINT("ha_archive::read_meta_file", ("Checkpoint %lld", check_point));
DBUG_PRINT("ha_archive::read_meta_file", ("Dirty %d", (int)meta_buffer[18]));
DBUG_PRINT("ha_archive::read_meta_file", ("Rows %llu", *rows));
DBUG_PRINT("ha_archive::read_meta_file", ("Checkpoint %llu", check_point));
DBUG_PRINT("ha_archive::read_meta_file", ("Auto-Increment %llu", *auto_increment));
DBUG_PRINT("ha_archive::read_meta_file", ("Dirty %d", (int)(*ptr)));
if ((meta_buffer[0] != (uchar)ARCHIVE_CHECK_HEADER) ||
((bool)meta_buffer[18] == TRUE))
((bool)(*ptr)== TRUE))
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
my_sync(meta_file, MYF(MY_WME));
......@@ -337,22 +349,34 @@ int ha_archive::read_meta_file(File meta_file, ha_rows *rows)
By setting dirty you say whether or not the file represents the actual state of the data file.
Upon ::open() we set to dirty, and upon ::close() we set to clean.
*/
int ha_archive::write_meta_file(File meta_file, ha_rows rows, bool dirty)
int ha_archive::write_meta_file(File meta_file, ha_rows rows,
ulonglong auto_increment, bool dirty)
{
uchar meta_buffer[META_BUFFER_SIZE];
uchar *ptr= meta_buffer;
ulonglong check_point= 0; //Reserved for the future
DBUG_ENTER("ha_archive::write_meta_file");
meta_buffer[0]= (uchar)ARCHIVE_CHECK_HEADER;
meta_buffer[1]= (uchar)ARCHIVE_VERSION;
int8store(meta_buffer + 2, (ulonglong)rows);
int8store(meta_buffer + 10, check_point);
*(meta_buffer + 18)= (uchar)dirty;
DBUG_PRINT("ha_archive::write_meta_file", ("Check %d", (uint)ARCHIVE_CHECK_HEADER));
DBUG_PRINT("ha_archive::write_meta_file", ("Version %d", (uint)ARCHIVE_VERSION));
*ptr= (uchar)ARCHIVE_CHECK_HEADER;
ptr += sizeof(uchar);
*ptr= (uchar)ARCHIVE_VERSION;
ptr += sizeof(uchar);
int8store(ptr, (ulonglong)rows);
ptr += sizeof(ulonglong);
int8store(ptr, check_point);
ptr += sizeof(ulonglong);
int8store(ptr, auto_increment);
ptr += sizeof(ulonglong);
*ptr= (uchar)dirty;
DBUG_PRINT("ha_archive::write_meta_file", ("Check %d",
(uint)ARCHIVE_CHECK_HEADER));
DBUG_PRINT("ha_archive::write_meta_file", ("Version %d",
(uint)ARCHIVE_VERSION));
DBUG_PRINT("ha_archive::write_meta_file", ("Rows %llu", (ulonglong)rows));
DBUG_PRINT("ha_archive::write_meta_file", ("Checkpoint %llu", check_point));
DBUG_PRINT("ha_archive::write_meta_file", ("Auto Increment %llu",
auto_increment));
DBUG_PRINT("ha_archive::write_meta_file", ("Dirty %d", (uint)dirty));
VOID(my_seek(meta_file, 0, MY_SEEK_SET, MYF(0)));
......@@ -414,17 +438,19 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, TABLE *table)
opposite. If the meta file will not open we assume it is crashed and
leave it up to the user to fix.
*/
if (read_meta_file(share->meta_file, &share->rows_recorded))
if (read_meta_file(share->meta_file, &share->rows_recorded,
&share->auto_increment_value))
share->crashed= TRUE;
else
(void)write_meta_file(share->meta_file, share->rows_recorded, TRUE);
(void)write_meta_file(share->meta_file, share->rows_recorded,
share->auto_increment_value, TRUE);
/*
It is expensive to open and close the data files and since you can't have
a gzip file that can be both read and written we keep a writer open
that is shared amoung all open tables.
*/
if (!(azopen(&(share->archive_write), share->data_file_name, O_WRONLY|O_APPEND|O_BINARY)))
if (!(azopen(&(share->archive_write), share->data_file_name,
O_WRONLY|O_APPEND|O_BINARY)))
{
DBUG_PRINT("info", ("Could not open archive write file"));
share->crashed= TRUE;
......@@ -452,7 +478,8 @@ int ha_archive::free_share(ARCHIVE_SHARE *share)
hash_delete(&archive_open_tables, (byte*) share);
thr_lock_delete(&share->lock);
VOID(pthread_mutex_destroy(&share->mutex));
(void)write_meta_file(share->meta_file, share->rows_recorded, FALSE);
(void)write_meta_file(share->meta_file, share->rows_recorded,
share->auto_increment_value, FALSE);
if (azclose(&(share->archive_write)))
rc= 1;
if (my_close(share->meta_file, MYF(0)))
......@@ -561,7 +588,27 @@ int ha_archive::create(const char *name, TABLE *table_arg,
error= my_errno;
goto error;
}
write_meta_file(create_file, 0, FALSE);
for (uint key= 0; key < table_arg->s->keys; key++)
{
KEY *pos= table_arg->key_info+key;
KEY_PART_INFO *key_part= pos->key_part;
KEY_PART_INFO *key_part_end= key_part + pos->key_parts;
for (; key_part != key_part_end; key_part++)
{
Field *field= key_part->field;
DBUG_PRINT("info", ("Looking at field index%s", field->field_name));
if (!(field->flags & AUTO_INCREMENT_FLAG))
{
error= -1;
goto error;
}
}
}
write_meta_file(create_file, 0, 0, FALSE);
my_close(create_file,MYF(0));
/*
......@@ -614,7 +661,8 @@ int ha_archive::real_write_row(byte *buf, azio_stream *writer)
DBUG_ENTER("ha_archive::real_write_row");
written= azwrite(writer, buf, table->s->reclength);
DBUG_PRINT("ha_archive::real_write_row", ("Wrote %d bytes expected %d", written, table->s->reclength));
DBUG_PRINT("ha_archive::real_write_row", ("Wrote %d bytes expected %d",
written, table->s->reclength));
if (!delayed_insert || !bulk_insert)
share->dirty= TRUE;
......@@ -655,6 +703,8 @@ int ha_archive::real_write_row(byte *buf, azio_stream *writer)
int ha_archive::write_row(byte *buf)
{
int rc;
byte *read_buf= NULL;
ulonglong temp_auto;
DBUG_ENTER("ha_archive::write_row");
if (share->crashed)
......@@ -664,13 +714,164 @@ int ha_archive::write_row(byte *buf)
if (table->timestamp_field_type & TIMESTAMP_AUTO_SET_ON_INSERT)
table->timestamp_field->set_time();
pthread_mutex_lock(&share->mutex);
if (table->next_number_field)
{
KEY *mkey= &table->s->key_info[0]; // We only support one key right now
update_auto_increment();
temp_auto= table->next_number_field->val_int();
DBUG_PRINT("info", ("archive would see %d and %d",
temp_auto, share->auto_increment_value));
/*
Bad news, this will cause a search for the unique value which is very
expensive since we will have to do a table scan which will lock up
all other writers during this period. This could perhaps be optimized
in the future.
*/
if (temp_auto == share->auto_increment_value &&
mkey->flags & HA_NOSAME)
{
rc= HA_ERR_FOUND_DUPP_KEY;
goto error;
}
if (temp_auto < share->auto_increment_value &&
mkey->flags & HA_NOSAME)
{
/*
First we create a buffer that we can use for reading rows, and can pass
to get_row().
*/
if (!(read_buf= (byte*) my_malloc(table->s->reclength, MYF(MY_WME))))
{
rc= HA_ERR_OUT_OF_MEM;
goto error;
}
/*
All of the buffer must be written out or we won't see all of the
data
*/
azflush(&(share->archive_write), Z_SYNC_FLUSH);
/*
Set the position of the local read thread to the beginning postion.
*/
if (read_data_header(&archive))
{
rc= HA_ERR_CRASHED_ON_USAGE;
goto error;
}
/*
Now we read and check all of the rows.
if (!memcmp(table->next_number_field->ptr, mfield->ptr, mfield->max_length()))
*/
Field *mfield= table->next_number_field;
while (!(get_row(&archive, read_buf)))
{
if ((longlong)temp_auto ==
mfield->val_int((char*)(read_buf + mfield->offset())))
{
rc= HA_ERR_FOUND_DUPP_KEY;
goto error;
}
}
}
else
{
auto_increment_value= share->auto_increment_value= temp_auto;
}
}
/*
Notice that the global auto_increment has been increased.
In case of a failed row write, we will never try to reuse the value.
*/
share->rows_recorded++;
rc= real_write_row(buf, &(share->archive_write));
error:
pthread_mutex_unlock(&share->mutex);
if (read_buf)
my_free(read_buf, MYF(0));
DBUG_RETURN(rc);
}
ulonglong ha_archive::get_auto_increment()
{
return auto_increment_value= ++share->auto_increment_value;
}
/* Initialized at each key walk (called multiple times unlike rnd_init()) */
int ha_archive::index_init(uint keynr, bool sorted)
{
DBUG_ENTER("ha_archive::index_init");
active_index= keynr;
DBUG_RETURN(0);
}
/*
No indexes, so if we get a request for an index search since we tell
the optimizer that we have unique indexes, we scan
*/
int ha_archive::index_read(byte *buf, const byte *key,
uint key_len, enum ha_rkey_function find_flag)
{
int rc;
DBUG_ENTER("ha_archive::index_read");
rc= index_read_idx(buf, active_index, key, key_len, find_flag);
DBUG_RETURN(rc);
}
int ha_archive::index_read_idx(byte *buf, uint index, const byte *key,
uint key_len, enum ha_rkey_function find_flag)
{
int rc= 0;
bool found= 0;
KEY *mkey= &table->s->key_info[index];
uint k_offset= mkey->key_part->offset;
DBUG_ENTER("ha_archive::index_read_idx");
/*
All of the buffer must be written out or we won't see all of the
data
*/
pthread_mutex_lock(&share->mutex);
azflush(&(share->archive_write), Z_SYNC_FLUSH);
pthread_mutex_unlock(&share->mutex);
/*
Set the position of the local read thread to the beginning postion.
*/
if (read_data_header(&archive))
{
rc= HA_ERR_CRASHED_ON_USAGE;
goto error;
}
while (!(get_row(&archive, buf)))
{
if (!memcmp(key, buf+k_offset, key_len))
{
found= 1;
break;
}
}
if (found)
DBUG_RETURN(0);
error:
DBUG_RETURN(rc ? rc : HA_ERR_END_OF_FILE);
}
/*
All calls that need to scan the table start with this method. If we are told
that it is a table scan we rewind the file to the beginning, otherwise
......@@ -726,7 +927,8 @@ int ha_archive::get_row(azio_stream *file_to_read, byte *buf)
DBUG_ENTER("ha_archive::get_row");
read= azread(file_to_read, buf, table->s->reclength);
DBUG_PRINT("ha_archive::get_row", ("Read %d bytes expected %d", read, table->s->reclength));
DBUG_PRINT("ha_archive::get_row", ("Read %d bytes expected %d", read,
table->s->reclength));
if (read == Z_STREAM_ERROR)
DBUG_RETURN(HA_ERR_CRASHED_ON_USAGE);
......@@ -912,9 +1114,18 @@ int ha_archive::optimize(THD* thd, HA_CHECK_OPT* check_opt)
if (!rc)
{
share->rows_recorded= 0;
auto_increment_value= share->auto_increment_value= 0;
while (!(rc= get_row(&archive, buf)))
{
real_write_row(buf, &writer);
if (table->found_next_number_field)
{
Field *field= table->found_next_number_field;
if (share->auto_increment_value <
field->val_int((char*)(buf + field->offset())))
auto_increment_value= share->auto_increment_value=
field->val_int((char*)(buf + field->offset()));
}
share->rows_recorded++;
}
}
......@@ -1028,6 +1239,9 @@ void ha_archive::info(uint flag)
delete_length= 0;
index_file_length=0;
if (flag & HA_STATUS_AUTO)
auto_increment_value= share->auto_increment_value;
DBUG_VOID_RETURN;
}
......
......@@ -18,6 +18,7 @@
#pragma interface /* gcc class implementation */
#endif
#include <values.h>
#include <zlib.h>
#include "../storage/archive/azlib.h"
......@@ -38,13 +39,14 @@ typedef struct st_archive_share {
bool dirty; /* Flag for if a flush should occur */
bool crashed; /* Meta file is crashed */
ha_rows rows_recorded; /* Number of rows in tables */
ulonglong auto_increment_value;
} ARCHIVE_SHARE;
/*
Version for file format.
1 - Initial Version
*/
#define ARCHIVE_VERSION 1
#define ARCHIVE_VERSION 2
class ha_archive: public handler
{
......@@ -68,13 +70,22 @@ class ha_archive: public handler
const char **bas_ext() const;
ulong table_flags() const
{
return (HA_REC_NOT_IN_SEQ | HA_NOT_EXACT_COUNT | HA_NO_AUTO_INCREMENT |
return (HA_REC_NOT_IN_SEQ | HA_NOT_EXACT_COUNT |
HA_FILE_BASED | HA_CAN_INSERT_DELAYED | HA_CAN_GEOMETRY);
}
ulong index_flags(uint idx, uint part, bool all_parts) const
{
return 0;
return HA_ONLY_WHOLE_INDEX;
}
ulonglong get_auto_increment();
uint max_supported_keys() const { return 1; }
uint max_supported_key_length() const { return sizeof(ulonglong); }
uint max_supported_key_part_length() const { return sizeof(ulonglong); }
int index_init(uint keynr, bool sorted);
virtual int index_read(byte * buf, const byte * key,
uint key_len, enum ha_rkey_function find_flag);
virtual int index_read_idx(byte * buf, uint index, const byte * key,
uint key_len, enum ha_rkey_function find_flag);
int open(const char *name, int mode, uint test_if_locked);
int close(void);
int write_row(byte * buf);
......@@ -84,8 +95,9 @@ class ha_archive: public handler
int rnd_next(byte *buf);
int rnd_pos(byte * buf, byte *pos);
int get_row(azio_stream *file_to_read, byte *buf);
int read_meta_file(File meta_file, ha_rows *rows);
int write_meta_file(File meta_file, ha_rows rows, bool dirty);
int read_meta_file(File meta_file, ha_rows *rows, ulonglong *auto_increment);
int write_meta_file(File meta_file, ha_rows rows,
ulonglong auto_increment, bool dirty);
ARCHIVE_SHARE *get_share(const char *table_name, TABLE *table);
int free_share(ARCHIVE_SHARE *share);
bool auto_repair() const { return 1; } // For the moment we just do this
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment