Commit cd275413 authored by unknown's avatar unknown

WL#3071 Maria checkpoint

Finally this is the real checkpoint code.
It however exhibits unstabilities when a checkpoint runs concurrently
with data-modifying clients (table corruption, transaction log's
assertions) so for now a checkpoint is taken only at startup after
recovery and at shutdown, i.e. not in concurrent situations. Later
we will let it run periodically, as well as flush dirty pages
periodically (almost all needed code is there already, only pagecache
code is written but not committed).
WL#3072 Maria recovery
* replacing UNDO_ROW_PURGE with CLR_END; testing of those CLR_END via
ma_test2 which has INSERTs failing with duplicate keys.
* replaying of REDO_RENAME_TABLE
Now, off to test Recovery in ha_maria :)


BitKeeper/deleted/.del-ma_least_recently_dirtied.c:
  Delete: storage/maria/ma_least_recently_dirtied.c
BitKeeper/deleted/.del-ma_least_recently_dirtied.h:
  Delete: storage/maria/ma_least_recently_dirtied.h
storage/maria/Makefile.am:
  compile Checkpoint module
storage/maria/ha_maria.cc:
  When ha_maria starts, do a recovery from last checkpoint.
  Take a checkpoint when that recovery has ended and when ha_maria
  shuts down cleanly.
storage/maria/ma_blockrec.c:
  * even if my_sync() fails we have to my_close() (otherwise we leak
  a descriptor)
  * UNDO_ROW_PURGE is replaced by a simple CLR_END for UNDO_ROW_INSERT,
  as promised in the old comment; it gives us skipping during the
  UNDO phase.
storage/maria/ma_check.c:
  All REDOs before create_rename_lsn are ignored by Recovery. So
  create_rename_lsn must be set only after all data/index has been
  flushed and forced to disk. We thus move write_log_record_for_repair()
  to after _ma_flush_tables_files_after_repair().
storage/maria/ma_checkpoint.c:
  Checkpoint module.
storage/maria/ma_checkpoint.h:
  optional argument if caller wants a thread to periodically take
  checkpoints and flush dirty pages.
storage/maria/ma_create.c:
  * no need to init some vars as the initial bzero(share) takes care of this.
  * update to new function's name
  * even if we fail in my_sync() we have to my_close()
storage/maria/ma_extra.c:
  Checkpoint reads share->last_version under intern_lock, so we make
  maria_extra() update it under intern_lock. THR_LOCK_maria still needed
  because of _ma_test_if_reopen().
storage/maria/ma_init.c:
  destroy checkpoint module when Maria shuts down.
storage/maria/ma_loghandler.c:
  * UNDO_ROW_PURGE gone (see ma_blockrec.c)
  * we need to remember the LSN of the LOGREC_FILE_ID for a share,
  because this LSN is needed into the checkpoint record (Recovery wants
  to know the validity domain of an id->name mapping)
  * translog_get_horizon_no_lock() needed for Checkpoint
  * comment about failing assertion (Sanja knows)
  * translog_init_reader_data() thought that translog_read_record_header_scan()
  returns 0 in case of error, but 0 just means "0-length header".
  * translog_assign_id_to_share() now needs the MARIA_HA because
  LOGREC_FILE_ID uses a log-write hook.
  * Verify that (de)assignment of share->id happens only under intern_lock,
  as Checkpoint reads this id with intern_lock.
  * translog_purge() can accept TRANSLOG_ADDRESS, not necessarily
  a real LSN.
storage/maria/ma_loghandler.h:
  prototype updates
storage/maria/ma_open.c:
  no need to initialize "res"
storage/maria/ma_pagecache.c:
  When taking a checkpoint, we don't need to know the maximum rec_lsn
  of dirty pages; this LSN was intended to be used in the two-checkpoint
  rule, but last_checkpoint_lsn is as good.
  4 bytes for stored_list_size is enough as PAGECACHE::blocks (number
  of blocks which the pagecache can contain) is int.
storage/maria/ma_pagecache.h:
  new prototype
storage/maria/ma_recovery.c:
  * added replaying of REDO_RENAME_TABLE
  * UNDO_ROW_PURGE gone (see ma_blockrec.c), replaced by CLR_END
  * Recovery from the last checkpoint record now possible
  * In new_table() we skip the table if the id->name mapping is older than
  create_rename_lsn (mapping dates from lsn_of_file_id).
  * in get_MARIA_HA_from_REDO_record() we skip the record
  if the id->name mapping is newer than the record (can happen if processing
  a record which is before the checkpoint record).
  * parse_checkpoint_record() has to return a LSN, that's what caller expects
storage/maria/ma_rename.c:
  new function's name; log end zeroes of tables' names (ease recovery)
storage/maria/ma_test2.c:
  * equivalent of ma_test1's --test-undo added (named -u here).
  * -t=1 now stops right after creating the table, so that
  we can test undoing of INSERTs with duplicate keys (which tests the
  CLR_END logged by _ma_write_abort_block_record()).
storage/maria/ma_test_recovery.expected:
  Result of testing undoing of INSERTs with duplicate keys; there are
  some differences in maria_chk -dvv but they are normal (removing
  records does not shrink data/index file, does not put back the
  "analyzed, optimized keys"(etc) index state.
storage/maria/ma_test_recovery:
  Test undoing of INSERTs with duplicate keys, using ma_test2;
  when such INSERT happens, it logs REDO_INSERT, UNDO_INSERT, REDO_DELETE,
  CLR_END; we abort after that, and test that CLR_END causes recovery
  to jump over UNDO_INSERT.
storage/maria/ma_write.c:
  comment
storage/maria/maria_chk.c:
  comment
storage/maria/maria_def.h:
  * a new bit in MARIA_SHARE::in_checkpoint, used to build a list
  of unique shares during Checkpoint.
  * MARIA_SHARE::lsn_of_file_id added: the LSN of the last LOGREC_FILE_ID
  for this share; needed to know to which LSN domain the mappings
  found in the Checkpoint record apply (new mappings should not apply
  to old REDOs).
storage/maria/trnman.c:
  * small changes to how trnman_collect_transactions() fills its buffer;
  it also uses a non-dummy lsn_read_non_atomic() found in ma_checkpoint.h
parent cbac9b2c
...@@ -61,7 +61,8 @@ noinst_HEADERS = maria_def.h ma_rt_index.h ma_rt_key.h ma_rt_mbr.h \ ...@@ -61,7 +61,8 @@ noinst_HEADERS = maria_def.h ma_rt_index.h ma_rt_key.h ma_rt_mbr.h \
ma_ft_eval.h trnman.h lockman.h tablockman.h \ ma_ft_eval.h trnman.h lockman.h tablockman.h \
ma_control_file.h ha_maria.h ma_blockrec.h \ ma_control_file.h ha_maria.h ma_blockrec.h \
ma_loghandler.h ma_loghandler_lsn.h ma_pagecache.h \ ma_loghandler.h ma_loghandler_lsn.h ma_pagecache.h \
ma_recovery.h ma_commit.h trnman_public.h ma_checkpoint.h ma_recovery.h ma_commit.h \
trnman_public.h
ma_test1_DEPENDENCIES= $(LIBRARIES) ma_test1_DEPENDENCIES= $(LIBRARIES)
ma_test1_LDADD= @CLIENT_EXTRA_LDFLAGS@ libmaria.a \ ma_test1_LDADD= @CLIENT_EXTRA_LDFLAGS@ libmaria.a \
$(top_builddir)/storage/myisam/libmyisam.a \ $(top_builddir)/storage/myisam/libmyisam.a \
...@@ -120,7 +121,7 @@ libmaria_a_SOURCES = ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c \ ...@@ -120,7 +121,7 @@ libmaria_a_SOURCES = ma_init.c ma_open.c ma_extra.c ma_info.c ma_rkey.c \
ma_rt_index.c ma_rt_key.c ma_rt_mbr.c ma_rt_split.c \ ma_rt_index.c ma_rt_key.c ma_rt_mbr.c ma_rt_split.c \
ma_sp_key.c ma_control_file.c ma_loghandler.c \ ma_sp_key.c ma_control_file.c ma_loghandler.c \
ma_pagecache.c ma_pagecaches.c \ ma_pagecache.c ma_pagecaches.c \
ma_recovery.c ma_commit.c ma_checkpoint.c ma_recovery.c ma_commit.c
CLEANFILES = test?.MA? FT?.MA? isam.log ma_test_all ma_rt_test.MA? sp_test.MA? CLEANFILES = test?.MA? FT?.MA? isam.log ma_test_all ma_rt_test.MA? sp_test.MA?
SUFFIXES = .sh SUFFIXES = .sh
......
...@@ -31,6 +31,8 @@ C_MODE_START ...@@ -31,6 +31,8 @@ C_MODE_START
#include "maria_def.h" #include "maria_def.h"
#include "ma_rt_index.h" #include "ma_rt_index.h"
#include "ma_blockrec.h" #include "ma_blockrec.h"
#include "ma_checkpoint.h"
#include "ma_recovery.h"
C_MODE_END C_MODE_END
/* /*
...@@ -2344,6 +2346,7 @@ bool ha_maria::check_if_incompatible_data(HA_CREATE_INFO *info, ...@@ -2344,6 +2346,7 @@ bool ha_maria::check_if_incompatible_data(HA_CREATE_INFO *info,
static int maria_hton_panic(handlerton *hton, ha_panic_function flag) static int maria_hton_panic(handlerton *hton, ha_panic_function flag)
{ {
ma_checkpoint_execute(CHECKPOINT_FULL, FALSE); /* can't catch error */
return maria_panic(flag); return maria_panic(flag);
} }
...@@ -2403,7 +2406,10 @@ static int ha_maria_init(void *p) ...@@ -2403,7 +2406,10 @@ static int ha_maria_init(void *p)
translog_init(maria_data_root, TRANSLOG_FILE_SIZE, translog_init(maria_data_root, TRANSLOG_FILE_SIZE,
MYSQL_VERSION_ID, server_id, maria_log_pagecache, MYSQL_VERSION_ID, server_id, maria_log_pagecache,
TRANSLOG_DEFAULT_FLAGS) || TRANSLOG_DEFAULT_FLAGS) ||
trnman_init(0); maria_recover() ||
ma_checkpoint_init(FALSE) ||
/* One checkpoint after Recovery */
ma_checkpoint_execute(CHECKPOINT_FULL, FALSE);
maria_multi_threaded= TRUE; maria_multi_threaded= TRUE;
return res; return res;
} }
......
...@@ -402,9 +402,10 @@ my_bool _ma_once_end_block_record(MARIA_SHARE *share) ...@@ -402,9 +402,10 @@ my_bool _ma_once_end_block_record(MARIA_SHARE *share)
File must be synced as it is going out of the maria_open_list and so File must be synced as it is going out of the maria_open_list and so
becoming unknown to Checkpoint. becoming unknown to Checkpoint.
*/ */
if ((share->now_transactional && if (share->now_transactional &&
my_sync(share->bitmap.file.file, MYF(MY_WME))) || my_sync(share->bitmap.file.file, MYF(MY_WME)))
my_close(share->bitmap.file.file, MYF(MY_WME))) res= 1;
if (my_close(share->bitmap.file.file, MYF(MY_WME)))
res= 1; res= 1;
/* /*
Trivial assignment to guard against multiple invocations Trivial assignment to guard against multiple invocations
...@@ -2587,7 +2588,8 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info) ...@@ -2587,7 +2588,8 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info)
my_bool res= 0; my_bool res= 0;
MARIA_BITMAP_BLOCKS *blocks= &info->cur_row.insert_blocks; MARIA_BITMAP_BLOCKS *blocks= &info->cur_row.insert_blocks;
MARIA_BITMAP_BLOCK *block, *end; MARIA_BITMAP_BLOCK *block, *end;
DBUG_ENTER("_ma_abort_write_block_record"); LSN lsn= LSN_IMPOSSIBLE;
DBUG_ENTER("_ma_write_abort_block_record");
if (delete_head_or_tail(info, if (delete_head_or_tail(info,
ma_recordpos_to_page(info->cur_row.lastpos), ma_recordpos_to_page(info->cur_row.lastpos),
...@@ -2616,44 +2618,42 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info) ...@@ -2616,44 +2618,42 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info)
if (info->s->now_transactional) if (info->s->now_transactional)
{ {
LSN lsn; LSN previous_undo_lsn;
TRANSLOG_HEADER_BUFFER rec;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE]; uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + 1];
int len;
/* /*
Write UNDO record We do need the code above (delete_head_or_tail() etc) for
This entry is just an end marker for the abort_insert as we will never non-transactional tables.
really undo a failed insert. Note that this UNDO will cause recover For transactional tables we could skip this code above and just execute
to ignore the LOGREC_UNDO_ROW_INSERT that is the previous entry the UNDO_INSERT, but we try to have one code path.
in the UNDO chain. Write CLR record, because we are somehow undoing UNDO_ROW_INSERT.
When we have logging for keys: as maria_write() first writes the row
then the keys, and if failure, deletes the keys then the rows,
info->trn->undo_lsn below will properly point to the UNDO of the
UNDO_ROW_INSERT for this row.
*/ */
/** if ((len= translog_read_record_header(info->trn->undo_lsn, &rec)) ==
@todo RECOVERY BUG RECHEADER_READ_ERROR)
We do need the code above (delete_head_or_tail() etc) for {
non-transactional tables. res= 1;
For transactional tables we can either also use it or execute the goto end;
UNDO_INSERT. If we crash before this }
_ma_write_abort_block_record(), Recovery will do the work of this DBUG_ASSERT(rec.type == LOGREC_UNDO_ROW_INSERT);
function by executing UNDO_INSERT. previous_undo_lsn= lsn_korr(rec.header);
For transactional tables, we will remove this LOGREC_UNDO_PURGE and lsn_store(log_data, previous_undo_lsn);
replace it with a LOGREC_CLR_END: we should go back the UNDO chain log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE]= LOGREC_UNDO_ROW_INSERT;
until we reach the UNDO which inserted the row into the data file, and
use its previous_undo_lsn.
Same logic for when we remove inserted keys (in case of error in
maria_write(): we come to the present function only after removing the
inserted keys... as long as we unpin the key pages only after writing
the CLR_END, this would be recovery-safe...).
*/
lsn_store(log_data, info->trn->undo_lsn);
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data; log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char*) log_data;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data); log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
if (translog_write_record(&lsn, LOGREC_UNDO_ROW_PURGE, if (translog_write_record(&lsn, LOGREC_CLR_END,
info->trn, info, sizeof(log_data), info->trn, info, sizeof(log_data),
TRANSLOG_INTERNAL_PARTS + 1, log_array, TRANSLOG_INTERNAL_PARTS + 1, log_array,
log_data + LSN_STORE_SIZE)) log_data + LSN_STORE_SIZE))
res= 1; res= 1;
} }
_ma_unpin_all_pages_and_finalize_row(info, info->trn->undo_lsn); end:
_ma_unpin_all_pages_and_finalize_row(info, lsn);
DBUG_RETURN(res); DBUG_RETURN(res);
} }
......
...@@ -2264,7 +2264,7 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info, ...@@ -2264,7 +2264,7 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
llstr(sort_info.dupp,llbuff)); llstr(sort_info.dupp,llbuff));
} }
got_error= sync_dir ? write_log_record_for_repair(param, info) : 0; got_error= 0;
/* If invoked by external program that uses thr_lock */ /* If invoked by external program that uses thr_lock */
if (&share->state.state != info->state) if (&share->state.state != info->state)
memcpy( &share->state.state, info->state, sizeof(*info->state)); memcpy( &share->state.state, info->state, sizeof(*info->state));
...@@ -2309,6 +2309,14 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info, ...@@ -2309,6 +2309,14 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
} }
maria_mark_crashed_on_repair(info); maria_mark_crashed_on_repair(info);
} }
else if (sync_dir)
{
/*
Now that we have flushed and forced everything, we can bump
create_rename_lsn:
*/
write_log_record_for_repair(param, info);
}
my_free(sort_param.rec_buff, MYF(MY_ALLOW_ZERO_PTR)); my_free(sort_param.rec_buff, MYF(MY_ALLOW_ZERO_PTR));
my_free(sort_param.record,MYF(MY_ALLOW_ZERO_PTR)); my_free(sort_param.record,MYF(MY_ALLOW_ZERO_PTR));
my_free(sort_info.buff,MYF(MY_ALLOW_ZERO_PTR)); my_free(sort_info.buff,MYF(MY_ALLOW_ZERO_PTR));
...@@ -5551,7 +5559,7 @@ static int _ma_safe_scan_block_record(MARIA_SORT_INFO *sort_info, ...@@ -5551,7 +5559,7 @@ static int _ma_safe_scan_block_record(MARIA_SORT_INFO *sort_info,
/** /**
@brief Writes a LOGREC_REPAIR_TABLE record and updates create_rename_lsn @brief Writes a LOGREC_REPAIR_TABLE record and updates create_rename_lsn
and is_of_lsn and is_of_horizon
REPAIR/OPTIMIZE have replaced the data/index file with a new file REPAIR/OPTIMIZE have replaced the data/index file with a new file
and so, in this scenario: and so, in this scenario:
...@@ -5572,6 +5580,7 @@ static int _ma_safe_scan_block_record(MARIA_SORT_INFO *sort_info, ...@@ -5572,6 +5580,7 @@ static int _ma_safe_scan_block_record(MARIA_SORT_INFO *sort_info,
static int write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info) static int write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info)
{ {
MARIA_SHARE *share= info->s;
/* in case this is maria_chk or recovery... */ /* in case this is maria_chk or recovery... */
if (translog_inited && !maria_in_recovery) if (translog_inited && !maria_in_recovery)
{ {
...@@ -5613,16 +5622,12 @@ static int write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info) ...@@ -5613,16 +5622,12 @@ static int write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info)
return 1; return 1;
/* /*
The table's existence was made durable earlier (MY_SYNC_DIR passed to The table's existence was made durable earlier (MY_SYNC_DIR passed to
maria_change_to_newfile()). maria_change_to_newfile()). _ma_flush_table_files_after_repair() was
called earlier, flushed and forced data+index+state. Old REDOs should
not be applied to the table:
*/ */
if (_ma_update_create_rename_lsn_on_disk(info->s, lsn, FALSE)) if (_ma_update_create_rename_lsn(share, lsn, TRUE))
return 1; return 1;
/*
_ma_flush_table_files_after_repair() is later called by maria_repair(),
and makes sure to flush the data, index, update is_of_lsn, flush state
and sync, so create_rename_lsn reaches disk, thus we won't apply old
REDOs to the new table.
*/
} }
return 0; return 0;
} }
This diff is collapsed.
...@@ -32,7 +32,7 @@ typedef enum enum_ma_checkpoint_level { ...@@ -32,7 +32,7 @@ typedef enum enum_ma_checkpoint_level {
} CHECKPOINT_LEVEL; } CHECKPOINT_LEVEL;
C_MODE_START C_MODE_START
int ma_checkpoint_init(); int ma_checkpoint_init(my_bool create_background_thread);
void ma_checkpoint_end(); void ma_checkpoint_end();
int ma_checkpoint_execute(CHECKPOINT_LEVEL level, my_bool no_wait); int ma_checkpoint_execute(CHECKPOINT_LEVEL level, my_bool no_wait);
C_MODE_END C_MODE_END
......
...@@ -636,7 +636,6 @@ int maria_create(const char *name, enum data_file_type datafile_type, ...@@ -636,7 +636,6 @@ int maria_create(const char *name, enum data_file_type datafile_type,
share.state.dellink = HA_OFFSET_ERROR; share.state.dellink = HA_OFFSET_ERROR;
share.state.first_bitmap_with_space= 0; share.state.first_bitmap_with_space= 0;
share.state.create_rename_lsn= share.state.is_of_lsn= LSN_IMPOSSIBLE;
share.state.process= (ulong) getpid(); share.state.process= (ulong) getpid();
share.state.unique= (ulong) 0; share.state.unique= (ulong) 0;
share.state.update_count=(ulong) 0; share.state.update_count=(ulong) 0;
...@@ -1006,7 +1005,7 @@ int maria_create(const char *name, enum data_file_type datafile_type, ...@@ -1006,7 +1005,7 @@ int maria_create(const char *name, enum data_file_type datafile_type,
DROP+CREATE happened (applying REDOs to the wrong table). DROP+CREATE happened (applying REDOs to the wrong table).
*/ */
share.kfile.file= file; share.kfile.file= file;
if (_ma_update_create_rename_lsn_on_disk_sub(&share, lsn, FALSE)) if (_ma_update_create_rename_lsn_sub(&share, lsn, FALSE))
goto err; goto err;
my_free(log_data, MYF(0)); my_free(log_data, MYF(0));
} }
...@@ -1070,7 +1069,9 @@ int maria_create(const char *name, enum data_file_type datafile_type, ...@@ -1070,7 +1069,9 @@ int maria_create(const char *name, enum data_file_type datafile_type,
if (my_chsize(dfile,share.base.min_pack_length*ci->reloc_rows,0,MYF(0))) if (my_chsize(dfile,share.base.min_pack_length*ci->reloc_rows,0,MYF(0)))
goto err; goto err;
#endif #endif
if ((sync_dir && my_sync(dfile, MYF(0))) || my_close(dfile,MYF(0))) if (sync_dir && my_sync(dfile, MYF(0)))
goto err;
if (my_close(dfile,MYF(0)))
goto err; goto err;
} }
pthread_mutex_unlock(&THR_LOCK_maria); pthread_mutex_unlock(&THR_LOCK_maria);
...@@ -1207,7 +1208,7 @@ int _ma_initialize_data_file(MARIA_SHARE *share, File dfile) ...@@ -1207,7 +1208,7 @@ int _ma_initialize_data_file(MARIA_SHARE *share, File dfile)
/** /**
@brief Writes create_rename_lsn and is_of_lsn to disk, optionally forces. @brief Writes create_rename_lsn and is_of_horizon to disk, can force.
This is for special cases where: This is for special cases where:
- we don't want to write the full state to disk (so, not call - we don't want to write the full state to disk (so, not call
...@@ -1224,21 +1225,21 @@ int _ma_initialize_data_file(MARIA_SHARE *share, File dfile) ...@@ -1224,21 +1225,21 @@ int _ma_initialize_data_file(MARIA_SHARE *share, File dfile)
@retval 1 error (disk problem) @retval 1 error (disk problem)
*/ */
int _ma_update_create_rename_lsn_on_disk(MARIA_SHARE *share, int _ma_update_create_rename_lsn(MARIA_SHARE *share,
LSN lsn, my_bool do_sync) LSN lsn, my_bool do_sync)
{ {
int res; int res;
pthread_mutex_lock(&share->intern_lock); pthread_mutex_lock(&share->intern_lock);
res= _ma_update_create_rename_lsn_on_disk_sub(share, lsn, do_sync); res= _ma_update_create_rename_lsn_sub(share, lsn, do_sync);
pthread_mutex_unlock(&share->intern_lock); pthread_mutex_unlock(&share->intern_lock);
return res; return res;
} }
/** /**
@brief Writes create_rename_lsn and is_of_lsn to disk, optionally forces. @brief Writes create_rename_lsn and is_of_horizon to disk, can force.
Shortcut of _ma_update_create_rename_lsn_on_disk() when we know that Shortcut of _ma_update_create_rename_lsn() when we know that
intern_lock is not needed (when creating a table or opening it for the intern_lock is not needed (when creating a table or opening it for the
first time). first time).
...@@ -1250,15 +1251,28 @@ int _ma_update_create_rename_lsn_on_disk(MARIA_SHARE *share, ...@@ -1250,15 +1251,28 @@ int _ma_update_create_rename_lsn_on_disk(MARIA_SHARE *share,
@retval 1 error (disk problem) @retval 1 error (disk problem)
*/ */
int _ma_update_create_rename_lsn_on_disk_sub(MARIA_SHARE *share, int _ma_update_create_rename_lsn_sub(MARIA_SHARE *share,
LSN lsn, my_bool do_sync) LSN lsn, my_bool do_sync)
{ {
char buf[LSN_STORE_SIZE*2], *ptr; char buf[LSN_STORE_SIZE*2], *ptr;
File file= share->kfile.file; File file= share->kfile.file;
DBUG_ASSERT(file >= 0); DBUG_ASSERT(file >= 0);
for (ptr= buf; ptr < (buf + sizeof(buf)); ptr+= LSN_STORE_SIZE) for (ptr= buf; ptr < (buf + sizeof(buf)); ptr+= LSN_STORE_SIZE)
lsn_store(ptr, lsn); lsn_store(ptr, lsn);
share->state.is_of_lsn= share->state.create_rename_lsn= lsn; share->state.is_of_horizon= share->state.create_rename_lsn= lsn;
if (share->id != 0)
{
/*
If OP is the operation which is calling us, if table is later written,
we could see in the log:
FILE_ID ... REDO_OP ... REDO_INSERT.
(that can happen in real life at least with OP=REPAIR).
As FILE_ID will be ignored by Recovery because it is <
create_rename_lsn, REDO_INSERT would be ignored too, wrongly.
To avoid that, we force a LOGREC_FILE_ID to be logged at next write:
*/
translog_deassign_id_from_share(share);
}
return my_pwrite(file, buf, sizeof(buf), return my_pwrite(file, buf, sizeof(buf),
sizeof(share->state.header) + 2, MYF(MY_NABP)) || sizeof(share->state.header) + 2, MYF(MY_NABP)) ||
(do_sync && my_sync(file, MYF(0))); (do_sync && my_sync(file, MYF(0)));
......
...@@ -297,8 +297,10 @@ int maria_extra(MARIA_HA *info, enum ha_extra_function function, ...@@ -297,8 +297,10 @@ int maria_extra(MARIA_HA *info, enum ha_extra_function function,
HA_EXTRA_PREPARE_FOR_DROP|RENAME. HA_EXTRA_PREPARE_FOR_DROP|RENAME.
*/ */
pthread_mutex_lock(&THR_LOCK_maria); pthread_mutex_lock(&THR_LOCK_maria);
pthread_mutex_lock(&share->intern_lock); /* protect against Checkpoint */
/* this makes the share not be re-used next time the table is opened */ /* this makes the share not be re-used next time the table is opened */
share->last_version= 0L; /* Impossible version */ share->last_version= 0L; /* Impossible version */
pthread_mutex_unlock(&share->intern_lock);
pthread_mutex_unlock(&THR_LOCK_maria); pthread_mutex_unlock(&THR_LOCK_maria);
break; break;
case HA_EXTRA_PREPARE_FOR_DROP: case HA_EXTRA_PREPARE_FOR_DROP:
...@@ -306,9 +308,8 @@ int maria_extra(MARIA_HA *info, enum ha_extra_function function, ...@@ -306,9 +308,8 @@ int maria_extra(MARIA_HA *info, enum ha_extra_function function,
{ {
my_bool do_flush= test(function != HA_EXTRA_PREPARE_FOR_DROP); my_bool do_flush= test(function != HA_EXTRA_PREPARE_FOR_DROP);
pthread_mutex_lock(&THR_LOCK_maria); pthread_mutex_lock(&THR_LOCK_maria);
share->last_version= 0L; /* Impossible version */
/* /*
This share, having last_version=0, needs to save all its data/index This share, to have last_version=0, needs to save all its data/index
blocks to disk if this is not for a DROP TABLE. Otherwise they would be blocks to disk if this is not for a DROP TABLE. Otherwise they would be
invisible to future openers; and they could even go to disk late and invisible to future openers; and they could even go to disk late and
cancel the work of future openers. cancel the work of future openers.
...@@ -396,6 +397,8 @@ int maria_extra(MARIA_HA *info, enum ha_extra_function function, ...@@ -396,6 +397,8 @@ int maria_extra(MARIA_HA *info, enum ha_extra_function function,
} }
} }
#endif #endif
/* For protection against Checkpoint, we set under intern_lock: */
share->last_version= 0L; /* Impossible version */
pthread_mutex_unlock(&share->intern_lock); pthread_mutex_unlock(&share->intern_lock);
pthread_mutex_unlock(&THR_LOCK_maria); pthread_mutex_unlock(&THR_LOCK_maria);
break; break;
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include <ft_global.h> #include <ft_global.h>
#include "ma_blockrec.h" #include "ma_blockrec.h"
#include "trnman_public.h" #include "trnman_public.h"
#include "ma_checkpoint.h"
my_bool maria_inited= FALSE; my_bool maria_inited= FALSE;
pthread_mutex_t THR_LOCK_maria; pthread_mutex_t THR_LOCK_maria;
...@@ -56,6 +57,7 @@ void maria_end(void) ...@@ -56,6 +57,7 @@ void maria_end(void)
{ {
maria_inited= maria_multi_threaded= FALSE; maria_inited= maria_multi_threaded= FALSE;
ft_free_stopwords(); ft_free_stopwords();
ma_checkpoint_end();
trnman_destroy(); trnman_destroy();
translog_destroy(); translog_destroy();
end_pagecache(maria_log_pagecache, TRUE); end_pagecache(maria_log_pagecache, TRUE);
......
/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
WL#3261 Maria - background flushing of the least-recently-dirtied pages
First version written by Guilhem Bichot on 2006-04-27.
Does not compile yet.
*/
/*
To be part of the page cache.
The pseudocode below is dependent on the page cache
which is being designed WL#3134. It is not clear if I need to do page
copies, as the page cache already keeps page copies.
So, this code will move to the page cache and take inspiration from its
methods. Below is just to give the idea of what could be done.
And I should compare my imaginations to WL#3134.
*/
/* Here is the implementation of this module */
#include "page_cache.h"
#include "least_recently_dirtied.h"
/*
This thread does background flush of pieces of the LRD, and serves
requests for asynchronous checkpoints.
Just launch it when engine starts.
MikaelR questioned why the same thread does two different jobs, the risk
could be that while a checkpoint happens no LRD flushing happens.
For now, we only do checkpoints - no LRD flushing (to be done when the
second version of the page cache is ready WL#3077).
Reasons to delay:
- Recovery will work (just slower)
- new page cache may be different, why do then re-do
- current pagecache probably has issues with flushing when somebody is
writing to the table being flushed - better avoid that.
*/
pthread_handler_decl background_flush_and_checkpoint_thread()
{
while (this_thread_not_killed)
{
/* note that we don't care of the checkpoint's success */
(void)execute_asynchronous_checkpoint_if_any();
sleep(5);
/*
in the final version, we will not sleep but call flush_pages_from_LRD()
repeatedly. If there are no dirty pages, we'll make sure to not have a
tight loop probing for checkpoint requests.
*/
}
}
/* The rest of this file will not serve in first version */
/*
flushes only the first pages of the LRD.
max_this_number could be FLUSH_CACHE (of mf_pagecache.c) for example.
*/
flush_pages_from_LRD(uint max_this_number, LSN max_this_lsn)
{
/*
One rule to better observe is "page must be flushed to disk before it is
removed from LRD" (otherwise checkpoint is incomplete info, corruption).
*/
/*
Build a list of pages to flush:
changed_blocks[i] is roughly sorted by descending rec_lsn,
so we could do a merge sort of changed_blocks[] lists, stopping after we
have the max_this_number first elements or after we have found a page with
rec_lsn > max_this_lsn.
Then do like pagecache_flush_blocks_int() does (beware! this time we are
not alone on the file! there may be dangers! TODO: sort this out).
*/
/*
MikaelR noted that he observed that Linux's file cache may never fsync to
disk until this cache is full, at which point it decides to empty the
cache, making the machine very slow. A solution was to fsync after writing
2 MB.
*/
}
/*
Note that when we flush all page from LRD up to rec_lsn>=max_lsn,
this is approximate because the LRD list may
not be exactly sorted by rec_lsn (because for a big row, all pages of the
row are inserted into the LRD with rec_lsn being the LSN of the REDO for the
first page, so if there are concurrent insertions, the last page of the big
row may have a smaller rec_lsn than the previous pages inserted by
concurrent inserters).
*/
/* Copyright (C) 2006 MySQL AB & MySQL Finland AB & TCX DataKonsult AB
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; version 2 of the License.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
/*
WL#3261 Maria - background flushing of the least-recently-dirtied pages
First version written by Guilhem Bichot on 2006-04-27.
Does not compile yet.
*/
/* This is the interface of this module. */
/* flushes all page from LRD up to approximately rec_lsn>=max_lsn */
int flush_all_LRD_to_lsn(LSN max_lsn);
This diff is collapsed.
...@@ -105,7 +105,6 @@ enum translog_record_type ...@@ -105,7 +105,6 @@ enum translog_record_type
LOGREC_UNDO_ROW_INSERT, LOGREC_UNDO_ROW_INSERT,
LOGREC_UNDO_ROW_DELETE, LOGREC_UNDO_ROW_DELETE,
LOGREC_UNDO_ROW_UPDATE, LOGREC_UNDO_ROW_UPDATE,
LOGREC_UNDO_ROW_PURGE,
LOGREC_UNDO_KEY_INSERT, LOGREC_UNDO_KEY_INSERT,
LOGREC_UNDO_KEY_DELETE, LOGREC_UNDO_KEY_DELETE,
LOGREC_PREPARE, LOGREC_PREPARE,
...@@ -251,13 +250,14 @@ extern my_bool translog_init_scanner(LSN lsn, ...@@ -251,13 +250,14 @@ extern my_bool translog_init_scanner(LSN lsn,
extern int translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner, extern int translog_read_next_record_header(TRANSLOG_SCANNER_DATA *scanner,
TRANSLOG_HEADER_BUFFER *buff); TRANSLOG_HEADER_BUFFER *buff);
extern LSN translog_get_file_max_lsn_stored(uint32 file); extern LSN translog_get_file_max_lsn_stored(uint32 file);
extern my_bool translog_purge(LSN low); extern my_bool translog_purge(TRANSLOG_ADDRESS low);
extern my_bool translog_is_file(uint file_no); extern my_bool translog_is_file(uint file_no);
extern my_bool translog_lock(); extern my_bool translog_lock();
extern my_bool translog_unlock(); extern my_bool translog_unlock();
extern void translog_lock_assert_owner(); extern void translog_lock_assert_owner();
extern TRANSLOG_ADDRESS translog_get_horizon(); extern TRANSLOG_ADDRESS translog_get_horizon();
extern int translog_assign_id_to_share(struct st_maria_share *share, extern TRANSLOG_ADDRESS translog_get_horizon_no_lock();
extern int translog_assign_id_to_share(struct st_maria_info *tbl_info,
struct st_transaction *trn); struct st_transaction *trn);
extern void translog_deassign_id_from_share(struct st_maria_share *share); extern void translog_deassign_id_from_share(struct st_maria_share *share);
extern void extern void
......
...@@ -618,14 +618,13 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) ...@@ -618,14 +618,13 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
view of the server, including server's recovery) now. view of the server, including server's recovery) now.
*/ */
if ((open_flags & HA_OPEN_FROM_SQL_LAYER) || maria_in_recovery) if ((open_flags & HA_OPEN_FROM_SQL_LAYER) || maria_in_recovery)
_ma_update_create_rename_lsn_on_disk_sub(share, _ma_update_create_rename_lsn_sub(share, translog_get_horizon(),
translog_get_horizon(), TRUE);
TRUE);
} }
else if ((!LSN_VALID(share->state.create_rename_lsn) || else if ((!LSN_VALID(share->state.create_rename_lsn) ||
!LSN_VALID(share->state.is_of_lsn) || !LSN_VALID(share->state.is_of_horizon) ||
(cmp_translog_addr(share->state.create_rename_lsn, (cmp_translog_addr(share->state.create_rename_lsn,
share->state.is_of_lsn) > 0)) && share->state.is_of_horizon) > 0)) &&
!(open_flags & HA_OPEN_FOR_REPAIR)) !(open_flags & HA_OPEN_FOR_REPAIR))
{ {
/* /*
...@@ -981,7 +980,7 @@ static void setup_key_functions(register MARIA_KEYDEF *keyinfo) ...@@ -981,7 +980,7 @@ static void setup_key_functions(register MARIA_KEYDEF *keyinfo)
@brief Function to save and store the header in the index file (.MYI) @brief Function to save and store the header in the index file (.MYI)
Operates under MARIA_SHARE::intern_lock if requested. Operates under MARIA_SHARE::intern_lock if requested.
Sets MARIA_SHARE::MARIA_STATE_INFO::is_of_lsn if table is transactional. Sets MARIA_SHARE::MARIA_STATE_INFO::is_of_horizon if transactional table.
Then calls _ma_state_info_write_sub(). Then calls _ma_state_info_write_sub().
@param share table @param share table
...@@ -998,7 +997,7 @@ static void setup_key_functions(register MARIA_KEYDEF *keyinfo) ...@@ -998,7 +997,7 @@ static void setup_key_functions(register MARIA_KEYDEF *keyinfo)
uint _ma_state_info_write(MARIA_SHARE *share, uint pWrite) uint _ma_state_info_write(MARIA_SHARE *share, uint pWrite)
{ {
uint res= 0; uint res;
if (pWrite & 4) if (pWrite & 4)
pthread_mutex_lock(&share->intern_lock); pthread_mutex_lock(&share->intern_lock);
else if (maria_multi_threaded) else if (maria_multi_threaded)
...@@ -1007,11 +1006,11 @@ uint _ma_state_info_write(MARIA_SHARE *share, uint pWrite) ...@@ -1007,11 +1006,11 @@ uint _ma_state_info_write(MARIA_SHARE *share, uint pWrite)
!maria_in_recovery) !maria_in_recovery)
{ {
/* /*
In a recovery, we want to set is_of_lsn to the LSN of the last In a recovery, we want to set is_of_horizon to the LSN of the last
record executed by Recovery, not the current EOF of the log (which record executed by Recovery, not the current EOF of the log (which
is too new). Recovery does it by itself. is too new). Recovery does it by itself.
*/ */
share->state.is_of_lsn= translog_get_horizon(); share->state.is_of_horizon= translog_get_horizon();
} }
res= _ma_state_info_write_sub(share->kfile.file, &share->state, pWrite); res= _ma_state_info_write_sub(share->kfile.file, &share->state, pWrite);
if (pWrite & 4) if (pWrite & 4)
...@@ -1052,11 +1051,12 @@ uint _ma_state_info_write_sub(File file, MARIA_STATE_INFO *state, uint pWrite) ...@@ -1052,11 +1051,12 @@ uint _ma_state_info_write_sub(File file, MARIA_STATE_INFO *state, uint pWrite)
/* open_count must be first because of _ma_mark_file_changed ! */ /* open_count must be first because of _ma_mark_file_changed ! */
mi_int2store(ptr,state->open_count); ptr+= 2; mi_int2store(ptr,state->open_count); ptr+= 2;
/* /*
if you change the offset of create_rename_lsn/is_of_lsn inside the file, if you change the offset of create_rename_lsn/is_of_horizon inside the
fix ma_create + ma_rename + ma_delete_all + backward-compatibility. index file's header, fix ma_create + ma_rename + ma_delete_all +
backward-compatibility.
*/ */
lsn_store(ptr, state->create_rename_lsn); ptr+= LSN_STORE_SIZE; lsn_store(ptr, state->create_rename_lsn); ptr+= LSN_STORE_SIZE;
lsn_store(ptr, state->is_of_lsn); ptr+= LSN_STORE_SIZE; lsn_store(ptr, state->is_of_horizon); ptr+= LSN_STORE_SIZE;
*ptr++= (uchar)state->changed; *ptr++= (uchar)state->changed;
*ptr++= state->sortkey; *ptr++= state->sortkey;
mi_rowstore(ptr,state->state.records); ptr+= 8; mi_rowstore(ptr,state->state.records); ptr+= 8;
...@@ -1119,7 +1119,7 @@ static uchar *_ma_state_info_read(uchar *ptr, MARIA_STATE_INFO *state) ...@@ -1119,7 +1119,7 @@ static uchar *_ma_state_info_read(uchar *ptr, MARIA_STATE_INFO *state)
state->open_count = mi_uint2korr(ptr); ptr+= 2; state->open_count = mi_uint2korr(ptr); ptr+= 2;
state->create_rename_lsn= lsn_korr(ptr); ptr+= LSN_STORE_SIZE; state->create_rename_lsn= lsn_korr(ptr); ptr+= LSN_STORE_SIZE;
state->is_of_lsn= lsn_korr(ptr); ptr+= LSN_STORE_SIZE; state->is_of_horizon= lsn_korr(ptr); ptr+= LSN_STORE_SIZE;
state->changed= (my_bool) *ptr++; state->changed= (my_bool) *ptr++;
state->sortkey= (uint) *ptr++; state->sortkey= (uint) *ptr++;
state->state.records= mi_rowkorr(ptr); ptr+= 8; state->state.records= mi_rowkorr(ptr); ptr+= 8;
......
...@@ -3865,8 +3865,6 @@ int reset_pagecache_counters(const char *name, PAGECACHE *pagecache) ...@@ -3865,8 +3865,6 @@ int reset_pagecache_counters(const char *name, PAGECACHE *pagecache)
its size, will be put its size, will be put
@param[out] min_rec_lsn pointer to where the minimum rec_lsn of all @param[out] min_rec_lsn pointer to where the minimum rec_lsn of all
relevant dirty pages will be put relevant dirty pages will be put
@param[out] max_rec_lsn pointer to where the maximum rec_lsn of all
relevant dirty pages will be put
@return Operation status @return Operation status
@retval 0 OK @retval 0 OK
@retval 1 Error @retval 1 Error
...@@ -3874,14 +3872,13 @@ int reset_pagecache_counters(const char *name, PAGECACHE *pagecache) ...@@ -3874,14 +3872,13 @@ int reset_pagecache_counters(const char *name, PAGECACHE *pagecache)
my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache, my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
LEX_STRING *str, LEX_STRING *str,
LSN *min_rec_lsn, LSN *min_rec_lsn)
LSN *max_rec_lsn)
{ {
my_bool error= 0; my_bool error= 0;
ulong stored_list_size= 0; uint stored_list_size= 0;
uint file_hash; uint file_hash;
char *ptr; char *ptr;
LSN minimum_rec_lsn= LSN_MAX, maximum_rec_lsn= 0; LSN minimum_rec_lsn= LSN_MAX;
DBUG_ENTER("pagecache_collect_changed_blocks_with_LSN"); DBUG_ENTER("pagecache_collect_changed_blocks_with_LSN");
DBUG_ASSERT(NULL == str->str); DBUG_ASSERT(NULL == str->str);
...@@ -3921,7 +3918,8 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache, ...@@ -3921,7 +3918,8 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
} }
} }
str->length= 8 + /* number of dirty pages */ compile_time_assert(sizeof(pagecache->blocks == 4));
str->length= 4 + /* number of dirty pages */
(4 + /* file */ (4 + /* file */
4 + /* pageno */ 4 + /* pageno */
LSN_STORE_SIZE /* rec_lsn */ LSN_STORE_SIZE /* rec_lsn */
...@@ -3929,8 +3927,8 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache, ...@@ -3929,8 +3927,8 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
if (NULL == (str->str= my_malloc(str->length, MYF(MY_WME)))) if (NULL == (str->str= my_malloc(str->length, MYF(MY_WME))))
goto err; goto err;
ptr= str->str; ptr= str->str;
int8store(ptr, stored_list_size); int4store(ptr, stored_list_size);
ptr+= 8; ptr+= 4;
if (!stored_list_size) if (!stored_list_size)
goto end; goto end;
for (file_hash= 0; file_hash < PAGECACHE_CHANGED_BLOCKS_HASH; file_hash++) for (file_hash= 0; file_hash < PAGECACHE_CHANGED_BLOCKS_HASH; file_hash++)
...@@ -3955,15 +3953,12 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache, ...@@ -3955,15 +3953,12 @@ my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
DBUG_ASSERT(LSN_VALID(block->rec_lsn)); DBUG_ASSERT(LSN_VALID(block->rec_lsn));
if (cmp_translog_addr(block->rec_lsn, minimum_rec_lsn) < 0) if (cmp_translog_addr(block->rec_lsn, minimum_rec_lsn) < 0)
minimum_rec_lsn= block->rec_lsn; minimum_rec_lsn= block->rec_lsn;
if (cmp_translog_addr(block->rec_lsn, maximum_rec_lsn) > 0)
maximum_rec_lsn= block->rec_lsn;
} /* otherwise, some trn->rec_lsn should hold the correct info */ } /* otherwise, some trn->rec_lsn should hold the correct info */
} }
} }
end: end:
pagecache_pthread_mutex_unlock(&pagecache->cache_lock); pagecache_pthread_mutex_unlock(&pagecache->cache_lock);
*min_rec_lsn= minimum_rec_lsn; *min_rec_lsn= minimum_rec_lsn;
*max_rec_lsn= maximum_rec_lsn;
DBUG_RETURN(error); DBUG_RETURN(error);
err: err:
......
...@@ -247,8 +247,7 @@ extern my_bool pagecache_delete_pages(PAGECACHE *pagecache, ...@@ -247,8 +247,7 @@ extern my_bool pagecache_delete_pages(PAGECACHE *pagecache,
extern void end_pagecache(PAGECACHE *keycache, my_bool cleanup); extern void end_pagecache(PAGECACHE *keycache, my_bool cleanup);
extern my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache, extern my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
LEX_STRING *str, LEX_STRING *str,
LSN *min_lsn, LSN *min_lsn);
LSN *max_lsn);
extern int reset_pagecache_counters(const char *name, PAGECACHE *pagecache); extern int reset_pagecache_counters(const char *name, PAGECACHE *pagecache);
......
This diff is collapsed.
...@@ -67,17 +67,12 @@ int maria_rename(const char *old_name, const char *new_name) ...@@ -67,17 +67,12 @@ int maria_rename(const char *old_name, const char *new_name)
if (sync_dir) if (sync_dir)
{ {
LSN lsn; LSN lsn;
uchar log_data[2 + 2]; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 3]; uint old_name_len= strlen(old_name)+1, new_name_len= strlen(new_name)+1;
uint old_name_len= strlen(old_name), new_name_len= strlen(new_name); log_array[TRANSLOG_INTERNAL_PARTS + 0].str= (char *)old_name;
int2store(log_data, old_name_len); log_array[TRANSLOG_INTERNAL_PARTS + 0].length= old_name_len;
int2store(log_data + 2, new_name_len); log_array[TRANSLOG_INTERNAL_PARTS + 1].str= (char *)new_name;
log_array[TRANSLOG_INTERNAL_PARTS + 0].str= log_data; log_array[TRANSLOG_INTERNAL_PARTS + 1].length= new_name_len;
log_array[TRANSLOG_INTERNAL_PARTS + 0].length= sizeof(log_data);
log_array[TRANSLOG_INTERNAL_PARTS + 1].str= (char *)old_name;
log_array[TRANSLOG_INTERNAL_PARTS + 1].length= old_name_len;
log_array[TRANSLOG_INTERNAL_PARTS + 2].str= (char *)new_name;
log_array[TRANSLOG_INTERNAL_PARTS + 2].length= new_name_len;
/* /*
For this record to be of any use for Recovery, we need the upper For this record to be of any use for Recovery, we need the upper
MySQL layer to be crash-safe, which it is not now (that would require MySQL layer to be crash-safe, which it is not now (that would require
...@@ -88,7 +83,7 @@ int maria_rename(const char *old_name, const char *new_name) ...@@ -88,7 +83,7 @@ int maria_rename(const char *old_name, const char *new_name)
*/ */
if (unlikely(translog_write_record(&lsn, LOGREC_REDO_RENAME_TABLE, if (unlikely(translog_write_record(&lsn, LOGREC_REDO_RENAME_TABLE,
&dummy_transaction_object, NULL, &dummy_transaction_object, NULL,
2 + 2 + old_name_len + new_name_len, old_name_len + new_name_len,
sizeof(log_array)/sizeof(log_array[0]), sizeof(log_array)/sizeof(log_array[0]),
log_array, NULL) || log_array, NULL) ||
translog_flush(lsn))) translog_flush(lsn)))
...@@ -100,7 +95,7 @@ int maria_rename(const char *old_name, const char *new_name) ...@@ -100,7 +95,7 @@ int maria_rename(const char *old_name, const char *new_name)
store LSN into file, needed for Recovery to not be confused if a store LSN into file, needed for Recovery to not be confused if a
RENAME happened (applying REDOs to the wrong table). RENAME happened (applying REDOs to the wrong table).
*/ */
if (_ma_update_create_rename_lsn_on_disk(share, lsn, TRUE)) if (_ma_update_create_rename_lsn(share, lsn, TRUE))
{ {
maria_close(info); maria_close(info);
DBUG_RETURN(1); DBUG_RETURN(1);
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#define SAFEMALLOC #define SAFEMALLOC
#endif #endif
#include "maria_def.h" #include "maria_def.h"
#include "trnman.h"
#include <m_ctype.h> #include <m_ctype.h>
#include <my_bit.h> #include <my_bit.h>
...@@ -47,7 +48,8 @@ static void copy_key(struct st_maria_info *info,uint inx, ...@@ -47,7 +48,8 @@ static void copy_key(struct st_maria_info *info,uint inx,
static int verbose=0,testflag=0, static int verbose=0,testflag=0,
first_key=0,async_io=0,pagecacheing=0,write_cacheing=0,locking=0, first_key=0,async_io=0,pagecacheing=0,write_cacheing=0,locking=0,
rec_pointer_size=0,pack_fields=1,silent=0, rec_pointer_size=0,pack_fields=1,silent=0,
opt_quick_mode=0, transactional= 0, skip_update= 0; opt_quick_mode=0, transactional= 0, skip_update= 0,
die_in_middle_of_transaction= 0;
static int pack_seg=HA_SPACE_PACK,pack_type=HA_PACK_KEY,remove_count=-1; static int pack_seg=HA_SPACE_PACK,pack_type=HA_PACK_KEY,remove_count=-1;
static int create_flag= 0, srand_arg= 0; static int create_flag= 0, srand_arg= 0;
static ulong pagecache_size=IO_SIZE*16; static ulong pagecache_size=IO_SIZE*16;
...@@ -235,6 +237,9 @@ int main(int argc, char *argv[]) ...@@ -235,6 +237,9 @@ int main(int argc, char *argv[])
goto err; goto err;
if (!(file=maria_open(filename,2,HA_OPEN_ABORT_IF_LOCKED))) if (!(file=maria_open(filename,2,HA_OPEN_ABORT_IF_LOCKED)))
goto err; goto err;
maria_begin(file);
if (testflag == 1)
goto end;
if (!silent) if (!silent)
printf("- Writing key:s\n"); printf("- Writing key:s\n");
if (locking) if (locking)
...@@ -244,8 +249,6 @@ int main(int argc, char *argv[]) ...@@ -244,8 +249,6 @@ int main(int argc, char *argv[])
if (opt_quick_mode) if (opt_quick_mode)
maria_extra(file,HA_EXTRA_QUICK,0); maria_extra(file,HA_EXTRA_QUICK,0);
maria_begin(file);
for (i=0 ; i < recant ; i++) for (i=0 ; i < recant ; i++)
{ {
ulong blob_length; ulong blob_length;
...@@ -297,7 +300,7 @@ int main(int argc, char *argv[]) ...@@ -297,7 +300,7 @@ int main(int argc, char *argv[])
} }
} }
} }
if (testflag == 1) if (testflag == 2)
goto end; goto end;
if (write_cacheing) if (write_cacheing)
...@@ -348,7 +351,7 @@ int main(int argc, char *argv[]) ...@@ -348,7 +351,7 @@ int main(int argc, char *argv[])
else else
puts("Warning: Skipping delete test because no dupplicate keys"); puts("Warning: Skipping delete test because no dupplicate keys");
} }
if (testflag == 2) if (testflag == 3)
goto end; goto end;
if (!silent) if (!silent)
...@@ -409,7 +412,7 @@ int main(int argc, char *argv[]) ...@@ -409,7 +412,7 @@ int main(int argc, char *argv[])
} }
} }
} }
if (testflag == 3) if (testflag == 4)
goto end; goto end;
for (i=999, dupp_keys=j=0 ; i>0 ; i--) for (i=999, dupp_keys=j=0 ; i>0 ; i--)
...@@ -814,7 +817,7 @@ int main(int argc, char *argv[]) ...@@ -814,7 +817,7 @@ int main(int argc, char *argv[])
goto err; goto err;
} }
if (testflag == 4) if (testflag == 5)
goto end; goto end;
if (!silent) if (!silent)
...@@ -892,6 +895,36 @@ int main(int argc, char *argv[]) ...@@ -892,6 +895,36 @@ int main(int argc, char *argv[])
goto err; goto err;
} }
end: end:
if (die_in_middle_of_transaction)
{
/* As commit record is not done, UNDO entries needs to be rolled back */
switch (die_in_middle_of_transaction) {
case 1:
/*
Flush changed pages go to disk. That will also flush log. Recovery
will skip REDOs and apply UNDOs.
*/
_ma_flush_table_files(file, MARIA_FLUSH_DATA, FLUSH_RELEASE,
FLUSH_RELEASE);
break;
case 2:
/*
Just flush log. Pages are likely to not be on disk. Recovery will
then execute REDOs and UNDOs.
*/
if (translog_flush(file->trn->undo_lsn))
goto err;
break;
case 3:
/*
Flush nothing. Pages and log are likely to not be on disk. Recovery
will then do nothing.
*/
break;
}
printf("Dying on request without maria_commit()/maria_close()\n");
exit(0);
}
if (maria_commit(file)) if (maria_commit(file))
goto err; goto err;
if (maria_close(file)) if (maria_close(file))
...@@ -998,9 +1031,9 @@ static void get_options(int argc, char **argv) ...@@ -998,9 +1031,9 @@ static void get_options(int argc, char **argv)
verbose=1; verbose=1;
break; break;
case 'm': /* records */ case 'm': /* records */
if ((recant=atoi(++pos)) < 10 && testflag > 1) if ((recant=atoi(++pos)) < 10 && testflag > 2)
{ {
fprintf(stderr,"record count must be >= 10 (if testflag != 1)\n"); fprintf(stderr,"record count must be >= 10 (if testflag > 2)\n");
exit(1); exit(1);
} }
break; break;
...@@ -1048,6 +1081,9 @@ static void get_options(int argc, char **argv) ...@@ -1048,6 +1081,9 @@ static void get_options(int argc, char **argv)
case 'T': case 'T':
transactional= 1; transactional= 1;
break; break;
case 'u':
die_in_middle_of_transaction= atoi(++pos);
break;
case 'q': case 'q':
opt_quick_mode=1; opt_quick_mode=1;
break; break;
......
...@@ -131,7 +131,7 @@ do ...@@ -131,7 +131,7 @@ do
for test_undo in 1 2 3 for test_undo in 1 2 3
do do
# first iteration tests rollback of insert, second tests rollback of delete # first iteration tests rollback of insert, second tests rollback of delete
set -- "ma_test1 $silent -M -T -c -N $blobs" "--testflag=1" "--testflag=2" "ma_test1 $silent -M -T -c -N --debug=d:t:i:o,/tmp/ma_test1.trace $blobs" "--testflag=3" "--testflag=4" "ma_test1 $silent -M -T -c -N --debug=d:t:i:o,/tmp/ma_test1.trace $blobs" "--testflag=2" "--testflag=3" set -- "ma_test1 $silent -M -T -c -N $blobs" "--testflag=1" "--testflag=2 --test-undo=" "ma_test1 $silent -M -T -c -N --debug=d:t:i:o,/tmp/ma_test1.trace $blobs" "--testflag=3" "--testflag=4 --test-undo=" "ma_test1 $silent -M -T -c -N --debug=d:t:i:o,/tmp/ma_test1.trace $blobs" "--testflag=2" "--testflag=3 --test-undo=" "ma_test2 $silent -L -K -W -P -M -T -c $blobs" "-t1" "-t2 -u"
# -N (create NULL fields) is needed because --test-undo adds it anyway # -N (create NULL fields) is needed because --test-undo adds it anyway
while [ $# != 0 ] while [ $# != 0 ]
do do
...@@ -148,8 +148,8 @@ do ...@@ -148,8 +148,8 @@ do
mv $table.MAD $tmp/$table.MAD.good mv $table.MAD $tmp/$table.MAD.good
rm $table.MAI rm $table.MAI
rm maria_log.* maria_log_control rm maria_log.* maria_log_control
echo "TEST WITH $prog $abort_run_args --test-undo=$test_undo (additional aborted work)" echo "TEST WITH $prog $abort_run_args$test_undo (additional aborted work)"
$maria_path/$prog $abort_run_args --test-undo=$test_undo $maria_path/$prog $abort_run_args$test_undo
cp $table.MAD $tmp/$table.MAD.before_undo cp $table.MAD $tmp/$table.MAD.before_undo
if [ $test_undo -lt 3 ] if [ $test_undo -lt 3 ]
then then
...@@ -174,7 +174,7 @@ do ...@@ -174,7 +174,7 @@ do
echo "testing applying of CLRs to recreate table" echo "testing applying of CLRs to recreate table"
rm $table.MA? rm $table.MA?
apply_log "shouldnotchangelog" apply_log "shouldnotchangelog"
# the cmp below fails with blobs! @todo RECOVERY BUG find out why. # the cmp below fails with ma_test1+blobs! @todo RECOVERY BUG why?
# It is probably serious; REDOs shouldn't place rows in different # It is probably serious; REDOs shouldn't place rows in different
# positions from what the run-time code did. Indeed it may lead to # positions from what the run-time code did. Indeed it may lead to
# more or less free space... # more or less free space...
...@@ -189,12 +189,15 @@ do ...@@ -189,12 +189,15 @@ do
check_table_is_same check_table_is_same
shift 3 shift 3
done done
rm -f $table.* $tmp/$table* $tmp/maria_chk_*.txt $tmp/maria_read_log_$table.txt
done done
done done
rm -f $table.* $tmp/$table* $tmp/maria_chk_*.txt $tmp/maria_read_log_$table.txt
) 2>&1 > $tmp/ma_test_recovery.output ) 2>&1 > $tmp/ma_test_recovery.output
# also note that maria_chk -dvv shows differences for ma_test2 in UNDO phase,
# this is normal: removing records does not shrink the data/key file,
# does not put back the "analyzed,optimized keys"(etc) index state.
diff $maria_path/ma_test_recovery.expected $tmp/ma_test_recovery.output > /dev/null || diff_failed=1 diff $maria_path/ma_test_recovery.expected $tmp/ma_test_recovery.output > /dev/null || diff_failed=1
if [ "$diff_failed" == "1" ] if [ "$diff_failed" == "1" ]
then then
......
This diff is collapsed.
...@@ -222,6 +222,12 @@ int maria_write(MARIA_HA *info, uchar *record) ...@@ -222,6 +222,12 @@ int maria_write(MARIA_HA *info, uchar *record)
maria_flush_bulk_insert(info, j); maria_flush_bulk_insert(info, j);
} }
info->errkey= (int) i; info->errkey= (int) i;
/*
We delete keys in the reverse order of insertion. This is the order that
a rollback would do and is important for CLR_ENDs generated by
_ma_ft|ck_delete() and write_record_abort() to work (with any other
order they would cause wrong jumps in the chain).
*/
while ( i-- > 0) while ( i-- > 0)
{ {
if (maria_is_key_active(share->state.key_map, i)) if (maria_is_key_active(share->state.key_map, i))
...@@ -231,6 +237,10 @@ int maria_write(MARIA_HA *info, uchar *record) ...@@ -231,6 +237,10 @@ int maria_write(MARIA_HA *info, uchar *record)
is_tree_inited(&info->bulk_insert[i]))); is_tree_inited(&info->bulk_insert[i])));
if (local_lock_tree) if (local_lock_tree)
rw_wrlock(&share->key_root_lock[i]); rw_wrlock(&share->key_root_lock[i]);
/**
@todo RECOVERY BUG
The key deletes below should generate CLR_ENDs
*/
if (share->keyinfo[i].flag & HA_FULLTEXT) if (share->keyinfo[i].flag & HA_FULLTEXT)
{ {
if (_ma_ft_del(info,i,(char*) buff,record,filepos)) if (_ma_ft_del(info,i,(char*) buff,record,filepos))
......
...@@ -1033,9 +1033,11 @@ static int maria_chk(HA_CHECK *param, char *filename) ...@@ -1033,9 +1033,11 @@ static int maria_chk(HA_CHECK *param, char *filename)
Tell the server's Recovery to ignore old REDOs on this table; we don't Tell the server's Recovery to ignore old REDOs on this table; we don't
know what the log's end LSN is now, so we just let the server know know what the log's end LSN is now, so we just let the server know
that it will have to find and store it. that it will have to find and store it.
This is the only case where create_rename_lsn can be a horizon and not
a LSN.
*/ */
if (share->base.born_transactional) if (share->base.born_transactional)
share->state.create_rename_lsn= share->state.is_of_lsn= share->state.create_rename_lsn= share->state.is_of_horizon=
LSN_REPAIRED_BY_MARIA_CHK; LSN_REPAIRED_BY_MARIA_CHK;
if ((param->testflag & (T_REP_BY_SORT | T_REP_PARALLEL)) && if ((param->testflag & (T_REP_BY_SORT | T_REP_PARALLEL)) &&
(maria_is_any_key_active(share->state.key_map) || (maria_is_any_key_active(share->state.key_map) ||
......
...@@ -96,7 +96,8 @@ typedef struct st_maria_state_info ...@@ -96,7 +96,8 @@ typedef struct st_maria_state_info
uint open_count; uint open_count;
uint8 changed; /* Changed since mariachk */ uint8 changed; /* Changed since mariachk */
LSN create_rename_lsn; /**< LSN when table was last created/renamed */ LSN create_rename_lsn; /**< LSN when table was last created/renamed */
LSN is_of_lsn; /**< LSN when state was last updated on disk */ /** @brief Log horizon when state was last updated on disk */
TRANSLOG_ADDRESS is_of_horizon;
/* the following isn't saved on disk */ /* the following isn't saved on disk */
uint state_diff_length; /* Should be 0 */ uint state_diff_length; /* Should be 0 */
...@@ -218,6 +219,7 @@ typedef struct st_maria_file_bitmap ...@@ -218,6 +219,7 @@ typedef struct st_maria_file_bitmap
#define MARIA_CHECKPOINT_LOOKS_AT_ME 1 #define MARIA_CHECKPOINT_LOOKS_AT_ME 1
#define MARIA_CHECKPOINT_SHOULD_FREE_ME 2 #define MARIA_CHECKPOINT_SHOULD_FREE_ME 2
#define MARIA_CHECKPOINT_SEEN_IN_LOOP 4
typedef struct st_maria_share typedef struct st_maria_share
{ /* Shared between opens */ { /* Shared between opens */
...@@ -331,6 +333,7 @@ typedef struct st_maria_share ...@@ -331,6 +333,7 @@ typedef struct st_maria_share
non-mmaped area */ non-mmaped area */
MARIA_FILE_BITMAP bitmap; MARIA_FILE_BITMAP bitmap;
rw_lock_t mmap_lock; rw_lock_t mmap_lock;
LSN lsn_of_file_id; /**< LSN of its last LOGREC_FILE_ID */
} MARIA_SHARE; } MARIA_SHARE;
...@@ -940,10 +943,10 @@ int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages, ...@@ -940,10 +943,10 @@ int _ma_create_index_by_sort(MARIA_SORT_PARAM *info, my_bool no_messages,
ulong); ulong);
int _ma_sync_table_files(const MARIA_HA *info); int _ma_sync_table_files(const MARIA_HA *info);
int _ma_initialize_data_file(MARIA_SHARE *share, File dfile); int _ma_initialize_data_file(MARIA_SHARE *share, File dfile);
int _ma_update_create_rename_lsn_on_disk(MARIA_SHARE *share, int _ma_update_create_rename_lsn(MARIA_SHARE *share,
LSN lsn, my_bool do_sync); LSN lsn, my_bool do_sync);
int _ma_update_create_rename_lsn_on_disk_sub(MARIA_SHARE *share, int _ma_update_create_rename_lsn_sub(MARIA_SHARE *share,
LSN lsn, my_bool do_sync); LSN lsn, my_bool do_sync);
void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn); void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn);
#define _ma_tmp_disable_logging_for_table(S) \ #define _ma_tmp_disable_logging_for_table(S) \
......
...@@ -18,6 +18,7 @@ ...@@ -18,6 +18,7 @@
#include <my_sys.h> #include <my_sys.h>
#include <m_string.h> #include <m_string.h>
#include "trnman.h" #include "trnman.h"
#include "ma_checkpoint.h"
#include "ma_control_file.h" #include "ma_control_file.h"
/* /*
...@@ -587,27 +588,25 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com, ...@@ -587,27 +588,25 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com,
TRN *trn; TRN *trn;
char *ptr; char *ptr;
uint stored_transactions= 0; uint stored_transactions= 0;
LSN minimum_rec_lsn= ULONGLONG_MAX, minimum_first_undo_lsn= ULONGLONG_MAX; LSN minimum_rec_lsn= LSN_MAX, minimum_first_undo_lsn= LSN_MAX;
DBUG_ENTER("trnman_collect_transactions"); DBUG_ENTER("trnman_collect_transactions");
DBUG_ASSERT((NULL == str_act->str) && (NULL == str_com->str)); DBUG_ASSERT((NULL == str_act->str) && (NULL == str_com->str));
/* validate the use of read_non_atomic() in general: */ /* validate the use of read_non_atomic() in general: */
compile_time_assert((sizeof(LSN) == 8) && (sizeof(LSN_WITH_FLAGS) == 8)); compile_time_assert((sizeof(LSN) == 8) && (sizeof(LSN_WITH_FLAGS) == 8));
DBUG_PRINT("info", ("pthread_mutex_lock LOCK_trn_list"));
pthread_mutex_lock(&LOCK_trn_list); pthread_mutex_lock(&LOCK_trn_list);
str_act->length= 2 + /* number of active transactions */ str_act->length= 2 + /* number of active transactions */
LSN_STORE_SIZE + /* minimum of their rec_lsn */ LSN_STORE_SIZE + /* minimum of their rec_lsn */
(6 + /* long id */ (2 + /* short id */
2 + /* short id */ 6 + /* long id */
LSN_STORE_SIZE + /* undo_lsn */ LSN_STORE_SIZE + /* undo_lsn */
#ifdef MARIA_VERSIONING /* not enabled yet */ #ifdef MARIA_VERSIONING /* not enabled yet */
LSN_STORE_SIZE + /* undo_purge_lsn */ LSN_STORE_SIZE + /* undo_purge_lsn */
#endif #endif
LSN_STORE_SIZE /* first_undo_lsn */ LSN_STORE_SIZE /* first_undo_lsn */
) * trnman_active_transactions; ) * trnman_active_transactions;
str_com->length= 8 + /* number of committed transactions */ str_com->length= 4 + /* number of committed transactions */
(6 + /* long id */ (6 + /* long id */
#ifdef MARIA_VERSIONING /* not enabled yet */ #ifdef MARIA_VERSIONING /* not enabled yet */
LSN_STORE_SIZE + /* undo_purge_lsn */ LSN_STORE_SIZE + /* undo_purge_lsn */
...@@ -638,13 +637,6 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com, ...@@ -638,13 +637,6 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com,
*/ */
continue; continue;
} }
#ifndef MARIA_CHECKPOINT
/*
in the checkpoint patch (not yet ready) we will have a real implementation
of lsn_read_non_atomic(); for now it's not needed
*/
#define lsn_read_non_atomic(A) (A)
#endif
/* needed for low-water mark calculation */ /* needed for low-water mark calculation */
if (((rec_lsn= lsn_read_non_atomic(trn->rec_lsn)) > 0) && if (((rec_lsn= lsn_read_non_atomic(trn->rec_lsn)) > 0) &&
(cmp_translog_addr(rec_lsn, minimum_rec_lsn) < 0)) (cmp_translog_addr(rec_lsn, minimum_rec_lsn) < 0))
...@@ -656,23 +648,23 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com, ...@@ -656,23 +648,23 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com,
if ((undo_lsn= trn->undo_lsn) == 0) /* trn can be forgotten */ if ((undo_lsn= trn->undo_lsn) == 0) /* trn can be forgotten */
continue; continue;
stored_transactions++; stored_transactions++;
int6store(ptr, trn->trid);
ptr+= 6;
int2store(ptr, sid); int2store(ptr, sid);
ptr+= 2; ptr+= 2;
int6store(ptr, trn->trid);
ptr+= 6;
lsn_store(ptr, undo_lsn); /* needed for rollback */ lsn_store(ptr, undo_lsn); /* needed for rollback */
ptr+= LSN_STORE_SIZE; ptr+= LSN_STORE_SIZE;
#ifdef MARIA_VERSIONING /* not enabled yet */
/* to know where purging should start (last delete of this trn) */
lsn_store(ptr, trn->undo_purge_lsn);
ptr+= LSN_STORE_SIZE;
#endif
/* needed for low-water mark calculation */ /* needed for low-water mark calculation */
if (((first_undo_lsn= lsn_read_non_atomic(trn->first_undo_lsn)) > 0) && if (((first_undo_lsn= lsn_read_non_atomic(trn->first_undo_lsn)) > 0) &&
(cmp_translog_addr(first_undo_lsn, minimum_first_undo_lsn) < 0)) (cmp_translog_addr(first_undo_lsn, minimum_first_undo_lsn) < 0))
minimum_first_undo_lsn= first_undo_lsn; minimum_first_undo_lsn= first_undo_lsn;
lsn_store(ptr, first_undo_lsn); lsn_store(ptr, first_undo_lsn);
ptr+= LSN_STORE_SIZE; ptr+= LSN_STORE_SIZE;
#ifdef MARIA_VERSIONING /* not enabled yet */
/* to know where purging should start (last delete of this trn) */
lsn_store(ptr, trn->undo_purge_lsn);
ptr+= LSN_STORE_SIZE;
#endif
/** /**
@todo RECOVERY: add a comment explaining why we can dirtily read some @todo RECOVERY: add a comment explaining why we can dirtily read some
vars, inspired by the text of "assumption 8" in WL#3072 vars, inspired by the text of "assumption 8" in WL#3072
...@@ -680,6 +672,8 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com, ...@@ -680,6 +672,8 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com,
} }
str_act->length= ptr - str_act->str; /* as we maybe over-estimated */ str_act->length= ptr - str_act->str; /* as we maybe over-estimated */
ptr= str_act->str; ptr= str_act->str;
DBUG_PRINT("info",("collected %u active transactions",
(uint)stored_transactions));
int2store(ptr, stored_transactions); int2store(ptr, stored_transactions);
ptr+= 2; ptr+= 2;
/* this LSN influences how REDOs for any page can be ignored by Recovery */ /* this LSN influences how REDOs for any page can be ignored by Recovery */
...@@ -687,8 +681,10 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com, ...@@ -687,8 +681,10 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com,
/* one day there will also be a list of prepared transactions */ /* one day there will also be a list of prepared transactions */
/* do the same for committed ones */ /* do the same for committed ones */
ptr= str_com->str; ptr= str_com->str;
int8store(ptr, (ulonglong)trnman_committed_transactions); int4store(ptr, trnman_committed_transactions);
ptr+= 8; ptr+= 4;
DBUG_PRINT("info",("collected %u committed transactions",
(uint)trnman_committed_transactions));
for (trn= committed_list_min.next; trn != &committed_list_max; for (trn= committed_list_min.next; trn != &committed_list_max;
trn= trn->next) trn= trn->next)
{ {
...@@ -716,7 +712,6 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com, ...@@ -716,7 +712,6 @@ my_bool trnman_collect_transactions(LEX_STRING *str_act, LEX_STRING *str_com,
err: err:
error= 1; error= 1;
end: end:
DBUG_PRINT("info", ("pthread_mutex_unlock LOCK_trn_list"));
pthread_mutex_unlock(&LOCK_trn_list); pthread_mutex_unlock(&LOCK_trn_list);
DBUG_RETURN(error); DBUG_RETURN(error);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment