Commit ea6ec89e authored by unknown's avatar unknown

Fixes for redo/undo logging of key pages

New extendable format for maria_log_control file
Fixed some compiler warnings


include/maria.h:
  Added maria_disable_logging() and maria_enable_logging()
mysql-test/include/maria_verify_recovery.inc:
  Updated tests now when key redo/undo works
mysql-test/r/maria-recovery.result:
  Updated tests now when key redo/undo works
storage/maria/ma_blockrec.c:
  Use unified CLR code
  Added rec_lsn for full pages
  Moved clr write hook to ma_key_recover.c
  Changed REDO code to keep pages pinned until undo
  Mark page_link's as changed
storage/maria/ma_blockrec.h:
  Moved write_hook_for_clr_end() to ma_key_recover.c
storage/maria/ma_check.c:
  Changed key check code to use PAGECACHE_READ_UNKNOWN_PAGE
  Fixed wrong warning when checking files after maria_pack
  When unpacking files, we have to use new keypos_to_recpos method
  When doing repair, we can disregard index key file pages in page cache
storage/maria/ma_commit.c:
  Added simple enable/disable logging functions
  (Needed for recovery)
storage/maria/ma_control_file.c:
  Make maria control file extendable without having to make it incompatible for older versions
storage/maria/ma_control_file.h:
  New error messages
  Added CONTROL_FILE_VERSION
storage/maria/ma_delete.c:
  Added redo/undo for key pages
  change_length -> changed_length to make things similar
  More comments & more DBUG
storage/maria/ma_key_recover.c:
  Unified CLR method
  Moved here write_hook_for_clr_end() and common keypage log functions
  Changed REDO to keep pages pinned until undo
  Changed UNDO code to change key_root under log mutex
storage/maria/ma_key_recover.h:
  New structures and functions
storage/maria/ma_loghandler.c:
  Include needed files
storage/maria/ma_open.c:
  Change maria_open() to use pread() instead of read()
storage/maria/ma_page.c:
  Fixed bug in key_del handling
  Clear pages if IDENTICAL_PAGES_AFTER_RECOVERY is defined
storage/maria/ma_pagecache.c:
  Indentation and spelling fixes
  More DBUG
  Added helper function: pagecache_block_link_to_buffer()
storage/maria/ma_pagecache.h:
  Added pagecache_block_link_to_buffer()
storage/maria/ma_recovery.c:
  Fixed state.changed
  Fixed that REDO keeps pages pinned until UNDO
  Some bug fixes from previous commit
  Fixes for UNDO/REDO of key pages
storage/maria/ma_search.c:
  Fixed packing and storing of keys to provide more information to caller so
  that we can do efficent REDO logging of the changes.
storage/maria/ma_test1.c:
  Fixed bug with not initialized variable
storage/maria/ma_test2.c:
  Removed not used code
storage/maria/ma_test_all.res:
  Updated results
storage/maria/ma_test_all.sh:
  Changed one test to test more
  Removed timing tests as not relevant here
storage/maria/ma_test_recovery.expected:
  Updated test result after redo/undo if key pages works
storage/maria/ma_test_recovery:
  Updated test after redo/undo if key pages works
storage/maria/ma_write.c:
  Moved some general log functions to ma_key_recover.c
  Fixed some bugs in undo
  Moved ma_log_split() to _ma_split_page()
  Small changes in some function arguments to be able to do redo logging
storage/maria/maria_chk.c:
  disable logging while doing repair table
storage/maria/maria_def.h:
  New function prototypes
  Move some structs and functions to ma_key_recover.c
storage/maria/unittest/ma_control_file-t.c:
  Updated with patch from Sanja
  NOTE: This is not complete and need to be updated to new control file format
storage/maria/unittest/ma_test_loghandler-t.c:
  Fixed compiler warning
parent 6ceb1a47
......@@ -306,6 +306,8 @@ extern int maria_delete_all_rows(MARIA_HA *info);
extern uint maria_get_pointer_length(ulonglong file_length, uint def);
extern int maria_commit(MARIA_HA *info);
extern int maria_begin(MARIA_HA *info);
extern void maria_disable_logging(MARIA_HA *info);
extern void maria_enable_logging(MARIA_HA *info);
/* this is used to pass to mysql_mariachk_table */
......
......@@ -77,16 +77,15 @@ let $mms_purpose=comparison;
let $mms_compare_physically=$mms_compare_physically_save;
while ($mms_table_to_use)
{
# Todo: remove this REPAIR when we have index recovery working.
# It is a quick repair, so that it will fail if data file is corrupted.
--echo * rebuilding index (until we have recovery of index)
eval repair table t$mms_table_to_use quick;
eval check table t$mms_table_to_use extended;
--echo * testing that checksum after recovery is as expected
let $new_checksum=`CHECKSUM TABLE t$mms_table_to_use`;
let $old_checksum=`CHECKSUM TABLE mysqltest_for_$mms_purpose.t$mms_table_to_use`;
# the $ text variables above are of the form "db.tablename\tchecksum",
# as db differs, we use substring().
eval select if(substring("$new_checksum",instr("$new_checksum",".t1")) = substring("$old_checksum",instr("$old_checksum",".t1")),"ok","failure");
--disable_query_log
eval select if(substring("$new_checksum",instr("$new_checksum",".t1")) = substring("$old_checksum",instr("$old_checksum",".t1")),"ok","failure") as "Checksum-check";
--enable_query_log
# this script may compare physically or do nothing
-- source include/maria_make_snapshot.inc
dec $mms_table_to_use;
......
......@@ -15,13 +15,11 @@ set global maria_checkpoint_interval=1;
ERROR HY000: Lost connection to MySQL server during query
* copied t1 back for feeding_recovery
* recovery happens
* rebuilding index (until we have recovery of index)
repair table t1 quick;
check table t1 extended;
Table Op Msg_type Msg_text
mysqltest.t1 repair status OK
mysqltest.t1 check status OK
* testing that checksum after recovery is as expected
select if(substring("mysqltest.t1 488070860",instr("mysqltest.t1 488070860",".t1")) = substring("mysqltest_for_comparison.t1 488070860",instr("mysqltest_for_comparison.t1 488070860",".t1")),"ok","failure");
if(substring("mysqltest.t1 488070860",instr("mysqltest.t1 488070860",".t1")) = substring("mysqltest_for_comparison.t1 488070860",instr("mysqltest_for_comparison.t1 488070860",".t1")),"ok","failure")
Checksum-check
ok
* compared t1 to old version
use mysqltest;
......@@ -39,13 +37,11 @@ SET SESSION debug="+d,maria_crash";
set global maria_checkpoint_interval=1;
ERROR HY000: Lost connection to MySQL server during query
* recovery happens
* rebuilding index (until we have recovery of index)
repair table t1 quick;
check table t1 extended;
Table Op Msg_type Msg_text
mysqltest.t1 repair status OK
mysqltest.t1 check status OK
* testing that checksum after recovery is as expected
select if(substring("mysqltest.t1 976141720",instr("mysqltest.t1 976141720",".t1")) = substring("mysqltest_for_comparison.t1 976141720",instr("mysqltest_for_comparison.t1 976141720",".t1")),"ok","failure");
if(substring("mysqltest.t1 976141720",instr("mysqltest.t1 976141720",".t1")) = substring("mysqltest_for_comparison.t1 976141720",instr("mysqltest_for_comparison.t1 976141720",".t1")),"ok","failure")
Checksum-check
ok
use mysqltest;
select * from t1;
......@@ -62,13 +58,11 @@ SET SESSION debug="+d,maria_flush_whole_page_cache,maria_crash";
set global maria_checkpoint_interval=1;
ERROR HY000: Lost connection to MySQL server during query
* recovery happens
* rebuilding index (until we have recovery of index)
repair table t1 quick;
check table t1 extended;
Table Op Msg_type Msg_text
mysqltest.t1 repair status OK
mysqltest.t1 check status OK
* testing that checksum after recovery is as expected
select if(substring("mysqltest.t1 1464212580",instr("mysqltest.t1 1464212580",".t1")) = substring("mysqltest_for_comparison.t1 1464212580",instr("mysqltest_for_comparison.t1 1464212580",".t1")),"ok","failure");
if(substring("mysqltest.t1 1464212580",instr("mysqltest.t1 1464212580",".t1")) = substring("mysqltest_for_comparison.t1 1464212580",instr("mysqltest_for_comparison.t1 1464212580",".t1")),"ok","failure")
Checksum-check
ok
use mysqltest;
select * from t1;
......@@ -86,13 +80,11 @@ SET SESSION debug="+d,maria_flush_states,maria_flush_whole_log,maria_crash";
set global maria_checkpoint_interval=1;
ERROR HY000: Lost connection to MySQL server during query
* recovery happens
* rebuilding index (until we have recovery of index)
repair table t1 quick;
check table t1 extended;
Table Op Msg_type Msg_text
mysqltest.t1 repair status OK
mysqltest.t1 check status OK
* testing that checksum after recovery is as expected
select if(substring("mysqltest.t1 1952283440",instr("mysqltest.t1 1952283440",".t1")) = substring("mysqltest_for_comparison.t1 1952283440",instr("mysqltest_for_comparison.t1 1952283440",".t1")),"ok","failure");
if(substring("mysqltest.t1 1952283440",instr("mysqltest.t1 1952283440",".t1")) = substring("mysqltest_for_comparison.t1 1952283440",instr("mysqltest_for_comparison.t1 1952283440",".t1")),"ok","failure")
Checksum-check
ok
use mysqltest;
select * from t1;
......@@ -111,13 +103,11 @@ SET SESSION debug="+d,maria_flush_whole_log,maria_crash";
set global maria_checkpoint_interval=1;
ERROR HY000: Lost connection to MySQL server during query
* recovery happens
* rebuilding index (until we have recovery of index)
repair table t1 quick;
check table t1 extended;
Table Op Msg_type Msg_text
mysqltest.t1 repair status OK
mysqltest.t1 check status OK
* testing that checksum after recovery is as expected
select if(substring("mysqltest.t1 2440354300",instr("mysqltest.t1 2440354300",".t1")) = substring("mysqltest_for_comparison.t1 2440354300",instr("mysqltest_for_comparison.t1 2440354300",".t1")),"ok","failure");
if(substring("mysqltest.t1 2440354300",instr("mysqltest.t1 2440354300",".t1")) = substring("mysqltest_for_comparison.t1 2440354300",instr("mysqltest_for_comparison.t1 2440354300",".t1")),"ok","failure")
Checksum-check
ok
use mysqltest;
select * from t1;
......@@ -149,13 +139,11 @@ set global maria_checkpoint_interval=1;
ERROR HY000: Lost connection to MySQL server during query
* copied t1 back for feeding_recovery
* recovery happens
* rebuilding index (until we have recovery of index)
repair table t1 quick;
check table t1 extended;
Table Op Msg_type Msg_text
mysqltest.t1 repair status OK
mysqltest.t1 check status OK
* testing that checksum after recovery is as expected
select if(substring("mysqltest.t1 3472399915",instr("mysqltest.t1 3472399915",".t1")) = substring("mysqltest_for_comparison.t1 3472399915",instr("mysqltest_for_comparison.t1 3472399915",".t1")),"ok","failure");
if(substring("mysqltest.t1 3472399915",instr("mysqltest.t1 3472399915",".t1")) = substring("mysqltest_for_comparison.t1 3472399915",instr("mysqltest_for_comparison.t1 3472399915",".t1")),"ok","failure")
Checksum-check
ok
use mysqltest;
SELECT LENGTH(b) FROM t1 WHERE i=3;
......
This diff is collapsed.
......@@ -235,9 +235,6 @@ my_bool write_hook_for_undo_row_delete(enum translog_record_type type,
my_bool write_hook_for_undo_row_update(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn, void *hook_arg);
my_bool write_hook_for_clr_end(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info, LSN *lsn,
void *hook_arg);
my_bool write_hook_for_file_id(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info, LSN *lsn,
void *hook_arg);
......@@ -305,7 +305,7 @@ static int check_k_link(HA_CHECK *param, register MARIA_HA *info,
&info->s->kfile, next_link/block_size,
DFLT_INIT_HITS,
(uchar*) info->buff,
PAGECACHE_PLAIN_PAGE,
PAGECACHE_READ_UNKNOWN_PAGE,
PAGECACHE_LOCK_LEFT_UNLOCKED, 0)))
{
/* purecov: begin tested */
......@@ -595,7 +595,7 @@ int maria_chk_key(HA_CHECK *param, register MARIA_HA *info)
puts("");
}
if (param->key_file_blocks != info->state->key_file_length &&
param->keys_in_use != ~(ulonglong) 0)
share->state.key_map == ~(ulonglong) 0)
_ma_check_print_warning(param, "Some data are unreferenced in keyfile");
if (found_keys != full_text_keys)
param->record_checksum=old_record_checksum-init_checksum; /* Remove delete links */
......@@ -2123,8 +2123,7 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
info->s->state.dellink= HA_OFFSET_ERROR;
info->rec_cache.file= new_file;
if (share->data_file_type == BLOCK_RECORD ||
((param->testflag & T_UNPACK) &&
share->state.header.org_data_file_type == BLOCK_RECORD))
(param->testflag & T_UNPACK))
{
MARIA_HA *new_info;
/*
......@@ -2152,6 +2151,10 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
if (_ma_initialize_data_file(sort_info.new_info->s, new_file))
goto err;
block_record= 1;
/* Use new virtual functions for key generation */
info->s->keypos_to_recpos= new_info->s->keypos_to_recpos;
info->s->recpos_to_keypos= new_info->s->recpos_to_keypos;
}
}
......@@ -2901,8 +2904,8 @@ int maria_repair_by_sort(HA_CHECK *param, register MARIA_HA *info,
if (info->s->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD))
param->testflag|=T_CALC_CHECKSUM;
if (_ma_flush_table_files(info, MARIA_FLUSH_DATA, FLUSH_FORCE_WRITE,
FLUSH_KEEP))
if (_ma_flush_table_files(info, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
FLUSH_FORCE_WRITE, FLUSH_IGNORE_CHANGED))
goto err;
if (!(sort_info.key_block=
......@@ -3328,8 +3331,8 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
if (info->s->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD))
param->testflag|=T_CALC_CHECKSUM;
if (_ma_flush_table_files(info, MARIA_FLUSH_DATA, FLUSH_FORCE_WRITE,
FLUSH_KEEP))
if (_ma_flush_table_files(info, MARIA_FLUSH_DATA | MARIA_FLUSH_INDEX,
FLUSH_FORCE_WRITE, FLUSH_IGNORE_CHANGED))
goto err;
/*
......@@ -5502,12 +5505,19 @@ set_data_file_type(MARIA_SORT_INFO *sort_info, MARIA_SHARE *share)
static void restore_data_file_type(MARIA_SHARE *share)
{
MARIA_SHARE tmp_share;
share->options&= ~HA_OPTION_COMPRESS_RECORD;
mi_int2store(share->state.header.options,share->options);
share->state.header.data_file_type=
share->state.header.org_data_file_type;
share->data_file_type= share->state.header.data_file_type;
share->pack.header_length= 0;
/* Use new virtual functions for key generation */
tmp_share= *share;
_ma_setup_functions(&tmp_share);
share->keypos_to_recpos= tmp_share.keypos_to_recpos;
share->recpos_to_keypos= tmp_share.recpos_to_keypos;
}
......
......@@ -122,3 +122,26 @@ int maria_begin(MARIA_HA *info)
}
DBUG_RETURN(0);
}
/*
@brief Disable logging for this table
@note
Mainly used during repair table, where we don't want to log all
changes to index or rows
*/
void maria_disable_logging(MARIA_HA *info)
{
info->s->now_transactional= 0;
info->trn= &dummy_transaction_object;
info->s->page_type= PAGECACHE_PLAIN_PAGE;
}
void maria_enable_logging(MARIA_HA *info)
{
if ((info->s->now_transactional= info->s->base.born_transactional))
info->s->page_type= PAGECACHE_LSN_PAGE;
}
This diff is collapsed.
......@@ -22,6 +22,12 @@
#define _ma_control_file_h
#define CONTROL_FILE_BASE_NAME "maria_log_control"
/*
Major version for control file. Should only be changed when doing
big changes that made the new control file incompatible with all
older versions of Maria.
*/
#define CONTROL_FILE_VERSION 1
/* Here is the interface of this module */
......@@ -43,8 +49,12 @@ typedef enum enum_control_file_error {
CONTROL_FILE_TOO_SMALL,
CONTROL_FILE_TOO_BIG,
CONTROL_FILE_BAD_MAGIC_STRING,
CONTROL_FILE_BAD_VERSION,
CONTROL_FILE_BAD_CHECKSUM,
CONTROL_FILE_BAD_HEAD_CHECKSUM,
CONTROL_FILE_MISSING,
CONTROL_FILE_INCONSISTENT_INFORMATION,
CONTROL_FILE_WRONG_BLOCKSIZE,
CONTROL_FILE_UNKNOWN_ERROR /* any other error */
} CONTROL_FILE_ERROR;
......
This diff is collapsed.
This diff is collapsed.
......@@ -21,14 +21,48 @@
called).
*/
/* Struct for clr_end */
struct st_msg_to_write_hook_for_clr_end
{
LSN previous_undo_lsn;
enum translog_record_type undone_record_type;
ha_checksum checksum_delta;
void *extra_msg;
};
struct st_msg_to_write_hook_for_undo_key
{
my_off_t *root;
my_off_t value;
};
/* Function definitions for some redo functions */
my_bool _ma_write_clr(MARIA_HA *info, LSN undo_lsn,
enum translog_record_type undo_type,
my_bool store_checksum, ha_checksum checksum,
LSN *res_lsn);
LSN *res_lsn, void *extra_msg);
my_bool write_hook_for_clr_end(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info, LSN *lsn,
void *hook_arg);
extern my_bool write_hook_for_undo_key(enum translog_record_type type,
TRN *trn, MARIA_HA *tbl_info,
LSN *lsn, void *hook_arg);
void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn);
my_bool _ma_log_prefix(MARIA_HA *info, my_off_t page,
uchar *buff, uint changed_length,
int move_length);
my_bool _ma_log_suffix(MARIA_HA *info, my_off_t page,
uchar *buff, uint org_length,
uint new_length);
my_bool _ma_log_add(MARIA_HA *info, my_off_t page, uchar *buff,
uint buff_length, uchar *key_pos,
uint changed_length, int move_length,
my_bool handle_overflow);
uint _ma_apply_redo_index_new_page(MARIA_HA *info, LSN lsn,
const uchar *header, uint length);
uint _ma_apply_redo_index_free_page(MARIA_HA *info, LSN lsn,
......
......@@ -14,8 +14,9 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
#include "maria_def.h"
#include "ma_blockrec.h" /* for some constants and in-write hooks */
#include "trnman.h"
#include "ma_blockrec.h" /* for some constants and in-write hooks */
#include "ma_key_recover.h" /* For some in-write hooks */
/**
@file
......
......@@ -117,7 +117,7 @@ static MARIA_HA *maria_clone_internal(MARIA_SHARE *share, int mode,
&info.blobs,sizeof(MARIA_BLOB)*share->base.blobs,
&info.buff,(share->base.max_key_block_length*2+
share->base.max_key_length),
&info.lastkey,share->base.max_key_length*3+1,
&info.lastkey,share->base.max_key_length*2+1,
&info.first_mbr_key, share->base.max_key_length,
&info.maria_rtree_recursion_state,
share->have_rtree ? 1024 : 0,
......@@ -304,8 +304,8 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
}
share->mode=open_mode;
errpos= 1;
if (my_read(kfile,share->state.header.file_version, head_length,
MYF(MY_NABP)))
if (my_pread(kfile,share->state.header.file_version, head_length, 0,
MYF(MY_NABP)))
{
my_errno= HA_ERR_NOT_A_TABLE;
goto err;
......@@ -355,11 +355,8 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
goto err;
}
end_pos=disk_cache+info_length;
errpos= 2;
VOID(my_seek(kfile,0L,MY_SEEK_SET,MYF(0)));
errpos= 3;
if (my_read(kfile,disk_cache,info_length,MYF(MY_NABP)))
if (my_pread(kfile, disk_cache, info_length, 0L, MYF(MY_NABP)))
{
my_errno=HA_ERR_CRASHED;
goto err;
......@@ -418,8 +415,10 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
goto err;
}
/*
If page cache is not initialized, then assume we will create it
after the table is opened!
If page cache is not initialized, then assume we will create the
page_cache after the table is opened!
This is only used by maria_check to allow it to check/repair tables
with different block sizes.
*/
if (share->base.block_size != maria_block_size &&
share_buff.pagecache->inited != 0)
......@@ -1226,7 +1225,6 @@ static uchar *_ma_state_info_read(uchar *ptr, MARIA_STATE_INFO *state)
@param file file to read from
@param state state which will be filled
@param pRead if true, use my_pread(), otherwise my_read()
*/
uint _ma_state_info_read_dsk(File file, MARIA_STATE_INFO *state)
......
......@@ -122,6 +122,7 @@ int _ma_write_keypage(register MARIA_HA *info, register MARIA_KEYDEF *keyinfo,
#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
{
uint length= _ma_get_page_used(info, buff);
DBUG_ASSERT(length <= block_size - KEYPAGE_CHECKSUM_SIZE);
bzero(buff + length, block_size - length);
}
#endif
......@@ -186,13 +187,14 @@ int _ma_dispose(register MARIA_HA *info, my_off_t pos, my_bool page_not_read)
(void) _ma_lock_key_del(info, 0);
old_link= share->state.key_del;
share->state.key_del= pos;
old_link= share->current_key_del;
share->current_key_del= pos;
page_no= pos / block_size;
bzero(buff, share->keypage_header);
_ma_store_keynr(info, buff, (uchar) MARIA_DELETE_KEY_NR);
mi_sizestore(buff + share->keypage_header, old_link);
share->state.changed|= STATE_NOT_SORTED_PAGES;
if (info->s->now_transactional)
{
LSN lsn;
......@@ -239,6 +241,14 @@ int _ma_dispose(register MARIA_HA *info, my_off_t pos, my_bool page_not_read)
0, share->keypage_header+8, 0, 0))
result= 1;
#ifdef IDENTICAL_PAGES_AFTER_RECOVERY
{
uchar *page_buff= pagecache_block_link_to_buffer(page_link.link);
bzero(page_buff + share->keypage_header + 8,
block_size - share->keypage_header - 8 - KEYPAGE_CHECKSUM_SIZE);
}
#endif
if (page_not_read)
{
/* It was not locked before, we have to unlock it when we unpin pages */
......@@ -295,7 +305,7 @@ my_off_t _ma_new(register MARIA_HA *info, int level,
TODO: replace PAGECACHE_PLAIN_PAGE with PAGECACHE_LSN_PAGE when
LSN on the pages will be implemented
*/
pos= info->s->state.key_del; /* Protected */
pos= share->current_key_del; /* Protected */
DBUG_ASSERT(share->pagecache->block_size == block_size);
if (!(buff= pagecache_read(share->pagecache,
&share->kfile, pos / block_size, level,
......@@ -312,7 +322,7 @@ my_off_t _ma_new(register MARIA_HA *info, int level,
(*page_link)->unlock= PAGECACHE_LOCK_WRITE_UNLOCK;
(*page_link)->write_lock= PAGECACHE_LOCK_WRITE;
(*page_link)->changed= 0;
push_dynamic(&info->pinned_pages, (void*) &page_link);
push_dynamic(&info->pinned_pages, (void*) *page_link);
*page_link= dynamic_element(&info->pinned_pages,
info->pinned_pages.elements-1,
MARIA_PINNED_PAGE *);
......
......@@ -405,8 +405,7 @@ static my_bool info_check_lock(PAGECACHE_BLOCK_LINK *block,
(PAGECACHE_LOCK_INFO *) info_find((PAGECACHE_PIN_INFO *) block->lock_list,
thread);
DBUG_ENTER("info_check_lock");
switch(lock)
{
switch(lock) {
case PAGECACHE_LOCK_LEFT_UNLOCKED:
if (pin != PAGECACHE_PIN_LEFT_UNPINNED ||
info)
......@@ -1199,7 +1198,7 @@ static inline void link_to_changed_list(PAGECACHE *pagecache,
none
NOTES.
The LRU chain is represented by a curcular list of block structures.
The LRU chain is represented by a circular list of block structures.
The list is double-linked of the type (**prev,*next) type.
The LRU chain is divided into two parts - hot and warm.
There are two pointers to access the last blocks of these two
......@@ -1268,7 +1267,7 @@ static void link_block(PAGECACHE *pagecache, PAGECACHE_BLOCK_LINK *block,
}
#else /* THREAD */
KEYCACHE_DBUG_ASSERT(! (!hot && pagecache->waiting_for_block.last_thread));
/* Condition not transformed using DeMorgan, to keep the text identical */
/* Condition not transformed using DeMorgan, to keep the text identical */
#endif /* THREAD */
ptr_ins= hot ? &pagecache->used_ins : &pagecache->used_last;
ins= *ptr_ins;
......@@ -2730,10 +2729,10 @@ void pagecache_unlock_by_link(PAGECACHE *pagecache,
LSN lsn, my_bool was_changed)
{
DBUG_ENTER("pagecache_unlock_by_link");
DBUG_PRINT("enter", ("block: 0x%lx fd: %u page: %lu %s %s",
DBUG_PRINT("enter", ("block: 0x%lx fd: %u page: %lu changed: %d %s %s",
(ulong) block,
(uint) block->hash_link->file.file,
(ulong) block->hash_link->pageno,
(ulong) block->hash_link->pageno, was_changed,
page_cache_page_lock_str[lock],
page_cache_page_pin_str[pin]));
/*
......@@ -2799,7 +2798,6 @@ void pagecache_unlock_by_link(PAGECACHE *pagecache,
(ulong) block));
}
pagecache_set_block_rec_lsn(block, first_REDO_LSN_for_page);
if (make_lock_and_pin(pagecache, block, lock, pin, 0))
DBUG_ASSERT(0); /* should not happend */
......@@ -4246,6 +4244,11 @@ static void test_key_cache(PAGECACHE *pagecache __attribute__((unused)),
}
#endif
uchar *pagecache_block_link_to_buffer(PAGECACHE_BLOCK_LINK *block)
{
return block->buffer;
}
#if defined(PAGECACHE_TIMEOUT)
#define KEYCACHE_DUMP_FILE "pagecache_dump.txt"
......
......@@ -269,6 +269,7 @@ extern my_bool pagecache_collect_changed_blocks_with_lsn(PAGECACHE *pagecache,
LEX_STRING *str,
LSN *min_lsn);
extern int reset_pagecache_counters(const char *name, PAGECACHE *pagecache);
extern uchar *pagecache_block_link_to_buffer(PAGECACHE_BLOCK_LINK *block);
/* Functions to handle multiple key caches */
......
......@@ -1529,7 +1529,8 @@ prototype_redo_exec_hook(UNDO_ROW_DELETE)
}
share->state.state.checksum+= ha_checksum_korr(buff);
}
share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED;
share->state.changed|= (STATE_CHANGED | STATE_NOT_ANALYZED |
STATE_NOT_OPTIMIZED_ROWS);
}
tprint(tracef, " rows' count %lu\n", (ulong)share->state.state.records);
_ma_unpin_all_pages(info, rec->lsn);
......@@ -1569,6 +1570,9 @@ prototype_redo_exec_hook(UNDO_ROW_UPDATE)
prototype_redo_exec_hook(UNDO_KEY_INSERT)
{
MARIA_HA *info;
if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
return 0;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
_ma_unpin_all_pages(info, rec->lsn);
return 0;
......@@ -1577,6 +1581,9 @@ prototype_redo_exec_hook(UNDO_KEY_INSERT)
prototype_redo_exec_hook(UNDO_KEY_DELETE)
{
MARIA_HA *info;
if (!(info= get_MARIA_HA_from_UNDO_record(rec)))
return 0;
set_undo_lsn_for_active_trans(rec->short_trid, rec->lsn);
_ma_unpin_all_pages(info, rec->lsn);
return 0;
......@@ -1595,9 +1602,9 @@ prototype_redo_exec_hook(UNDO_KEY_DELETE_WITH_ROOT)
{
uint key_nr;
my_off_t page;
page= page_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
key_nr= key_nr_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE);
key_nr= key_nr_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE);
page= page_korr(rec->header + LSN_STORE_SIZE + FILEID_STORE_SIZE +
KEY_NR_STORE_SIZE);
share->state.key_root[key_nr]= (page == IMPOSSIBLE_PAGE_NO ?
HA_OFFSET_ERROR :
page * share->block_size);
......@@ -1653,6 +1660,7 @@ prototype_redo_exec_hook(CLR_END)
LSN previous_undo_lsn;
enum translog_record_type undone_record_type;
const LOG_DESC *log_desc;
my_bool row_entry= 0;
if (info == NULL)
return 0;
......@@ -1668,33 +1676,40 @@ prototype_redo_exec_hook(CLR_END)
if (cmp_translog_addr(rec->lsn, share->state.is_of_horizon) >= 0)
{
tprint(tracef, " state older than record, updating rows' count\n");
if (share->calc_checksum)
{
uchar buff[HA_CHECKSUM_STORE_SIZE];
if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
CLR_TYPE_STORE_SIZE, HA_CHECKSUM_STORE_SIZE,
buff, NULL) != HA_CHECKSUM_STORE_SIZE)
{
tprint(tracef, "Failed to read record\n");
return 1;
}
share->state.state.checksum+= ha_checksum_korr(buff);
}
switch (undone_record_type) {
case LOGREC_UNDO_ROW_DELETE:
row_entry= 1;
share->state.state.records++;
break;
case LOGREC_UNDO_ROW_INSERT:
share->state.state.records--;
row_entry= 1;
break;
case LOGREC_UNDO_ROW_UPDATE:
row_entry= 1;
break;
case LOGREC_UNDO_KEY_INSERT:
case LOGREC_UNDO_KEY_DELETE:
break;
default:
DBUG_ASSERT(0);
}
if (row_entry && share->calc_checksum)
{
uchar buff[HA_CHECKSUM_STORE_SIZE];
if (translog_read_record(rec->lsn, LSN_STORE_SIZE + FILEID_STORE_SIZE +
CLR_TYPE_STORE_SIZE, HA_CHECKSUM_STORE_SIZE,
buff, NULL) != HA_CHECKSUM_STORE_SIZE)
{
tprint(tracef, "Failed to read record\n");
return 1;
}
share->state.state.checksum+= ha_checksum_korr(buff);
}
share->state.changed|= STATE_CHANGED | STATE_NOT_ANALYZED;
}
tprint(tracef, " rows' count %lu\n", (ulong)share->state.state.records);
if (row_entry)
tprint(tracef, " rows' count %lu\n", (ulong)share->state.state.records);
_ma_unpin_all_pages(info, rec->lsn);
return 0;
}
......@@ -2356,19 +2371,22 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
pgcache_page_no_t page;
MARIA_HA *info;
char llbuf[22];
my_bool index_page_redo_entry= 0;
print_redo_phase_progress(rec->lsn);
sid= fileid_korr(rec->header);
page= page_korr(rec->header + FILEID_STORE_SIZE);
switch(rec->type) {
switch (rec->type) {
/* not all REDO records have a page: */
case LOGREC_REDO_INDEX_NEW_PAGE:
case LOGREC_REDO_INDEX:
case LOGREC_REDO_INDEX_FREE_PAGE:
index_page_redo_entry= 1;
/* Fall trough*/
case LOGREC_REDO_INSERT_ROW_HEAD:
case LOGREC_REDO_INSERT_ROW_TAIL:
case LOGREC_REDO_PURGE_ROW_HEAD:
case LOGREC_REDO_PURGE_ROW_TAIL:
case LOGREC_REDO_INDEX_NEW_PAGE:
case LOGREC_REDO_INDEX:
case LOGREC_REDO_INDEX_FREE_PAGE:
llstr(page, llbuf);
tprint(tracef, " For page %s of table of short id %u", llbuf, sid);
break;
......@@ -2407,12 +2425,9 @@ static MARIA_HA *get_MARIA_HA_from_REDO_record(const
DBUG_ASSERT(info->s->last_version != 0);
if (cmp_translog_addr(rec->lsn, checkpoint_start) < 0)
{
/**
@todo RECOVERY BUG always assuming this is REDO for data file, but it
could soon be index file
*/
uint64 file_and_page_id=
(((uint64)all_tables[sid].org_dfile) << 32) | page;
(((uint64) (index_page_redo_entry ? all_tables[sid].org_kfile :
all_tables[sid].org_dfile)) << 32) | page;
struct st_dirty_page *dirty_page= (struct st_dirty_page *)
hash_search(&all_dirty_pages,
(uchar *)&file_and_page_id, sizeof(file_and_page_id));
......
......@@ -1182,7 +1182,10 @@ static my_bool _ma_get_prev_key(MARIA_HA *info, MARIA_KEYDEF *keyinfo,
/*
@brief Get last key from key-page
@brief Get last key from key-page before 'endpos'
@note
endpos may be either end of buffer or start of a key
@return
@retval pointer to where key starts
......@@ -1506,7 +1509,7 @@ _ma_calc_static_key_length(MARIA_KEYDEF *keyinfo,uint nod_flag,
const uchar *key, MARIA_KEY_PARAM *s_temp)
{
s_temp->key= key;
return (int) (s_temp->totlength=keyinfo->keylength+nod_flag);
return (int) (s_temp->move_length= keyinfo->keylength + nod_flag);
}
/* Variable length key */
......@@ -1519,26 +1522,28 @@ _ma_calc_var_key_length(MARIA_KEYDEF *keyinfo,uint nod_flag,
const uchar *key, MARIA_KEY_PARAM *s_temp)
{
s_temp->key= key;
return (int) (s_temp->totlength= _ma_keylength(keyinfo,key)+nod_flag);
return (int) (s_temp->move_length= _ma_keylength(keyinfo,key)+nod_flag);
}
/*
length of key with a variable length first segment which is prefix
compressed (maria_chk reports 'packed + stripped')
/**
@brief Calc length needed to store prefixed compressed keys
Keys are compressed the following way:
@info
Variable length first segment which is prefix compressed
(maria_chk reports 'packed + stripped')
If the max length of first key segment <= 127 bytes the prefix is
1 uchar else it's 2 byte
Keys are compressed the following way:
prefix byte(s) The high bit is set if this is a prefix for the prev key
length Packed length if the previous was a prefix byte
[length] data bytes ('length' bytes)
next-key-seg Next key segments
If the max length of first key segment <= 127 bytes the prefix is
1 uchar else it's 2 byte
If the first segment can have NULL:
The length is 0 for NULLS and 1+length for not null columns.
prefix byte(s) The high bit is set if this is a prefix for the prev key
length Packed length if the previous was a prefix byte
[length] data bytes ('length' bytes)
next-key-seg Next key segments
If the first segment can have NULL:
The length is 0 for NULLS and 1+length for not null columns.
*/
int
......@@ -1589,7 +1594,7 @@ _ma_calc_var_pack_key_length(MARIA_KEYDEF *keyinfo, uint nod_flag,
s_temp->key_length= 0;
s_temp->totlength= key_length-1+diff_flag;
s_temp->next_key_pos= 0; /* No next key */
return (s_temp->totlength);
return (s_temp->move_length= s_temp->totlength);
}
s_temp->store_not_null=1;
key_length--; /* We don't store NULL */
......@@ -1744,7 +1749,7 @@ _ma_calc_var_pack_key_length(MARIA_KEYDEF *keyinfo, uint nod_flag,
s_temp->n_ref_length=s_temp->n_length= org_key_length;
length+= org_key_length;
}
return (int) length;
return (s_temp->move_length= (int) length);
}
ref_length=n_length;
......@@ -1757,7 +1762,8 @@ _ma_calc_var_pack_key_length(MARIA_KEYDEF *keyinfo, uint nod_flag,
s_temp->part_of_prev_key= 0;
s_temp->prev_length= ref_length;
s_temp->n_ref_length= s_temp->n_length= n_length+ref_length;
return (int) length+ref_length-next_length_pack;
return s_temp->move_length= ((int) length+ref_length-
next_length_pack);
}
if (ref_length+pack_marker > new_ref_length)
{
......@@ -1768,7 +1774,7 @@ _ma_calc_var_pack_key_length(MARIA_KEYDEF *keyinfo, uint nod_flag,
s_temp->n_ref_length=s_temp->n_length=n_length + s_temp->prev_length;
s_temp->prev_key+= new_pack_length;
length-= (next_length_pack - get_pack_length(s_temp->n_length));
return (int) length + s_temp->prev_length;
return s_temp->move_length= ((int) length + s_temp->prev_length);
}
}
else
......@@ -1803,7 +1809,7 @@ _ma_calc_var_pack_key_length(MARIA_KEYDEF *keyinfo, uint nod_flag,
if (!(tmp_length=(uint) (key-start)))
{ /* Key can't be re-packed */
s_temp->next_key_pos=0;
return length;
return (s_temp->move_length= length);
}
ref_length+=tmp_length;
n_length-=tmp_length;
......@@ -1821,7 +1827,7 @@ _ma_calc_var_pack_key_length(MARIA_KEYDEF *keyinfo, uint nod_flag,
}
}
}
return length;
return (s_temp->move_length= length);
}
......@@ -1884,8 +1890,9 @@ int _ma_calc_bin_pack_key_length(MARIA_KEYDEF *keyinfo, uint nod_flag,
s_temp->n_ref_length= ref_length;
s_temp->prev_length= next_length-ref_length;
s_temp->prev_key+= ref_length;
return (int) (length+ s_temp->prev_length - next_length_pack +
get_pack_length(ref_length));
return s_temp->move_length= ((int) (length+ s_temp->prev_length -
next_length_pack +
get_pack_length(ref_length)));
}
/* Check how many characters are identical to next key */
key= s_temp->key+next_length;
......@@ -1893,14 +1900,15 @@ int _ma_calc_bin_pack_key_length(MARIA_KEYDEF *keyinfo, uint nod_flag,
if ((ref_length= (uint) (key - s_temp->key)-1) == next_length)
{
s_temp->next_key_pos=0;
return length; /* can't pack next key */
return (s_temp->move_length= length); /* Can't pack next key */
}
s_temp->prev_length=0;
s_temp->n_ref_length=ref_length;
return (int) (length-(ref_length - next_length) - next_length_pack +
get_pack_length(ref_length));
return s_temp->move_length= (int) (length-(ref_length - next_length) -
next_length_pack +
get_pack_length(ref_length));
}
return (int) length;
return (s_temp->move_length= (int) length);
}
......@@ -1914,8 +1922,8 @@ void _ma_store_static_key(MARIA_KEYDEF *keyinfo __attribute__((unused)),
register uchar *key_pos,
register MARIA_KEY_PARAM *s_temp)
{
memcpy(key_pos, s_temp->key,(size_t) s_temp->totlength);
s_temp->changed_length= s_temp->totlength;
memcpy(key_pos, s_temp->key,(size_t) s_temp->move_length);
s_temp->changed_length= s_temp->move_length;
}
......
......@@ -97,7 +97,7 @@ int main(int argc,char *argv[])
static int run_test(const char *filename)
{
MARIA_HA *file;
int i,j,error,deleted,rec_length,uniques=0;
int i,j= 0,error,deleted,rec_length,uniques=0;
uint offset_to_key;
ha_rows found,row_count;
uchar record[MAX_REC_LENGTH],key[MAX_REC_LENGTH],read_record[MAX_REC_LENGTH];
......@@ -275,7 +275,7 @@ static int run_test(const char *filename)
{
if (!silent)
printf("- Checking unique constraint\n");
create_record(record,j);
create_record(record,j); /* Check last created row */
if (!maria_write(file,record) || my_errno != HA_ERR_FOUND_DUPP_UNIQUE)
{
printf("unique check failed\n");
......
......@@ -880,15 +880,6 @@ int main(int argc, char *argv[])
goto err;
}
opt_delete++;
#if TO_BE_REMOVED
/
/*
179 is ok, 180 causes a difference between runtime and log-applying.
This is now fixed (we zero the last directory entry during
log-applying, just to eliminate this irrelevant difference).
*/
if (opt_delete==180) goto end;
#endif
}
else
found_parts++;
......
Running tests with dynamic row format
Running tests with static row format
Running tests with block row format
Running tests with block row format and transactions
ma_test2 -s -L -K -R1 -m2000 ; Should give error 135
Error: 135 in write at record: 1099
got error: 135 when using MARIA-database
......@@ -8,55 +9,6 @@ got error: 135 when using MARIA-database
maria_chk: MARIA file test2
maria_chk: warning: Datafile is almost full, 65516 of 65534 used
MARIA-table 'test2' is usable but should be fixed
real 0m0.808s
user 0m0.584s
sys 0m0.212s
real 0m0.780s
user 0m0.584s
sys 0m0.176s
real 0m0.809s
user 0m0.616s
sys 0m0.180s
real 0m1.356s
user 0m1.140s
sys 0m0.188s
real 0m0.783s
user 0m0.600s
sys 0m0.176s
real 0m1.390s
user 0m1.184s
sys 0m0.152s
real 0m1.875s
user 0m1.632s
sys 0m0.244s
real 0m1.313s
user 0m1.148s
sys 0m0.160s
real 0m1.846s
user 0m1.644s
sys 0m0.188s
real 0m1.875s
user 0m1.632s
sys 0m0.212s
real 0m1.819s
user 0m1.672s
sys 0m0.124s
real 0m2.117s
user 0m1.816s
sys 0m0.292s
real 0m1.871s
user 0m1.636s
sys 0m0.196s
MARIA RECOVERY TESTS
ALL RECOVERY TESTS OK
!!!!!!!! BUT REMEMBER to FIX this BLOB issue !!!!!!!
......@@ -115,7 +115,7 @@ run_tests()
$maria_path/maria_chk$suffix -sm test2
$maria_path/ma_test2$suffix $silent -m10000 -e4096 -K $row_type
$maria_path/maria_chk$suffix -sm test2
$maria_path/ma_test2$suffix $silent -m10000 -e8192 -K $row_type
$maria_path/ma_test2$suffix $silent -m10000 -e8192 -K $row_type -P
$maria_path/maria_chk$suffix -sm test2
$maria_path/ma_test2$suffix $silent -m10000 -e16384 -E16384 -K -L $row_type
$maria_path/maria_chk$suffix -sm test2
......@@ -232,16 +232,16 @@ $maria_path/maria_chk$suffix -ssm test2
#
# Some timing tests
#
time $maria_path/ma_test2$suffix $silent
time $maria_path/ma_test2$suffix $silent -S
time $maria_path/ma_test2$suffix $silent -M
time $maria_path/ma_test2$suffix $silent -B
time $maria_path/ma_test2$suffix $silent -L
time $maria_path/ma_test2$suffix $silent -K
time $maria_path/ma_test2$suffix $silent -K -B
time $maria_path/ma_test2$suffix $silent -L -B
time $maria_path/ma_test2$suffix $silent -L -K -B
time $maria_path/ma_test2$suffix $silent -L -K -W -B
time $maria_path/ma_test2$suffix $silent -L -K -W -B -S
time $maria_path/ma_test2$suffix $silent -L -K -W -B -M
time $maria_path/ma_test2$suffix $silent -D -K -W -B -S
#time $maria_path/ma_test2$suffix $silent
#time $maria_path/ma_test2$suffix $silent -S
#time $maria_path/ma_test2$suffix $silent -M
#time $maria_path/ma_test2$suffix $silent -B
#time $maria_path/ma_test2$suffix $silent -L
#time $maria_path/ma_test2$suffix $silent -K
#time $maria_path/ma_test2$suffix $silent -K -B
#time $maria_path/ma_test2$suffix $silent -L -B
#time $maria_path/ma_test2$suffix $silent -L -K -B
#time $maria_path/ma_test2$suffix $silent -L -K -W -B
#time $maria_path/ma_test2$suffix $silent -L -K -W -B -S
#time $maria_path/ma_test2$suffix $silent -L -K -W -B -M
#time $maria_path/ma_test2$suffix $silent -D -K -W -B -S
......@@ -87,9 +87,6 @@ apply_log()
(
# this message is to remember about the problem with -b (see @todo below)
echo "!!!!!!!! REMEMBER to FIX this BLOB issue !!!!!!!"
echo "Testing the REDO PHASE ALONE"
# runs a program inserting/deleting rows, then moves the resulting table
# elsewhere; applies the log and checks that the data file is
......@@ -111,10 +108,12 @@ do
mv $table.MAI $tmp/$table-good.MAI
apply_log "shouldnotchangelog"
cmp $table.MAD $tmp/$table-good.MAD
cmp $table.MAI $tmp/$table-good.MAI
check_table_is_same
echo "testing idempotency"
apply_log "shouldnotchangelog"
cmp $table.MAD $tmp/$table-good.MAD
cmp $table.MAI $tmp/$table-good.MAI
check_table_is_same
shift
done
......@@ -157,6 +156,12 @@ do
echo "TEST WITH $prog $abort_run_args$test_undo (additional aborted work)"
$maria_path/$prog $abort_run_args$test_undo
cp $table.MAD $tmp/$table.MAD.before_undo
# We have to copy and restore logs, as running maria_read_log will
# change the maria_control_file
rm -f $tmp/maria_log.* $tmp/maria_log_control
cp $maria_path/maria_log* $tmp
if [ $test_undo -lt 3 ]
then
apply_log "shouldchangelog" # should undo aborted work
......@@ -179,19 +184,9 @@ do
check_table_is_same
echo "testing applying of CLRs to recreate table"
rm $table.MA?
apply_log "shouldnotchangelog"
# the cmp below fails with ma_test1+blobs! @todo RECOVERY BUG why?
# It is probably serious; REDOs shouldn't place rows in different
# positions from what the run-time code did. Indeed it may lead to
# more or less free space...
# Execution of UNDO re-inserted rows at different positions than
# originally. This generated REDOs which do not insert at the same
# positions as the execution of UNDOs, but at the same positions
# as before the row was originally deleted.
if [ "$blobs" == "" ]
then
cmp $table.MAD $tmp/$table.MAD.after_undo
fi
cp $tmp/maria_log* $maria_path
apply_log "dontknow"
cmp $table.MAD $tmp/$table.MAD.after_undo
check_table_is_same
shift 3
done
......@@ -213,5 +208,3 @@ if [ "$diff_failed" == "1" ]
exit 1
fi
echo "ALL RECOVERY TESTS OK"
# this message is to remember about the problem with -b (see @todo above)
echo "!!!!!!!! BUT REMEMBER to FIX this BLOB issue !!!!!!!"
This diff is collapsed.
This diff is collapsed.
......@@ -1007,6 +1007,9 @@ static int maria_chk(HA_CHECK *param, char *filename)
if (param->testflag & (T_REP_ANY | T_SORT_RECORDS | T_SORT_INDEX))
{
/* Mark table as not transactional to avoid logging */
maria_disable_logging(info);
if (param->testflag & T_REP_ANY)
{
ulonglong tmp=share->state.key_map;
......@@ -1181,6 +1184,7 @@ static int maria_chk(HA_CHECK *param, char *filename)
((param->testflag & T_SORT_RECORDS) ?
UPDATE_SORT : 0)));
info->update&= ~HA_STATE_CHANGED;
maria_enable_logging(info);
maria_lock_database(info, F_UNLCK);
end2:
......
This diff is collapsed.
This diff is collapsed.
......@@ -337,7 +337,7 @@ int main(int argc __attribute__((unused)), char *argv[])
if (translog_flush(translog_get_horizon()))
{
fprintf(stderr, "Can't flush up to horizon\n", (ulong) i);
fprintf(stderr, "Can't flush up to horizon\n");
translog_destroy();
ok(0, "flush");
exit(1);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment