Commit e06cb317 authored by Sergei Golubchik's avatar Sergei Golubchik

CREATE TABLE and frm-less discovering engines.

Now CREATE TABLE does not write the frm file on disk,
if the engine can discover it
parent b3e70c4a
......@@ -877,7 +877,7 @@ extern void *my_az_allocator(void *dummy, unsigned int items, unsigned int size)
extern void my_az_free(void *dummy, void *address);
extern int my_compress_buffer(uchar *dest, size_t *destLen,
const uchar *source, size_t sourceLen);
extern int packfrm(uchar *, size_t, uchar **, size_t *);
extern int packfrm(const uchar *, size_t, uchar **, size_t *);
extern int unpackfrm(uchar **, size_t *, const uchar *);
extern ha_checksum my_checksum(ha_checksum crc, const uchar *mem,
......
......@@ -12816,7 +12816,35 @@ DROP TABLE t1;
create table t1 (a int, b char(50)) engine=archive;
select * from t1;
a b
flush tables;
select * from t1;
a b
1 aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa
show warnings;
Level Code Message
drop table t1;
create temporary table t1 (a int) engine=archive;
insert t1 values (1),(2),(3);
select * from t1;
a
1
2
3
show create table t1;
Table Create Table
t1 CREATE TEMPORARY TABLE `t1` (
`a` int(11) DEFAULT NULL
) ENGINE=ARCHIVE DEFAULT CHARSET=latin1
alter table t1 add column b varchar(10);
select * from t1;
a b
1 NULL
2 NULL
3 NULL
show create table t1;
Table Create Table
t1 CREATE TEMPORARY TABLE `t1` (
`a` int(11) DEFAULT NULL,
`b` varchar(10) DEFAULT NULL
) ENGINE=ARCHIVE DEFAULT CHARSET=latin1
drop table t1;
......@@ -1734,9 +1734,24 @@ DROP TABLE t1;
--echo # BUG#917689 Using wrong archive table causes crash
--echo #
create table t1 (a int, b char(50)) engine=archive;
select * from t1; # open the table to create the frm
flush tables; # and close the table again
--remove_file $MYSQLD_DATADIR/test/t1.ARZ
copy_file std_data/t917689.ARZ $MYSQLD_DATADIR/test/t1.ARZ;
# --error 1194
#--error ER_CRASHED_ON_USAGE
select * from t1;
show warnings;
drop table t1;
#
# temporary archive
#
create temporary table t1 (a int) engine=archive;
insert t1 values (1),(2),(3);
select * from t1;
show create table t1;
alter table t1 add column b varchar(10);
select * from t1;
show create table t1;
drop table t1;
......@@ -21,6 +21,8 @@ a
# show tables
#
create table t2 (a int) engine=archive;
select * from t2;
a
flush tables;
show tables;
Tables_in_test
......@@ -107,4 +109,7 @@ drop table t1;
# archive_discover
#
create table `a/../`(a int) engine=archive;
select * from `a/../`;
a
flush tables;
drop table `a/../`;
......@@ -18,6 +18,7 @@ select * from t1;
--echo # show tables
--echo #
create table t2 (a int) engine=archive;
select * from t2;
remove_file $mysqld_datadir/test/t1.frm;
flush tables;
show tables;
......@@ -84,6 +85,7 @@ drop table t1;
--echo # archive_discover
--echo #
create table `a/../`(a int) engine=archive;
select * from `a/../`; flush tables;
remove_file $mysqld_datadir/test/a@002f@002e@002e@002f.frm;
drop table `a/../`;
......@@ -234,7 +234,7 @@ my_bool my_uncompress(uchar *packet, size_t len, size_t *complen)
>0 Failure
*/
int packfrm(uchar *data, size_t len,
int packfrm(const uchar *data, size_t len,
uchar **pack_data, size_t *pack_len)
{
int error;
......
......@@ -107,7 +107,7 @@ bool dd_recreate_table(THD *thd, const char *db, const char *table_name,
}
/* Attempt to reconstruct the table. */
error= ha_create_table(thd, path, db, table_name, &create_info);
error= ha_create_table(thd, path, db, table_name, &create_info, NULL);
DBUG_RETURN(error);
}
......
......@@ -45,7 +45,7 @@
3 Could not allocate data for read. Could not read file
*/
int readfrm(const char *name, uchar **frmdata, size_t *len)
int readfrm(const char *name, const uchar **frmdata, size_t *len)
{
int error;
char index_file[FN_REFLEN];
......@@ -134,7 +134,7 @@ int writefrm(const char *path, const char *db, const char *table,
{
error= mysql_file_write(file, frmdata, len, MYF(MY_WME | MY_NABP));
if (!error && need_sync)
if (!error && need_sync && opt_sync_frm)
error= mysql_file_sync(file, MYF(MY_WME)) ||
my_sync_dir_by_file(file_name, MYF(MY_WME));
......
......@@ -18,14 +18,14 @@
#include "my_global.h" /* uchar */
int readfrm(const char *name, uchar **data, size_t *length);
int writefrm(const char *path, const char *db, const char *table,
bool need_sync, const uchar *frmdata, size_t len);
int extension_based_table_discovery(MY_DIR *dirp, const char *ext,
handlerton::discovered_list *tl);
#ifdef MYSQL_SERVER
int readfrm(const char *name, const uchar **data, size_t *length);
int writefrm(const char *path, const char *db, const char *table,
bool need_sync, const uchar *frmdata, size_t len);
int ext_table_discovery_simple(MY_DIR *dirp,
handlerton::discovered_list *result);
#endif
......
......@@ -34,7 +34,7 @@
#include "sql_parse.h" // check_stack_overrun
#include "sql_acl.h" // SUPER_ACL
#include "sql_base.h" // free_io_cache
#include "discover.h" // writefrm
#include "discover.h" // extension_based_table_discovery, etc
#include "log_event.h" // *_rows_log_event
#include "create_options.h"
#include "rpl_filter.h"
......@@ -4131,7 +4131,7 @@ end:
*/
int ha_create_table(THD *thd, const char *path,
const char *db, const char *table_name,
HA_CREATE_INFO *create_info)
HA_CREATE_INFO *create_info, LEX_CUSTRING *frm)
{
int error= 1;
TABLE table;
......@@ -4141,9 +4141,28 @@ int ha_create_table(THD *thd, const char *path,
DBUG_ENTER("ha_create_table");
init_tmp_table_share(thd, &share, db, 0, table_name, path);
if (open_table_def(thd, &share) ||
open_table_from_share(thd, &share, "", 0, (uint) READ_ALL, 0, &table,
TRUE))
if (frm)
{
bool write_frm_now= !create_info->db_type->discover_table &&
!create_info->tmp_table();
share.frm_image= frm;
// open an frm image
if (share.init_from_binary_frm_image(thd, write_frm_now,
frm->str, frm->length))
goto err;
}
else
{
// open an frm file
if (open_table_def(thd, &share))
goto err;
}
if (open_table_from_share(thd, &share, "", 0, READ_ALL, 0, &table, true))
goto err;
update_create_info_from_table(create_info, &table);
......@@ -4151,7 +4170,9 @@ int ha_create_table(THD *thd, const char *path,
name= get_canonical_filename(table.file, share.path.str, name_buff);
error= table.file->ha_create(name, &table, create_info);
(void) closefrm(&table, 0);
if (error)
{
strxmov(name_buff, db, ".", table_name, NullS);
......
......@@ -1397,6 +1397,8 @@ struct HA_CREATE_INFO
ha_table_option_struct *option_struct; ///< structure with parsed table options
ha_field_option_struct **fields_option_struct; ///< array of field option structures
ha_index_option_struct **indexes_option_struct; ///< array of index option structures
bool tmp_table() { return options & HA_LEX_CREATE_TMP_TABLE; }
};
......@@ -3093,7 +3095,7 @@ void ha_checkpoint_state(bool disable);
void ha_commit_checkpoint_request(void *cookie, void (*pre_hook)(void *));
int ha_create_table(THD *thd, const char *path,
const char *db, const char *table_name,
HA_CREATE_INFO *create_info);
HA_CREATE_INFO *create_info, LEX_CUSTRING *frm);
int ha_delete_table(THD *thd, handlerton *db_type, const char *path,
const char *db, const char *alias, bool generate_warning);
......
......@@ -5963,7 +5963,7 @@ TABLE *open_table_uncached(THD *thd, const char *path, const char *db,
init_tmp_table_share(thd, share, saved_cache_key, key_length,
strend(saved_cache_key)+1, tmp_path);
if (open_table_def(thd, share) ||
if (open_table_def(thd, share, GTS_TABLE | GTS_FORCE_DISCOVERY) ||
open_table_from_share(thd, share, table_name,
(uint) (HA_OPEN_KEYFILE | HA_OPEN_RNDFILE |
HA_GET_INDEX),
......
......@@ -1724,12 +1724,12 @@ bool mysql_write_frm(ALTER_PARTITION_PARAM_TYPE *lpt, uint flags)
handlers that have the main version of the frm file stored in the
handler.
*/
uchar *data;
const uchar *data;
size_t length;
if (readfrm(shadow_path, &data, &length) ||
packfrm(data, length, &lpt->pack_frm_data, &lpt->pack_frm_len))
{
my_free(data);
my_free(const_cast<uchar*>(data));
my_free(lpt->pack_frm_data);
mem_alloc_error(length);
error= 1;
......@@ -5981,7 +5981,6 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
char new_alias_buff[FN_REFLEN], *table_name, *db, *new_alias, *alias;
char index_file[FN_REFLEN], data_file[FN_REFLEN];
char path[FN_REFLEN + 1];
char reg_path[FN_REFLEN+1];
ha_rows copied,deleted;
handlerton *old_db_type, *new_db_type, *save_old_db_type;
enum_alter_table_change_level need_copy_table= ALTER_TABLE_METADATA_ONLY;
......@@ -6051,7 +6050,7 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
}
/*
Assign variables table_name, new_name, db, new_db, path, reg_path
Assign variables table_name, new_name, db, new_db, path,
to simplify further comparisions: we want to see if it's a RENAME
later just by comparing the pointers, avoiding the need for strcmp.
*/
......@@ -6061,7 +6060,6 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
db=table_list->db;
if (!new_db || !my_strcasecmp(table_alias_charset, new_db, db))
new_db= db;
build_table_filename(reg_path, sizeof(reg_path) - 1, db, table_name, reg_ext, 0);
build_table_filename(path, sizeof(path) - 1, db, table_name, "", 0);
mysql_ha_rm_tables(thd, table_list);
......@@ -6644,13 +6642,16 @@ bool mysql_alter_table(THD *thd,char *new_db, char *new_name,
need_copy_table, need_lock_for_indexes));
}
if (need_copy_table == ALTER_TABLE_METADATA_ONLY)
{
char frm_name[FN_REFLEN+1];
strxmov(frm_name, path, reg_ext, NullS);
/*
better have a negative test here, instead of positive, like
alter_info->flags & ALTER_ADD_COLUMN|ALTER_ADD_INDEX|...
so that ALTER TABLE won't break when somebody will add new flag
frm_only can only be used if old frm exists.
discovering frm-less engines cannot enjoy this optimization.
*/
if (need_copy_table == ALTER_TABLE_METADATA_ONLY)
create_info->frm_only= 1;
create_info->frm_only= !my_access(frm_name, F_OK);
}
#ifdef WITH_PARTITION_STORAGE_ENGINE
if (table_for_fast_alter_partition)
......
......@@ -661,9 +661,6 @@ enum open_frm_error open_table_def(THD *thd, TABLE_SHARE *share, uint flags)
error_given= true; // init_from_binary_frm_image has already called my_error()
my_free(buf);
if (!share->error)
thd->status_var.opened_shares++;
goto err_not_open;
err:
......@@ -741,9 +738,7 @@ bool TABLE_SHARE::init_from_binary_frm_image(THD *thd, bool write,
old_root= *root_ptr;
*root_ptr= &share->mem_root;
if (write && writefrm(share->normalized_path.str,
share->db.str, share->table_name.str, MY_SYNC,
frm_image, frm_length))
if (write && write_frm_image(frm_image, frm_length))
goto err;
if (frm_length < FRM_HEADER_SIZE + FRM_FORMINFO_SIZE)
......@@ -1915,6 +1910,7 @@ bool TABLE_SHARE::init_from_binary_frm_image(THD *thd, bool write,
#endif
share->error= OPEN_FRM_OK;
thd->status_var.opened_shares++;
*root_ptr= old_root;
DBUG_RETURN(0);
......@@ -1943,6 +1939,33 @@ bool TABLE_SHARE::init_from_binary_frm_image(THD *thd, bool write,
DBUG_RETURN(1);
} /* open_binary_frm */
bool TABLE_SHARE::write_frm_image(const uchar *frm, size_t len)
{
return writefrm(normalized_path.str, db.str, table_name.str, 1, frm, len);
}
bool TABLE_SHARE::read_frm_image(const uchar **frm, size_t *len)
{
if (partition_info_str) // cannot discover a partition
{
DBUG_ASSERT(db_type()->discover_table == 0);
return 1;
}
if (frm_image)
{
*frm= frm_image->str;
*len= frm_image->length;
frm_image->str= 0; // pass the ownership to the caller
frm_image= 0;
return 0;
}
return readfrm(normalized_path.str, frm, len);
}
/*
@brief
Clear GET_FIXED_FIELDS_FLAG in all fields of a table
......
......@@ -988,8 +988,11 @@ struct TABLE_SHARE
uint actual_n_key_parts(THD *thd);
LEX_CUSTRING *frm_image; ///< only during CREATE TABLE (@sa ha_create_table)
bool init_from_binary_frm_image(THD *thd, bool write,
const uchar *frm_image, size_t frm_length);
bool write_frm_image(const uchar *frm_image, size_t frm_length);
bool read_frm_image(const uchar **frm_image, size_t *frm_length);
};
......
......@@ -80,9 +80,8 @@ bool mysql_create_frm(THD *thd, const char *file_name,
if (!frm.str)
DBUG_RETURN(1);
bool need_sync= opt_sync_frm &&
!(create_info->options & HA_LEX_CREATE_TMP_TABLE);
int error= writefrm(file_name, db, table, need_sync, frm.str, frm.length);
int error= writefrm(file_name, db, table, !create_info->tmp_table(),
frm.str, frm.length);
my_free(const_cast<uchar*>(frm.str));
DBUG_RETURN(error);
......@@ -382,30 +381,49 @@ int rea_create_table(THD *thd, const char *path,
{
DBUG_ENTER("rea_create_table");
if (mysql_create_frm(thd, path, db, table_name, create_info,
create_fields, keys, key_info, file))
LEX_CUSTRING frm= create_frm_image(thd, table_name, create_info,
create_fields, keys, key_info, file);
if (!frm.str)
DBUG_RETURN(1);
if (thd->variables.keep_files_on_create)
create_info->options|= HA_CREATE_KEEP_FILES;
if (!create_info->frm_only &&
(file->ha_create_partitioning_metadata(path, NULL, CHF_CREATE_FLAG,
if (create_info->frm_only)
{
if (writefrm(path, db, table_name, 1, frm.str, frm.length))
goto err_handler;
}
else
{
// TODO don't write frm for temp tables
if (create_info->tmp_table() &&
writefrm(path, db, table_name, 0, frm.str, frm.length))
goto err_handler;
if (file->ha_create_partitioning_metadata(path, NULL, CHF_CREATE_FLAG,
create_info) ||
ha_create_table(thd, path, db, table_name, create_info)))
ha_create_table(thd, path, db, table_name, create_info, &frm))
{
file->ha_create_partitioning_metadata(path, NULL, CHF_DELETE_FLAG,
create_info);
goto err_handler;
}
}
my_free(const_cast<uchar*>(frm.str));
DBUG_RETURN(0);
err_handler:
(void) file->ha_create_partitioning_metadata(path, NULL, CHF_DELETE_FLAG, create_info);
char frm_name[FN_REFLEN];
strxmov(frm_name, path, reg_ext, NullS);
mysql_file_delete(key_file_frm, frm_name, MYF(0));
my_free(const_cast<uchar*>(frm.str));
DBUG_RETURN(1);
} /* rea_create_table */
/* Pack keyinfo and keynames to keybuff for save in form-file. */
/* Pack keyinfo and keynames to keybuff for save in form-file. */
static uint pack_keys(uchar *keybuff, uint key_count, KEY *keyinfo,
ulong data_offset)
......@@ -488,7 +506,7 @@ static uint pack_keys(uchar *keybuff, uint key_count, KEY *keyinfo,
} /* pack_keys */
/* Make formheader */
/* Make formheader */
static bool pack_header(uchar *forminfo, List<Create_field> &create_fields,
uint table_options, ulong data_offset, handler *file)
......@@ -511,7 +529,6 @@ static bool pack_header(uchar *forminfo, List<Create_field> &create_fields,
n_length=2L;
/* Check fields */
List_iterator<Create_field> it(create_fields);
Create_field *field;
while ((field=it++))
......@@ -639,7 +656,6 @@ static bool pack_header(uchar *forminfo, List<Create_field> &create_fields,
int_length+=int_count*2; // 255 prefix + 0 suffix
/* Save values in forminfo */
if (reclength > (ulong) file->max_record_length())
{
my_error(ER_TOO_BIG_ROWSIZE, MYF(0), static_cast<long>(file->max_record_length()));
......@@ -679,8 +695,7 @@ static bool pack_header(uchar *forminfo, List<Create_field> &create_fields,
} /* pack_header */
/* get each unique interval each own id */
/* get each unique interval each own id */
static uint get_interval_id(uint *int_count,List<Create_field> &create_fields,
Create_field *last_field)
{
......@@ -742,7 +757,7 @@ static size_t packed_fields_length(List<Create_field> &create_fields)
DBUG_RETURN(length);
}
/* Save fields, fieldnames and intervals */
/* Save fields, fieldnames and intervals */
static bool pack_fields(uchar *buff, List<Create_field> &create_fields,
ulong data_offset)
......@@ -752,9 +767,7 @@ static bool pack_fields(uchar *buff, List<Create_field> &create_fields,
DBUG_ENTER("pack_fields");
/* Write field info */
List_iterator<Create_field> it(create_fields);
int_count=0;
while ((field=it++))
{
......@@ -907,7 +920,7 @@ static bool pack_fields(uchar *buff, List<Create_field> &create_fields,
}
/* save an empty record on start of formfile */
/* save an empty record on start of formfile */
static bool make_empty_rec(THD *thd, uchar *buff, uint table_options,
List<Create_field> &create_fields,
......
......@@ -855,7 +855,7 @@ int azclose (azio_stream *s)
Though this was added to support MySQL's FRM file, anything can be
stored in this location.
*/
int azwrite_frm(azio_stream *s, uchar *blob, unsigned int length)
int azwrite_frm(azio_stream *s, const uchar *blob, unsigned int length)
{
if (s->mode == 'r')
return 1;
......
......@@ -331,7 +331,7 @@ extern int azclose(azio_stream *file);
error number (see function gzerror below).
*/
extern int azwrite_frm (azio_stream *s, uchar *blob, unsigned int length);
extern int azwrite_frm (azio_stream *s, const uchar *blob, unsigned int length);
extern int azread_frm (azio_stream *s, uchar *blob);
extern int azwrite_comment (azio_stream *s, char *blob, unsigned int length);
extern int azread_comment (azio_stream *s, char *blob);
......
......@@ -302,8 +302,7 @@ int archive_discover(handlerton *hton, THD* thd, TABLE_SHARE *share)
azclose(&frm_stream);
if (!share->init_from_binary_frm_image(thd, share->normalized_path.str,
frm_ptr, frm_stream.frm_length))
if (!share->init_from_binary_frm_image(thd, 1, frm_ptr, frm_stream.frm_length))
my_errno= 0;
ret:
......@@ -437,7 +436,7 @@ ARCHIVE_SHARE *ha_archive::get_share(const char *table_name, int *rc)
*/
if (archive_tmp.version < ARCHIVE_VERSION)
*rc= HA_ERR_TABLE_NEEDS_UPGRADE;
else if (frm_compare(&archive_tmp, table_name))
else if (frm_compare(&archive_tmp))
*rc= HA_ERR_TABLE_DEF_CHANGED;
azclose(&archive_tmp);
......@@ -669,19 +668,19 @@ int ha_archive::frm_copy(azio_stream *src, azio_stream *dst)
Compare frm blob with the on-disk frm file
@param s The azio stream.
@param path A path for readfrm()
@return Zero if equal, non-zero otherwise.
*/
int ha_archive::frm_compare(azio_stream *s, const char *path)
int ha_archive::frm_compare(azio_stream *s)
{
int rc= 0;
uchar *frm_ptr= 0, *azfrm_ptr= 0;
const uchar *frm_ptr= 0;
uchar *azfrm_ptr= 0;
size_t frm_len;
/* no frm = no discovery. perhaps it's a partitioned table */
if (readfrm(path, &frm_ptr, &frm_len))
if (table->s->read_frm_image(&frm_ptr, &frm_len))
goto err;
if (!(azfrm_ptr= (uchar *) my_malloc(s->frm_length,
......@@ -699,7 +698,7 @@ int ha_archive::frm_compare(azio_stream *s, const char *path)
rc= memcmp(frm_ptr, azfrm_ptr, frm_len);
err:
my_free(frm_ptr);
my_free(const_cast<uchar*>(frm_ptr));
my_free(azfrm_ptr);
return rc;
}
......@@ -721,7 +720,7 @@ int ha_archive::create(const char *name, TABLE *table_arg,
char linkname[FN_REFLEN];
int error;
azio_stream create_stream; /* Archive file we are working with */
uchar *frm_ptr;
const uchar *frm_ptr;
size_t frm_len;
DBUG_ENTER("ha_archive::create");
......@@ -784,11 +783,10 @@ int ha_archive::create(const char *name, TABLE *table_arg,
/*
Here is where we open up the frm and pass it to archive to store
*/
readfrm(name, &frm_ptr, &frm_len);
if (frm_ptr)
if (!table_arg->s->read_frm_image(&frm_ptr, &frm_len))
{
azwrite_frm(&create_stream, frm_ptr, frm_len);
my_free(frm_ptr);
my_free(const_cast<uchar*>(frm_ptr));
}
if (create_info->comment.str)
......
......@@ -76,7 +76,7 @@ class ha_archive: public handler
archive_record_buffer *create_record_buffer(unsigned int length);
void destroy_record_buffer(archive_record_buffer *r);
int frm_copy(azio_stream *src, azio_stream *dst);
int frm_compare(azio_stream *src, const char *path);
int frm_compare(azio_stream *src);
public:
ha_archive(handlerton *hton, TABLE_SHARE *table_arg);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment