Commit ced74d24 authored by unknown's avatar unknown

Don't use subtransactions on write_row() and delete_row() in

ha_berkeley.
A few smaller bug fixes.


sql/ha_berkeley.cc:
  - Don't use subtransactions on write_row() and delete_row()
  - Bug fixes for sharing DB *file
sql/ha_berkeley.h:
  - Don't use subtransactions on write_row() and delete_row()
  - Bug fixes for sharing DB *file
sql/ha_myisam.cc:
  - fix compiler warning
sql/log_event.cc:
  - fix compiler warning
parent 03cd236f
...@@ -99,7 +99,7 @@ static void berkeley_print_error(const char *db_errpfx, char *buffer); ...@@ -99,7 +99,7 @@ static void berkeley_print_error(const char *db_errpfx, char *buffer);
static byte* bdb_get_key(BDB_SHARE *share,uint *length, static byte* bdb_get_key(BDB_SHARE *share,uint *length,
my_bool not_used __attribute__((unused))); my_bool not_used __attribute__((unused)));
static BDB_SHARE *get_share(const char *table_name, TABLE *table); static BDB_SHARE *get_share(const char *table_name, TABLE *table);
static int free_share(BDB_SHARE *share, TABLE *table); static int free_share(BDB_SHARE *share, TABLE *table, uint hidden_primary_key);
static int write_status(DB *status_block, char *buff, uint length); static int write_status(DB *status_block, char *buff, uint length);
static void update_status(BDB_SHARE *share, TABLE *table); static void update_status(BDB_SHARE *share, TABLE *table);
static void berkeley_noticecall(DB_ENV *db_env, db_notices notice); static void berkeley_noticecall(DB_ENV *db_env, db_notices notice);
...@@ -465,7 +465,7 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked) ...@@ -465,7 +465,7 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked)
{ {
if ((error=db_create(&file, db_env, 0))) if ((error=db_create(&file, db_env, 0)))
{ {
free_share(share,table); free_share(share,table, hidden_primary_key);
my_free(rec_buff,MYF(0)); my_free(rec_buff,MYF(0));
my_free(alloc_ptr,MYF(0)); my_free(alloc_ptr,MYF(0));
my_errno=error; my_errno=error;
...@@ -482,7 +482,7 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked) ...@@ -482,7 +482,7 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked)
2 | 4), 2 | 4),
"main", DB_BTREE, open_mode,0)))) "main", DB_BTREE, open_mode,0))))
{ {
free_share(share,table); free_share(share,table, hidden_primary_key);
my_free(rec_buff,MYF(0)); my_free(rec_buff,MYF(0));
my_free(alloc_ptr,MYF(0)); my_free(alloc_ptr,MYF(0));
my_errno=error; my_errno=error;
...@@ -550,12 +550,11 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked) ...@@ -550,12 +550,11 @@ int ha_berkeley::open(const char *name, int mode, uint test_if_locked)
int ha_berkeley::close(void) int ha_berkeley::close(void)
{ {
uint keys=table->keys + test(hidden_primary_key);
DBUG_ENTER("ha_berkeley::close"); DBUG_ENTER("ha_berkeley::close");
my_free(rec_buff,MYF(MY_ALLOW_ZERO_PTR)); my_free(rec_buff,MYF(MY_ALLOW_ZERO_PTR));
my_free(alloc_ptr,MYF(MY_ALLOW_ZERO_PTR)); my_free(alloc_ptr,MYF(MY_ALLOW_ZERO_PTR));
DBUG_RETURN(free_share(share,table)); DBUG_RETURN(free_share(share,table, hidden_primary_key));
} }
...@@ -796,18 +795,23 @@ int ha_berkeley::write_row(byte * record) ...@@ -796,18 +795,23 @@ int ha_berkeley::write_row(byte * record)
} }
else else
{ {
DB_TXN *sub_trans = transaction;
ulong thd_options = table->in_use->options;
for (uint retry=0 ; retry < berkeley_trans_retry ; retry++) for (uint retry=0 ; retry < berkeley_trans_retry ; retry++)
{ {
uint keynr; key_map changed_keys = 0;
DB_TXN *sub_trans; if (using_ignore && (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
{
if ((error=txn_begin(db_env, transaction, &sub_trans, 0))) if ((error=txn_begin(db_env, transaction, &sub_trans, 0)))
break; break;
DBUG_PRINT("trans",("starting subtransaction")); DBUG_PRINT("trans",("starting subtransaction"));
}
if (!(error=file->put(file, sub_trans, create_key(&prim_key, primary_key, if (!(error=file->put(file, sub_trans, create_key(&prim_key, primary_key,
key_buff, record), key_buff, record),
&row, key_type[primary_key]))) &row, key_type[primary_key])))
{ {
for (keynr=0 ; keynr < table->keys ; keynr++) changed_keys |= (key_map) 1 << primary_key;
for (uint keynr=0 ; keynr < table->keys ; keynr++)
{ {
if (keynr == primary_key) if (keynr == primary_key)
continue; continue;
...@@ -819,27 +823,48 @@ int ha_berkeley::write_row(byte * record) ...@@ -819,27 +823,48 @@ int ha_berkeley::write_row(byte * record)
last_dup_key=keynr; last_dup_key=keynr;
break; break;
} }
changed_keys |= (key_map) 1 << keynr;
} }
} }
else else
last_dup_key=primary_key; last_dup_key=primary_key;
if (!error) if (error)
{
DBUG_PRINT("trans",("committing subtransaction"));
error=txn_commit(sub_trans, 0);
}
else
{ {
/* Remove inserted row */ /* Remove inserted row */
int new_error;
DBUG_PRINT("error",("Got error %d",error)); DBUG_PRINT("error",("Got error %d",error));
if (using_ignore)
{
int new_error = 0;
if (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS)
{
DBUG_PRINT("trans",("aborting subtransaction")); DBUG_PRINT("trans",("aborting subtransaction"));
if ((new_error=txn_abort(sub_trans))) new_error=txn_abort(sub_trans);
}
else if (changed_keys)
{
new_error = 0;
for (uint keynr=0; changed_keys; keynr++, changed_keys >>= 1)
{
if (changed_keys & 1)
{
if ((new_error = remove_key(sub_trans, keynr, record,
(DBT*) 0, &prim_key)))
break;
}
}
}
if (new_error)
{ {
error=new_error; // This shouldn't happen error=new_error; // This shouldn't happen
break; break;
} }
} }
}
else if (using_ignore && (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
{
DBUG_PRINT("trans",("committing subtransaction"));
error=txn_commit(sub_trans, 0);
}
if (error != DB_LOCK_DEADLOCK) if (error != DB_LOCK_DEADLOCK)
break; break;
} }
...@@ -1126,7 +1151,7 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row) ...@@ -1126,7 +1151,7 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row)
packed_record may be NULL if the key is unique packed_record may be NULL if the key is unique
*/ */
int ha_berkeley::remove_key(DB_TXN *sub_trans, uint keynr, const byte *record, int ha_berkeley::remove_key(DB_TXN *trans, uint keynr, const byte *record,
DBT *packed_record, DBT *packed_record,
DBT *prim_key) DBT *prim_key)
{ {
...@@ -1139,7 +1164,7 @@ int ha_berkeley::remove_key(DB_TXN *sub_trans, uint keynr, const byte *record, ...@@ -1139,7 +1164,7 @@ int ha_berkeley::remove_key(DB_TXN *sub_trans, uint keynr, const byte *record,
HA_NOSAME || keynr == primary_key) HA_NOSAME || keynr == primary_key)
{ // Unique key { // Unique key
dbug_assert(keynr == primary_key || prim_key->data != key_buff2); dbug_assert(keynr == primary_key || prim_key->data != key_buff2);
error=key_file[keynr]->del(key_file[keynr], sub_trans, error=key_file[keynr]->del(key_file[keynr], trans,
keynr == primary_key ? keynr == primary_key ?
prim_key : prim_key :
create_key(&key, keynr, key_buff2, record), create_key(&key, keynr, key_buff2, record),
...@@ -1154,7 +1179,7 @@ int ha_berkeley::remove_key(DB_TXN *sub_trans, uint keynr, const byte *record, ...@@ -1154,7 +1179,7 @@ int ha_berkeley::remove_key(DB_TXN *sub_trans, uint keynr, const byte *record,
*/ */
dbug_assert(keynr != primary_key && prim_key->data != key_buff2); dbug_assert(keynr != primary_key && prim_key->data != key_buff2);
DBC *tmp_cursor; DBC *tmp_cursor;
if (!(error=file->cursor(key_file[keynr], sub_trans, &tmp_cursor, 0))) if (!(error=file->cursor(key_file[keynr], trans, &tmp_cursor, 0)))
{ {
if (!(error=cursor->c_get(tmp_cursor, if (!(error=cursor->c_get(tmp_cursor,
(keynr == primary_key ? (keynr == primary_key ?
...@@ -1178,10 +1203,10 @@ int ha_berkeley::remove_key(DB_TXN *sub_trans, uint keynr, const byte *record, ...@@ -1178,10 +1203,10 @@ int ha_berkeley::remove_key(DB_TXN *sub_trans, uint keynr, const byte *record,
/* Delete all keys for new_record */ /* Delete all keys for new_record */
int ha_berkeley::remove_keys(DB_TXN *trans, const byte *record, int ha_berkeley::remove_keys(DB_TXN *trans, const byte *record,
DBT *new_record, DBT *prim_key, key_map keys, DBT *new_record, DBT *prim_key, key_map keys)
int result)
{ {
for (uint keynr=0 ; keys ;keynr++, keys>>=1) int result = 0;
for (uint keynr=0; keys; keynr++, keys>>=1)
{ {
if (keys & 1) if (keys & 1)
{ {
...@@ -1189,7 +1214,6 @@ int ha_berkeley::remove_keys(DB_TXN *trans, const byte *record, ...@@ -1189,7 +1214,6 @@ int ha_berkeley::remove_keys(DB_TXN *trans, const byte *record,
if (new_error) if (new_error)
{ {
result=new_error; // Return last error result=new_error; // Return last error
if (trans)
break; // Let rollback correct things break; // Let rollback correct things
} }
} }
...@@ -1203,6 +1227,7 @@ int ha_berkeley::delete_row(const byte * record) ...@@ -1203,6 +1227,7 @@ int ha_berkeley::delete_row(const byte * record)
int error; int error;
DBT row, prim_key; DBT row, prim_key;
key_map keys=table->keys_in_use; key_map keys=table->keys_in_use;
ulong thd_options = table->in_use->options;
DBUG_ENTER("delete_row"); DBUG_ENTER("delete_row");
statistic_increment(ha_delete_count,&LOCK_status); statistic_increment(ha_delete_count,&LOCK_status);
...@@ -1212,24 +1237,30 @@ int ha_berkeley::delete_row(const byte * record) ...@@ -1212,24 +1237,30 @@ int ha_berkeley::delete_row(const byte * record)
if (hidden_primary_key) if (hidden_primary_key)
keys|= (key_map) 1 << primary_key; keys|= (key_map) 1 << primary_key;
/* Subtransactions may be used in order to retry the delete in
case we get a DB_LOCK_DEADLOCK error. */
DB_TXN *sub_trans = transaction;
for (uint retry=0 ; retry < berkeley_trans_retry ; retry++) for (uint retry=0 ; retry < berkeley_trans_retry ; retry++)
{ {
DB_TXN *sub_trans; if (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS)
{
if ((error=txn_begin(db_env, transaction, &sub_trans, 0))) if ((error=txn_begin(db_env, transaction, &sub_trans, 0)))
break; break;
DBUG_PRINT("trans",("starting sub transaction")); DBUG_PRINT("trans",("starting sub transaction"));
if (!error) }
error=remove_keys(sub_trans, record, &row, &prim_key, keys,0); error=remove_keys(sub_trans, record, &row, &prim_key, keys);
if (!error) if (!error && (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
{ {
DBUG_PRINT("trans",("ending sub transaction")); DBUG_PRINT("trans",("ending sub transaction"));
error=txn_commit(sub_trans, 0); error=txn_commit(sub_trans, 0);
} }
if (error) if (error)
{
DBUG_PRINT("error",("Got error %d",error));
if (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS)
{ {
/* retry */ /* retry */
int new_error; int new_error;
DBUG_PRINT("error",("Got error %d",error));
DBUG_PRINT("trans",("aborting subtransaction")); DBUG_PRINT("trans",("aborting subtransaction"));
if ((new_error=txn_abort(sub_trans))) if ((new_error=txn_abort(sub_trans)))
{ {
...@@ -1237,6 +1268,9 @@ int ha_berkeley::delete_row(const byte * record) ...@@ -1237,6 +1268,9 @@ int ha_berkeley::delete_row(const byte * record)
break; break;
} }
} }
else
break; // No retry - return error
}
if (error != DB_LOCK_DEADLOCK) if (error != DB_LOCK_DEADLOCK)
break; break;
} }
...@@ -2058,16 +2092,17 @@ static BDB_SHARE *get_share(const char *table_name, TABLE *table) ...@@ -2058,16 +2092,17 @@ static BDB_SHARE *get_share(const char *table_name, TABLE *table)
return share; return share;
} }
static int free_share(BDB_SHARE *share, TABLE *table) static int free_share(BDB_SHARE *share, TABLE *table, uint hidden_primary_key)
{ {
int error, result = 0; int error, result = 0;
uint keys=table->keys + test(hidden_primary_key);
pthread_mutex_lock(&bdb_mutex); pthread_mutex_lock(&bdb_mutex);
if (!--share->use_count) if (!--share->use_count)
{ {
DB **key_file = share->key_file; DB **key_file = share->key_file;
update_status(share,table); update_status(share,table);
/* this does share->file->close() implicitly */ /* this does share->file->close() implicitly */
for (uint i=0; i < table->keys; i++) for (uint i=0; i < keys; i++)
{ {
if (key_file[i] && (error=key_file[i]->close(key_file[i],0))) if (key_file[i] && (error=key_file[i]->close(key_file[i],0)))
result=error; result=error;
......
...@@ -70,7 +70,7 @@ class ha_berkeley: public handler ...@@ -70,7 +70,7 @@ class ha_berkeley: public handler
int remove_key(DB_TXN *trans, uint keynr, const byte *record, int remove_key(DB_TXN *trans, uint keynr, const byte *record,
DBT *packed_record, DBT *prim_key); DBT *packed_record, DBT *prim_key);
int remove_keys(DB_TXN *trans,const byte *record, DBT *new_record, int remove_keys(DB_TXN *trans,const byte *record, DBT *new_record,
DBT *prim_key, key_map keys, int result); DBT *prim_key, key_map keys);
int restore_keys(DB_TXN *trans, key_map changed_keys, uint primary_key, int restore_keys(DB_TXN *trans, key_map changed_keys, uint primary_key,
const byte *old_row, DBT *old_key, const byte *old_row, DBT *old_key,
const byte *new_row, DBT *new_key, const byte *new_row, DBT *new_key,
......
...@@ -1103,7 +1103,7 @@ int ha_myisam::ft_read(byte * buf) ...@@ -1103,7 +1103,7 @@ int ha_myisam::ft_read(byte * buf)
thread_safe_increment(ha_read_next_count,&LOCK_status); // why ? thread_safe_increment(ha_read_next_count,&LOCK_status); // why ?
if (error=ft_read_next((FT_DOCLIST *) ft_handler,(char*) buf)) if ((error=ft_read_next((FT_DOCLIST *) ft_handler,(char*) buf)))
ft_handler=NULL; // Magic here ! See Item_func_match::val() ft_handler=NULL; // Magic here ! See Item_func_match::val()
// and ha_myisam::index_init() // and ha_myisam::index_init()
table->status=error ? STATUS_NOT_FOUND: 0; table->status=error ? STATUS_NOT_FOUND: 0;
......
...@@ -624,7 +624,7 @@ void Load_log_event::print(FILE* file, bool short_form) ...@@ -624,7 +624,7 @@ void Load_log_event::print(FILE* file, bool short_form)
if (!short_form) if (!short_form)
{ {
print_header(file); print_header(file);
fprintf(file, "\tQuery\tthread_id=%d\texec_time=%ld\n", fprintf(file, "\tQuery\tthread_id=%ld\texec_time=%ld\n",
thread_id, exec_time); thread_id, exec_time);
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment