Commit 4a2ce097 authored by unknown's avatar unknown

Merge gbichot@bk-internal.mysql.com:/home/bk/mysql-maria

into  gbichot3.local:/home/mysql_src/mysql-maria-clean


storage/maria/ma_pagecache.c:
  merge
parents 1aa9027c e4c2d748
...@@ -2456,7 +2456,14 @@ static int flush_key_blocks_int(KEY_CACHE *keycache, ...@@ -2456,7 +2456,14 @@ static int flush_key_blocks_int(KEY_CACHE *keycache,
} }
else else
{ {
/* Link the block into a list of blocks 'in switch' */ /*
Link the block into a list of blocks 'in switch'.
Note that if there could be two concurrent flush_key_blocks_int()
on this file (normally this does not happen, as MyISAM uses
intern_lock for flushing), then the first one may move the block
into its first_in_switch, and the second one would just not see
the block and wrongly consider its job done.
*/
unlink_changed(block); unlink_changed(block);
link_changed(block, &first_in_switch); link_changed(block, &first_in_switch);
} }
......
...@@ -479,7 +479,7 @@ handler(hton, table_arg), file(0), ...@@ -479,7 +479,7 @@ handler(hton, table_arg), file(0),
int_table_flags(HA_NULL_IN_KEY | HA_CAN_FULLTEXT | HA_CAN_SQL_HANDLER | int_table_flags(HA_NULL_IN_KEY | HA_CAN_FULLTEXT | HA_CAN_SQL_HANDLER |
HA_DUPLICATE_POS | HA_CAN_INDEX_BLOBS | HA_AUTO_PART_KEY | HA_DUPLICATE_POS | HA_CAN_INDEX_BLOBS | HA_AUTO_PART_KEY |
HA_FILE_BASED | HA_CAN_GEOMETRY | MARIA_CANNOT_ROLLBACK | HA_FILE_BASED | HA_CAN_GEOMETRY | MARIA_CANNOT_ROLLBACK |
HA_CAN_INSERT_DELAYED | HA_CAN_BIT_FIELD | HA_CAN_RTREEKEYS | HA_CAN_BIT_FIELD | HA_CAN_RTREEKEYS |
HA_HAS_RECORDS | HA_STATS_RECORDS_IS_EXACT), HA_HAS_RECORDS | HA_STATS_RECORDS_IS_EXACT),
can_enable_indexes(1) can_enable_indexes(1)
{} {}
...@@ -697,9 +697,19 @@ int ha_maria::open(const char *name, int mode, uint test_if_locked) ...@@ -697,9 +697,19 @@ int ha_maria::open(const char *name, int mode, uint test_if_locked)
info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST); info(HA_STATUS_NO_LOCK | HA_STATUS_VARIABLE | HA_STATUS_CONST);
if (!(test_if_locked & HA_OPEN_WAIT_IF_LOCKED)) if (!(test_if_locked & HA_OPEN_WAIT_IF_LOCKED))
VOID(maria_extra(file, HA_EXTRA_WAIT_LOCK, 0)); VOID(maria_extra(file, HA_EXTRA_WAIT_LOCK, 0));
save_transactional= file->s->base.transactional;
if ((data_file_type= file->s->data_file_type) != STATIC_RECORD) if ((data_file_type= file->s->data_file_type) != STATIC_RECORD)
int_table_flags |= HA_REC_NOT_IN_SEQ; int_table_flags |= HA_REC_NOT_IN_SEQ;
if (!file->s->base.born_transactional)
{
/*
INSERT DELAYED cannot work with transactional tables (because it cannot
stand up to "when client gets ok the data is safe on disk": the record
may not even be inserted). In the future, we could enable it back (as a
client doing INSERT DELAYED knows the specificities; but we then should
make sure to regularly commit in the delayed_insert thread).
*/
int_table_flags|= HA_CAN_INSERT_DELAYED;
}
if (file->s->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD)) if (file->s->options & (HA_OPTION_CHECKSUM | HA_OPTION_COMPRESS_RECORD))
int_table_flags |= HA_HAS_CHECKSUM; int_table_flags |= HA_HAS_CHECKSUM;
...@@ -1178,8 +1188,6 @@ int ha_maria::repair(THD *thd, HA_CHECK &param, bool do_optimize) ...@@ -1178,8 +1188,6 @@ int ha_maria::repair(THD *thd, HA_CHECK &param, bool do_optimize)
llstr(rows, llbuff), llstr(rows, llbuff),
llstr(file->state->records, llbuff2)); llstr(file->state->records, llbuff2));
} }
if (!error)
error= _ma_repair_write_log_record(&param, file);
} }
else else
{ {
...@@ -1861,30 +1869,19 @@ int ha_maria::external_lock(THD *thd, int lock_type) ...@@ -1861,30 +1869,19 @@ int ha_maria::external_lock(THD *thd, int lock_type)
{ {
TRN *trn= THD_TRN; TRN *trn= THD_TRN;
DBUG_ENTER("ha_maria::external_lock"); DBUG_ENTER("ha_maria::external_lock");
if (!save_transactional) /*
We don't test now_transactional because it may vary between lock/unlock
and thus confuse our reference counting.
It is critical to skip non-transactional tables: user-visible temporary
tables get an external_lock() when read/written for the first time, but no
corresponding unlock (they just stay locked and are later dropped while
locked); if a tmp table was transactional, "SELECT FROM non_tmp, tmp"
would never commit as its "locked_tables" count would stay 1.
*/
if (!file->s->base.born_transactional)
goto skip_transaction; goto skip_transaction;
if (!trn && lock_type != F_UNLCK) /* no transaction yet - open it now */
{
trn= trnman_new_trn(& thd->mysys_var->mutex,
& thd->mysys_var->suspend,
thd->thread_stack + STACK_DIRECTION *
(my_thread_stack_size - STACK_MIN_SIZE));
if (!trn)
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
DBUG_PRINT("info", ("THD_TRN set to 0x%lx", (ulong)trn));
THD_TRN= trn;
if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trans_register_ha(thd, TRUE, maria_hton);
}
if (lock_type != F_UNLCK) if (lock_type != F_UNLCK)
{ {
this->file->trn= trn;
if (!trnman_increment_locked_tables(trn))
{
trans_register_ha(thd, FALSE, maria_hton);
trnman_new_statement(trn);
}
if (!thd->transaction.on) if (!thd->transaction.on)
{ {
/* /*
...@@ -1896,11 +1893,32 @@ int ha_maria::external_lock(THD *thd, int lock_type) ...@@ -1896,11 +1893,32 @@ int ha_maria::external_lock(THD *thd, int lock_type)
tons of archived logs to roll-forward, we could then not disable tons of archived logs to roll-forward, we could then not disable
REDOs/UNDOs in this case. REDOs/UNDOs in this case.
*/ */
file->s->base.transactional= FALSE; _ma_tmp_disable_logging_for_table(file->s);
}
if (!trn) /* no transaction yet - open it now */
{
trn= trnman_new_trn(& thd->mysys_var->mutex,
& thd->mysys_var->suspend,
thd->thread_stack + STACK_DIRECTION *
(my_thread_stack_size - STACK_MIN_SIZE));
if (unlikely(!trn))
DBUG_RETURN(HA_ERR_OUT_OF_MEM);
DBUG_PRINT("info", ("THD_TRN set to 0x%lx", (ulong)trn));
THD_TRN= trn;
if (thd->options & (OPTION_NOT_AUTOCOMMIT | OPTION_BEGIN))
trans_register_ha(thd, TRUE, maria_hton);
}
this->file->trn= trn;
if (!trnman_increment_locked_tables(trn))
{
trans_register_ha(thd, FALSE, maria_hton);
trnman_new_statement(trn);
} }
} }
else else
{ {
_ma_reenable_logging_for_table(file->s);
this->file->trn= 0; /* TODO: remove it also in commit and rollback */ this->file->trn= 0; /* TODO: remove it also in commit and rollback */
if (trn && trnman_has_locked_tables(trn)) if (trn && trnman_has_locked_tables(trn))
{ {
...@@ -1921,7 +1939,6 @@ int ha_maria::external_lock(THD *thd, int lock_type) ...@@ -1921,7 +1939,6 @@ int ha_maria::external_lock(THD *thd, int lock_type)
#endif #endif
} }
} }
file->s->base.transactional= save_transactional;
} }
skip_transaction: skip_transaction:
DBUG_RETURN(maria_lock_database(file, !table->s->tmp_table ? DBUG_RETURN(maria_lock_database(file, !table->s->tmp_table ?
...@@ -1932,7 +1949,7 @@ int ha_maria::external_lock(THD *thd, int lock_type) ...@@ -1932,7 +1949,7 @@ int ha_maria::external_lock(THD *thd, int lock_type)
int ha_maria::start_stmt(THD *thd, thr_lock_type lock_type) int ha_maria::start_stmt(THD *thd, thr_lock_type lock_type)
{ {
TRN *trn= THD_TRN; TRN *trn= THD_TRN;
if (save_transactional) if (file->s->base.born_transactional)
{ {
DBUG_ASSERT(trn); // this may be called only after external_lock() DBUG_ASSERT(trn); // this may be called only after external_lock()
DBUG_ASSERT(trnman_has_locked_tables(trn)); DBUG_ASSERT(trnman_has_locked_tables(trn));
......
...@@ -39,11 +39,6 @@ class ha_maria :public handler ...@@ -39,11 +39,6 @@ class ha_maria :public handler
char *data_file_name, *index_file_name; char *data_file_name, *index_file_name;
enum data_file_type data_file_type; enum data_file_type data_file_type;
bool can_enable_indexes; bool can_enable_indexes;
/**
@brief for temporarily disabling table's transactionality
(if THD::transaction::on is false), remember the original value here
*/
bool save_transactional;
int repair(THD * thd, HA_CHECK &param, bool optimize); int repair(THD * thd, HA_CHECK &param, bool optimize);
public: public:
......
...@@ -581,18 +581,10 @@ void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn) ...@@ -581,18 +581,10 @@ void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn)
DBUG_PRINT("info", ("undo_lsn: %lu", (ulong) undo_lsn)); DBUG_PRINT("info", ("undo_lsn: %lu", (ulong) undo_lsn));
/* True if not disk error */ /* True if not disk error */
DBUG_ASSERT((undo_lsn != LSN_IMPOSSIBLE) || !info->s->base.transactional); DBUG_ASSERT((undo_lsn != LSN_IMPOSSIBLE) || !info->s->now_transactional);
if (!info->s->base.transactional) if (!info->s->now_transactional)
{ undo_lsn= LSN_IMPOSSIBLE; /* don't try to set a LSN on pages */
/*
If this is a transactional table but with transactionality temporarily
disabled (like in ALTER TABLE) we need to give a sensible LSN to pages
and not LSN_IMPOSSIBLE. If this is not a transactional table it will
reduce to LSN_IMPOSSIBLE.
*/
undo_lsn= info->s->state.create_rename_lsn;
}
while (pinned_page-- != page_link) while (pinned_page-- != page_link)
pagecache_unlock_by_link(info->s->pagecache, pinned_page->link, pagecache_unlock_by_link(info->s->pagecache, pinned_page->link,
...@@ -1446,7 +1438,7 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count) ...@@ -1446,7 +1438,7 @@ static my_bool free_full_page_range(MARIA_HA *info, ulonglong page, uint count)
page, count, PAGECACHE_LOCK_WRITE, 0)) page, count, PAGECACHE_LOCK_WRITE, 0))
res= 1; res= 1;
if (info->s->base.transactional) if (info->s->now_transactional)
{ {
LSN lsn; LSN lsn;
DBUG_ASSERT(info->trn->rec_lsn); DBUG_ASSERT(info->trn->rec_lsn);
...@@ -1953,7 +1945,7 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -1953,7 +1945,7 @@ static my_bool write_block_record(MARIA_HA *info,
head_block+1, bitmap_blocks->count - 1); head_block+1, bitmap_blocks->count - 1);
} }
if (share->base.transactional) if (share->now_transactional)
{ {
uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE]; uchar log_data[FILEID_STORE_SIZE + PAGE_STORE_SIZE + DIRPOS_STORE_SIZE];
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 2]; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 2];
...@@ -1998,7 +1990,7 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -1998,7 +1990,7 @@ static my_bool write_block_record(MARIA_HA *info,
else else
push_dynamic(&info->pinned_pages, (void*) &page_link); push_dynamic(&info->pinned_pages, (void*) &page_link);
if (share->base.transactional && (tmp_data_used || blob_full_pages_exists)) if (share->now_transactional && (tmp_data_used || blob_full_pages_exists))
{ {
/* /*
Log REDO writes for all full pages (head part and all blobs) Log REDO writes for all full pages (head part and all blobs)
...@@ -2095,7 +2087,7 @@ static my_bool write_block_record(MARIA_HA *info, ...@@ -2095,7 +2087,7 @@ static my_bool write_block_record(MARIA_HA *info,
} }
/* Write UNDO record */ /* Write UNDO record */
if (share->base.transactional) if (share->now_transactional)
{ {
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE +
PAGE_STORE_SIZE + DIRPOS_STORE_SIZE]; PAGE_STORE_SIZE + DIRPOS_STORE_SIZE];
...@@ -2312,7 +2304,7 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info) ...@@ -2312,7 +2304,7 @@ my_bool _ma_write_abort_block_record(MARIA_HA *info)
} }
} }
if (info->s->base.transactional) if (info->s->now_transactional)
{ {
LSN lsn; LSN lsn;
LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1]; LEX_STRING log_array[TRANSLOG_INTERNAL_PARTS + 1];
...@@ -2671,7 +2663,7 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const byte *record) ...@@ -2671,7 +2663,7 @@ my_bool _ma_delete_block_record(MARIA_HA *info, const byte *record)
if (info->cur_row.extents && free_full_pages(info, &info->cur_row)) if (info->cur_row.extents && free_full_pages(info, &info->cur_row))
goto err; goto err;
if (info->s->base.transactional) if (info->s->now_transactional)
{ {
LSN lsn; LSN lsn;
uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE + uchar log_data[LSN_STORE_SIZE + FILEID_STORE_SIZE + PAGE_STORE_SIZE +
......
...@@ -91,6 +91,7 @@ static int _ma_safe_scan_block_record(MARIA_SORT_INFO *sort_info, ...@@ -91,6 +91,7 @@ static int _ma_safe_scan_block_record(MARIA_SORT_INFO *sort_info,
MARIA_HA *info, byte *record); MARIA_HA *info, byte *record);
static void copy_data_file_state(MARIA_STATE_INFO *to, static void copy_data_file_state(MARIA_STATE_INFO *to,
MARIA_STATE_INFO *from); MARIA_STATE_INFO *from);
static int write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info);
void maria_chk_init(HA_CHECK *param) void maria_chk_init(HA_CHECK *param)
...@@ -1952,6 +1953,8 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info, ...@@ -1952,6 +1953,8 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
MARIA_SORT_PARAM sort_param; MARIA_SORT_PARAM sort_param;
my_bool block_record, scan_inited= 0; my_bool block_record, scan_inited= 0;
enum data_file_type org_data_file_type= info->s->data_file_type; enum data_file_type org_data_file_type= info->s->data_file_type;
myf sync_dir= ((share->now_transactional && !share->temporary) ?
MY_SYNC_DIR : 0);
DBUG_ENTER("maria_repair"); DBUG_ENTER("maria_repair");
bzero((char *)&sort_info, sizeof(sort_info)); bzero((char *)&sort_info, sizeof(sort_info));
...@@ -1999,7 +2002,15 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info, ...@@ -1999,7 +2002,15 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
share->state.header.org_data_file_type == BLOCK_RECORD)) share->state.header.org_data_file_type == BLOCK_RECORD))
{ {
MARIA_HA *new_info; MARIA_HA *new_info;
if (!(sort_info.new_info= maria_open(info->s->unique_file_name, O_RDWR, /**
@todo RECOVERY it's a bit worrying to have two MARIA_SHARE on the
same index file:
- Checkpoint will see them as two tables
- are we sure that new_info never flushes an in-progress state
to the index file? And how to prevent Checkpoint from doing that?
- in the close future maria_close() will write the state...
*/
if (!(sort_info.new_info= maria_open(info->s->open_file_name, O_RDWR,
HA_OPEN_COPY | HA_OPEN_FOR_REPAIR))) HA_OPEN_COPY | HA_OPEN_FOR_REPAIR)))
goto err; goto err;
new_info= sort_info.new_info; new_info= sort_info.new_info;
...@@ -2174,8 +2185,6 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info, ...@@ -2174,8 +2185,6 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
if (!rep_quick) if (!rep_quick)
{ {
myf sync_dir= ((share->base.transactional && !share->temporary) ?
MY_SYNC_DIR : 0);
if (sort_info.new_info != sort_info.info) if (sort_info.new_info != sort_info.info)
{ {
MARIA_STATE_INFO save_state= sort_info.new_info->s->state; MARIA_STATE_INFO save_state= sort_info.new_info->s->state;
...@@ -2223,7 +2232,7 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info, ...@@ -2223,7 +2232,7 @@ int maria_repair(HA_CHECK *param, register MARIA_HA *info,
llstr(sort_info.dupp,llbuff)); llstr(sort_info.dupp,llbuff));
} }
got_error=0; got_error= sync_dir ? write_log_record_for_repair(param, info) : 0;
/* If invoked by external program that uses thr_lock */ /* If invoked by external program that uses thr_lock */
if (&share->state.state != info->state) if (&share->state.state != info->state)
memcpy( &share->state.state, info->state, sizeof(*info->state)); memcpy( &share->state.state, info->state, sizeof(*info->state));
...@@ -2424,7 +2433,7 @@ int maria_sort_index(HA_CHECK *param, register MARIA_HA *info, my_string name) ...@@ -2424,7 +2433,7 @@ int maria_sort_index(HA_CHECK *param, register MARIA_HA *info, my_string name)
int old_lock; int old_lock;
MARIA_SHARE *share=info->s; MARIA_SHARE *share=info->s;
MARIA_STATE_INFO old_state; MARIA_STATE_INFO old_state;
myf sync_dir= (share->base.transactional && !share->temporary) ? myf sync_dir= (share->now_transactional && !share->temporary) ?
MY_SYNC_DIR : 0; MY_SYNC_DIR : 0;
DBUG_ENTER("maria_sort_index"); DBUG_ENTER("maria_sort_index");
...@@ -2702,7 +2711,7 @@ int maria_repair_by_sort(HA_CHECK *param, register MARIA_HA *info, ...@@ -2702,7 +2711,7 @@ int maria_repair_by_sort(HA_CHECK *param, register MARIA_HA *info,
char llbuff[22]; char llbuff[22];
MARIA_SORT_INFO sort_info; MARIA_SORT_INFO sort_info;
ulonglong key_map=share->state.key_map; ulonglong key_map=share->state.key_map;
myf sync_dir= ((share->base.transactional && !share->temporary) ? myf sync_dir= ((share->now_transactional && !share->temporary) ?
MY_SYNC_DIR : 0); MY_SYNC_DIR : 0);
DBUG_ENTER("maria_repair_by_sort"); DBUG_ENTER("maria_repair_by_sort");
...@@ -3127,7 +3136,7 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info, ...@@ -3127,7 +3136,7 @@ int maria_repair_parallel(HA_CHECK *param, register MARIA_HA *info,
MARIA_SORT_INFO sort_info; MARIA_SORT_INFO sort_info;
ulonglong key_map=share->state.key_map; ulonglong key_map=share->state.key_map;
pthread_attr_t thr_attr; pthread_attr_t thr_attr;
myf sync_dir= (share->base.transactional && !share->temporary) ? myf sync_dir= (share->now_transactional && !share->temporary) ?
MY_SYNC_DIR : 0; MY_SYNC_DIR : 0;
DBUG_ENTER("maria_repair_parallel"); DBUG_ENTER("maria_repair_parallel");
...@@ -5487,11 +5496,10 @@ static int _ma_safe_scan_block_record(MARIA_SORT_INFO *sort_info, ...@@ -5487,11 +5496,10 @@ static int _ma_safe_scan_block_record(MARIA_SORT_INFO *sort_info,
@retval 1 error (disk problem) @retval 1 error (disk problem)
*/ */
int _ma_repair_write_log_record(const HA_CHECK *param, MARIA_HA *info) static int write_log_record_for_repair(const HA_CHECK *param, MARIA_HA *info)
{ {
MARIA_SHARE *share= info->s; MARIA_SHARE *share= info->s;
/* Only called from ha_maria.cc, not maria_check, so translog is inited */ if (translog_inited) /* test it in case this is maria_chk */
if (share->base.transactional && !share->temporary)
{ {
/* /*
For now this record is only informative. It could serve when applying For now this record is only informative. It could serve when applying
......
...@@ -108,7 +108,8 @@ int maria_close(register MARIA_HA *info) ...@@ -108,7 +108,8 @@ int maria_close(register MARIA_HA *info)
} }
} }
#endif #endif
my_free((gptr) info->s,MYF(0)); DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
my_free((gptr) share, MYF(0));
} }
pthread_mutex_unlock(&THR_LOCK_maria); pthread_mutex_unlock(&THR_LOCK_maria);
if (info->ftparser_param) if (info->ftparser_param)
......
...@@ -259,7 +259,7 @@ int maria_create(const char *name, enum data_file_type datafile_type, ...@@ -259,7 +259,7 @@ int maria_create(const char *name, enum data_file_type datafile_type,
} }
share.base.null_bytes= ci->null_bytes; share.base.null_bytes= ci->null_bytes;
share.base.original_null_bytes= ci->null_bytes; share.base.original_null_bytes= ci->null_bytes;
share.base.transactional= ci->transactional; share.base.born_transactional= ci->transactional;
share.base.max_field_lengths= max_field_lengths; share.base.max_field_lengths= max_field_lengths;
share.base.field_offsets= 0; /* for future */ share.base.field_offsets= 0; /* for future */
......
...@@ -46,7 +46,7 @@ int maria_delete_all_rows(MARIA_HA *info) ...@@ -46,7 +46,7 @@ int maria_delete_all_rows(MARIA_HA *info)
*/ */
if (_ma_readinfo(info,F_WRLCK,1)) if (_ma_readinfo(info,F_WRLCK,1))
DBUG_RETURN(my_errno); DBUG_RETURN(my_errno);
log_record= share->base.transactional && !share->temporary; log_record= share->now_transactional && !share->temporary;
if (_ma_mark_file_changed(info)) if (_ma_mark_file_changed(info))
goto err; goto err;
...@@ -142,7 +142,6 @@ void _ma_reset_status(MARIA_HA *info) ...@@ -142,7 +142,6 @@ void _ma_reset_status(MARIA_HA *info)
info->state->data_file_length= 0; info->state->data_file_length= 0;
info->state->empty= info->state->key_empty= 0; info->state->empty= info->state->key_empty= 0;
info->state->checksum= 0; info->state->checksum= 0;
share->state.create_rename_lsn= LSN_IMPOSSIBLE;
/* Drop the delete key chain. */ /* Drop the delete key chain. */
state->key_del= HA_OFFSET_ERROR; state->key_del= HA_OFFSET_ERROR;
......
...@@ -64,7 +64,7 @@ int maria_delete_table(const char *name) ...@@ -64,7 +64,7 @@ int maria_delete_table(const char *name)
raid_type= info->s->base.raid_type; raid_type= info->s->base.raid_type;
raid_chunks= info->s->base.raid_chunks; raid_chunks= info->s->base.raid_chunks;
#endif #endif
sync_dir= (info->s->base.transactional && !info->s->temporary) ? sync_dir= (info->s->now_transactional && !info->s->temporary) ?
MY_SYNC_DIR : 0; MY_SYNC_DIR : 0;
maria_close(info); maria_close(info);
} }
......
...@@ -129,6 +129,7 @@ int maria_lock_database(MARIA_HA *info, int lock_type) ...@@ -129,6 +129,7 @@ int maria_lock_database(MARIA_HA *info, int lock_type)
} }
info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED); info->opt_flag&= ~(READ_CACHE_USED | WRITE_CACHE_USED);
info->lock_type= F_UNLCK; info->lock_type= F_UNLCK;
DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
break; break;
case F_RDLCK: case F_RDLCK:
if (info->lock_type == F_WRLCK) if (info->lock_type == F_WRLCK)
......
...@@ -4263,7 +4263,7 @@ my_bool translog_write_record(LSN *lsn, ...@@ -4263,7 +4263,7 @@ my_bool translog_write_record(LSN *lsn,
if (share) if (share)
{ {
if (!share->base.transactional) if (!share->now_transactional)
{ {
DBUG_PRINT("info", ("It is not transactional table")); DBUG_PRINT("info", ("It is not transactional table"));
DBUG_RETURN(0); DBUG_RETURN(0);
...@@ -5614,6 +5614,16 @@ static my_bool write_hook_for_redo(enum translog_record_type type ...@@ -5614,6 +5614,16 @@ static my_bool write_hook_for_redo(enum translog_record_type type
struct st_translog_parts *parts struct st_translog_parts *parts
__attribute__ ((unused))) __attribute__ ((unused)))
{ {
/*
Users of dummy_transaction_object must keep this TRN clean as it
is used by many threads (like those manipulating non-transactional
tables). It might be dangerous if one user sets rec_lsn or some other
member and it is picked up by another user (like putting this rec_lsn into
a page of a non-transactional table); it's safer if all members stay 0. So
non-transactional log records (REPAIR, CREATE, RENAME, DROP) should not
call this hook; we trust them but verify ;)
*/
DBUG_ASSERT(trn->trid != 0);
/* /*
If the hook stays so simple, it would be faster to pass If the hook stays so simple, it would be faster to pass
!trn->rec_lsn ? trn->rec_lsn : some_dummy_lsn !trn->rec_lsn ? trn->rec_lsn : some_dummy_lsn
...@@ -5640,6 +5650,7 @@ static my_bool write_hook_for_undo(enum translog_record_type type ...@@ -5640,6 +5650,7 @@ static my_bool write_hook_for_undo(enum translog_record_type type
struct st_translog_parts *parts struct st_translog_parts *parts
__attribute__ ((unused))) __attribute__ ((unused)))
{ {
DBUG_ASSERT(trn->trid != 0); /* see write_hook_for_redo() */
trn->undo_lsn= *lsn; trn->undo_lsn= *lsn;
if (unlikely(LSN_WITH_FLAGS_TO_LSN(trn->first_undo_lsn) == 0)) if (unlikely(LSN_WITH_FLAGS_TO_LSN(trn->first_undo_lsn) == 0))
trn->first_undo_lsn= trn->first_undo_lsn=
......
...@@ -589,9 +589,17 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) ...@@ -589,9 +589,17 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
share->base.pack_bytes + share->base.pack_bytes +
test(share->options & HA_OPTION_CHECKSUM)); test(share->options & HA_OPTION_CHECKSUM));
if (open_flags & HA_OPEN_COPY) if (open_flags & HA_OPEN_COPY)
share->base.transactional= 0; /* Repair: no logging */
if (share->base.transactional)
{ {
/*
this instance will be a temporary one used just to create a data
file for REPAIR. Don't do logging. This base information will not go
to disk.
*/
share->base.born_transactional= FALSE;
}
if (share->base.born_transactional)
{
share->page_type= PAGECACHE_LSN_PAGE;
share->base_length+= TRANS_ROW_EXTRA_HEADER_SIZE; share->base_length+= TRANS_ROW_EXTRA_HEADER_SIZE;
if (unlikely((share->state.create_rename_lsn == (LSN)ULONGLONG_MAX) && if (unlikely((share->state.create_rename_lsn == (LSN)ULONGLONG_MAX) &&
(open_flags & HA_OPEN_FROM_SQL_LAYER))) (open_flags & HA_OPEN_FROM_SQL_LAYER)))
...@@ -604,11 +612,12 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags) ...@@ -604,11 +612,12 @@ MARIA_HA *maria_open(const char *name, int mode, uint open_flags)
_ma_update_create_rename_lsn_on_disk(share, TRUE); _ma_update_create_rename_lsn_on_disk(share, TRUE);
} }
} }
else
share->page_type= PAGECACHE_PLAIN_PAGE;
share->now_transactional= share->base.born_transactional;
share->base.default_rec_buff_size= max(share->base.pack_reclength, share->base.default_rec_buff_size= max(share->base.pack_reclength,
share->base.max_key_length); share->base.max_key_length);
share->page_type= (share->base.transactional ? PAGECACHE_LSN_PAGE :
PAGECACHE_PLAIN_PAGE);
if (share->data_file_type == DYNAMIC_RECORD) if (share->data_file_type == DYNAMIC_RECORD)
{ {
share->base.extra_rec_buff_size= share->base.extra_rec_buff_size=
...@@ -1124,7 +1133,7 @@ uint _ma_base_info_write(File file, MARIA_BASE_INFO *base) ...@@ -1124,7 +1133,7 @@ uint _ma_base_info_write(File file, MARIA_BASE_INFO *base)
*ptr++= base->key_reflength; *ptr++= base->key_reflength;
*ptr++= base->keys; *ptr++= base->keys;
*ptr++= base->auto_key; *ptr++= base->auto_key;
*ptr++= base->transactional; *ptr++= base->born_transactional;
*ptr++= 0; /* Reserved */ *ptr++= 0; /* Reserved */
mi_int2store(ptr,base->pack_bytes); ptr+= 2; mi_int2store(ptr,base->pack_bytes); ptr+= 2;
mi_int2store(ptr,base->blobs); ptr+= 2; mi_int2store(ptr,base->blobs); ptr+= 2;
...@@ -1167,7 +1176,7 @@ static byte *_ma_base_info_read(byte *ptr, MARIA_BASE_INFO *base) ...@@ -1167,7 +1176,7 @@ static byte *_ma_base_info_read(byte *ptr, MARIA_BASE_INFO *base)
base->key_reflength= *ptr++; base->key_reflength= *ptr++;
base->keys= *ptr++; base->keys= *ptr++;
base->auto_key= *ptr++; base->auto_key= *ptr++;
base->transactional= *ptr++; base->born_transactional= *ptr++;
ptr++; ptr++;
base->pack_bytes= mi_uint2korr(ptr); ptr+= 2; base->pack_bytes= mi_uint2korr(ptr); ptr+= 2;
base->blobs= mi_uint2korr(ptr); ptr+= 2; base->blobs= mi_uint2korr(ptr); ptr+= 2;
......
...@@ -3657,6 +3657,14 @@ static int flush_pagecache_blocks_int(PAGECACHE *pagecache, ...@@ -3657,6 +3657,14 @@ static int flush_pagecache_blocks_int(PAGECACHE *pagecache,
("changed_blocks") though it's still dirty (the flush by another ("changed_blocks") though it's still dirty (the flush by another
thread has not yet happened). Checkpoint will miss the page and so thread has not yet happened). Checkpoint will miss the page and so
must be blocked until that flush has happened. must be blocked until that flush has happened.
Note that if there are two concurrent
flush_pagecache_blocks_int() on this file, then the first one may
move the block into its first_in_switch, and the second one would
just not see the block and wrongly consider its job done.
@todo RECOVERY Maria does protect such flushes with intern_lock,
but Checkpoint does not (Checkpoint makes sure that
changed_blocks_is_incomplete is 0 when it starts, but as
flush_cached_blocks() releases mutex, this may change...
*/ */
/** /**
@todo RECOVERY: check all places where we remove a page from the @todo RECOVERY: check all places where we remove a page from the
......
...@@ -56,7 +56,13 @@ int maria_rename(const char *old_name, const char *new_name) ...@@ -56,7 +56,13 @@ int maria_rename(const char *old_name, const char *new_name)
raid_chunks = share->base.raid_chunks; raid_chunks = share->base.raid_chunks;
#endif #endif
sync_dir= (share->base.transactional && !share->temporary) ? /*
the renaming of an internal table to the final table (like in ALTER TABLE)
is the moment when this table receives its correct create_rename_lsn and
this is important; make sure transactionality has been re-enabled.
*/
DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
sync_dir= (share->now_transactional && !share->temporary) ?
MY_SYNC_DIR : 0; MY_SYNC_DIR : 0;
if (sync_dir) if (sync_dir)
{ {
......
...@@ -1033,7 +1033,7 @@ static int maria_chk(HA_CHECK *param, my_string filename) ...@@ -1033,7 +1033,7 @@ static int maria_chk(HA_CHECK *param, my_string filename)
know what the log's end LSN is now, so we just let the server know know what the log's end LSN is now, so we just let the server know
that it will have to find and store it. that it will have to find and store it.
*/ */
if (share->base.transactional) if (share->base.born_transactional)
share->state.create_rename_lsn= (LSN)ULONGLONG_MAX; share->state.create_rename_lsn= (LSN)ULONGLONG_MAX;
if ((param->testflag & (T_REP_BY_SORT | T_REP_PARALLEL)) && if ((param->testflag & (T_REP_BY_SORT | T_REP_PARALLEL)) &&
(maria_is_any_key_active(share->state.key_map) || (maria_is_any_key_active(share->state.key_map) ||
......
...@@ -171,8 +171,11 @@ typedef struct st_ma_base_info ...@@ -171,8 +171,11 @@ typedef struct st_ma_base_info
/* The following are from the header */ /* The following are from the header */
uint key_parts, all_key_parts; uint key_parts, all_key_parts;
/* If false, we disable logging, versioning, transaction etc */ /**
my_bool transactional; @brief If false, we disable logging, versioning, transaction etc. Observe
difference with MARIA_SHARE::now_transactional
*/
my_bool born_transactional;
} MARIA_BASE_INFO; } MARIA_BASE_INFO;
...@@ -306,6 +309,13 @@ typedef struct st_maria_share ...@@ -306,6 +309,13 @@ typedef struct st_maria_share
not_flushed, concurrent_insert; not_flushed, concurrent_insert;
my_bool delay_key_write; my_bool delay_key_write;
my_bool have_rtree; my_bool have_rtree;
/**
@brief if the table is transactional right now. It may have been created
transactional (base.born_transactional==TRUE) but with transactionality
(logging) temporarily disabled (now_transactional==FALSE). The opposite
(FALSE, TRUE) is impossible.
*/
my_bool now_transactional;
#ifdef THREAD #ifdef THREAD
THR_LOCK lock; THR_LOCK lock;
pthread_mutex_t intern_lock; /* Locking for use with _locking */ pthread_mutex_t intern_lock; /* Locking for use with _locking */
...@@ -891,7 +901,6 @@ MARIA_RECORD_POS _ma_write_init_default(MARIA_HA *info, const byte *record); ...@@ -891,7 +901,6 @@ MARIA_RECORD_POS _ma_write_init_default(MARIA_HA *info, const byte *record);
my_bool _ma_write_abort_default(MARIA_HA *info); my_bool _ma_write_abort_default(MARIA_HA *info);
C_MODE_START C_MODE_START
int _ma_repair_write_log_record(const HA_CHECK *param, MARIA_HA *info);
/* Functions needed by _ma_check (are overrided in MySQL) */ /* Functions needed by _ma_check (are overrided in MySQL) */
volatile int *_ma_killed_ptr(HA_CHECK *param); volatile int *_ma_killed_ptr(HA_CHECK *param);
void _ma_check_print_error _VARARGS((HA_CHECK *param, const char *fmt, ...)); void _ma_check_print_error _VARARGS((HA_CHECK *param, const char *fmt, ...));
...@@ -916,5 +925,10 @@ int _ma_initialize_data_file(MARIA_SHARE *share, File dfile); ...@@ -916,5 +925,10 @@ int _ma_initialize_data_file(MARIA_SHARE *share, File dfile);
int _ma_update_create_rename_lsn_on_disk(MARIA_SHARE *share, my_bool do_sync); int _ma_update_create_rename_lsn_on_disk(MARIA_SHARE *share, my_bool do_sync);
void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn); void _ma_unpin_all_pages(MARIA_HA *info, LSN undo_lsn);
#define _ma_tmp_disable_logging_for_table(S) \
{ (S)->now_transactional= FALSE; (S)->page_type= PAGECACHE_PLAIN_PAGE; }
#define _ma_reenable_logging_for_table(S) \
{ if (((S)->now_transactional= (S)->base.born_transactional)) \
(S)->page_type= PAGECACHE_LSN_PAGE; }
extern PAGECACHE *maria_log_pagecache; extern PAGECACHE *maria_log_pagecache;
...@@ -442,8 +442,11 @@ prototype_exec_hook(REDO_CREATE_TABLE) ...@@ -442,8 +442,11 @@ prototype_exec_hook(REDO_CREATE_TABLE)
info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR); info= maria_open(name, O_RDONLY, HA_OPEN_FOR_REPAIR);
if (info) if (info)
{ {
DBUG_ASSERT(info->s->reopen == 1); /* check that we're not using it */ MARIA_SHARE *share= info->s;
if (!info->s->base.transactional) /* check that we're not already using it */
DBUG_ASSERT(share->reopen == 1);
DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
if (!share->base.born_transactional)
{ {
/* /*
could be that transactional table was later dropped, and a non-trans could be that transactional table was later dropped, and a non-trans
...@@ -454,7 +457,7 @@ prototype_exec_hook(REDO_CREATE_TABLE) ...@@ -454,7 +457,7 @@ prototype_exec_hook(REDO_CREATE_TABLE)
DBUG_ASSERT(0); /* I want to know this */ DBUG_ASSERT(0); /* I want to know this */
goto end; goto end;
} }
if (cmp_translog_addr(info->s->state.create_rename_lsn, rec->lsn) >= 0) if (cmp_translog_addr(share->state.create_rename_lsn, rec->lsn) >= 0)
{ {
printf(", has create_rename_lsn (%lu,0x%lx) is more recent than record", printf(", has create_rename_lsn (%lu,0x%lx) is more recent than record",
(ulong) LSN_FILE_NO(rec->lsn), (ulong) LSN_FILE_NO(rec->lsn),
...@@ -551,6 +554,7 @@ prototype_exec_hook(FILE_ID) ...@@ -551,6 +554,7 @@ prototype_exec_hook(FILE_ID)
int error; int error;
char *name, *buff; char *name, *buff;
MARIA_HA *info= NULL; MARIA_HA *info= NULL;
MARIA_SHARE *share;
if (((buff= my_malloc(rec->record_length, MYF(MY_WME))) == NULL) || if (((buff= my_malloc(rec->record_length, MYF(MY_WME))) == NULL) ||
(translog_read_record(rec->lsn, 0, rec->record_length, buff, NULL) != (translog_read_record(rec->lsn, 0, rec->record_length, buff, NULL) !=
rec->record_length)) rec->record_length))
...@@ -566,7 +570,7 @@ prototype_exec_hook(FILE_ID) ...@@ -566,7 +570,7 @@ prototype_exec_hook(FILE_ID)
{ {
printf(", closing table '%s'", info->s->open_file_name); printf(", closing table '%s'", info->s->open_file_name);
all_tables[sid]= NULL; all_tables[sid]= NULL;
info->s->base.transactional= TRUE; /* put back the truth */ _ma_reenable_logging_for_table(info->s); /* put back the truth */
if (maria_close(info)) if (maria_close(info))
{ {
fprintf(stderr, "Failed to close table\n"); fprintf(stderr, "Failed to close table\n");
...@@ -586,19 +590,19 @@ prototype_exec_hook(FILE_ID) ...@@ -586,19 +590,19 @@ prototype_exec_hook(FILE_ID)
fprintf(stderr, "Table is crashed, can't apply log records to it\n"); fprintf(stderr, "Table is crashed, can't apply log records to it\n");
goto err; goto err;
} }
DBUG_ASSERT(info->s->reopen == 1); /* should always be only one instance */ share= info->s;
if (!info->s->base.transactional) /* check that we're not already using it */
DBUG_ASSERT(share->reopen == 1);
DBUG_ASSERT(share->now_transactional == share->base.born_transactional);
if (!share->base.born_transactional)
{ {
printf(", is not transactional\n"); printf(", is not transactional\n");
DBUG_ASSERT(0); /* I want to know this */ DBUG_ASSERT(0); /* I want to know this */
goto end; goto end;
} }
all_tables[sid]= info; all_tables[sid]= info;
/* /* don't log any records for this work */
don't log any records for this work. TODO make sure this variable does not _ma_tmp_disable_logging_for_table(share);
go to disk before we restore it to its true value.
*/
info->s->base.transactional= FALSE;
printf(", opened\n"); printf(", opened\n");
error= 0; error= 0;
goto end; goto end;
...@@ -742,7 +746,10 @@ static void end_of_redo_phase() ...@@ -742,7 +746,10 @@ static void end_of_redo_phase()
{ {
MARIA_HA *info= all_tables[sid]; MARIA_HA *info= all_tables[sid];
if (info != NULL) if (info != NULL)
{
_ma_reenable_logging_for_table(info->s); /* put back the truth */
maria_close(info); maria_close(info);
}
} }
} }
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment