Commit 1d9c696b authored by unknown's avatar unknown

ha_berkeley.cc - Don't use subtransactions in update_row by default

ha_berkeley.cc	- Use the DB_RMW (read, modify, write) flag when removing a key
ha_berkeley.h	- Don't use subtransactions in update_row by default
sql_table.cc	- Don't use subtransactions in update_row by default
bdb.result	BitKeeper file /usr/home/tim/my/work/mysql-test/r/bdb.result
bdb.test	BitKeeper file /usr/home/tim/my/work/mysql-test/t/bdb.test


sql/ha_berkeley.cc:
  - Don't use subtransactions in update_row by default
  - Use the DB_RMW (read, modify, write) flag when removing a key
sql/ha_berkeley.h:
  - Don't use subtransactions in update_row by default
sql/sql_table.cc:
  - Don't use subtransactions in update_row by default
parent 5f99f02a
id code name
1 1 Tim
2 1 Monty
3 2 David
4 2 Erik
5 3 Sasha
6 3 Jeremy
7 4 Matt
id code name
2 1 Monty
3 2 David
4 2 Erik
5 3 Sasha
6 3 Jeremy
7 4 Matt
8 1 Sinisa
id code name
3 2 David
4 2 Erik
5 3 Sasha
6 3 Jeremy
7 4 Matt
8 1 Sinisa
12 1 Ralph
drop table if exists t1;
create table t1 (id int unsigned not null auto_increment, code tinyint unsigned not null, name char(20) not null, primary key (id), key (code), unique (name)) type=bdb;
insert into t1 (code, name) values (1, 'Tim'), (1, 'Monty'), (2, 'David'), (2, 'Erik'), (3, 'Sasha'), (3, 'Jeremy'), (4, 'Matt');
select id, code, name from t1 order by id;
update ignore t1 set id = 8, name = 'Sinisa' where id < 3;
select id, code, name from t1 order by id;
update ignore t1 set id = id + 10, name = 'Ralph' where id < 4;
select id, code, name from t1 order by id;
drop table t1;
...@@ -889,13 +889,15 @@ int ha_berkeley::key_cmp(uint keynr, const byte * old_row, ...@@ -889,13 +889,15 @@ int ha_berkeley::key_cmp(uint keynr, const byte * old_row,
/* /*
Update a row from one value to another. Update a row from one value to another.
Clobbers key_buff2
*/ */
int ha_berkeley::update_primary_key(DB_TXN *trans, bool primary_key_changed, int ha_berkeley::update_primary_key(DB_TXN *trans, bool primary_key_changed,
const byte * old_row, const byte * old_row, DBT *old_key,
const byte * new_row, DBT *prim_key) const byte * new_row, DBT *new_key,
ulong thd_options, bool local_using_ignore)
{ {
DBT row, old_key; DBT row;
int error; int error;
DBUG_ENTER("update_primary_key"); DBUG_ENTER("update_primary_key");
...@@ -903,37 +905,83 @@ int ha_berkeley::update_primary_key(DB_TXN *trans, bool primary_key_changed, ...@@ -903,37 +905,83 @@ int ha_berkeley::update_primary_key(DB_TXN *trans, bool primary_key_changed,
{ {
// Primary key changed or we are updating a key that can have duplicates. // Primary key changed or we are updating a key that can have duplicates.
// Delete the old row and add a new one // Delete the old row and add a new one
create_key(&old_key, primary_key, key_buff2, old_row); if (!(error=remove_key(trans, primary_key, old_row, (DBT *) 0, old_key)))
if ((error=remove_key(trans, primary_key, old_row, (DBT *) 0, &old_key)))
DBUG_RETURN(error); // This should always succeed
if ((error=pack_row(&row, new_row, 0)))
{
// Out of memory (this shouldn't happen!)
(void) file->put(file, trans, &old_key, &row,
key_type[primary_key]);
DBUG_RETURN(error);
}
// Write new key
if ((error=file->put(file, trans, prim_key, &row, key_type[primary_key])))
{ {
// Probably a duplicated key; Return the error and let the caller if (!(error=pack_row(&row, new_row, 0)))
// abort. {
last_dup_key=primary_key; if ((error=file->put(file, trans, new_key, &row,
DBUG_RETURN(error); key_type[primary_key])))
{
// Probably a duplicated key; restore old key and row if needed
last_dup_key=primary_key;
if (local_using_ignore &&
!(thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
{
int new_error;
if ((new_error=pack_row(&row, old_row, 0)) ||
(new_error=file->put(file, trans, old_key, &row,
key_type[primary_key])))
error=new_error; // fatal error
}
}
}
} }
} }
else else
{ {
// Primary key didn't change; just update the row data // Primary key didn't change; just update the row data
if ((error=pack_row(&row, new_row, 0))) if (!(error=pack_row(&row, new_row, 0)))
DBUG_RETURN(error); error=file->put(file, trans, new_key, &row, 0);
error=file->put(file, trans, prim_key, &row, 0);
if (error)
DBUG_RETURN(error); // Fatal error
} }
DBUG_RETURN(0); DBUG_RETURN(error);
} }
/*
Restore changed keys, when a non-fatal error aborts the insert/update
of one row.
Clobbers keybuff2
*/
int ha_berkeley::restore_keys(DB_TXN *trans, key_map changed_keys,
uint primary_key,
const byte *old_row, DBT *old_key,
const byte *new_row, DBT *new_key,
ulong thd_options)
{
int error;
DBT tmp_key;
DBUG_ENTER("restore_keys");
/* Restore the old primary key, and the old row, but don't ignore
duplicate key failure */
if ((error=update_primary_key(trans, TRUE, new_row, new_key,
old_row, old_key, thd_options, FALSE)))
goto err;
/* Remove the new key, and put back the old key
changed_keys is a map of all non-primary keys that need to be
rolled back. The last key set in changed_keys is the one that
triggered the duplicate key error (it wasn't inserted), so for
that one just put back the old value. */
for (uint keynr=0; changed_keys; keynr++, changed_keys >>= 1)
{
if (changed_keys & 1)
{
if (changed_keys != 1 &&
(error = remove_key(trans, keynr, new_row, (DBT*) 0, new_key)))
break;
if ((error = key_file[keynr]->put(key_file[keynr], trans,
create_key(&tmp_key, keynr, key_buff2,
old_row),
old_key, key_type[keynr])))
break;
}
}
err:
dbug_assert(error != DB_KEYEXIST);
DBUG_RETURN(error);
}
int ha_berkeley::update_row(const byte * old_row, byte * new_row) int ha_berkeley::update_row(const byte * old_row, byte * new_row)
...@@ -941,6 +989,7 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row) ...@@ -941,6 +989,7 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row)
DBT prim_key, key, old_prim_key; DBT prim_key, key, old_prim_key;
int error; int error;
DB_TXN *sub_trans; DB_TXN *sub_trans;
ulong thd_options = table->in_use->options;
bool primary_key_changed; bool primary_key_changed;
DBUG_ENTER("update_row"); DBUG_ENTER("update_row");
...@@ -966,15 +1015,22 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row) ...@@ -966,15 +1015,22 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row)
old_prim_key=prim_key; old_prim_key=prim_key;
} }
sub_trans = transaction;
LINT_INIT(error); LINT_INIT(error);
for (uint retry=0 ; retry < berkeley_trans_retry ; retry++) for (uint retry=0 ; retry < berkeley_trans_retry ; retry++)
{ {
if ((error=txn_begin(db_env, transaction, &sub_trans, 0))) key_map changed_keys = 0;
break; if (using_ignore && (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
DBUG_PRINT("trans",("starting subtransaction")); {
if ((error=txn_begin(db_env, transaction, &sub_trans, 0)))
break;
DBUG_PRINT("trans",("starting subtransaction"));
}
/* Start by updating the primary key */ /* Start by updating the primary key */
if (!(error=update_primary_key(sub_trans, primary_key_changed, if (!(error=update_primary_key(sub_trans, primary_key_changed,
old_row, new_row, &prim_key))) old_row, &old_prim_key,
new_row, &prim_key,
thd_options, using_ignore)))
{ {
// Update all other keys // Update all other keys
for (uint keynr=0 ; keynr < table->keys ; keynr++) for (uint keynr=0 ; keynr < table->keys ; keynr++)
...@@ -984,10 +1040,23 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row) ...@@ -984,10 +1040,23 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row)
if (key_cmp(keynr, old_row, new_row) || primary_key_changed) if (key_cmp(keynr, old_row, new_row) || primary_key_changed)
{ {
if ((error=remove_key(sub_trans, keynr, old_row, (DBT*) 0, if ((error=remove_key(sub_trans, keynr, old_row, (DBT*) 0,
&old_prim_key)) || &old_prim_key)))
(error=key_file[keynr]->put(key_file[keynr], sub_trans, {
if (using_ignore &&
(thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
{
int new_error;
DBUG_PRINT("trans",("aborting subtransaction"));
new_error=txn_abort(sub_trans);
if (new_error)
error = new_error;
}
DBUG_RETURN(error); // Fatal error
}
changed_keys |= (key_map)1 << keynr;
if ((error=key_file[keynr]->put(key_file[keynr], sub_trans,
create_key(&key, keynr, key_buff2, create_key(&key, keynr, key_buff2,
new_row), new_row),
&prim_key, key_type[keynr]))) &prim_key, key_type[keynr])))
{ {
last_dup_key=keynr; last_dup_key=keynr;
...@@ -996,22 +1065,50 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row) ...@@ -996,22 +1065,50 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row)
} }
} }
} }
if (!error) if (error)
{
DBUG_PRINT("trans",("committing subtransaction"));
error=txn_commit(sub_trans, 0);
}
else
{ {
/* Remove inserted row */ /* Remove inserted row */
DBUG_PRINT("error",("Got error %d",error));
if (using_ignore)
{
int new_error = 0;
if (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS)
{
DBUG_PRINT("trans",("aborting subtransaction"));
new_error=txn_abort(sub_trans);
}
else if (changed_keys)
new_error=restore_keys(transaction, changed_keys, primary_key,
old_row, &old_prim_key, new_row, &prim_key,
thd_options);
if (new_error)
{
error=new_error; // This shouldn't happen
break;
}
}
#ifdef BROKEN_CODE_HERE
int new_error; int new_error;
DBUG_PRINT("error",("Got error %d",error)); DBUG_PRINT("error",("Got error %d",error));
DBUG_PRINT("trans",("aborting subtransaction")); if (using_ignore && (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
if ((new_error=txn_abort(sub_trans))) {
DBUG_PRINT("trans",("aborting subtransaction"));
new_error=txn_abort(sub_trans);
}
else if (changed_keys)
new_error=restore_keys(changed_keys, primary_key,
old_row, old_prim_key, new_row, prim_key);
if (new_error)
{ {
error=new_error; // This shouldn't happen error=new_error; // This shouldn't happen
break; break;
} }
#endif
}
else if (using_ignore && (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
{
DBUG_PRINT("trans",("committing subtransaction"));
error=txn_commit(sub_trans, 0);
} }
if (error != DB_LOCK_DEADLOCK) if (error != DB_LOCK_DEADLOCK)
break; break;
...@@ -1065,7 +1162,7 @@ int ha_berkeley::remove_key(DB_TXN *sub_trans, uint keynr, const byte *record, ...@@ -1065,7 +1162,7 @@ int ha_berkeley::remove_key(DB_TXN *sub_trans, uint keynr, const byte *record,
create_key(&key, keynr, key_buff2, record)), create_key(&key, keynr, key_buff2, record)),
(keynr == primary_key ? (keynr == primary_key ?
packed_record : prim_key), packed_record : prim_key),
DB_GET_BOTH))) DB_GET_BOTH | DB_RMW)))
{ // This shouldn't happen { // This shouldn't happen
error=tmp_cursor->c_del(tmp_cursor,0); error=tmp_cursor->c_del(tmp_cursor,0);
} }
...@@ -1401,9 +1498,7 @@ void ha_berkeley::position(const byte *record) ...@@ -1401,9 +1498,7 @@ void ha_berkeley::position(const byte *record)
{ {
DBT key; DBT key;
if (hidden_primary_key) if (hidden_primary_key)
{
memcpy_fixed(ref, (char*) current_ident, BDB_HIDDEN_PRIMARY_KEY_LENGTH); memcpy_fixed(ref, (char*) current_ident, BDB_HIDDEN_PRIMARY_KEY_LENGTH);
}
else else
create_key(&key, primary_key, ref, record); create_key(&key, primary_key, ref, record);
} }
...@@ -1438,6 +1533,7 @@ int ha_berkeley::extra(enum ha_extra_function operation) ...@@ -1438,6 +1533,7 @@ int ha_berkeley::extra(enum ha_extra_function operation)
case HA_EXTRA_RESET: case HA_EXTRA_RESET:
case HA_EXTRA_RESET_STATE: case HA_EXTRA_RESET_STATE:
key_read=0; key_read=0;
using_ignore=0;
break; break;
case HA_EXTRA_KEYREAD: case HA_EXTRA_KEYREAD:
key_read=1; // Query satisfied with key key_read=1; // Query satisfied with key
...@@ -1445,6 +1541,12 @@ int ha_berkeley::extra(enum ha_extra_function operation) ...@@ -1445,6 +1541,12 @@ int ha_berkeley::extra(enum ha_extra_function operation)
case HA_EXTRA_NO_KEYREAD: case HA_EXTRA_NO_KEYREAD:
key_read=0; key_read=0;
break; break;
case HA_EXTRA_IGNORE_DUP_KEY:
using_ignore=1;
break;
case HA_EXTRA_NO_IGNORE_DUP_KEY:
using_ignore=0;
break;
default: default:
break; break;
} }
...@@ -1548,6 +1650,8 @@ THR_LOCK_DATA **ha_berkeley::store_lock(THD *thd, THR_LOCK_DATA **to, ...@@ -1548,6 +1650,8 @@ THR_LOCK_DATA **ha_berkeley::store_lock(THD *thd, THR_LOCK_DATA **to,
!thd->in_lock_tables) !thd->in_lock_tables)
lock_type = TL_WRITE_ALLOW_WRITE; lock_type = TL_WRITE_ALLOW_WRITE;
lock.type=lock_type; lock.type=lock_type;
lock_on_read= ((table->reginfo.lock_type > TL_WRITE_ALLOW_READ) ? DB_RMW :
0);
} }
*to++= &lock; *to++= &lock;
return to; return to;
...@@ -1658,7 +1762,7 @@ int ha_berkeley::delete_table(const char *name) ...@@ -1658,7 +1762,7 @@ int ha_berkeley::delete_table(const char *name)
double ha_berkeley::scan_time() double ha_berkeley::scan_time()
{ {
return records/3; return records/3;
} }
ha_rows ha_berkeley::records_in_range(int keynr, ha_rows ha_berkeley::records_in_range(int keynr,
const byte *start_key,uint start_key_len, const byte *start_key,uint start_key_len,
...@@ -1811,7 +1915,7 @@ int ha_berkeley::analyze(THD* thd, HA_CHECK_OPT* check_opt) ...@@ -1811,7 +1915,7 @@ int ha_berkeley::analyze(THD* thd, HA_CHECK_OPT* check_opt)
stat=0; stat=0;
} }
if (file->stat(file, (void*) &stat, 0, 0)) if (file->stat(file, (void*) &stat, 0, 0))
goto err; goto err;
} }
pthread_mutex_lock(&share->mutex); pthread_mutex_lock(&share->mutex);
share->rows=stat->bt_ndata; share->rows=stat->bt_ndata;
......
...@@ -70,10 +70,15 @@ class ha_berkeley: public handler ...@@ -70,10 +70,15 @@ class ha_berkeley: public handler
DBT *packed_record, DBT *prim_key); DBT *packed_record, DBT *prim_key);
int remove_keys(DB_TXN *trans,const byte *record, DBT *new_record, int remove_keys(DB_TXN *trans,const byte *record, DBT *new_record,
DBT *prim_key, key_map keys, int result); DBT *prim_key, key_map keys, int result);
int restore_keys(DB_TXN *trans, key_map changed_keys, uint primary_key,
const byte *old_row, DBT *old_key,
const byte *new_row, DBT *new_key,
ulong thd_options);
int key_cmp(uint keynr, const byte * old_row, const byte * new_row); int key_cmp(uint keynr, const byte * old_row, const byte * new_row);
int update_primary_key(DB_TXN *trans, bool primary_key_changed, int update_primary_key(DB_TXN *trans, bool primary_key_changed,
const byte * old_row, const byte * new_row, const byte * old_row, DBT *old_key,
DBT *prim_key); const byte * new_row, DBT *prim_key,
ulong thd_options, bool local_using_ignore);
int read_row(int error, char *buf, uint keynr, DBT *row, DBT *key, bool); int read_row(int error, char *buf, uint keynr, DBT *row, DBT *key, bool);
DBT *get_pos(DBT *to, byte *pos); DBT *get_pos(DBT *to, byte *pos);
......
...@@ -1658,7 +1658,8 @@ copy_data_between_tables(TABLE *from,TABLE *to, ...@@ -1658,7 +1658,8 @@ copy_data_between_tables(TABLE *from,TABLE *to,
}; };
init_read_record(&info, thd, from, (SQL_SELECT *) 0, 1,1); init_read_record(&info, thd, from, (SQL_SELECT *) 0, 1,1);
if (handle_duplicates == DUP_IGNORE) if (handle_duplicates == DUP_IGNORE ||
handle_duplicates == DUP_REPLACE)
to->file->extra(HA_EXTRA_IGNORE_DUP_KEY); to->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
next_field=to->next_number_field; next_field=to->next_number_field;
while (!(error=info.read_record(&info))) while (!(error=info.read_record(&info)))
...@@ -1675,7 +1676,8 @@ copy_data_between_tables(TABLE *from,TABLE *to, ...@@ -1675,7 +1676,8 @@ copy_data_between_tables(TABLE *from,TABLE *to,
copy_ptr->do_copy(copy_ptr); copy_ptr->do_copy(copy_ptr);
if ((error=to->file->write_row((byte*) to->record[0]))) if ((error=to->file->write_row((byte*) to->record[0])))
{ {
if (handle_duplicates != DUP_IGNORE || if ((handle_duplicates != DUP_IGNORE &&
handle_duplicates != DUP_REPLACE) ||
(error != HA_ERR_FOUND_DUPP_KEY && (error != HA_ERR_FOUND_DUPP_KEY &&
error != HA_ERR_FOUND_DUPP_UNIQUE)) error != HA_ERR_FOUND_DUPP_UNIQUE))
{ {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment