Commit f3e458a6 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:2952], move fix to main

git-svn-id: file:///svn/toku/tokudb@24952 c7de825b-a66e-492c-adef-691d508d4ae1
parent 3059834e
......@@ -308,6 +308,7 @@ static inline int db_opened(DB *db) {
return db->i->opened != 0;
}
static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags);
static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags);
static int toku_db_cursor(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags, int is_temporary_cursor);
......@@ -2710,6 +2711,30 @@ grab_range_lock(RANGE_LOCK_REQUEST request) {
return r;
}
static int
toku_grab_read_lock_on_directory (DB* db, DB_TXN * txn) {
RANGE_LOCK_REQUEST_S request;
char * dname = db->i->dname;
DBT key_in_directory;
//
// bad hack because some environment dictionaries do not have a dname
//
if (!dname) {
return 0;
}
toku_fill_dbt(&key_in_directory, dname, strlen(dname)+1);
//Left end of range == right end of range (point lock)
read_lock_request_init(
&request,
txn,
db->dbenv->i->directory,
&key_in_directory,
&key_in_directory
);
int r = grab_range_lock(&request);
return r;
}
//This is the user level callback function given to ydb layer functions like
//toku_c_getf_first
......@@ -3377,6 +3402,10 @@ toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) {
BOOL do_locking = (BOOL)(db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE));
int r = 0;
if (unchecked_flags!=0) r = EINVAL;
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) {
goto cleanup;
}
if (r==0 && error_if_missing) {
//Check if the key exists in the db.
r = db_getf_set(db, txn, lock_flags|DB_SERIALIZABLE, key, ydb_getf_do_nothing, NULL);
......@@ -3394,8 +3423,10 @@ toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) {
//Do the actual deleting.
r = toku_brt_delete(db->i->brt, key, txn ? db_txn_struct_i(txn)->tokutxn : 0);
}
if (r)
num_deletes_fail++;
cleanup:
if (r) {
num_deletes_fail++;
}
return r;
}
......@@ -3435,6 +3466,10 @@ env_del_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT
}
//Do locking if necessary.
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) {
goto cleanup;
}
if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) {
//Needs locking
RANGE_LOCK_REQUEST_S request;
......@@ -3488,6 +3523,7 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int
HANDLE_PANICKED_DB(db);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
DB_ENV* env = db->dbenv;
int r;
size_t result_size = sizeof(DBC)+sizeof(struct __toku_dbc_internal); // internal stuff stuck on the end
if (!(flags == 0 ||
flags == DB_SERIALIZABLE ||
......@@ -3500,6 +3536,11 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int
"Invalid isolation flags set for toku_db_cursor\n"
);
}
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) {
return r;
}
DBC *result = toku_malloc(result_size);
if (result == 0)
return ENOMEM;
......@@ -3549,7 +3590,7 @@ static int toku_db_cursor(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int
is_snapshot_read = (dbc_struct_i(result)->iso == TOKU_ISO_READ_COMMITTED ||
dbc_struct_i(result)->iso == TOKU_ISO_SNAPSHOT);
}
int r = toku_brt_cursor(
r = toku_brt_cursor(
db->i->brt,
&dbc_struct_i(result)->c,
txn ? db_txn_struct_i(txn)->tokutxn : NULL,
......@@ -3980,6 +4021,10 @@ toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags) {
num_inserts++;
u_int32_t lock_flags = get_prelocked_flags(flags);
flags &= ~lock_flags;
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) {
goto cleanup;
}
BOOL do_locking = (BOOL)(db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE));
r = db_put_check_size_constraints(db, key, val);
......@@ -4004,8 +4049,10 @@ toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags) {
type = BRT_INSERT_NO_OVERWRITE;
r = toku_brt_maybe_insert(db->i->brt, key, val, ttxn, FALSE, ZERO_LSN, TRUE, type);
}
if (r)
num_inserts_fail++;
cleanup:
if (r) {
num_inserts_fail++;
}
return r;
}
......@@ -4043,6 +4090,10 @@ env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT
r = EINVAL; goto cleanup;
}
//Do locking if necessary.
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) {
goto cleanup;
}
if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) {
//Needs locking
RANGE_LOCK_REQUEST_S request;
......@@ -4400,6 +4451,29 @@ static int toku_c_pre_acquire_read_lock(DBC *dbc, const DBT *key_left, const DBT
return r;
}
static int toku_db_pre_acquire_fileops_lock(DB *db, DB_TXN *txn) {
RANGE_LOCK_REQUEST_S request;
char * dname = db->i->dname;
DBT key_in_directory;
//
// bad hack because some environment dictionaries do not have a dname
//
if (!dname) {
return 0;
}
toku_fill_dbt(&key_in_directory, dname, strlen(dname)+1);
//Left end of range == right end of range (point lock)
write_lock_request_init(
&request,
txn,
db->dbenv->i->directory,
&key_in_directory,
&key_in_directory
);
int r = grab_range_lock(&request);
return r;
}
//static int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
// needed by loader.c
int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock) {
......@@ -4409,10 +4483,14 @@ int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock) {
int r;
{
RANGE_LOCK_REQUEST_S request;
write_lock_request_init(&request, txn, db,
toku_lt_neg_infinity,
toku_lt_infinity);
RANGE_LOCK_REQUEST_S request;
write_lock_request_init(
&request,
txn,
db,
toku_lt_neg_infinity,
toku_lt_infinity
);
r = grab_range_lock(&request);
}
......@@ -4565,6 +4643,13 @@ static int locked_db_pre_acquire_table_lock(DB *db, DB_TXN *txn) {
return r;
}
static int locked_db_pre_acquire_fileops_lock(DB *db, DB_TXN *txn) {
toku_ydb_lock();
int r = toku_db_pre_acquire_fileops_lock(db, txn);
toku_ydb_unlock();
return r;
}
// truncate a database
// effect: remove all of the rows from a database
static int toku_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t flags) {
......@@ -4588,9 +4673,14 @@ static int toku_db_truncate(DB *db, DB_TXN *txn, u_int32_t *row_count, u_int32_t
// acquire a table lock
if (txn) {
r = toku_db_pre_acquire_fileops_lock(db, txn);
if (r != 0) {
return r;
}
r = toku_db_pre_acquire_table_lock(db, txn, TRUE);
if (r != 0)
if (r != 0) {
return r;
}
}
*row_count = 0;
......@@ -4797,6 +4887,7 @@ static int toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
SDB(stat64);
SDB(fd);
SDB(pre_acquire_table_lock);
SDB(pre_acquire_fileops_lock);
SDB(truncate);
SDB(row_size_supported);
SDB(getf_set);
......@@ -5124,3 +5215,4 @@ toku_test_get_latest_lsn(DB_ENV *env) {
int toku_test_get_checkpointing_user_data_status (void) {
return toku_cachetable_get_checkpointing_user_data_status();
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment