Commit 58f55aef authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:2952], move fix to main

git-svn-id: file:///svn/toku/tokudb@24952 c7de825b-a66e-492c-adef-691d508d4ae1
parent 70eb6eba
...@@ -308,6 +308,7 @@ static inline int db_opened(DB *db) { ...@@ -308,6 +308,7 @@ static inline int db_opened(DB *db) {
return db->i->opened != 0; return db->i->opened != 0;
} }
static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags); static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags);
static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags); static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags);
static int toku_db_cursor(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags, int is_temporary_cursor); static int toku_db_cursor(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags, int is_temporary_cursor);
...@@ -2710,6 +2711,30 @@ grab_range_lock(RANGE_LOCK_REQUEST request) { ...@@ -2710,6 +2711,30 @@ grab_range_lock(RANGE_LOCK_REQUEST request) {
return r; return r;
} }
static int
toku_grab_read_lock_on_directory (DB* db, DB_TXN * txn) {
RANGE_LOCK_REQUEST_S request;
char * dname = db->i->dname;
DBT key_in_directory;
//
// bad hack because some environment dictionaries do not have a dname
//
if (!dname) {
return 0;
}
toku_fill_dbt(&key_in_directory, dname, strlen(dname)+1);
//Left end of range == right end of range (point lock)
read_lock_request_init(
&request,
txn,
db->dbenv->i->directory,
&key_in_directory,
&key_in_directory
);
int r = grab_range_lock(&request);
return r;
}
//This is the user level callback function given to ydb layer functions like //This is the user level callback function given to ydb layer functions like
//toku_c_getf_first //toku_c_getf_first
...@@ -3377,6 +3402,10 @@ toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) { ...@@ -3377,6 +3402,10 @@ toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) {
BOOL do_locking = (BOOL)(db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE)); BOOL do_locking = (BOOL)(db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE));
int r = 0; int r = 0;
if (unchecked_flags!=0) r = EINVAL; if (unchecked_flags!=0) r = EINVAL;
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) {
goto cleanup;
}
if (r==0 && error_if_missing) { if (r==0 && error_if_missing) {
//Check if the key exists in the db. //Check if the key exists in the db.
r = db_getf_set(db, txn, lock_flags|DB_SERIALIZABLE, key, ydb_getf_do_nothing, NULL); r = db_getf_set(db, txn, lock_flags|DB_SERIALIZABLE, key, ydb_getf_do_nothing, NULL);
...@@ -3394,8 +3423,10 @@ toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) { ...@@ -3394,8 +3423,10 @@ toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) {
//Do the actual deleting. //Do the actual deleting.
r = toku_brt_delete(db->i->brt, key, txn ? db_txn_struct_i(txn)->tokutxn : 0); r = toku_brt_delete(db->i->brt, key, txn ? db_txn_struct_i(txn)->tokutxn : 0);
} }
if (r) cleanup:
if (r) {
num_deletes_fail++; num_deletes_fail++;
}
return r; return r;
} }
...@@ -3435,6 +3466,10 @@ env_del_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT ...@@ -3435,6 +3466,10 @@ env_del_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT
} }
//Do locking if necessary. //Do locking if necessary.
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) {
goto cleanup;
}
if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) { if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) {
//Needs locking //Needs locking
RANGE_LOCK_REQUEST_S request; RANGE_LOCK_REQUEST_S request;
...@@ -3488,6 +3523,7 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int ...@@ -3488,6 +3523,7 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int
HANDLE_PANICKED_DB(db); HANDLE_PANICKED_DB(db);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn); HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
DB_ENV* env = db->dbenv; DB_ENV* env = db->dbenv;
int r;
size_t result_size = sizeof(DBC)+sizeof(struct __toku_dbc_internal); // internal stuff stuck on the end size_t result_size = sizeof(DBC)+sizeof(struct __toku_dbc_internal); // internal stuff stuck on the end
if (!(flags == 0 || if (!(flags == 0 ||
flags == DB_SERIALIZABLE || flags == DB_SERIALIZABLE ||
...@@ -3500,6 +3536,11 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int ...@@ -3500,6 +3536,11 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int
"Invalid isolation flags set for toku_db_cursor\n" "Invalid isolation flags set for toku_db_cursor\n"
); );
} }
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) {
return r;
}
DBC *result = toku_malloc(result_size); DBC *result = toku_malloc(result_size);
if (result == 0) if (result == 0)
return ENOMEM; return ENOMEM;
...@@ -3549,7 +3590,7 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int ...@@ -3549,7 +3590,7 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int
is_snapshot_read = (dbc_struct_i(result)->iso == TOKU_ISO_READ_COMMITTED || is_snapshot_read = (dbc_struct_i(result)->iso == TOKU_ISO_READ_COMMITTED ||
dbc_struct_i(result)->iso == TOKU_ISO_SNAPSHOT); dbc_struct_i(result)->iso == TOKU_ISO_SNAPSHOT);
} }
int r = toku_brt_cursor( r = toku_brt_cursor(
db->i->brt, db->i->brt,
&dbc_struct_i(result)->c, &dbc_struct_i(result)->c,
txn ? db_txn_struct_i(txn)->tokutxn : NULL, txn ? db_txn_struct_i(txn)->tokutxn : NULL,
...@@ -3980,6 +4021,10 @@ toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags) { ...@@ -3980,6 +4021,10 @@ toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags) {
num_inserts++; num_inserts++;
u_int32_t lock_flags = get_prelocked_flags(flags); u_int32_t lock_flags = get_prelocked_flags(flags);
flags &= ~lock_flags; flags &= ~lock_flags;
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) {
goto cleanup;
}
BOOL do_locking = (BOOL)(db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE)); BOOL do_locking = (BOOL)(db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE));
r = db_put_check_size_constraints(db, key, val); r = db_put_check_size_constraints(db, key, val);
...@@ -4004,8 +4049,10 @@ toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags) { ...@@ -4004,8 +4049,10 @@ toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags) {
type = BRT_INSERT_NO_OVERWRITE; type = BRT_INSERT_NO_OVERWRITE;
r = toku_brt_maybe_insert(db->i->brt, key, val, ttxn, FALSE, ZERO_LSN, TRUE, type); r = toku_brt_maybe_insert(db->i->brt, key, val, ttxn, FALSE, ZERO_LSN, TRUE, type);
} }
if (r) cleanup:
if (r) {
num_inserts_fail++; num_inserts_fail++;
}
return r; return r;
} }
...@@ -4043,6 +4090,10 @@ env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT ...@@ -4043,6 +4090,10 @@ env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT
r = EINVAL; goto cleanup; r = EINVAL; goto cleanup;
} }
//Do locking if necessary. //Do locking if necessary.
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) {
goto cleanup;
}
if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) { if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) {
//Needs locking //Needs locking
RANGE_LOCK_REQUEST_S request; RANGE_LOCK_REQUEST_S request;
...@@ -4400,6 +4451,29 @@ static int toku_c_pre_acquire_read_lock(DBC *dbc, const DBT *key_left, const DBT ...@@ -4400,6 +4451,29 @@ static int toku_c_pre_acquire_read_lock(DBC *dbc, const DBT *key_left, const DBT
return r; return r;
} }
static int toku_db_pre_acquire_fileops_lock(DB *db, DB_TXN *txn) {
RANGE_LOCK_REQUEST_S request;
char * dname = db->i->dname;
DBT key_in_directory;
//
// bad hack because some environment dictionaries do not have a dname
//
if (!dname) {
return 0;
}
toku_fill_dbt(&key_in_directory, dname, strlen(dname)+1);
//Left end of range == right end of range (point lock)
write_lock_request_init(
&request,
txn,
db->dbenv->i->directory,
&key_in_directory,
&key_in_directory
);
int r = grab_range_lock(&request);
return r;
}
//static int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) { //static int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
// needed by loader.c // needed by loader.c
int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock) { int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock) {
...@@ -4410,9 +4484,13 @@ int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock) { ...@@ -4410,9 +4484,13 @@ int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock) {
{ {
RANGE_LOCK_REQUEST_S request; RANGE_LOCK_REQUEST_S request;
write_lock_request_init(&request, txn, db, write_lock_request_init(
&request,
txn,
db,
toku_lt_neg_infinity, toku_lt_neg_infinity,
toku_lt_infinity); toku_lt_infinity
);
r = grab_range_lock(&request); r = grab_range_lock(&request);
} }
...@@ -4565,6 +4643,13 @@ static int locked_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) { ...@@ -4565,6 +4643,13 @@ static int locked_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
return r; return r;
} }
static int locked_db_pre_acquire_fileops_lock(DB *db, DB_TXN *txn) {
toku_ydb_lock();
int r = toku_db_pre_acquire_fileops_lock(db, txn);
toku_ydb_unlock();
return r;
}
// truncate a database // truncate a database
// effect: remove all of the rows from a database // effect: remove all of the rows from a database
static int toku_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t flags) { static int toku_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t flags) {
...@@ -4588,10 +4673,15 @@ static int toku_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t ...@@ -4588,10 +4673,15 @@ static int toku_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t
// acquire a table lock // acquire a table lock
if (txn) { if (txn) {
r = toku_db_pre_acquire_fileops_lock(db, txn);
if (r != 0) {
return r;
}
r = toku_db_pre_acquire_table_lock(db, txn, TRUE); r = toku_db_pre_acquire_table_lock(db, txn, TRUE);
if (r != 0) if (r != 0) {
return r; return r;
} }
}
*row_count = 0; *row_count = 0;
...@@ -4797,6 +4887,7 @@ static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) { ...@@ -4797,6 +4887,7 @@ static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
SDB(stat64); SDB(stat64);
SDB(fd); SDB(fd);
SDB(pre_acquire_table_lock); SDB(pre_acquire_table_lock);
SDB(pre_acquire_fileops_lock);
SDB(truncate); SDB(truncate);
SDB(row_size_supported); SDB(row_size_supported);
SDB(getf_set); SDB(getf_set);
...@@ -5124,3 +5215,4 @@ toku_test_get_latest_lsn(DB_ENV *env) { ...@@ -5124,3 +5215,4 @@ toku_test_get_latest_lsn(DB_ENV *env) {
int toku_test_get_checkpointing_user_data_status (void) { int toku_test_get_checkpointing_user_data_status (void) {
return toku_cachetable_get_checkpointing_user_data_status(); return toku_cachetable_get_checkpointing_user_data_status();
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment