Commit 507df175 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4472], merge fix to main

git-svn-id: file:///svn/toku/tokudb@39735 c7de825b-a66e-492c-adef-691d508d4ae1
parent da2862bf
......@@ -147,7 +147,6 @@ typedef enum {
#define DB_IS_RESETTING_OP 0x01000000
#define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_PRELOCKED_FILE_READ 0x00200000
#define DB_IS_HOT_INDEX 0x00100000
#define DBC_DISABLE_PREFETCHING 0x20000000
#define DB_DBT_APPMALLOC 1
......@@ -316,7 +315,6 @@ struct __toku_db {
void *app_private; /* 32-bit offset=16 size=4, 64=bit offset=32 size=8 */
DB_ENV *dbenv; /* 32-bit offset=20 size=4, 64=bit offset=40 size=8 */
int (*pre_acquire_fileops_lock)(DB*, DB_TXN*);
int (*pre_acquire_fileops_shared_lock)(DB*, DB_TXN*);
const DBT* (*dbt_pos_infty)(void) /* Return the special DBT that refers to positive infinity in the lock table.*/;
const DBT* (*dbt_neg_infty)(void)/* Return the special DBT that refers to negative infinity in the lock table.*/;
void (*get_max_row_size) (DB*, u_int32_t *max_key_size, u_int32_t *max_row_size);
......@@ -334,7 +332,7 @@ struct __toku_db {
int (*verify_with_progress)(DB *, int (*progress_callback)(void *progress_extra, float progress), void *progress_extra, int verbose, int keep_going);
int (*update)(DB *, DB_TXN*, const DBT *key, const DBT *extra, u_int32_t flags);
int (*update_broadcast)(DB *, DB_TXN*, const DBT *extra, u_int32_t flags);
void* __toku_dummy0[10];
void* __toku_dummy0[11];
char __toku_dummy1[96];
void *api_internal; /* 32-bit offset=236 size=4, 64=bit offset=376 size=8 */
void* __toku_dummy2[5];
......
......@@ -148,7 +148,6 @@ typedef enum {
#define DB_IS_RESETTING_OP 0x01000000
#define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_PRELOCKED_FILE_READ 0x00200000
#define DB_IS_HOT_INDEX 0x00100000
#define DBC_DISABLE_PREFETCHING 0x20000000
#define DB_DBT_APPMALLOC 1
......@@ -326,7 +325,6 @@ struct __toku_db {
void *app_private; /* 32-bit offset=16 size=4, 64=bit offset=32 size=8 */
DB_ENV *dbenv; /* 32-bit offset=20 size=4, 64=bit offset=40 size=8 */
int (*pre_acquire_fileops_lock)(DB*, DB_TXN*);
int (*pre_acquire_fileops_shared_lock)(DB*, DB_TXN*);
const DBT* (*dbt_pos_infty)(void) /* Return the special DBT that refers to positive infinity in the lock table.*/;
const DBT* (*dbt_neg_infty)(void)/* Return the special DBT that refers to negative infinity in the lock table.*/;
void (*get_max_row_size) (DB*, u_int32_t *max_key_size, u_int32_t *max_row_size);
......@@ -344,7 +342,7 @@ struct __toku_db {
int (*verify_with_progress)(DB *, int (*progress_callback)(void *progress_extra, float progress), void *progress_extra, int verbose, int keep_going);
int (*update)(DB *, DB_TXN*, const DBT *key, const DBT *extra, u_int32_t flags);
int (*update_broadcast)(DB *, DB_TXN*, const DBT *extra, u_int32_t flags);
void* __toku_dummy0[13];
void* __toku_dummy0[14];
char __toku_dummy1[96];
void *api_internal; /* 32-bit offset=248 size=4, 64=bit offset=400 size=8 */
void* __toku_dummy2[5];
......
......@@ -148,7 +148,6 @@ typedef enum {
#define DB_IS_RESETTING_OP 0x01000000
#define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_PRELOCKED_FILE_READ 0x00200000
#define DB_IS_HOT_INDEX 0x00100000
#define DBC_DISABLE_PREFETCHING 0x20000000
#define DB_DBT_APPMALLOC 1
......@@ -328,7 +327,6 @@ struct __toku_db {
void *app_private; /* 32-bit offset=16 size=4, 64=bit offset=32 size=8 */
DB_ENV *dbenv; /* 32-bit offset=20 size=4, 64=bit offset=40 size=8 */
int (*pre_acquire_fileops_lock)(DB*, DB_TXN*);
int (*pre_acquire_fileops_shared_lock)(DB*, DB_TXN*);
const DBT* (*dbt_pos_infty)(void) /* Return the special DBT that refers to positive infinity in the lock table.*/;
const DBT* (*dbt_neg_infty)(void)/* Return the special DBT that refers to negative infinity in the lock table.*/;
void (*get_max_row_size) (DB*, u_int32_t *max_key_size, u_int32_t *max_row_size);
......@@ -346,7 +344,7 @@ struct __toku_db {
int (*verify_with_progress)(DB *, int (*progress_callback)(void *progress_extra, float progress), void *progress_extra, int verbose, int keep_going);
int (*update)(DB *, DB_TXN*, const DBT *key, const DBT *extra, u_int32_t flags);
int (*update_broadcast)(DB *, DB_TXN*, const DBT *extra, u_int32_t flags);
void* __toku_dummy0[15];
void* __toku_dummy0[16];
char __toku_dummy1[96];
void *api_internal; /* 32-bit offset=256 size=4, 64=bit offset=416 size=8 */
void* __toku_dummy2[5];
......
......@@ -148,7 +148,6 @@ typedef enum {
#define DB_IS_RESETTING_OP 0x01000000
#define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_PRELOCKED_FILE_READ 0x00200000
#define DB_IS_HOT_INDEX 0x00100000
#define DBC_DISABLE_PREFETCHING 0x20000000
#define DB_DBT_APPMALLOC 1
......@@ -328,7 +327,6 @@ struct __toku_db {
void *app_private; /* 32-bit offset=16 size=4, 64=bit offset=32 size=8 */
DB_ENV *dbenv; /* 32-bit offset=20 size=4, 64=bit offset=40 size=8 */
int (*pre_acquire_fileops_lock)(DB*, DB_TXN*);
int (*pre_acquire_fileops_shared_lock)(DB*, DB_TXN*);
const DBT* (*dbt_pos_infty)(void) /* Return the special DBT that refers to positive infinity in the lock table.*/;
const DBT* (*dbt_neg_infty)(void)/* Return the special DBT that refers to negative infinity in the lock table.*/;
void (*get_max_row_size) (DB*, u_int32_t *max_key_size, u_int32_t *max_row_size);
......@@ -346,7 +344,7 @@ struct __toku_db {
int (*verify_with_progress)(DB *, int (*progress_callback)(void *progress_extra, float progress), void *progress_extra, int verbose, int keep_going);
int (*update)(DB *, DB_TXN*, const DBT *key, const DBT *extra, u_int32_t flags);
int (*update_broadcast)(DB *, DB_TXN*, const DBT *extra, u_int32_t flags);
void* __toku_dummy0[18];
void* __toku_dummy0[19];
char __toku_dummy1[96];
void *api_internal; /* 32-bit offset=268 size=4, 64=bit offset=440 size=8 */
void* __toku_dummy2[5];
......
......@@ -148,7 +148,6 @@ typedef enum {
#define DB_IS_RESETTING_OP 0x01000000
#define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_PRELOCKED_FILE_READ 0x00200000
#define DB_IS_HOT_INDEX 0x00100000
#define DBC_DISABLE_PREFETCHING 0x20000000
#define DB_DBT_APPMALLOC 1
......@@ -331,7 +330,6 @@ struct __toku_db {
DB_ENV *dbenv; /* 32-bit offset=24 size=4, 64=bit offset=40 size=8 */
int (*pre_acquire_table_lock)(DB*, DB_TXN*);
int (*pre_acquire_fileops_lock)(DB*, DB_TXN*);
int (*pre_acquire_fileops_shared_lock)(DB*, DB_TXN*);
const DBT* (*dbt_pos_infty)(void) /* Return the special DBT that refers to positive infinity in the lock table.*/;
const DBT* (*dbt_neg_infty)(void)/* Return the special DBT that refers to negative infinity in the lock table.*/;
void (*get_max_row_size) (DB*, u_int32_t *max_key_size, u_int32_t *max_row_size);
......@@ -349,7 +347,7 @@ struct __toku_db {
int (*verify_with_progress)(DB *, int (*progress_callback)(void *progress_extra, float progress), void *progress_extra, int verbose, int keep_going);
int (*update)(DB *, DB_TXN*, const DBT *key, const DBT *extra, u_int32_t flags);
int (*update_broadcast)(DB *, DB_TXN*, const DBT *extra, u_int32_t flags);
void* __toku_dummy1[22];
void* __toku_dummy1[23];
char __toku_dummy2[80];
void *api_internal; /* 32-bit offset=276 size=4, 64=bit offset=464 size=8 */
void* __toku_dummy3[5];
......
......@@ -158,7 +158,7 @@ static void print_defines (void) {
printf("#define DB_IS_RESETTING_OP 0x01000000\n"); // private tokudb
printf("#define DB_PRELOCKED 0x00800000\n"); // private tokudb
printf("#define DB_PRELOCKED_WRITE 0x00400000\n"); // private tokudb
printf("#define DB_PRELOCKED_FILE_READ 0x00200000\n"); // private tokudb
//printf("#define DB_PRELOCKED_FILE_READ 0x00200000\n"); // private tokudb. No longer supported in #4472
printf("#define DB_IS_HOT_INDEX 0x00100000\n"); // private tokudb
printf("#define DBC_DISABLE_PREFETCHING 0x20000000\n"); // private tokudb
......@@ -585,7 +585,6 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__
"int (*stat64)(DB *, DB_TXN *, DB_BTREE_STAT64 *)",
"int (*pre_acquire_table_lock)(DB*, DB_TXN*)",
"int (*pre_acquire_fileops_lock)(DB*, DB_TXN*)",
"int (*pre_acquire_fileops_shared_lock)(DB*, DB_TXN*)",
"const DBT* (*dbt_pos_infty)(void) /* Return the special DBT that refers to positive infinity in the lock table.*/",
"const DBT* (*dbt_neg_infty)(void)/* Return the special DBT that refers to negative infinity in the lock table.*/",
"void (*get_max_row_size) (DB*, u_int32_t *max_key_size, u_int32_t *max_row_size)",
......
......@@ -148,7 +148,6 @@ typedef enum {
#define DB_IS_RESETTING_OP 0x01000000
#define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_PRELOCKED_FILE_READ 0x00200000
#define DB_IS_HOT_INDEX 0x00100000
#define DBC_DISABLE_PREFETCHING 0x20000000
#define DB_DBT_APPMALLOC 1
......@@ -300,7 +299,6 @@ struct __toku_db {
DB_ENV *dbenv;
int (*pre_acquire_table_lock)(DB*, DB_TXN*);
int (*pre_acquire_fileops_lock)(DB*, DB_TXN*);
int (*pre_acquire_fileops_shared_lock)(DB*, DB_TXN*);
const DBT* (*dbt_pos_infty)(void) /* Return the special DBT that refers to positive infinity in the lock table.*/;
const DBT* (*dbt_neg_infty)(void)/* Return the special DBT that refers to negative infinity in the lock table.*/;
void (*get_max_row_size) (DB*, u_int32_t *max_key_size, u_int32_t *max_row_size);
......
......@@ -148,7 +148,6 @@ typedef enum {
#define DB_IS_RESETTING_OP 0x01000000
#define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_PRELOCKED_FILE_READ 0x00200000
#define DB_IS_HOT_INDEX 0x00100000
#define DBC_DISABLE_PREFETCHING 0x20000000
#define DB_DBT_APPMALLOC 1
......@@ -300,7 +299,6 @@ struct __toku_db {
DB_ENV *dbenv;
int (*pre_acquire_table_lock)(DB*, DB_TXN*);
int (*pre_acquire_fileops_lock)(DB*, DB_TXN*);
int (*pre_acquire_fileops_shared_lock)(DB*, DB_TXN*);
const DBT* (*dbt_pos_infty)(void) /* Return the special DBT that refers to positive infinity in the lock table.*/;
const DBT* (*dbt_neg_infty)(void)/* Return the special DBT that refers to negative infinity in the lock table.*/;
void (*get_max_row_size) (DB*, u_int32_t *max_key_size, u_int32_t *max_row_size);
......
......@@ -162,8 +162,6 @@ toku_indexer_create_indexer(DB_ENV *env,
*indexerp = NULL;
rval = toku_grab_read_lock_on_directory (src_db, txn);
if (rval == 0) {
XCALLOC(indexer); // init to all zeroes (thus initializing the error_callback and poll_func)
if ( !indexer ) { rval = ENOMEM; goto create_exit; }
XCALLOC(indexer->i); // init to all zeroes (thus initializing all pointers to NULL)
......@@ -203,11 +201,10 @@ toku_indexer_create_indexer(DB_ENV *env,
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
FILENUMS filenums = indexer->i->filenums;
rval = toku_brt_hot_index(NULL, ttxn, filenums, 1, &hot_index_lsn);
}
if (rval == 0)
if (rval == 0) {
rval = associate_indexer_with_hot_dbs(indexer, dest_dbs, N);
}
create_exit:
if ( rval == 0 ) {
......
......@@ -29,7 +29,6 @@ static void verify_shared_ops_fail(DB_ENV* env, DB* db) {
int r;
DB_TXN* txn = NULL;
u_int32_t flags = 0;
DBC* c = NULL;
DBT key,val;
DBT in_key,in_val;
uint32_t in_key_data, in_val_data = 0;
......@@ -48,14 +47,6 @@ static void verify_shared_ops_fail(DB_ENV* env, DB* db) {
dbt_init(&key, "a", 4);
dbt_init(&val, "a", 4);
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = db->pre_acquire_fileops_shared_lock(db, txn); CKERR2(r,DB_LOCK_NOTGRANTED);
r = txn->commit(txn,0); CKERR(r);
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = db->cursor(db, txn, &c, 0); CKERR2(r,DB_LOCK_NOTGRANTED);
r = txn->commit(txn,0); CKERR(r);
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = db->put(
db,
......@@ -162,34 +153,23 @@ static void verify_shared_ops_fail(DB_ENV* env, DB* db) {
}
static void verify_excl_ops_fail(DB_ENV* env, DB* db) {
static void verify_excl_ops_fail(DB_ENV* env, const char* name) {
DB_TXN* txn = NULL;
int r;
DBT extra_up;
dbt_init(&extra_up, NULL, 0);
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = db->pre_acquire_fileops_lock(db, txn);
CKERR2(r, DB_LOCK_NOTGRANTED);
r = txn->commit(txn,0); CKERR(r);
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = db->update_broadcast(db, txn, &extra_up, DB_IS_RESETTING_OP);
CKERR2(r, DB_LOCK_NOTGRANTED);
r = env->dbrename(env, txn, name, NULL, "asdf.db", 0);
CKERR2(r, EINVAL);
r = txn->commit(txn,0); CKERR(r);
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = db->change_descriptor(db, txn, &extra_up, 0);
CKERR2(r, DB_LOCK_NOTGRANTED);
r = txn->commit(txn,0); CKERR(r);
}
int test_main (int argc, char * const argv[]) {
parse_args(argc, argv);
int r;
DBC* c1 = NULL;
DBC* c2 = NULL;
DBT in_key,in_val;
uint32_t in_key_data = 123456;
uint32_t in_val_data = 654321;
......@@ -232,7 +212,7 @@ int test_main (int argc, char * const argv[]) {
r = env->txn_begin(env, NULL, &txna, 0); CKERR(r);
r = db_create(&db2, env, 0); CKERR(r);
r = db2->open(db2, txna, "foo2.db", NULL, DB_BTREE, DB_CREATE|DB_IS_HOT_INDEX, 0666); CKERR(r);
verify_excl_ops_fail(env, db2);
verify_excl_ops_fail(env, "foo2.db");
r = txna->commit(txna, 0); CKERR(r);
......@@ -242,7 +222,6 @@ int test_main (int argc, char * const argv[]) {
r = env->txn_begin(env, NULL, &txna, 0); CKERR(r);
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, txna, "foo.db", NULL, DB_BTREE, DB_CREATE, 0666); CKERR(r);
verify_shared_ops_fail(env, db);
r = txna->commit(txna, 0); CKERR(r);
//
......@@ -255,38 +234,6 @@ int test_main (int argc, char * const argv[]) {
loader=NULL;
r = txna->commit(txna, 0); CKERR(r);
//
// preacquire fileops lock
//
r = env->txn_begin(env, NULL, &txna, 0); CKERR(r);
r = db->pre_acquire_fileops_lock(db,txna); CKERR(r);
verify_shared_ops_fail(env,db);
r = txna->commit(txna, 0); CKERR(r);
r = env->txn_begin(env, NULL, &txna, 0); CKERR(r);
r = db->pre_acquire_fileops_lock(db,txna); CKERR(r);
verify_shared_ops_fail(env,db);
r = txna->commit(txna, 0); CKERR(r);
r = env->txn_begin(env, NULL, &txna, 0); CKERR(r);
r = env->txn_begin(env, NULL, &txnb, 0); CKERR(r);
r = db->pre_acquire_fileops_shared_lock(db, txna); CKERR(r);
r = db->pre_acquire_fileops_shared_lock(db, txnb); CKERR(r);
verify_excl_ops_fail(env,db);
r = txna->abort(txna); CKERR(r);
r = txnb->abort(txnb); CKERR(r);
r = env->txn_begin(env, NULL, &txna, 0); CKERR(r);
r = env->txn_begin(env, NULL, &txnb, 0); CKERR(r);
r = db->cursor(db, txna, &c1, 0); CKERR(r);
r = db->cursor(db, txnb, &c2, 0); CKERR(r);
verify_excl_ops_fail(env,db);
c1->c_close(c1);
c2->c_close(c2);
r = txna->abort(txna); CKERR(r);
r = txnb->abort(txnb); CKERR(r);
r = env->txn_begin(env, NULL, &txna, 0); CKERR(r);
r = env->txn_begin(env, NULL, &txnb, 0); CKERR(r);
DBT key,val;
......@@ -296,7 +243,7 @@ int test_main (int argc, char * const argv[]) {
dbt_init(&key, "b", 4);
dbt_init(&val, "b", 4);
r = db->put(db, txnb, &key, &val, 0); CKERR(r);
verify_excl_ops_fail(env,db);
verify_excl_ops_fail(env,"foo.db");
r = txna->abort(txna); CKERR(r);
r = txnb->abort(txnb); CKERR(r);
......@@ -306,7 +253,7 @@ int test_main (int argc, char * const argv[]) {
r = db->del(db, txna, &key, DB_DELETE_ANY); CKERR(r);
dbt_init(&key, "b", 4);
r = db->del(db, txnb, &key, DB_DELETE_ANY); CKERR(r);
verify_excl_ops_fail(env,db);
verify_excl_ops_fail(env,"foo.db");
r = txna->abort(txna); CKERR(r);
r = txnb->abort(txnb); CKERR(r);
......@@ -317,30 +264,17 @@ int test_main (int argc, char * const argv[]) {
r = db->update(db, txna, &key, &val, 0); CKERR(r);
dbt_init(&key, "b", 4);
r = db->update(db, txnb, &key, &val, 0); CKERR(r);
verify_excl_ops_fail(env,db);
verify_excl_ops_fail(env,"foo.db");
r = txna->abort(txna); CKERR(r);
r = txnb->abort(txnb); CKERR(r);
r = env->txn_begin(env, NULL, &txna, 0); CKERR(r);
r = db->update_broadcast(db, txna, &val, 0); CKERR(r);
verify_excl_ops_fail(env,db);
verify_excl_ops_fail(env,"foo.db");
r = txna->abort(txna); CKERR(r);
r = env->txn_begin(env, NULL, &txna, 0); CKERR(r);
r = db->update_broadcast(db, txna, &val, DB_IS_RESETTING_OP); CKERR(r);
verify_shared_ops_fail(env,db);
r = txna->abort(txna); CKERR(r);
r = env->txn_begin(env, NULL, &txna, 0); CKERR(r);
r = db->change_descriptor(db, txna, &val, 0); CKERR(r);
verify_shared_ops_fail(env,db);
r = txna->abort(txna); CKERR(r);
u_int32_t flags = 0;
r = env->txn_begin(env, NULL, &txna, 0); CKERR(r);
r = env->txn_begin(env, NULL, &txnb, 0); CKERR(r);
dbt_init(&key, "a", 4);
......@@ -357,7 +291,7 @@ int test_main (int argc, char * const argv[]) {
&key, &val,
1, &db, &in_key, &in_val, &flags);
CKERR(r);
verify_excl_ops_fail(env,db);
verify_excl_ops_fail(env,"foo.db");
r = txna->abort(txna); CKERR(r);
r = txnb->abort(txnb); CKERR(r);
......@@ -378,7 +312,7 @@ int test_main (int argc, char * const argv[]) {
&key, &val,
1, &db, &in_key, &flags);
CKERR(r);
verify_excl_ops_fail(env,db);
verify_excl_ops_fail(env,"foo.db");
r = txna->abort(txna); CKERR(r);
r = txnb->abort(txnb); CKERR(r);
......@@ -407,7 +341,7 @@ int test_main (int argc, char * const argv[]) {
2, in_keys,
1, &in_val);
CKERR(r);
verify_excl_ops_fail(env,db);
verify_excl_ops_fail(env,"foo.db");
r = txna->abort(txna); CKERR(r);
r = txnb->abort(txnb); CKERR(r);
......
......@@ -245,8 +245,6 @@ int toku_db_pre_acquire_table_lock(DB *db, DB_TXN *txn, BOOL just_lock);
int toku_grab_write_lock(DB *db, DBT *key, TOKUTXN tokutxn);
int toku_grab_read_lock_on_directory(DB *db, DB_TXN *txn);
#if defined(__cplusplus)
}
#endif
......
......@@ -458,7 +458,7 @@ static int toku_db_put(DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t f
static int toku_db_update(DB *db, DB_TXN *txn, const DBT *key, const DBT *update_function_extra, u_int32_t flags);
static int toku_db_update_broadcast(DB *db, DB_TXN *txn, const DBT *update_function_extra, u_int32_t flags);
static int toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags);
static int toku_db_cursor_internal(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags, int is_temporary_cursor, bool holds_ydb_lock);
static int toku_db_cursor_internal(DB *db, DB_TXN * txn, DBC **c, u_int32_t flags, int is_temporary_cursor);
/* lightweight cursor methods. */
static int toku_c_getf_first(DBC *c, u_int32_t flag, YDB_CALLBACK_FUNCTION f, void *extra);
......@@ -3322,13 +3322,13 @@ typedef struct {
static inline u_int32_t
get_prelocked_flags(u_int32_t flags) {
u_int32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE | DB_PRELOCKED_FILE_READ);
u_int32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE);
return lock_flags;
}
static inline u_int32_t
get_cursor_prelocked_flags(u_int32_t flags, DBC* dbc) {
u_int32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE | DB_PRELOCKED_FILE_READ);
u_int32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE);
//DB_READ_UNCOMMITTED and DB_READ_COMMITTED transactions 'own' all read locks for user-data dictionaries.
if (dbc_struct_i(dbc)->iso != TOKU_ISO_SERIALIZABLE) {
......@@ -3493,24 +3493,6 @@ get_point_write_lock(DB *db, DB_TXN *txn, const DBT *key) {
return r;
}
// assume ydb is locked
int
toku_grab_read_lock_on_directory (DB* db, DB_TXN * txn) {
// bad hack because some environment dictionaries do not have a dname
char *dname = db->i->dname;
if (!dname || (db->dbenv->i->directory->i->lt == NULL))
return 0;
//Left end of range == right end of range (point lock)
DBT key_in_directory = { .data = dname, .size = strlen(dname)+1 };
int r = get_range_lock(db->dbenv->i->directory, txn, &key_in_directory, &key_in_directory, LOCK_REQUEST_READ);
if (r == 0)
STATUS_VALUE(YDB_LAYER_DIRECTORY_READ_LOCKS)++;
else
STATUS_VALUE(YDB_LAYER_DIRECTORY_READ_LOCKS_FAIL)++;
return r;
}
//This is the user level callback function given to ydb layer functions like
//toku_c_getf_first
......@@ -4151,7 +4133,7 @@ toku_c_count(DBC *cursor, db_recno_t *count, u_int32_t flags) {
// lock_flags |= DB_PRELOCKED
//}
r = toku_db_cursor_internal(cursor->dbp, dbc_struct_i(cursor)->txn, &count_cursor, DBC_DISABLE_PREFETCHING, 0, true);
r = toku_db_cursor_internal(cursor->dbp, dbc_struct_i(cursor)->txn, &count_cursor, DBC_DISABLE_PREFETCHING, 0);
if (r != 0) goto finish;
r = toku_c_getf_set(count_cursor, lock_flags, &currentkey, ydb_getf_do_nothing, NULL);
......@@ -4180,7 +4162,7 @@ db_getf_set(DB *db, DB_TXN *txn, u_int32_t flags, DBT *key, YDB_CALLBACK_FUNCTIO
DBC *c;
uint32_t create_flags = flags & (DB_ISOLATION_FLAGS | DB_RMW);
flags &= ~DB_ISOLATION_FLAGS;
int r = toku_db_cursor_internal(db, txn, &c, create_flags | DBC_DISABLE_PREFETCHING, 1, true);
int r = toku_db_cursor_internal(db, txn, &c, create_flags | DBC_DISABLE_PREFETCHING, 1);
if (r==0) {
r = toku_c_getf_set(c, flags, key, f, extra);
int r2 = toku_c_close(c);
......@@ -4201,15 +4183,12 @@ toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) {
u_int32_t lock_flags = get_prelocked_flags(flags);
unchecked_flags &= ~lock_flags;
BOOL do_locking = (BOOL)(db->i->lt && !(lock_flags&DB_PRELOCKED_WRITE));
BOOL do_dir_locking = !(lock_flags&DB_PRELOCKED_FILE_READ);
int r = 0;
if (unchecked_flags!=0)
if (unchecked_flags!=0) {
r = EINVAL;
if (r == 0 && do_dir_locking) {
r = toku_grab_read_lock_on_directory(db, txn);
}
if (r == 0 && error_if_missing) {
//Check if the key exists in the db.
r = db_getf_set(db, txn, lock_flags|DB_SERIALIZABLE|DB_RMW, key, ydb_getf_do_nothing, NULL);
......@@ -4343,11 +4322,6 @@ env_del_multiple(
lock_flags[which_db] = get_prelocked_flags(flags_array[which_db]);
remaining_flags[which_db] = flags_array[which_db] & ~lock_flags[which_db];
//Do locking if necessary.
if (!(lock_flags[which_db] & DB_PRELOCKED_FILE_READ)) {
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) goto cleanup;
}
if (db == src_db) {
del_keys[which_db] = *src_key;
}
......@@ -4415,7 +4389,7 @@ locked_c_del(DBC * c, u_int32_t flags) {
static int locked_c_pre_acquire_range_lock(DBC *dbc, const DBT *key_left, const DBT *key_right);
static int
toku_db_cursor_internal(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int is_temporary_cursor, bool holds_ydb_lock) {
toku_db_cursor_internal(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int is_temporary_cursor) {
HANDLE_PANICKED_DB(db);
HANDLE_DB_ILLEGAL_WORKING_PARENT_TXN(db, txn);
DB_ENV* env = db->dbenv;
......@@ -4429,13 +4403,6 @@ toku_db_cursor_internal(DB * db, DB_TXN * txn, DBC ** c, u_int32_t flags, int is
}
int r = 0;
if (1) {
if (!holds_ydb_lock) toku_ydb_lock();
r = toku_grab_read_lock_on_directory(db, txn);
if (!holds_ydb_lock) toku_ydb_unlock();
if (r != 0)
return r;
}
struct __toku_dbc_external *XMALLOC(eresult); // so the internal stuff is stuck on the end
memset(eresult, 0, sizeof(*eresult));
......@@ -4522,7 +4489,7 @@ toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
if ((db->i->open_flags & DB_THREAD) && db_thread_need_flags(data))
return EINVAL;
u_int32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE | DB_PRELOCKED_FILE_READ);
u_int32_t lock_flags = flags & (DB_PRELOCKED | DB_PRELOCKED_WRITE);
flags &= ~lock_flags;
flags &= ~DB_ISOLATION_FLAGS;
// And DB_GET_BOTH is no longer supported. #2862.
......@@ -4530,7 +4497,7 @@ toku_db_get (DB * db, DB_TXN * txn, DBT * key, DBT * data, u_int32_t flags) {
DBC *dbc;
r = toku_db_cursor_internal(db, txn, &dbc, iso_flags | DBC_DISABLE_PREFETCHING, 1, true);
r = toku_db_cursor_internal(db, txn, &dbc, iso_flags | DBC_DISABLE_PREFETCHING, 1);
if (r!=0) return r;
u_int32_t c_get_flags = DB_SET;
r = toku_c_get(dbc, key, data, c_get_flags | lock_flags);
......@@ -4925,11 +4892,6 @@ toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags) {
u_int32_t lock_flags = get_prelocked_flags(flags);
flags &= ~lock_flags;
if (!(lock_flags & DB_PRELOCKED_FILE_READ)) {
r = toku_grab_read_lock_on_directory(db, txn);
}
if (r == 0)
r = db_put_check_size_constraints(db, key, val);
if (r == 0) {
//Do any checking required by the flags.
......@@ -4985,11 +4947,6 @@ toku_db_update(DB *db, DB_TXN *txn,
u_int32_t lock_flags = get_prelocked_flags(flags);
flags &= ~lock_flags;
if (!(lock_flags & DB_PRELOCKED_FILE_READ)) {
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) { goto cleanup; }
}
r = db_put_check_size_constraints(db, key, update_function_extra);
if (r != 0) { goto cleanup; }
......@@ -5039,11 +4996,6 @@ toku_db_update_broadcast(DB *db, DB_TXN *txn,
r = toku_db_pre_acquire_fileops_lock(db, txn);
if (r != 0) { goto cleanup; }
}
else if (!(lock_flags & DB_PRELOCKED_FILE_READ)) {
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) { goto cleanup; }
}
{
DBT null_key;
toku_init_dbt(&null_key);
......@@ -5159,12 +5111,6 @@ env_put_multiple(
lock_flags[which_db] = get_prelocked_flags(flags_array[which_db]);
remaining_flags[which_db] = flags_array[which_db] & ~lock_flags[which_db];
//Do locking if necessary.
if (!(lock_flags[which_db] & DB_PRELOCKED_FILE_READ)) {
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) goto cleanup;
}
//Generate the row
if (db == src_db) {
put_keys[which_db] = *src_key;
......@@ -5263,10 +5209,6 @@ env_update_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn,
lock_flags[which_db] = get_prelocked_flags(flags_array[which_db]);
remaining_flags[which_db] = flags_array[which_db] & ~lock_flags[which_db];
if (!(lock_flags[which_db] & DB_PRELOCKED_FILE_READ)) {
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) goto cleanup;
}
// keys[0..num_dbs-1] are the new keys
// keys[num_dbs..2*num_dbs-1] are the old keys
// vals[0..num_dbs-1] are the new vals
......@@ -5632,17 +5574,7 @@ toku_db_change_descriptor(DB *db, DB_TXN* txn, const DBT* descriptor, u_int32_t
r = EINVAL; // cannot have a parent if you are a resetting op
goto cleanup;
}
//
// If the DB is created for the purpose of being a hot index,
// then do not grab a write lock on the directory when setting the
// descriptor, because the hot index DB must not have a write
// lock grabbed in order to work
//
if (is_db_hot_index) {
r = toku_grab_read_lock_on_directory(db, txn);
if (r != 0) { goto cleanup; }
}
else {
if (!is_db_hot_index) {
r = toku_db_pre_acquire_fileops_lock(db, txn);
if (r != 0) { goto cleanup; }
}
......@@ -5876,7 +5808,7 @@ autotxn_db_cursor(DB *db, DB_TXN *txn, DBC **c, u_int32_t flags) {
return toku_ydb_do_error(db->dbenv, EINVAL,
"Cursors in a transaction environment must have transactions.\n");
}
return toku_db_cursor_internal(db, txn, c, flags, 0, false);
return toku_db_cursor_internal(db, txn, c, flags, 0);
}
// Create a cursor on a db.
......@@ -5952,13 +5884,6 @@ static int locked_db_pre_acquire_fileops_lock(DB *db, DB_TXN *txn) {
return r;
}
static int locked_db_pre_acquire_fileops_shared_lock(DB *db, DB_TXN *txn) {
toku_ydb_lock();
int r = toku_grab_read_lock_on_directory(db, txn);
toku_ydb_unlock();
return r;
}
// truncate a database
// effect: remove all of the rows from a database
static int
......@@ -6203,32 +6128,11 @@ toku_db_hot_optimize(DB *db,
HANDLE_PANICKED_DB(db);
int r = 0;
// #4356 Take directory read lock around hot optimize to prevent
// race condition of another thread deleting the dictionary during
// the hot optimize. Create a long-lived transaction to hold the
// lock, but the transaction does nothing else so the rollback log
// is tiny and the txnid does not appear in any dictionary.
int using_txns = db->dbenv->i->open_flags & DB_INIT_TXN;
DB_TXN *txn;
if (using_txns) {
toku_ydb_lock();
int rx = toku_txn_begin(db->dbenv, NULL, &txn, DB_TXN_NOSYNC, 1);
invariant_zero(rx);
r = toku_grab_read_lock_on_directory(db, txn);
toku_ydb_unlock();
}
// If we areunable to get a directory read lock, do nothing.
if (r == 0) {
r = toku_brt_hot_optimize(db->i->brt,
progress_callback,
progress_extra);
}
if (using_txns) {
int rx = locked_txn_commit(txn, 0);
invariant_zero(rx);
}
return r;
}
......@@ -6382,7 +6286,6 @@ toku_db_create(DB ** db, DB_ENV * env, u_int32_t flags) {
SDB(fd);
SDB(pre_acquire_table_lock);
SDB(pre_acquire_fileops_lock);
SDB(pre_acquire_fileops_shared_lock);
SDB(truncate);
SDB(get_max_row_size);
SDB(getf_set);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment