Commit 4b1fe65b authored by unknown's avatar unknown

WL#3239 "log CREATE TABLE in Maria": write the log record _before_

creating the data file, and sync this log, so that the table cannot be
used if log record didn't reach disk. The same way, we force the log
in DROP/RENAME TABLE. Also in REPAIR TABLE though logging in this
case is not polished.
Making DELETE FROM t <no WHERE> atomic: we log the record before
starting the operation, and will finish this op at Recovery if needed.


storage/maria/ma_check.c:
  comment. Force the log record for the log to have a complete history.
storage/maria/ma_create.c:
  better conformance to the text of WL#3239 "log CREATE TABLE in Maria":
  write the log record before creating the data file. This ensures
  that the log can be applied to an old backup in all circumstances.
  errpos=2 was wrong.
storage/maria/ma_delete_all.c:
  making DELETE FROM t <no WHERE> atomic: we log the record before
  starting the operation, and will finish the operation at Recovery
  if needed. Thus there is no need to force files to disk.
storage/maria/ma_delete_table.c:
  forcing the log before dropping a table, so that the log has the
  entire history.
storage/maria/ma_loghandler.c:
  LOGREC_REDO_DELETE_ALL needs to set trn's rec_lsn so that the log's
  low-water mark and Checkpoint retain this record until the 
  delete operation has finished.
storage/maria/ma_rename.c:
  force the log before renaming a table, so that the log has a complete
  history.
parent 0cf96a32
...@@ -5176,7 +5176,23 @@ int _ma_repair_write_log_record(const HA_CHECK *param, MARIA_HA *info) ...@@ -5176,7 +5176,23 @@ int _ma_repair_write_log_record(const HA_CHECK *param, MARIA_HA *info)
/* Only called from ha_maria.cc, not maria_check, so translog is inited */ /* Only called from ha_maria.cc, not maria_check, so translog is inited */
if (share->base.transactional && !share->temporary) if (share->base.transactional && !share->temporary)
{ {
/* For now this record is only informative */ /*
For now this record is only informative. It could serve when applying
logs to a backup, but that needs more thought. Assume table became
corrupted. It is repaired, then some writes happen to it.
Later we restore an old backup, and want to apply this REDO_REPAIR_TABLE
record. For it to give the same result as originally, the table should
be corrupted the same way, so applying previous REDOs should produce the
same corruption; that's really not guaranteed (different execution paths
in execution of REDOs vs runtime code so not same bugs hit, temporary
hardware issues not repeatable etc). Corruption may not be repeatable.
A reasonable solution is to execute the REDO_REPAIR_TABLE record and
check if the checksum of the resulting table matches what it was at the
end of the original repair (should be stored in log record); or execute
the REDO_REPAIR_TABLE if the checksum of the table-before-repair matches
was it was at the start of the original repair (should be stored in log
record).
*/
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[LSN_STORE_SIZE]; uchar log_data[LSN_STORE_SIZE];
compile_time_assert(LSN_STORE_SIZE >= (FILEID_STORE_SIZE + 4)); compile_time_assert(LSN_STORE_SIZE >= (FILEID_STORE_SIZE + 4));
...@@ -5193,7 +5209,8 @@ int _ma_repair_write_log_record(const HA_CHECK *param, MARIA_HA *info) ...@@ -5193,7 +5209,8 @@ int _ma_repair_write_log_record(const HA_CHECK *param, MARIA_HA *info)
log_array[TRANSLOG_INTERNAL_PARTS + log_array[TRANSLOG_INTERNAL_PARTS +
0].length, 0].length,
sizeof(log_array)/sizeof(log_array[0]), sizeof(log_array)/sizeof(log_array[0]),
log_array, log_data))) log_array, log_data) ||
translog_flush(share->state.create_rename_lsn)))
return 1; return 1;
/* /*
But this piece is really needed, to have the new table's content durable But this piece is really needed, to have the new table's content durable
......
...@@ -620,7 +620,7 @@ int maria_create(const char *name, enum data_file_type datafile_type, ...@@ -620,7 +620,7 @@ int maria_create(const char *name, enum data_file_type datafile_type,
mi_int2store(share.state.header.state_info_length,MARIA_STATE_INFO_SIZE); mi_int2store(share.state.header.state_info_length,MARIA_STATE_INFO_SIZE);
mi_int2store(share.state.header.base_info_length,MARIA_BASE_INFO_SIZE); mi_int2store(share.state.header.base_info_length,MARIA_BASE_INFO_SIZE);
mi_int2store(share.state.header.base_pos,base_pos); mi_int2store(share.state.header.base_pos,base_pos);
share.state.header.data_file_type= datafile_type; share.state.header.data_file_type= share.data_file_type= datafile_type;
share.state.header.org_data_file_type= org_datafile_type; share.state.header.org_data_file_type= org_datafile_type;
share.state.header.language= (ci->language ? share.state.header.language= (ci->language ?
ci->language : default_charset_info->number); ci->language : default_charset_info->number);
...@@ -766,50 +766,6 @@ int maria_create(const char *name, enum data_file_type datafile_type, ...@@ -766,50 +766,6 @@ int maria_create(const char *name, enum data_file_type datafile_type,
goto err; goto err;
errpos=1; errpos=1;
if (!(flags & HA_DONT_TOUCH_DATA))
{
if (ci->data_file_name)
{
char *dext= strrchr(ci->data_file_name, '.');
int have_dext= dext && !strcmp(dext, MARIA_NAME_DEXT);
if (tmp_table)
{
char *path;
/* chop off the table name, tempory tables use generated name */
if ((path= strrchr(ci->data_file_name, FN_LIBCHAR)))
*path= '\0';
fn_format(filename, name, ci->data_file_name, MARIA_NAME_DEXT,
MY_REPLACE_DIR | MY_UNPACK_FILENAME | MY_APPEND_EXT);
}
else
{
fn_format(filename, ci->data_file_name, "", MARIA_NAME_DEXT,
MY_UNPACK_FILENAME |
(have_dext ? MY_REPLACE_EXT : MY_APPEND_EXT));
}
fn_format(linkname, name, "",MARIA_NAME_DEXT,
MY_UNPACK_FILENAME | MY_APPEND_EXT);
linkname_ptr= linkname;
create_flag=0;
}
else
{
fn_format(filename,name,"", MARIA_NAME_DEXT,
MY_UNPACK_FILENAME | MY_APPEND_EXT);
linkname_ptr= NULL;
create_flag=MY_DELETE_OLD;
}
if ((dfile=
my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
MYF(MY_WME | create_flag | sync_dir))) < 0)
goto err;
errpos=3;
share.data_file_type= datafile_type;
if (_ma_initialize_data_file(dfile, &share))
goto err;
}
DBUG_PRINT("info", ("write state info and base info")); DBUG_PRINT("info", ("write state info and base info"));
if (_ma_state_info_write(file, &share.state, 2) || if (_ma_state_info_write(file, &share.state, 2) ||
_ma_base_info_write(file, &share.base)) _ma_base_info_write(file, &share.base))
...@@ -959,7 +915,7 @@ int maria_create(const char *name, enum data_file_type datafile_type, ...@@ -959,7 +915,7 @@ int maria_create(const char *name, enum data_file_type datafile_type,
if ((log_data == NULL) || if ((log_data == NULL) ||
my_pread(file, 1 + 2 + 2 + log_data, kfile_size_before_extension, my_pread(file, 1 + 2 + 2 + log_data, kfile_size_before_extension,
0, MYF(MY_NABP))) 0, MYF(MY_NABP)))
goto err_no_lock; goto err;
/* /*
remember if the data file was created or not, to know if Recovery can remember if the data file was created or not, to know if Recovery can
do it or not, in the future do it or not, in the future
...@@ -989,8 +945,14 @@ int maria_create(const char *name, enum data_file_type datafile_type, ...@@ -989,8 +945,14 @@ int maria_create(const char *name, enum data_file_type datafile_type,
MySQL layer to be crash-safe, which it is not now (that would require MySQL layer to be crash-safe, which it is not now (that would require
work using the ddl_log of sql/sql_table.cc); when it is, we should work using the ddl_log of sql/sql_table.cc); when it is, we should
reconsider the moment of writing this log record (before or after op, reconsider the moment of writing this log record (before or after op,
under THR_LOCK_maria or not...), how to use it in Recovery, and force under THR_LOCK_maria or not...), how to use it in Recovery.
the log. For now this record is just informative. For now this record can serve when we apply logs to a backup,
so we sync it. This happens before the data file is created. If the data
file was created before, and we crashed before writing the log record,
at restart the table may be used, so we would not have a trustable
history in the log (impossible to apply this log to a backup). The way
we do it, if we crash before writing the log record then there is no
data file and the table cannot be used.
Note that in case of TRUNCATE TABLE we also come here. Note that in case of TRUNCATE TABLE we also come here.
When in CREATE/TRUNCATE (or DROP or RENAME or REPAIR) we have not called When in CREATE/TRUNCATE (or DROP or RENAME or REPAIR) we have not called
external_lock(), so have no TRN. It does not matter, as all these external_lock(), so have no TRN. It does not matter, as all these
...@@ -1001,20 +963,63 @@ int maria_create(const char *name, enum data_file_type datafile_type, ...@@ -1001,20 +963,63 @@ int maria_create(const char *name, enum data_file_type datafile_type,
&dummy_transaction_object, NULL, &dummy_transaction_object, NULL,
total_rec_length, total_rec_length,
sizeof(log_array)/sizeof(log_array[0]), sizeof(log_array)/sizeof(log_array[0]),
log_array, NULL))) log_array, NULL) ||
goto err_no_lock; translog_flush(share.state.create_rename_lsn)))
goto err;
/* /*
store LSN into file, needed for Recovery to not be confused if a store LSN into file, needed for Recovery to not be confused if a
DROP+CREATE happened (applying REDOs to the wrong table). DROP+CREATE happened (applying REDOs to the wrong table).
If such direct my_pwrite() to a fixed offset is too "hackish", I can
call ma_state_info_write() again but it will be less efficient.
*/ */
share.kfile.file= file; share.kfile.file= file;
if (_ma_update_create_rename_lsn_on_disk(&share, FALSE)) if (_ma_update_create_rename_lsn_on_disk(&share, FALSE))
goto err_no_lock; goto err;
my_free(log_data, MYF(0)); my_free(log_data, MYF(0));
} }
if (!(flags & HA_DONT_TOUCH_DATA))
{
if (ci->data_file_name)
{
char *dext= strrchr(ci->data_file_name, '.');
int have_dext= dext && !strcmp(dext, MARIA_NAME_DEXT);
if (tmp_table)
{
char *path;
/* chop off the table name, tempory tables use generated name */
if ((path= strrchr(ci->data_file_name, FN_LIBCHAR)))
*path= '\0';
fn_format(filename, name, ci->data_file_name, MARIA_NAME_DEXT,
MY_REPLACE_DIR | MY_UNPACK_FILENAME | MY_APPEND_EXT);
}
else
{
fn_format(filename, ci->data_file_name, "", MARIA_NAME_DEXT,
MY_UNPACK_FILENAME |
(have_dext ? MY_REPLACE_EXT : MY_APPEND_EXT));
}
fn_format(linkname, name, "",MARIA_NAME_DEXT,
MY_UNPACK_FILENAME | MY_APPEND_EXT);
linkname_ptr= linkname;
create_flag=0;
}
else
{
fn_format(filename,name,"", MARIA_NAME_DEXT,
MY_UNPACK_FILENAME | MY_APPEND_EXT);
linkname_ptr= NULL;
create_flag=MY_DELETE_OLD;
}
if ((dfile=
my_create_with_symlink(linkname_ptr, filename, 0, create_mode,
MYF(MY_WME | create_flag | sync_dir))) < 0)
goto err;
errpos=3;
if (_ma_initialize_data_file(dfile, &share))
goto err;
}
/* Enlarge files */ /* Enlarge files */
DBUG_PRINT("info", ("enlarge to keystart: %lu", DBUG_PRINT("info", ("enlarge to keystart: %lu",
(ulong) share.base.keystart)); (ulong) share.base.keystart));
...@@ -1030,7 +1035,6 @@ int maria_create(const char *name, enum data_file_type datafile_type, ...@@ -1030,7 +1035,6 @@ int maria_create(const char *name, enum data_file_type datafile_type,
if (my_chsize(dfile,share.base.min_pack_length*ci->reloc_rows,0,MYF(0))) if (my_chsize(dfile,share.base.min_pack_length*ci->reloc_rows,0,MYF(0)))
goto err; goto err;
#endif #endif
errpos=2;
if ((sync_dir && my_sync(dfile, MYF(0))) || my_close(dfile,MYF(0))) if ((sync_dir && my_sync(dfile, MYF(0))) || my_close(dfile,MYF(0)))
goto err; goto err;
} }
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
/* This clears the status information and truncates files */ /* This clears the status information and truncates files */
#include "maria_def.h" #include "maria_def.h"
#include "trnman_public.h" #include "trnman.h"
/** /**
@brief deletes all rows from a table @brief deletes all rows from a table
...@@ -52,6 +52,25 @@ int maria_delete_all_rows(MARIA_HA *info) ...@@ -52,6 +52,25 @@ int maria_delete_all_rows(MARIA_HA *info)
if (_ma_mark_file_changed(info)) if (_ma_mark_file_changed(info))
goto err; goto err;
if (log_record)
{
/*
This record will be used by Recovery to finish the deletion if it
crashed. We force it because it's a non-undoable operation.
*/
LSN lsn;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[FILEID_STORE_SIZE];
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (unlikely(translog_write_record(&lsn, LOGREC_REDO_DELETE_ALL,
info->trn, share, 0,
sizeof(log_array)/sizeof(log_array[0]),
log_array, log_data) ||
translog_flush(lsn)))
goto err;
}
info->state->records=info->state->del=state->split=0; info->state->records=info->state->del=state->split=0;
state->changed= 0; /* File is optimized */ state->changed= 0; /* File is optimized */
state->dellink = HA_OFFSET_ERROR; state->dellink = HA_OFFSET_ERROR;
...@@ -78,6 +97,12 @@ int maria_delete_all_rows(MARIA_HA *info) ...@@ -78,6 +97,12 @@ int maria_delete_all_rows(MARIA_HA *info)
if (_ma_initialize_data_file(info->dfile.file, share)) if (_ma_initialize_data_file(info->dfile.file, share))
goto err; goto err;
/*
The operations above on the index/data file will be forced to disk at
Checkpoint or maria_close() time. So we can reset:
*/
info->trn->rec_lsn= LSN_IMPOSSIBLE;
VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE)); VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
#ifdef HAVE_MMAP #ifdef HAVE_MMAP
/* Resize mmaped area */ /* Resize mmaped area */
...@@ -85,36 +110,6 @@ int maria_delete_all_rows(MARIA_HA *info) ...@@ -85,36 +110,6 @@ int maria_delete_all_rows(MARIA_HA *info)
_ma_remap_file(info, (my_off_t)0); _ma_remap_file(info, (my_off_t)0);
rw_unlock(&info->s->mmap_lock); rw_unlock(&info->s->mmap_lock);
#endif #endif
if (log_record)
{
/* For now this record is only informative */
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[FILEID_STORE_SIZE];
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (unlikely(translog_write_record(&share->state.create_rename_lsn,
LOGREC_REDO_DELETE_ALL,
info->trn, share, 0,
sizeof(log_array)/sizeof(log_array[0]),
log_array, log_data)))
goto err;
/*
store LSN into file. It is an optimization so that all old REDOs for
this table are ignored (scenario: checkpoint, INSERT1s, DELETE ALL;
INSERT2s, crash: then Recovery can skip INSERT1s). It also allows us to
ignore the present record at Recovery.
Note that storing the LSN could not be done by _ma_writeinfo() above as
the table is locked at this moment. So we need to do it by ourselves.
*/
if (_ma_update_create_rename_lsn_on_disk(share, FALSE) ||
_ma_sync_table_files(info))
goto err;
/**
@todo RECOVERY Until we take into account the log record above
for log-low-water-mark calculation and use it in Recovery, we need
to sync above.
*/
}
allow_break(); /* Allow SIGHUP & SIGINT */ allow_break(); /* Allow SIGHUP & SIGINT */
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -123,9 +118,11 @@ err: ...@@ -123,9 +118,11 @@ err:
int save_errno=my_errno; int save_errno=my_errno;
VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE)); VOID(_ma_writeinfo(info,WRITEINFO_UPDATE_KEYFILE));
info->update|=HA_STATE_WRITTEN; /* Buffer changed */ info->update|=HA_STATE_WRITTEN; /* Buffer changed */
/** @todo RECOVERY until we use the log record above we have to sync */ /**
if (log_record &&_ma_sync_table_files(info) && !save_errno) @todo RECOVERY if we come here, Recovery may later apply the REDO above,
save_errno= my_errno; which may be wrong. Not fixing it now, as anyway this way of deleting
rows will have to be re-examined when we have versioning.
*/
allow_break(); /* Allow SIGHUP & SIGINT */ allow_break(); /* Allow SIGHUP & SIGINT */
DBUG_RETURN(my_errno=save_errno); DBUG_RETURN(my_errno=save_errno);
} }
......
...@@ -78,9 +78,9 @@ int maria_delete_table(const char *name) ...@@ -78,9 +78,9 @@ int maria_delete_table(const char *name)
{ {
/* /*
For this log record to be of any use for Recovery, we need the upper For this log record to be of any use for Recovery, we need the upper
MySQL layer to be crash-safe in DDLs; when it is we should reconsider MySQL layer to be crash-safe in DDLs.
the moment of writing this log record, how to use it in Recovery, and For now this record can serve when we apply logs to a backup, so we sync
force the log. For now this record is only informative. it.
*/ */
LSN lsn; LSN lsn;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
...@@ -91,7 +91,8 @@ int maria_delete_table(const char *name) ...@@ -91,7 +91,8 @@ int maria_delete_table(const char *name)
log_array[TRANSLOG_INTERNAL_PARTS + log_array[TRANSLOG_INTERNAL_PARTS +
0].length, 0].length,
sizeof(log_array)/sizeof(log_array[0]), sizeof(log_array)/sizeof(log_array[0]),
log_array, NULL))) log_array, NULL) ||
translog_flush(lsn)))
DBUG_RETURN(1); DBUG_RETURN(1);
} }
......
...@@ -375,7 +375,7 @@ static LOG_DESC INIT_LOGREC_REDO_DROP_TABLE= ...@@ -375,7 +375,7 @@ static LOG_DESC INIT_LOGREC_REDO_DROP_TABLE=
static LOG_DESC INIT_LOGREC_REDO_DELETE_ALL= static LOG_DESC INIT_LOGREC_REDO_DELETE_ALL=
{LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE, FILEID_STORE_SIZE, {LOGRECTYPE_FIXEDLENGTH, FILEID_STORE_SIZE, FILEID_STORE_SIZE,
NULL, NULL, NULL, 0, NULL, write_hook_for_redo, NULL, 0,
"redo_delete_all", TRUE, NULL, NULL}; "redo_delete_all", TRUE, NULL, NULL};
static LOG_DESC INIT_LOGREC_REDO_REPAIR_TABLE= static LOG_DESC INIT_LOGREC_REDO_REPAIR_TABLE=
......
...@@ -76,15 +76,16 @@ int maria_rename(const char *old_name, const char *new_name) ...@@ -76,15 +76,16 @@ int maria_rename(const char *old_name, const char *new_name)
MySQL layer to be crash-safe, which it is not now (that would require MySQL layer to be crash-safe, which it is not now (that would require
work using the ddl_log of sql/sql_table.cc); when it is, we should work using the ddl_log of sql/sql_table.cc); when it is, we should
reconsider the moment of writing this log record (before or after op, reconsider the moment of writing this log record (before or after op,
under THR_LOCK_maria or not...), how to use it in Recovery, and force under THR_LOCK_maria or not...), how to use it in Recovery.
the log. For now this record is just informative. For now it can serve to apply logs to a backup so we sync it.
*/ */
if (unlikely(translog_write_record(&share->state.create_rename_lsn, if (unlikely(translog_write_record(&share->state.create_rename_lsn,
LOGREC_REDO_RENAME_TABLE, LOGREC_REDO_RENAME_TABLE,
&dummy_transaction_object, NULL, &dummy_transaction_object, NULL,
2 + 2 + old_name_len + new_name_len, 2 + 2 + old_name_len + new_name_len,
sizeof(log_array)/sizeof(log_array[0]), sizeof(log_array)/sizeof(log_array[0]),
log_array, NULL))) log_array, NULL) ||
translog_flush(share->state.create_rename_lsn)))
{ {
maria_close(info); maria_close(info);
DBUG_RETURN(1); DBUG_RETURN(1);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment