Commit 384b371b authored by tim@cane.mysql.fi's avatar tim@cane.mysql.fi

ha_berkeley.cc - Don't use subtransactions in update_row by default

ha_berkeley.cc	- Use the DB_RMW (read, modify, write) flag when removing a key
ha_berkeley.h	- Don't use subtransactions in update_row by default
sql_table.cc	- Don't use subtransactions in update_row by default
bdb.result	BitKeeper file /usr/home/tim/my/work/mysql-test/r/bdb.result
bdb.test	BitKeeper file /usr/home/tim/my/work/mysql-test/t/bdb.test
parent 142a7b36
id code name
1 1 Tim
2 1 Monty
3 2 David
4 2 Erik
5 3 Sasha
6 3 Jeremy
7 4 Matt
id code name
2 1 Monty
3 2 David
4 2 Erik
5 3 Sasha
6 3 Jeremy
7 4 Matt
8 1 Sinisa
id code name
3 2 David
4 2 Erik
5 3 Sasha
6 3 Jeremy
7 4 Matt
8 1 Sinisa
12 1 Ralph
drop table if exists t1;
create table t1 (id int unsigned not null auto_increment, code tinyint unsigned not null, name char(20) not null, primary key (id), key (code), unique (name)) type=bdb;
insert into t1 (code, name) values (1, 'Tim'), (1, 'Monty'), (2, 'David'), (2, 'Erik'), (3, 'Sasha'), (3, 'Jeremy'), (4, 'Matt');
select id, code, name from t1 order by id;
update ignore t1 set id = 8, name = 'Sinisa' where id < 3;
select id, code, name from t1 order by id;
update ignore t1 set id = id + 10, name = 'Ralph' where id < 4;
select id, code, name from t1 order by id;
drop table t1;
......@@ -889,13 +889,15 @@ int ha_berkeley::key_cmp(uint keynr, const byte * old_row,
/*
Update a row from one value to another.
Clobbers key_buff2
*/
int ha_berkeley::update_primary_key(DB_TXN *trans, bool primary_key_changed,
const byte * old_row,
const byte * new_row, DBT *prim_key)
const byte * old_row, DBT *old_key,
const byte * new_row, DBT *new_key,
ulong thd_options, bool local_using_ignore)
{
DBT row, old_key;
DBT row;
int error;
DBUG_ENTER("update_primary_key");
......@@ -903,37 +905,83 @@ int ha_berkeley::update_primary_key(DB_TXN *trans, bool primary_key_changed,
{
// Primary key changed or we are updating a key that can have duplicates.
// Delete the old row and add a new one
create_key(&old_key, primary_key, key_buff2, old_row);
if ((error=remove_key(trans, primary_key, old_row, (DBT *) 0, &old_key)))
DBUG_RETURN(error); // This should always succeed
if ((error=pack_row(&row, new_row, 0)))
{
// Out of memory (this shouldn't happen!)
(void) file->put(file, trans, &old_key, &row,
key_type[primary_key]);
DBUG_RETURN(error);
}
// Write new key
if ((error=file->put(file, trans, prim_key, &row, key_type[primary_key])))
if (!(error=remove_key(trans, primary_key, old_row, (DBT *) 0, old_key)))
{
// Probably a duplicated key; Return the error and let the caller
// abort.
last_dup_key=primary_key;
DBUG_RETURN(error);
if (!(error=pack_row(&row, new_row, 0)))
{
if ((error=file->put(file, trans, new_key, &row,
key_type[primary_key])))
{
// Probably a duplicated key; restore old key and row if needed
last_dup_key=primary_key;
if (local_using_ignore &&
!(thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
{
int new_error;
if ((new_error=pack_row(&row, old_row, 0)) ||
(new_error=file->put(file, trans, old_key, &row,
key_type[primary_key])))
error=new_error; // fatal error
}
}
}
}
}
else
{
// Primary key didn't change; just update the row data
if ((error=pack_row(&row, new_row, 0)))
DBUG_RETURN(error);
error=file->put(file, trans, prim_key, &row, 0);
if (error)
DBUG_RETURN(error); // Fatal error
if (!(error=pack_row(&row, new_row, 0)))
error=file->put(file, trans, new_key, &row, 0);
}
DBUG_RETURN(0);
DBUG_RETURN(error);
}
/*
Restore changed keys, when a non-fatal error aborts the insert/update
of one row.
Clobbers keybuff2
*/
int ha_berkeley::restore_keys(DB_TXN *trans, key_map changed_keys,
uint primary_key,
const byte *old_row, DBT *old_key,
const byte *new_row, DBT *new_key,
ulong thd_options)
{
int error;
DBT tmp_key;
DBUG_ENTER("restore_keys");
/* Restore the old primary key, and the old row, but don't ignore
duplicate key failure */
if ((error=update_primary_key(trans, TRUE, new_row, new_key,
old_row, old_key, thd_options, FALSE)))
goto err;
/* Remove the new key, and put back the old key
changed_keys is a map of all non-primary keys that need to be
rolled back. The last key set in changed_keys is the one that
triggered the duplicate key error (it wasn't inserted), so for
that one just put back the old value. */
for (uint keynr=0; changed_keys; keynr++, changed_keys >>= 1)
{
if (changed_keys & 1)
{
if (changed_keys != 1 &&
(error = remove_key(trans, keynr, new_row, (DBT*) 0, new_key)))
break;
if ((error = key_file[keynr]->put(key_file[keynr], trans,
create_key(&tmp_key, keynr, key_buff2,
old_row),
old_key, key_type[keynr])))
break;
}
}
err:
dbug_assert(error != DB_KEYEXIST);
DBUG_RETURN(error);
}
int ha_berkeley::update_row(const byte * old_row, byte * new_row)
......@@ -941,6 +989,7 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row)
DBT prim_key, key, old_prim_key;
int error;
DB_TXN *sub_trans;
ulong thd_options = table->in_use->options;
bool primary_key_changed;
DBUG_ENTER("update_row");
......@@ -966,15 +1015,22 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row)
old_prim_key=prim_key;
}
sub_trans = transaction;
LINT_INIT(error);
for (uint retry=0 ; retry < berkeley_trans_retry ; retry++)
{
if ((error=txn_begin(db_env, transaction, &sub_trans, 0)))
break;
DBUG_PRINT("trans",("starting subtransaction"));
key_map changed_keys = 0;
if (using_ignore && (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
{
if ((error=txn_begin(db_env, transaction, &sub_trans, 0)))
break;
DBUG_PRINT("trans",("starting subtransaction"));
}
/* Start by updating the primary key */
if (!(error=update_primary_key(sub_trans, primary_key_changed,
old_row, new_row, &prim_key)))
old_row, &old_prim_key,
new_row, &prim_key,
thd_options, using_ignore)))
{
// Update all other keys
for (uint keynr=0 ; keynr < table->keys ; keynr++)
......@@ -984,10 +1040,23 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row)
if (key_cmp(keynr, old_row, new_row) || primary_key_changed)
{
if ((error=remove_key(sub_trans, keynr, old_row, (DBT*) 0,
&old_prim_key)) ||
(error=key_file[keynr]->put(key_file[keynr], sub_trans,
&old_prim_key)))
{
if (using_ignore &&
(thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
{
int new_error;
DBUG_PRINT("trans",("aborting subtransaction"));
new_error=txn_abort(sub_trans);
if (new_error)
error = new_error;
}
DBUG_RETURN(error); // Fatal error
}
changed_keys |= (key_map)1 << keynr;
if ((error=key_file[keynr]->put(key_file[keynr], sub_trans,
create_key(&key, keynr, key_buff2,
new_row),
new_row),
&prim_key, key_type[keynr])))
{
last_dup_key=keynr;
......@@ -996,22 +1065,50 @@ int ha_berkeley::update_row(const byte * old_row, byte * new_row)
}
}
}
if (!error)
{
DBUG_PRINT("trans",("committing subtransaction"));
error=txn_commit(sub_trans, 0);
}
else
if (error)
{
/* Remove inserted row */
DBUG_PRINT("error",("Got error %d",error));
if (using_ignore)
{
int new_error = 0;
if (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS)
{
DBUG_PRINT("trans",("aborting subtransaction"));
new_error=txn_abort(sub_trans);
}
else if (changed_keys)
new_error=restore_keys(transaction, changed_keys, primary_key,
old_row, &old_prim_key, new_row, &prim_key,
thd_options);
if (new_error)
{
error=new_error; // This shouldn't happen
break;
}
}
#ifdef BROKEN_CODE_HERE
int new_error;
DBUG_PRINT("error",("Got error %d",error));
DBUG_PRINT("trans",("aborting subtransaction"));
if ((new_error=txn_abort(sub_trans)))
if (using_ignore && (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
{
DBUG_PRINT("trans",("aborting subtransaction"));
new_error=txn_abort(sub_trans);
}
else if (changed_keys)
new_error=restore_keys(changed_keys, primary_key,
old_row, old_prim_key, new_row, prim_key);
if (new_error)
{
error=new_error; // This shouldn't happen
break;
}
#endif
}
else if (using_ignore && (thd_options & OPTION_INTERNAL_SUBTRANSACTIONS))
{
DBUG_PRINT("trans",("committing subtransaction"));
error=txn_commit(sub_trans, 0);
}
if (error != DB_LOCK_DEADLOCK)
break;
......@@ -1065,7 +1162,7 @@ int ha_berkeley::remove_key(DB_TXN *sub_trans, uint keynr, const byte *record,
create_key(&key, keynr, key_buff2, record)),
(keynr == primary_key ?
packed_record : prim_key),
DB_GET_BOTH)))
DB_GET_BOTH | DB_RMW)))
{ // This shouldn't happen
error=tmp_cursor->c_del(tmp_cursor,0);
}
......@@ -1401,9 +1498,7 @@ void ha_berkeley::position(const byte *record)
{
DBT key;
if (hidden_primary_key)
{
memcpy_fixed(ref, (char*) current_ident, BDB_HIDDEN_PRIMARY_KEY_LENGTH);
}
else
create_key(&key, primary_key, ref, record);
}
......@@ -1438,6 +1533,7 @@ int ha_berkeley::extra(enum ha_extra_function operation)
case HA_EXTRA_RESET:
case HA_EXTRA_RESET_STATE:
key_read=0;
using_ignore=0;
break;
case HA_EXTRA_KEYREAD:
key_read=1; // Query satisfied with key
......@@ -1445,6 +1541,12 @@ int ha_berkeley::extra(enum ha_extra_function operation)
case HA_EXTRA_NO_KEYREAD:
key_read=0;
break;
case HA_EXTRA_IGNORE_DUP_KEY:
using_ignore=1;
break;
case HA_EXTRA_NO_IGNORE_DUP_KEY:
using_ignore=0;
break;
default:
break;
}
......@@ -1548,6 +1650,8 @@ THR_LOCK_DATA **ha_berkeley::store_lock(THD *thd, THR_LOCK_DATA **to,
!thd->in_lock_tables)
lock_type = TL_WRITE_ALLOW_WRITE;
lock.type=lock_type;
lock_on_read= ((table->reginfo.lock_type > TL_WRITE_ALLOW_READ) ? DB_RMW :
0);
}
*to++= &lock;
return to;
......@@ -1658,7 +1762,7 @@ int ha_berkeley::delete_table(const char *name)
double ha_berkeley::scan_time()
{
return records/3;
}
}
ha_rows ha_berkeley::records_in_range(int keynr,
const byte *start_key,uint start_key_len,
......@@ -1811,7 +1915,7 @@ int ha_berkeley::analyze(THD* thd, HA_CHECK_OPT* check_opt)
stat=0;
}
if (file->stat(file, (void*) &stat, 0, 0))
goto err;
goto err;
}
pthread_mutex_lock(&share->mutex);
share->rows=stat->bt_ndata;
......
......@@ -70,10 +70,15 @@ class ha_berkeley: public handler
DBT *packed_record, DBT *prim_key);
int remove_keys(DB_TXN *trans,const byte *record, DBT *new_record,
DBT *prim_key, key_map keys, int result);
int restore_keys(DB_TXN *trans, key_map changed_keys, uint primary_key,
const byte *old_row, DBT *old_key,
const byte *new_row, DBT *new_key,
ulong thd_options);
int key_cmp(uint keynr, const byte * old_row, const byte * new_row);
int update_primary_key(DB_TXN *trans, bool primary_key_changed,
const byte * old_row, const byte * new_row,
DBT *prim_key);
const byte * old_row, DBT *old_key,
const byte * new_row, DBT *prim_key,
ulong thd_options, bool local_using_ignore);
int read_row(int error, char *buf, uint keynr, DBT *row, DBT *key, bool);
DBT *get_pos(DBT *to, byte *pos);
......
......@@ -1658,7 +1658,8 @@ copy_data_between_tables(TABLE *from,TABLE *to,
};
init_read_record(&info, thd, from, (SQL_SELECT *) 0, 1,1);
if (handle_duplicates == DUP_IGNORE)
if (handle_duplicates == DUP_IGNORE ||
handle_duplicates == DUP_REPLACE)
to->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
next_field=to->next_number_field;
while (!(error=info.read_record(&info)))
......@@ -1675,7 +1676,8 @@ copy_data_between_tables(TABLE *from,TABLE *to,
copy_ptr->do_copy(copy_ptr);
if ((error=to->file->write_row((byte*) to->record[0])))
{
if (handle_duplicates != DUP_IGNORE ||
if ((handle_duplicates != DUP_IGNORE &&
handle_duplicates != DUP_REPLACE) ||
(error != HA_ERR_FOUND_DUPP_KEY &&
error != HA_ERR_FOUND_DUPP_UNIQUE))
{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment