Commit 838e0e5b authored by Yoni Fogel's avatar Yoni Fogel

[t:2318] Merge #2318 back to main up to r17166

git-svn-id: file:///svn/toku/tokudb@17178 c7de825b-a66e-492c-adef-691d508d4ae1
parent ea77448b
......@@ -104,10 +104,6 @@ typedef enum {
#define DB_VERB_RECOVERY 4
#define DB_VERB_REPLICATION 8
#define DB_VERB_WAITSFOR 16
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 32
#define DB_DBT_DUPOK 64
#define DB_ARCH_ABS 1
#define DB_ARCH_LOG 4
#define DB_CREATE 1
......@@ -164,6 +160,11 @@ typedef enum {
#define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 64
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 32
#define DB_DBT_TEMPMEMORY 2
#define DB_TXN_WRITE_NOSYNC 524288
#define DB_TXN_NOWAIT 2048
#define DB_TXN_SYNC 4096
......@@ -196,14 +197,25 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */;
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */;
int (*set_multiple_callbacks) (DB_ENV *env,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */;
void* __toku_dummy0[21];
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*generate_row_for_put)(DB *dest_db, DB *src_db,
DBT *dest_key, DBT *dest_val,
const DBT *src_key, const DBT *src_val,
void *extra));;
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env,
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void* __toku_dummy0[20];
char __toku_dummy1[64];
void *api1_internal; /* 32-bit offset=212 size=4, 64=bit offset=360 size=8 */
void* __toku_dummy2[7];
......
......@@ -104,10 +104,6 @@ typedef enum {
#define DB_VERB_RECOVERY 2
#define DB_VERB_REPLICATION 4
#define DB_VERB_WAITSFOR 8
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 32
#define DB_DBT_DUPOK 64
#define DB_ARCH_ABS 1
#define DB_ARCH_LOG 4
#define DB_CREATE 1
......@@ -165,6 +161,11 @@ typedef enum {
#define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 64
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 32
#define DB_DBT_TEMPMEMORY 2
#define DB_LOG_AUTOREMOVE 65536
#define DB_TXN_WRITE_NOSYNC 268435456
#define DB_TXN_NOWAIT 4096
......@@ -198,14 +199,25 @@ struct __toku_db_env {
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */;
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */;
int (*set_multiple_callbacks) (DB_ENV *env,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */;
void* __toku_dummy0[21];
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*generate_row_for_put)(DB *dest_db, DB *src_db,
DBT *dest_key, DBT *dest_val,
const DBT *src_key, const DBT *src_val,
void *extra));;
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env,
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void* __toku_dummy0[20];
char __toku_dummy1[96];
void *api1_internal; /* 32-bit offset=244 size=4, 64=bit offset=392 size=8 */
void* __toku_dummy2[7];
......
......@@ -104,10 +104,6 @@ typedef enum {
#define DB_VERB_RECOVERY 2
#define DB_VERB_REPLICATION 8
#define DB_VERB_WAITSFOR 16
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 32
#define DB_DBT_DUPOK 64
#define DB_ARCH_ABS 1
#define DB_ARCH_LOG 4
#define DB_CREATE 1
......@@ -165,6 +161,11 @@ typedef enum {
#define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 64
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 32
#define DB_DBT_TEMPMEMORY 2
#define DB_LOG_AUTOREMOVE 262144
#define DB_TXN_WRITE_NOSYNC 1024
#define DB_TXN_NOWAIT 8192
......@@ -199,14 +200,25 @@ struct __toku_db_env {
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */;
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */;
int (*set_multiple_callbacks) (DB_ENV *env,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */;
void* __toku_dummy0[36];
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*generate_row_for_put)(DB *dest_db, DB *src_db,
DBT *dest_key, DBT *dest_val,
const DBT *src_key, const DBT *src_val,
void *extra));;
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env,
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void* __toku_dummy0[35];
char __toku_dummy1[128];
void *api1_internal; /* 32-bit offset=336 size=4, 64=bit offset=544 size=8 */
void* __toku_dummy2[7];
......
......@@ -104,10 +104,6 @@ typedef enum {
#define DB_VERB_RECOVERY 2
#define DB_VERB_REPLICATION 8
#define DB_VERB_WAITSFOR 16
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 64
#define DB_DBT_DUPOK 128
#define DB_ARCH_ABS 1
#define DB_ARCH_LOG 4
#define DB_CREATE 1
......@@ -165,6 +161,11 @@ typedef enum {
#define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 128
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 64
#define DB_DBT_TEMPMEMORY 2
#define DB_LOG_AUTOREMOVE 524288
#define DB_TXN_WRITE_NOSYNC 2048
#define DB_TXN_NOWAIT 16384
......@@ -198,15 +199,26 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */;
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */;
int (*set_multiple_callbacks) (DB_ENV *env,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */;
void* __toku_dummy0[36];
int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*generate_row_for_put)(DB *dest_db, DB *src_db,
DBT *dest_key, DBT *dest_val,
const DBT *src_key, const DBT *src_val,
void *extra));;
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env,
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void* __toku_dummy0[35];
char __toku_dummy1[128];
void *api1_internal; /* 32-bit offset=336 size=4, 64=bit offset=544 size=8 */
void* __toku_dummy2[8];
......
......@@ -104,10 +104,6 @@ typedef enum {
#define DB_VERB_RECOVERY 8
#define DB_VERB_REPLICATION 32
#define DB_VERB_WAITSFOR 64
#define DB_DBT_MALLOC 8
#define DB_DBT_REALLOC 64
#define DB_DBT_USERMEM 256
#define DB_DBT_DUPOK 2
#define DB_ARCH_ABS 1
#define DB_ARCH_LOG 4
#define DB_CREATE 1
......@@ -166,7 +162,12 @@ typedef enum {
#define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 2
#define DB_DBT_MALLOC 8
#define DB_DBT_MULTIPLE 16
#define DB_DBT_REALLOC 64
#define DB_DBT_USERMEM 256
#define DB_DBT_TEMPMEMORY 4
#define DB_LOG_AUTOREMOVE 524288
#define DB_TXN_WRITE_NOSYNC 4096
#define DB_TXN_NOWAIT 1024
......@@ -200,15 +201,26 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */;
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */;
int (*set_multiple_callbacks) (DB_ENV *env,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */;
void* __toku_dummy0[37];
int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*generate_row_for_put)(DB *dest_db, DB *src_db,
DBT *dest_key, DBT *dest_val,
const DBT *src_key, const DBT *src_val,
void *extra));;
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env,
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void* __toku_dummy0[36];
char __toku_dummy1[144];
void *api1_internal; /* 32-bit offset=356 size=4, 64=bit offset=568 size=8 */
void* __toku_dummy2[8];
......
......@@ -60,11 +60,6 @@ void print_defines (void) {
dodefine(DB_VERB_REPLICATION);
dodefine(DB_VERB_WAITSFOR);
dodefine(DB_DBT_MALLOC);
dodefine(DB_DBT_REALLOC);
dodefine(DB_DBT_USERMEM);
dodefine(DB_DBT_DUPOK);
dodefine(DB_ARCH_ABS);
dodefine(DB_ARCH_LOG);
......@@ -136,10 +131,19 @@ void print_defines (void) {
printf("#define DB_PRELOCKED 0x00800000\n"); // private tokudb
printf("#define DB_PRELOCKED_WRITE 0x00400000\n"); // private tokudb
dodefine(DB_DBT_APPMALLOC);
{
//dbt flags
uint32_t dbt_flags = 0;
dodefine_track(dbt_flags, DB_DBT_APPMALLOC);
dodefine_track(dbt_flags, DB_DBT_DUPOK);
dodefine_track(dbt_flags, DB_DBT_MALLOC);
#ifdef DB_DBT_MULTIPLE
dodefine(DB_DBT_MULTIPLE);
dodefine_track(dbt_flags, DB_DBT_MULTIPLE);
#endif
dodefine_track(dbt_flags, DB_DBT_REALLOC);
dodefine_track(dbt_flags, DB_DBT_USERMEM);
dodefine_from_track(dbt_flags, DB_DBT_TEMPMEMORY);
}
// flags for the env->set_flags function
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
......@@ -447,13 +451,24 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
"int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */",
"int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */",
"int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */",
"int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */",
"int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */",
"int (*set_multiple_callbacks) (DB_ENV *env,\n"
" int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),\n"
" int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),\n"
" int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),\n"
" int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */",
"int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,\n"
" const DBT *key, const DBT *val,\n"
" uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,\n"
" void *extra) /* Insert into multiple dbs */",
"int (*set_generate_row_callback_for_put) (DB_ENV *env, \n"
" int (*generate_row_for_put)(DB *dest_db, DB *src_db,\n"
" DBT *dest_key, DBT *dest_val,\n"
" const DBT *src_key, const DBT *src_val,\n"
" void *extra));",
"int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,\n"
" const DBT *key, const DBT *val,\n"
" uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,\n"
" void *extra) /* Insert into multiple dbs */",
"int (*set_generate_row_callback_for_del) (DB_ENV *env, \n"
" int (*generate_row_for_del)(DB *dest_db, DB *src_db,\n"
" DBT *dest_key,\n"
" const DBT *src_key, const DBT *src_val,\n"
" void *extra));",
NULL};
print_struct("db_env", 1, db_env_fields32, db_env_fields64, sizeof(db_env_fields32)/sizeof(db_env_fields32[0]), extra);
}
......
......@@ -104,10 +104,6 @@ typedef enum {
#define DB_VERB_RECOVERY 8
#define DB_VERB_REPLICATION 32
#define DB_VERB_WAITSFOR 64
#define DB_DBT_MALLOC 8
#define DB_DBT_REALLOC 64
#define DB_DBT_USERMEM 256
#define DB_DBT_DUPOK 2
#define DB_ARCH_ABS 1
#define DB_ARCH_LOG 4
#define DB_CREATE 1
......@@ -166,7 +162,12 @@ typedef enum {
#define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 2
#define DB_DBT_MALLOC 8
#define DB_DBT_MULTIPLE 16
#define DB_DBT_REALLOC 64
#define DB_DBT_USERMEM 256
#define DB_DBT_TEMPMEMORY 4
#define DB_LOG_AUTOREMOVE 524288
#define DB_TXN_WRITE_NOSYNC 4096
#define DB_TXN_NOWAIT 1024
......@@ -200,14 +201,25 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */;
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
void *app_private;
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */;
int (*set_multiple_callbacks) (DB_ENV *env,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */;
int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*generate_row_for_put)(DB *dest_db, DB *src_db,
DBT *dest_key, DBT *dest_val,
const DBT *src_key, const DBT *src_val,
void *extra));;
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env,
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void *api1_internal;
int (*close) (DB_ENV *, u_int32_t);
int (*dbremove) (DB_ENV *, DB_TXN *, const char *, const char *, u_int32_t);
......
......@@ -100,10 +100,15 @@ check-no-rolltmp: $(TARGET_TDB)
./$(TARGET_TDB) $(VERBVERBOSE) --env no-rolltmp.dir --singlex --nolog --check_small_rolltmp $(SUMMARIZE_CMD)
# Check to make sure that if we make a file that's bigger than 4GB that we can read the file back out and get all the rows.
ifeq ($(SKIP_4G),1)
check-4G:
@echo SKIPPED SLOW TEST $@
else
check-4G: $(TARGET_TDB) $(SCANSCAN_TDB)
( ./$(TARGET_TDB) $(VERBVERBOSE) --env 4g.dir --norandom --compressibility 1 --valsize 10000 1 && \
./$(SCANSCAN_TDB) --env 4g.dir --lwc --prelock --prelockflag --nox > 4g.out && \
fgrep "(1048576 rows)" 4g.out > /dev/null ) $(SUMMARIZE_CMD)
endif
clean:
rm -f $(TARGETS) 4g.out
......
......@@ -126,33 +126,20 @@ DB *dbs[MAX_DBS];
uint32_t put_flagss[MAX_DBS];
DB_TXN *parenttid=0;
DB_TXN *tid=0;
DBT dest_keys[MAX_DBS];
DBT dest_vals[MAX_DBS];
#if defined(TOKUDB)
static int
put_multiple_generate(DBT *row, uint32_t num_dbs_in, DB **dbs_in, DBT *keys, DBT *vals, void *extra) {
assert((int)num_dbs_in == num_dbs);
put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra) {
assert(src_db == NULL);
assert(dest_db != NULL);
assert(extra == &put_flags); //Verifying extra gets set right.
assert(row->size >= 4);
int32_t row_keysize = *(int32_t*)row->data;
assert(row_keysize == keysize);
assert((int)row->size >= 4+keysize);
int32_t row_valsize = row->size - 4 - keysize;
assert(row_valsize == valsize);
void *key = ((uint8_t*)row->data)+4;
void *val = ((uint8_t*)row->data)+4 + keysize;
for (which = 0; which < num_dbs; which++) {
assert(dbs_in[which] == dbs[which]);
keys[which].size = keysize;
keys[which].data = key;
vals[which].size = valsize;
vals[which].data = val;
}
return 0;
}
static int
put_multiple_clean(DBT *UU(row), uint32_t UU(num_dbs_in), DB **UU(dbs_in), DBT *UU(keys), DBT *UU(vals), void *extra) {
assert(extra == &put_flags); //Verifying extra gets set right.
dest_key->data = src_key->data;
dest_key->size = src_key->size;
dest_val->data = src_val->data;
dest_val->size = src_val->size;
return 0;
}
#endif
......@@ -200,9 +187,7 @@ static void benchmark_setup (void) {
}
#if defined(TOKUDB)
if (insert_multiple) {
r = dbenv->set_multiple_callbacks(dbenv,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
r = dbenv->set_generate_row_callback_for_put(dbenv, put_multiple_generate);
CKERR(r);
}
#endif
......@@ -369,19 +354,18 @@ static void fill_array (unsigned char *data, int size) {
static void insert (long long v) {
int r;
unsigned char data[keysize+valsize+4];
unsigned char *kc = data+4, *vc = data+keysize+4;
unsigned char kc[keysize];
unsigned char vc[valsize];;
DBT kt, vt;
fill_array(kc, keysize);
long_long_to_array(kc, keysize, v); // Fill in the array first, then write the long long in.
fill_array(vc, valsize);
long_long_to_array(vc, valsize, v);
*(uint32_t*)(data) = keysize;
fill_dbt(&kt, kc, keysize);
fill_dbt(&vt, vc, valsize);
if (insert_multiple) {
DBT row;
fill_dbt(&row, data, sizeof(data));
#if defined(TOKUDB)
r = dbenv->put_multiple(dbenv, tid, &row, num_dbs, dbs, put_flagss, &put_flags); //Extra used just to verify its passed right
r = dbenv->put_multiple(dbenv, NULL, tid, &kt, &vt, num_dbs, dbs, dest_keys, dest_vals, put_flagss, &put_flags); //Extra used just to verify its passed right
#else
r = EINVAL;
#endif
......@@ -390,7 +374,7 @@ static void insert (long long v) {
else {
for (which = 0; which < num_dbs; which++) {
DB *db = dbs[which];
r = db->put(db, tid, fill_dbt(&kt, kc, keysize), fill_dbt(&vt, vc, valsize), put_flags);
r = db->put(db, tid, &kt, &vt, put_flags);
CKERR(r);
}
}
......@@ -711,6 +695,8 @@ int main (int argc, const char *argv[]) {
}
#endif
if (insert_multiple) {
memset(dest_keys, 0, sizeof(dest_keys));
memset(dest_vals, 0, sizeof(dest_vals));
for (which = 0; which < num_dbs; which++) {
put_flagss[i] = put_flags;
}
......
......@@ -104,10 +104,6 @@ typedef enum {
#define DB_VERB_RECOVERY 8
#define DB_VERB_REPLICATION 32
#define DB_VERB_WAITSFOR 64
#define DB_DBT_MALLOC 8
#define DB_DBT_REALLOC 64
#define DB_DBT_USERMEM 256
#define DB_DBT_DUPOK 2
#define DB_ARCH_ABS 1
#define DB_ARCH_LOG 4
#define DB_CREATE 1
......@@ -166,7 +162,12 @@ typedef enum {
#define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000
#define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 2
#define DB_DBT_MALLOC 8
#define DB_DBT_MULTIPLE 16
#define DB_DBT_REALLOC 64
#define DB_DBT_USERMEM 256
#define DB_DBT_TEMPMEMORY 4
#define DB_LOG_AUTOREMOVE 524288
#define DB_TXN_WRITE_NOSYNC 4096
#define DB_TXN_NOWAIT 1024
......@@ -200,14 +201,25 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */;
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
void *app_private;
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */;
int (*set_multiple_callbacks) (DB_ENV *env,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */;
int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*generate_row_for_put)(DB *dest_db, DB *src_db,
DBT *dest_key, DBT *dest_val,
const DBT *src_key, const DBT *src_val,
void *extra));;
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env,
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void *api1_internal;
int (*close) (DB_ENV *, u_int32_t);
int (*dbremove) (DB_ENV *, DB_TXN *, const char *, const char *, u_int32_t);
......
......@@ -2639,7 +2639,7 @@ txn_note_doing_work(TOKUTXN txn) {
}
int
toku_brt_log_put_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row) {
toku_brt_log_put_multiple (TOKUTXN txn, BRT src_brt, BRT *brts, int num_brts, const DBT *key, const DBT *val) {
int r = 0;
assert(txn);
assert(num_brts > 0);
......@@ -2651,9 +2651,11 @@ toku_brt_log_put_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row) {
for (i = 0; i < num_brts; i++) {
fnums[i] = toku_cachefile_filenum(brts[i]->cf);
}
BYTESTRING rowbs = {.len=row->size, .data=row->data};
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
TXNID xid = toku_txn_get_txnid(txn);
r = toku_log_enq_insert_multiple(logger, (LSN*)0, 0, filenums, xid, rowbs);
FILENUM src_filenum = src_brt ? toku_cachefile_filenum(src_brt->cf) : FILENUM_NONE;
r = toku_log_enq_insert_multiple(logger, (LSN*)0, 0, src_filenum, filenums, xid, keybs, valbs);
}
return r;
}
......@@ -2705,7 +2707,7 @@ int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
}
int
toku_brt_log_del_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row) {
toku_brt_log_del_multiple (TOKUTXN txn, BRT src_brt, BRT *brts, int num_brts, const DBT *key, const DBT *val) {
int r = 0;
assert(txn);
assert(num_brts > 0);
......@@ -2717,9 +2719,11 @@ toku_brt_log_del_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row) {
for (i = 0; i < num_brts; i++) {
fnums[i] = toku_cachefile_filenum(brts[i]->cf);
}
BYTESTRING rowbs = {.len=row->size, .data=row->data};
BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
TXNID xid = toku_txn_get_txnid(txn);
r = toku_log_enq_delete_multiple(logger, (LSN*)0, 0, filenums, xid, rowbs);
FILENUM src_filenum = src_brt ? toku_cachefile_filenum(src_brt->cf) : FILENUM_NONE;
r = toku_log_enq_delete_multiple(logger, (LSN*)0, 0, src_filenum, filenums, xid, keybs, valbs);
}
return r;
}
......
......@@ -59,8 +59,8 @@ int toku_brt_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn);
// Effect: Insert a key and data pair into a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
// Returns 0 if successful
int toku_brt_maybe_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, int do_logging);
int toku_brt_log_put_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row);
int toku_brt_log_del_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row);
int toku_brt_log_put_multiple (TOKUTXN txn, BRT src_brt, BRT *brts, int num_brts, const DBT *key, const DBT *val);
int toku_brt_log_del_multiple (TOKUTXN txn, BRT src_brt, BRT *brts, int num_brts, const DBT *key, const DBT *val);
// Effect: Delete a key from a brt
// Returns 0 if successful
......
......@@ -47,6 +47,7 @@ typedef struct __toku_lsn { u_int64_t lsn; } LSN;
/* Make the FILEID a struct for the same reason. */
typedef struct __toku_fileid { u_int32_t fileid; } FILENUM;
#define FILENUM_NONE ((FILENUM){UINT32_MAX})
typedef struct {
u_int32_t num;
......@@ -106,10 +107,8 @@ typedef struct brt_msg BRT_MSG_S, *BRT_MSG;
typedef int (*brt_compare_func)(DB *, const DBT *, const DBT *);
typedef int (*generate_keys_vals_for_put_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra);
typedef int (*cleanup_keys_vals_for_put_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra);
typedef int (*generate_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra);
typedef int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra);
typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra);
typedef int (*generate_row_for_del_func)(DB *dest_db, DB *src_db, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra);
#define UU(x) x __attribute__((__unused__))
......
......@@ -386,6 +386,7 @@ is_filenum_reserved(CACHETABLE ct, FILENUM filenum) {
static void
reserve_filenum(CACHETABLE ct, FILENUM filenum) {
int r;
assert(filenum.fileid != FILENUM_NONE.fileid);
uint32_t index;
r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, NULL, &index, NULL);
......@@ -477,6 +478,7 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
CACHEFILE extant;
struct fileid fileid;
if (with_filenum) assert(filenum.fileid != FILENUM_NONE.fileid);
if (reserved) assert(with_filenum);
r = toku_os_get_unique_file_id(fd, &fileid);
if (r != 0) {
......@@ -530,6 +532,7 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
} else {
// find an unused fileid and use it
try_again:
assert(next_filenum_to_use.fileid != FILENUM_NONE.fileid);
for (extant = ct->cachefiles; extant; extant=extant->next) {
if (next_filenum_to_use.fileid==extant->filenum.fileid) {
next_filenum_to_use.fileid++;
......
......@@ -150,13 +150,17 @@ const struct logtype logtypes[] = {
{"TXNID", "xid", 0},
{"BYTESTRING", "key", 0},
NULLFIELD}},
{"enq_insert_multiple", 'm', FA{{"FILENUMS", "filenums", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "row", 0},
NULLFIELD}},
{"enq_delete_multiple", 'M', FA{{"FILENUMS", "filenums", 0},
{"enq_insert_multiple", 'm', FA{{"FILENUM", "src_filenum", 0},
{"FILENUMS", "dest_filenums", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "src_key", 0},
{"BYTESTRING", "src_val", 0},
NULLFIELD}},
{"enq_delete_multiple", 'M', FA{{"FILENUM", "src_filenum", 0},
{"FILENUMS", "dest_filenums", 0},
{"TXNID", "xid", 0},
{"BYTESTRING", "row", 0},
{"BYTESTRING", "src_key", 0},
{"BYTESTRING", "src_val", 0},
NULLFIELD}},
{"comment", 'T', FA{{"u_int64_t", "timestamp", 0},
{"BYTESTRING", "comment", 0},
......
......@@ -166,10 +166,8 @@ struct recover_env {
TOKULOGGER logger;
brt_compare_func bt_compare;
brt_compare_func dup_compare;
generate_keys_vals_for_put_func generate_keys_vals_for_put;
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put;
generate_keys_for_del_func generate_keys_for_del;
cleanup_keys_for_del_func cleanup_keys_for_del;
generate_row_for_put_func generate_row_for_put;
generate_row_for_del_func generate_row_for_del;
struct scan_state ss;
struct file_map fmap;
BOOL goforward;
......@@ -177,10 +175,8 @@ struct recover_env {
typedef struct recover_env *RECOVER_ENV;
static int recover_env_init (RECOVER_ENV renv, brt_compare_func bt_compare, brt_compare_func dup_compare,
generate_keys_vals_for_put_func generate_keys_vals_for_put,
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put,
generate_keys_for_del_func generate_keys_for_del,
cleanup_keys_for_del_func cleanup_keys_for_del,
generate_row_for_put_func generate_row_for_put,
generate_row_for_del_func generate_row_for_del,
size_t cachetable_size) {
int r;
......@@ -192,10 +188,8 @@ static int recover_env_init (RECOVER_ENV renv, brt_compare_func bt_compare, brt_
toku_logger_set_cachetable(renv->logger, renv->ct);
renv->bt_compare = bt_compare;
renv->dup_compare = dup_compare;
renv->generate_keys_vals_for_put = generate_keys_vals_for_put;
renv->cleanup_keys_vals_for_put = cleanup_keys_vals_for_put;
renv->generate_keys_for_del = generate_keys_for_del;
renv->cleanup_keys_for_del = cleanup_keys_for_del;
renv->generate_row_for_put = generate_row_for_put;
renv->generate_row_for_del = generate_row_for_del;
file_map_init(&renv->fmap);
renv->goforward = FALSE;
......@@ -497,54 +491,51 @@ static int toku_recover_enq_insert_multiple (struct logtype_enq_insert_multiple
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); //cannot happen after checkpoint begin
return 0;
}
#if 0
int (*generate_keys_vals_for_put) (DBT *row, uint32_t num_dbs, DB *dbs[num_dbs], DBT keys[num_dbs], DBT vals[num_dbs], void *extra),
int (*cleanup_keys_vals_for_put) (DBT *row, uint32_t num_dbs, DB *dbs[num_dbs], DBT keys[num_dbs], DBT vals[num_dbs], void *extra),
int (*generate_keys_for_del) (DBT *row, uint32_t num_dbs, DB *dbs[num_dbs], DBT keys[num_dbs], void *extra),
int (*cleanup_keys_for_del) (DBT *row, uint32_t num_dbs, DB *dbs[num_dbs], DBT keys[num_dbs], void *extra));
#endif
DB* dbs[l->filenums.num];
memset(dbs, 0, sizeof(dbs));
uint32_t file;
uint32_t num_dbs = 0;
for (file = 0; file < l->filenums.num; file++) {
DB *src_db = NULL;
{
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenums.filenums[file], &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything for this file.
continue;
r = file_map_find(&renv->fmap, l->src_filenum, &tuple);
if (l->src_filenum.fileid == FILENUM_NONE.fileid)
assert(r!=0);
else {
assert(r==0); //How do we continue if src_db is specified but missing?
src_db = tuple->brt->db;
}
dbs[num_dbs++] = tuple->brt->db;
}
if (num_dbs == 0) //All files are closed/deleted. We're done.
return 0;
DBT keydbts[num_dbs], valdbts[num_dbs], rowdbt;
memset(keydbts, 0, sizeof(keydbts));
memset(valdbts, 0, sizeof(valdbts));
//Generate all the DBTs
toku_fill_dbt(&rowdbt, l->row.data, l->row.len);
r = renv->generate_keys_vals_for_put(&rowdbt, num_dbs, dbs, keydbts, valdbts, NULL);
assert(r==0);
uint32_t which_db = 0;
for (file = 0; file < l->filenums.num; file++) {
uint32_t file;
DBT src_key, src_val, dest_key, dest_val;
toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len);
toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len);
toku_init_dbt(&dest_key);
toku_init_dbt(&dest_val);
dest_key.flags = DB_DBT_REALLOC;
dest_val.flags = DB_DBT_REALLOC;
for (file = 0; file < l->dest_filenums.num; file++) {
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenums.filenums[file], &tuple);
r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything for this file.
continue;
}
assert(tuple->brt->db == dbs[which_db]);
r = toku_brt_maybe_insert(tuple->brt, &keydbts[which_db], &valdbts[which_db], txn, TRUE, l->lsn, FALSE);
DB *db = tuple->brt->db;
r = renv->generate_row_for_put(db, src_db, &dest_key, &dest_val, &src_key, &src_val, NULL);
assert(r==0);
r = toku_brt_maybe_insert(tuple->brt, &dest_key, &dest_val, txn, TRUE, l->lsn, FALSE);
assert(r == 0);
which_db++;
//DB_DBT_TEMPMEMORY indicates the return values are stored in temporary memory that does
//not need to be freed. We need to continue using DB_DBT_REALLOC however.
if (dest_key.flags & DB_DBT_TEMPMEMORY) {
toku_init_dbt(&dest_key);
dest_key.flags = DB_DBT_REALLOC;
}
if (dest_val.flags & DB_DBT_TEMPMEMORY) {
toku_init_dbt(&dest_val);
dest_val.flags = DB_DBT_REALLOC;
}
}
assert(which_db == num_dbs);
//Do cleanup of all dbts.
r = renv->cleanup_keys_vals_for_put(&rowdbt, num_dbs, dbs, keydbts, valdbts, NULL);
assert(r==0);
if (dest_key.flags & DB_DBT_REALLOC && dest_key.data) toku_free(dest_key.data); //TODO: #2321 May need windows hack
if (dest_val.flags & DB_DBT_REALLOC && dest_val.data) toku_free(dest_val.data); //TODO: #2321 May need windows hack
return 0;
}
......@@ -564,49 +555,44 @@ static int toku_recover_enq_delete_multiple (struct logtype_enq_delete_multiple
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); //cannot happen after checkpoint begin
return 0;
}
DB* dbs[l->filenums.num];
memset(dbs, 0, sizeof(dbs));
uint32_t file;
uint32_t num_dbs = 0;
for (file = 0; file < l->filenums.num; file++) {
DB *src_db = NULL;
{
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenums.filenums[file], &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything for this file.
continue;
r = file_map_find(&renv->fmap, l->src_filenum, &tuple);
if (l->src_filenum.fileid == FILENUM_NONE.fileid)
assert(r!=0);
else {
assert(r==0); //How do we continue if src_db is specified but missing?
src_db = tuple->brt->db;
}
dbs[num_dbs++] = tuple->brt->db;
}
if (num_dbs == 0) //All files are closed/deleted. We're done.
return 0;
DBT keydbts[num_dbs], rowdbt;
memset(keydbts, 0, sizeof(keydbts));
//Generate all the DBTs
toku_fill_dbt(&rowdbt, l->row.data, l->row.len);
r = renv->generate_keys_for_del(&rowdbt, num_dbs, dbs, keydbts, NULL);
assert(r==0);
uint32_t file;
DBT src_key, src_val, dest_key;
toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len);
toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len);
toku_init_dbt(&dest_key);
dest_key.flags = DB_DBT_REALLOC;
uint32_t which_db = 0;
for (file = 0; file < l->filenums.num; file++) {
for (file = 0; file < l->dest_filenums.num; file++) {
struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenums.filenums[file], &tuple);
r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple);
if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything for this file.
continue;
}
assert(tuple->brt->db == dbs[which_db]);
r = toku_brt_maybe_delete(tuple->brt, &keydbts[which_db], txn, TRUE, l->lsn, FALSE);
DB *db = tuple->brt->db;
r = renv->generate_row_for_del(db, src_db, &dest_key, &src_key, &src_val, NULL);
assert(r==0);
r = toku_brt_maybe_delete(tuple->brt, &dest_key, txn, TRUE, l->lsn, FALSE);
assert(r == 0);
which_db++;
//DB_DBT_TEMPMEMORY indicates the return values are stored in temporary memory that does
//not need to be freed. We need to continue using DB_DBT_REALLOC however.
if (dest_key.flags & DB_DBT_TEMPMEMORY) {
toku_init_dbt(&dest_key);
dest_key.flags = DB_DBT_REALLOC;
}
}
assert(which_db == num_dbs);
//Do cleanup of all dbts.
r = renv->cleanup_keys_for_del(&rowdbt, num_dbs, dbs, keydbts, NULL);
assert(r==0);
if (dest_key.flags & DB_DBT_REALLOC && dest_key.data) toku_free(dest_key.data); //TODO: #2321 May need windows hack
return 0;
}
......@@ -1331,10 +1317,8 @@ int tokudb_recover_delete_rolltmp_files(const char *UU(data_dir), const char *lo
int tokudb_recover(const char *env_dir, const char *log_dir,
brt_compare_func bt_compare,
brt_compare_func dup_compare,
generate_keys_vals_for_put_func generate_keys_vals_for_put,
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put,
generate_keys_for_del_func generate_keys_for_del,
cleanup_keys_for_del_func cleanup_keys_for_del,
generate_row_for_put_func generate_row_for_put,
generate_row_for_del_func generate_row_for_del,
size_t cachetable_size) {
int r;
int lockfd = -1;
......@@ -1353,10 +1337,8 @@ int tokudb_recover(const char *env_dir, const char *log_dir,
if (tokudb_needs_recovery(log_dir, FALSE)) {
struct recover_env renv;
r = recover_env_init(&renv, bt_compare, dup_compare,
generate_keys_vals_for_put,
cleanup_keys_vals_for_put,
generate_keys_for_del,
cleanup_keys_for_del,
generate_row_for_put,
generate_row_for_del,
cachetable_size);
assert(r == 0);
......
......@@ -19,10 +19,8 @@
int tokudb_recover (const char *env_dir, const char *log_dir,
brt_compare_func bt_compare,
brt_compare_func dup_compare,
generate_keys_vals_for_put_func generate_keys_vals_for_put,
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put,
generate_keys_for_del_func generate_keys_for_del,
cleanup_keys_for_del_func cleanup_keys_for_del,
generate_row_for_put_func generate_row_for_put,
generate_row_for_del_func generate_row_for_del,
size_t cachetable_size);
// Effect: Check the tokudb logs to determine whether or not we need to run recovery.
......
......@@ -35,7 +35,7 @@ int recovery_main (int argc, const char *argv[]) {
return(1);
}
int r = tokudb_recover(data_dir, log_dir, NULL, NULL, NULL, NULL, NULL, NULL, 0);
int r = tokudb_recover(data_dir, log_dir, NULL, NULL, NULL, NULL, 0);
if (r!=0) {
fprintf(stderr, "Recovery failed\n");
return(1);
......
......@@ -40,7 +40,7 @@ run_test(void) {
r = close(devnul); assert(r==0);
// run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, 0);
assert(r == 0);
return 0;
......
......@@ -25,7 +25,7 @@ run_test(void) {
r = toku_logger_close(&logger); assert(r == 0);
// run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, 0);
assert(r == 0);
return 0;
}
......
......@@ -31,7 +31,7 @@ run_test(void) {
r = close(devnul);
assert(r==0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, 0);
assert(r == 0);
return 0;
}
......
......@@ -20,7 +20,7 @@ run_test(void) {
r = toku_logger_close(&logger); assert(r == 0);
// run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, 0);
assert(r == DB_RUNRECOVERY);
return 0;
......
......@@ -34,7 +34,7 @@ run_test(void) {
r = close(devnul); assert(r==0);
// run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, 0);
assert(r == 0);
return 0;
}
......
......@@ -34,7 +34,7 @@ run_test(void) {
r = close(devnul); assert(r==0);
// run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, 0);
assert(r == 0);
return 0;
}
......
......@@ -28,7 +28,7 @@ run_test(void) {
r = close(devnul); assert(r==0);
// run recovery
r = tokudb_recover("/junk", TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
r = tokudb_recover("/junk", TESTDIR, 0, 0, NULL, NULL, 0);
assert(r != 0);
return 0;
}
......
......@@ -20,7 +20,7 @@ run_test(void) {
r = close(devnul); assert(r==0);
// run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, 0);
assert(r != 0);
return 0;
}
......
......@@ -14,7 +14,7 @@ run_test(void) {
r = toku_os_mkdir(TESTDIR, S_IRWXU); assert(r == 0);
// run recovery
r = tokudb_recover(NULL, NULL, 0, 0, NULL, NULL, NULL, NULL, 0);
r = tokudb_recover(NULL, NULL, 0, 0, NULL, NULL, 0);
assert(r != 0);
return 0;
}
......
......@@ -28,6 +28,7 @@
toku_free;
toku_malloc;
toku_xmemdup;
toku_os_get_file_size;
toku_os_getpid;
toku_os_gettid;
......
......@@ -13,35 +13,37 @@ char names_single[MAX_DBS][sizeof("dbs_0xFFF")];
char names_multiple[MAX_DBS][sizeof("dbm_0xFFF")];
uint32_t num_dbs;
uint32_t flags[MAX_DBS];
uint32_t ids[MAX_DBS];
uint32_t kbuf[MAX_DBS][MAX_KEY/4];
uint32_t vbuf[MAX_DBS][MAX_VAL/4];
DBT dest_keys[MAX_DBS];
DBT dest_vals[MAX_DBS];
#define CKERRIFNOT0(r) do { if (num_dbs>0) { CKERR(r); } else { CKERR2(r, EINVAL); } } while (0)
#define CKERR2IFNOT0(r, rexpect) do { if (num_dbs>0) { CKERR2(r, rexpect); } else { CKERR2(r, EINVAL); } } while (0)
static int
put_multiple_generate(DBT *row, uint32_t num_dbs_in, DB **UU(dbs_in), DBT *keys, DBT *vals, void *extra) {
assert(num_dbs_in > 0);
assert(num_dbs_in == num_dbs);
assert(extra==&num_dbs); //Verifying extra gets set right.
assert(row->size == 4);
uint32_t which;
for (which = 0; which < num_dbs_in; which++) {
kbuf[which][0] = *(uint32_t*)row->data;
kbuf[which][1] = which;
vbuf[which][0] = which;
vbuf[which][1] = *(uint32_t*)row->data;
keys[which].data = kbuf[which];
keys[which].size = sizeof(kbuf[which]);
vals[which].data = vbuf[which];
vals[which].size = sizeof(vbuf[which]);
put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra) {
if (extra == NULL) {
assert(src_db == dbs_multiple[0]);
}
return 0;
}
else {
assert(src_db == NULL);
assert(extra==&num_dbs); //Verifying extra gets set right.
}
uint32_t which = *(uint32_t*)dest_db->app_private;
assert(which < MAX_DBS);
static int
put_multiple_clean(DBT *UU(row), uint32_t UU(num_dbs_in), DB **UU(dbs_in), DBT *UU(keys), DBT *UU(vals), void *extra) {
assert(extra==&num_dbs); //Verifying extra gets set right.
assert(src_key->size == 4);
assert(src_val->size == 4);
kbuf[which][0] = *(uint32_t*)src_key->data;
kbuf[which][1] = which;
vbuf[which][0] = which;
vbuf[which][1] = *(uint32_t*)src_val->data;
dest_key->data = kbuf[which];
dest_key->size = sizeof(kbuf[which]);
dest_val->data = vbuf[which];
dest_val->size = sizeof(vbuf[which]);
return 0;
}
......@@ -55,9 +57,7 @@ static void run_test (void) {
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
......@@ -69,11 +69,13 @@ static void run_test (void) {
CKERR(r);
DB *db;
for (which = 0; which < num_dbs; which++) {
ids[which] = which;
r = db_create(&dbs_multiple[which], env, 0);
CKERR(r);
db = dbs_multiple[which];
r = db->open(db, txn, names_multiple[which], NULL, DB_BTREE, DB_CREATE, 0666);
CKERR(r);
db->app_private = &ids[which];
r = db_create(&dbs_single[which], env, 0);
CKERR(r);
db = dbs_single[which];
......@@ -95,8 +97,10 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
DBT rowdbt={.data=&magic, .size=sizeof(magic)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs);
uint32_t magic2 = ~magic;
DBT keydbt = {.data=&magic, .size=sizeof(magic)};
DBT valdbt = {.data=&magic2, .size=sizeof(magic2)};
r = env->put_multiple(env, dbs_multiple[0], txn, &keydbt, &valdbt, num_dbs, dbs_multiple, dest_keys, dest_vals, flags, NULL);
CKERRIFNOT0(r);
for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
......@@ -116,8 +120,10 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
DBT rowdbt={.data=&magic, .size=sizeof(magic)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs);
uint32_t magic2 = ~magic;
DBT keydbt = {.data=&magic, .size=sizeof(magic)};
DBT valdbt = {.data=&magic2, .size=sizeof(magic2)};
r = env->put_multiple(env, NULL, txn, &keydbt, &valdbt, num_dbs, dbs_multiple, dest_keys, dest_vals, flags, &num_dbs);
CKERRIFNOT0(r);
for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
......@@ -137,8 +143,10 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
DBT rowdbt={.data=&magic, .size=sizeof(magic)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs);
uint32_t magic2 = ~magic;
DBT keydbt = {.data=&magic, .size=sizeof(magic)};
DBT valdbt = {.data=&magic2, .size=sizeof(magic2)};
r = env->put_multiple(env, NULL, txn, &keydbt, &valdbt, num_dbs, dbs_multiple, dest_keys, dest_vals, flags, &num_dbs);
CKERR2IFNOT0(r, DB_KEYEXIST);
for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
......@@ -161,8 +169,10 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txna, 0);
CKERR(r);
DBT rowdbt={.data=&magic, .size=sizeof(magic)};
r = env->put_multiple(env, txna, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs);
uint32_t magic2 = ~magic;
DBT keydbt = {.data=&magic, .size=sizeof(magic)};
DBT valdbt = {.data=&magic2, .size=sizeof(magic2)};
r = env->put_multiple(env, NULL, txna, &keydbt, &valdbt, num_dbs, dbs_multiple, dest_keys, dest_vals, flags, &num_dbs);
CKERRIFNOT0(r);
for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
......@@ -177,7 +187,7 @@ static void run_test (void) {
CKERR(r);
//Lock should fail
r = env->put_multiple(env, txnb, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs);
r = env->put_multiple(env, NULL, txnb, &keydbt, &valdbt, num_dbs, dbs_multiple, dest_keys, dest_vals, flags, &num_dbs);
CKERR2IFNOT0(r, DB_LOCK_NOTGRANTED);
for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
......@@ -189,7 +199,7 @@ static void run_test (void) {
r = txna->commit(txna, 0);
//Should succeed this time.
r = env->put_multiple(env, txnb, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs);
r = env->put_multiple(env, NULL, txnb, &keydbt, &valdbt, num_dbs, dbs_multiple, dest_keys, dest_vals, flags, &num_dbs);
CKERRIFNOT0(r);
for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
......@@ -259,6 +269,8 @@ int test_main (int argc, char *argv[]) {
for (which = 0; which < MAX_DBS; which++) {
sprintf(names_multiple[which], "dbm_0x%02X", which);
sprintf(names_single[which], "dbs_0x%02X", which);
dbt_init(&dest_keys[which], NULL, 0);
dbt_init(&dest_vals[which], NULL, 0);
}
for (num_dbs = 0; num_dbs < 4; num_dbs++) {
run_test();
......
......@@ -9,38 +9,50 @@ const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG
char *namea="a.db";
char *nameb="b.db";
enum {num_dbs = 2};
static DBT dest_keys[num_dbs];
static DBT dest_vals[num_dbs];
BOOL do_test=FALSE, do_recover=FALSE;
static int
put_multiple_generate(DBT *row, uint32_t num_dbs_in, DB **UU(dbs_in), DBT *keys, DBT *vals, void *extra) {
assert((int)num_dbs_in == num_dbs);
if (do_recover)
assert(extra==NULL);
else
assert(extra==&namea); //Verifying extra gets set right.
assert(row->size >= 4);
int32_t keysize = *(int32_t*)row->data;
assert((int)row->size >= 4+keysize);
int32_t valsize = row->size - 4 - keysize;
void *key = ((uint8_t*)row->data)+4;
void *val = ((uint8_t*)row->data)+4 + keysize;
int which;
for (which = 0; which < num_dbs; which++) {
keys[which].size = keysize;
keys[which].data = key;
vals[which].size = valsize;
vals[which].data = val;
}
return 0;
crash_on_upgrade(DB* db,
u_int32_t old_version, const DBT *old_descriptor, const DBT *old_key, const DBT *old_val,
u_int32_t new_version, const DBT *new_descriptor, const DBT *new_key, const DBT *new_val) {
db = db;
old_version = old_version;
old_descriptor = old_descriptor;
old_key = old_key;
old_val = old_val;
new_version = new_version;
new_descriptor = new_descriptor;
new_key = new_key;
new_val = new_val;
assert(FALSE);
}
static int
put_multiple_clean(DBT *UU(row), uint32_t UU(num_dbs_in), DB **UU(dbs_in), DBT *UU(keys), DBT *UU(vals), void *extra) {
if (do_recover)
assert(extra==NULL);
else
put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra) {
if (extra == NULL) {
if (src_db) {
assert(src_db->descriptor);
assert(src_db->descriptor->size == 4);
assert((*(uint32_t*)src_db->descriptor->data) == 0);
}
}
else {
assert(src_db == NULL);
assert(extra==&namea); //Verifying extra gets set right.
}
assert(dest_db->descriptor->size == 4);
uint32_t which = *(uint32_t*)dest_db->descriptor->data;
assert(which < num_dbs);
if (dest_key->data) toku_free(dest_key->data);
if (dest_val->data) toku_free(dest_val->data);
dest_key->data = toku_xmemdup (src_key->data, src_key->size);
dest_key->size = src_key->size;
dest_val->data = toku_xmemdup (src_val->data, src_val->size);
dest_val->size = src_val->size;
return 0;
}
......@@ -52,9 +64,7 @@ static void run_test (void) {
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
......@@ -64,10 +74,21 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &oldest_living_txn, 0); CKERR(r);
}
DBT descriptor;
uint32_t which;
for (which = 0; which < num_dbs; which++) {
dbt_init_realloc(&dest_keys[which]);
dbt_init_realloc(&dest_vals[which]);
}
dbt_init(&descriptor, &which, sizeof(which));
DB *dba;
DB *dbb;
r = db_create(&dba, env, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
which = 0;
r = dba->set_descriptor(dba, 1, &descriptor, crash_on_upgrade); CKERR(r);
which = 1;
r = dbb->set_descriptor(dbb, 1, &descriptor, crash_on_upgrade); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
......@@ -79,18 +100,13 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2};
DBT v={.data="a", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r);
r = env->put_multiple(env, dba, txn, &k, &v, num_dbs, dbs, dest_keys, dest_vals, flags, NULL);
CKERR(r);
r = txn->abort(txn); CKERR(r);
}
r = dbb->close(dbb, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT, 0666); CKERR(r);
dbs[1] = dbb;
// txn_begin; insert <a,b>;
......@@ -99,13 +115,8 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2};
DBT v={.data="b", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r);
r = env->put_multiple(env, NULL, txn, &k, &v, num_dbs, dbs, dest_keys, dest_vals, flags, &namea);
CKERR(r);
}
// checkpoint
......@@ -125,9 +136,7 @@ static void run_recover (void) {
// run recovery
r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
......
......@@ -9,38 +9,41 @@ const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG
char *namea="a.db";
char *nameb="b.db";
enum {num_dbs = 2};
static DBT dest_keys[num_dbs];
static DBT dest_vals[num_dbs];
BOOL do_test=FALSE, do_recover=FALSE;
static int
put_multiple_generate(DBT *row, uint32_t num_dbs_in, DB **UU(dbs_in), DBT *keys, DBT *vals, void *extra) {
assert(num_dbs_in > 0);
if (do_recover)
assert(extra==NULL);
else
assert(extra==&namea); //Verifying extra gets set right.
assert(row->size >= 4);
int32_t keysize = *(int32_t*)row->data;
assert((int)row->size >= 4+keysize);
int32_t valsize = row->size - 4 - keysize;
void *key = ((uint8_t*)row->data)+4;
void *val = ((uint8_t*)row->data)+4 + keysize;
uint32_t which;
for (which = 0; which < num_dbs_in; which++) {
keys[which].size = keysize;
keys[which].data = key;
vals[which].size = valsize;
vals[which].data = val;
}
return 0;
crash_on_upgrade(DB* db,
u_int32_t old_version, const DBT *old_descriptor, const DBT *old_key, const DBT *old_val,
u_int32_t new_version, const DBT *new_descriptor, const DBT *new_key, const DBT *new_val) {
db = db;
old_version = old_version;
old_descriptor = old_descriptor;
old_key = old_key;
old_val = old_val;
new_version = new_version;
new_descriptor = new_descriptor;
new_key = new_key;
new_val = new_val;
assert(FALSE);
}
static int
put_multiple_clean(DBT *UU(row), uint32_t UU(num_dbs_in), DB **UU(dbs_in), DBT *UU(keys), DBT *UU(vals), void *extra) {
if (do_recover)
assert(extra==NULL);
else
assert(extra==&namea); //Verifying extra gets set right.
put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra) {
assert(src_db == NULL);
assert(extra==&namea || extra==NULL); //Verifying extra gets set right.
assert(dest_db->descriptor->size == 4);
uint32_t which = *(uint32_t*)dest_db->descriptor->data;
assert(which < num_dbs);
if (dest_key->data) toku_free(dest_key->data);
if (dest_val->data) toku_free(dest_val->data);
dest_key->data = toku_xmemdup (src_key->data, src_key->size);
dest_key->size = src_key->size;
dest_val->data = toku_xmemdup (src_val->data, src_val->size);
dest_val->size = src_val->size;
return 0;
}
......@@ -52,9 +55,7 @@ static void run_test (void) {
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
......@@ -64,10 +65,21 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &oldest_living_txn, 0); CKERR(r);
}
DBT descriptor;
uint32_t which;
for (which = 0; which < num_dbs; which++) {
dbt_init_realloc(&dest_keys[which]);
dbt_init_realloc(&dest_vals[which]);
}
dbt_init(&descriptor, &which, sizeof(which));
DB *dba;
DB *dbb;
r = db_create(&dba, env, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
which = 0;
r = dba->set_descriptor(dba, 1, &descriptor, crash_on_upgrade); CKERR(r);
which = 1;
r = dbb->set_descriptor(dbb, 1, &descriptor, crash_on_upgrade); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
......@@ -79,18 +91,14 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2};
DBT v={.data="a", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r);
r = env->put_multiple(env, NULL, txn, &k, &v, num_dbs, dbs, dest_keys, dest_vals, flags, &namea);
CKERR(r);
r = txn->abort(txn); CKERR(r);
}
r = dbb->close(dbb, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT, 0666); CKERR(r);
dbs[1] = dbb;
// txn_begin; insert <a,b>;
......@@ -99,13 +107,9 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2};
DBT v={.data="b", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r);
r = env->put_multiple(env, NULL, txn, &k, &v, num_dbs, dbs, dest_keys, dest_vals, flags, &namea);
CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
}
{
......@@ -133,9 +137,7 @@ static void run_recover (void) {
// run recovery
r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
......
......@@ -9,38 +9,50 @@ const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG
char *namea="a.db";
char *nameb="b.db";
enum {num_dbs = 2};
static DBT dest_keys[num_dbs];
static DBT dest_vals[num_dbs];
BOOL do_test=FALSE, do_recover=FALSE;
static int
put_multiple_generate(DBT *row, uint32_t num_dbs_in, DB **UU(dbs_in), DBT *keys, DBT *vals, void *extra) {
assert(num_dbs_in > 0);
if (do_recover)
assert(extra==NULL);
else
assert(extra==&namea); //Verifying extra gets set right.
assert(row->size >= 4);
int32_t keysize = *(int32_t*)row->data;
assert((int)row->size >= 4+keysize);
int32_t valsize = row->size - 4 - keysize;
void *key = ((uint8_t*)row->data)+4;
void *val = ((uint8_t*)row->data)+4 + keysize;
uint32_t which;
for (which = 0; which < num_dbs_in; which++) {
keys[which].size = keysize;
keys[which].data = key;
vals[which].size = valsize;
vals[which].data = val;
}
return 0;
crash_on_upgrade(DB* db,
u_int32_t old_version, const DBT *old_descriptor, const DBT *old_key, const DBT *old_val,
u_int32_t new_version, const DBT *new_descriptor, const DBT *new_key, const DBT *new_val) {
db = db;
old_version = old_version;
old_descriptor = old_descriptor;
old_key = old_key;
old_val = old_val;
new_version = new_version;
new_descriptor = new_descriptor;
new_key = new_key;
new_val = new_val;
assert(FALSE);
}
static int
put_multiple_clean(DBT *UU(row), uint32_t UU(num_dbs_in), DB **UU(dbs_in), DBT *UU(keys), DBT *UU(vals), void *extra) {
if (do_recover)
assert(extra==NULL);
else
put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra) {
if (extra == NULL) {
if (src_db) {
assert(src_db->descriptor);
assert(src_db->descriptor->size == 4);
assert((*(uint32_t*)src_db->descriptor->data) == 0);
}
}
else {
assert(src_db == NULL);
assert(extra==&namea); //Verifying extra gets set right.
}
assert(dest_db->descriptor->size == 4);
uint32_t which = *(uint32_t*)dest_db->descriptor->data;
assert(which < num_dbs);
if (dest_key->data) toku_free(dest_key->data);
if (dest_val->data) toku_free(dest_val->data);
dest_key->data = toku_xmemdup (src_key->data, src_key->size);
dest_key->size = src_key->size;
dest_val->data = toku_xmemdup (src_val->data, src_val->size);
dest_val->size = src_val->size;
return 0;
}
......@@ -52,9 +64,7 @@ static void run_test (void) {
DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
......@@ -64,10 +74,21 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &oldest_living_txn, 0); CKERR(r);
}
DBT descriptor;
uint32_t which;
for (which = 0; which < num_dbs; which++) {
dbt_init_realloc(&dest_keys[which]);
dbt_init_realloc(&dest_vals[which]);
}
dbt_init(&descriptor, &which, sizeof(which));
DB *dba;
DB *dbb;
r = db_create(&dba, env, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
which = 0;
r = dba->set_descriptor(dba, 1, &descriptor, crash_on_upgrade); CKERR(r);
which = 1;
r = dbb->set_descriptor(dbb, 1, &descriptor, crash_on_upgrade); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
......@@ -79,18 +100,14 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2};
DBT v={.data="a", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r);
r = env->put_multiple(env, dba, txn, &k, &v, num_dbs, dbs, dest_keys, dest_vals, flags, NULL);
CKERR(r);
r = txn->abort(txn); CKERR(r);
}
r = dbb->close(dbb, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT, 0666); CKERR(r);
dbs[1] = dbb;
// txn_begin; insert <a,b>;
......@@ -99,20 +116,16 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2};
DBT v={.data="b", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r);
r = env->put_multiple(env, NULL, txn, &k, &v, num_dbs, dbs, dest_keys, dest_vals, flags, &namea);
CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
}
{
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = dba->close(dba, 0); CKERR(r);
r = env->dbremove(env, txn, namea, NULL, 0); CKERR(r);
r = dba->close(dbb, 0); CKERR(r);
r = env->dbremove(env, txn, nameb, NULL, 0); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
}
......@@ -131,9 +144,7 @@ static void run_recover (void) {
// run recovery
r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env,
put_multiple_generate, put_multiple_clean,
NULL, NULL);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
......@@ -141,13 +152,13 @@ static void run_recover (void) {
{
DB *db;
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, namea, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR2(r, ENOENT);
r = db->open(db, NULL, nameb, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR2(r, ENOENT);
r = db->close(db, 0); CKERR(r);
}
{
DB *db;
r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, nameb, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR(r);
r = db->open(db, NULL, namea, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR(r);
DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
......
......@@ -55,10 +55,8 @@ struct __toku_db_env_internal {
char *data_dir;
int (*bt_compare) (DB *, const DBT *, const DBT *);
int (*dup_compare) (DB *, const DBT *, const DBT *);
generate_keys_vals_for_put_func generate_keys_vals_for_put;
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put;
generate_keys_for_del_func generate_keys_for_del;
cleanup_keys_for_del_func cleanup_keys_for_del;
generate_row_for_put_func generate_row_for_put;
generate_row_for_del_func generate_row_for_del;
//void (*noticecall)(DB_ENV *, db_notices);
unsigned long cachetable_size;
CACHETABLE cachetable;
......
......@@ -338,8 +338,7 @@ ydb_do_recovery (DB_ENV *env) {
}
toku_ydb_unlock();
int r = tokudb_recover(envdir, logdir, env->i->bt_compare, env->i->dup_compare,
env->i->generate_keys_vals_for_put, env->i->cleanup_keys_vals_for_put,
env->i->generate_keys_for_del, env->i->cleanup_keys_for_del,
env->i->generate_row_for_put, env->i->generate_row_for_del,
env->i->cachetable_size);
toku_ydb_lock();
toku_free(logdir);
......@@ -1228,54 +1227,58 @@ locked_env_set_default_bt_compare(DB_ENV * env, int (*bt_compare) (DB *, const D
}
static int
env_set_multiple_callbacks(DB_ENV *env,
generate_keys_vals_for_put_func generate_keys_vals_for_put,
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put,
generate_keys_for_del_func generate_keys_for_del,
cleanup_keys_for_del_func cleanup_keys_for_del) {
env_set_generate_row_callback_for_put(DB_ENV *env, generate_row_for_put_func generate_row_for_put) {
HANDLE_PANICKED_ENV(env);
int r = 0;
if (env_opened(env)) r = EINVAL;
else {
env->i->generate_keys_vals_for_put = generate_keys_vals_for_put;
env->i->cleanup_keys_vals_for_put = cleanup_keys_vals_for_put;
env->i->generate_keys_for_del = generate_keys_for_del;
env->i->cleanup_keys_for_del = cleanup_keys_for_del;
env->i->generate_row_for_put = generate_row_for_put;
}
return r;
}
static int
locked_env_set_multiple_callbacks(DB_ENV *env,
generate_keys_vals_for_put_func generate_keys_vals_for_put,
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put,
generate_keys_for_del_func generate_keys_for_del,
cleanup_keys_for_del_func cleanup_keys_for_del) {
env_set_generate_row_callback_for_del(DB_ENV *env, generate_row_for_del_func generate_row_for_del) {
HANDLE_PANICKED_ENV(env);
int r = 0;
if (env_opened(env)) r = EINVAL;
else {
env->i->generate_row_for_del = generate_row_for_del;
}
return r;
}
static int
locked_env_set_generate_row_callback_for_put(DB_ENV *env, generate_row_for_put_func generate_row_for_put) {
toku_ydb_lock();
int r = env_set_multiple_callbacks(env,
generate_keys_vals_for_put,
cleanup_keys_vals_for_put,
generate_keys_for_del,
cleanup_keys_for_del);
int r = env_set_generate_row_callback_for_put(env, generate_row_for_put);
toku_ydb_unlock();
return r;
}
static int env_put_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra);
static int env_del_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra);
static int
locked_env_set_generate_row_callback_for_del(DB_ENV *env, generate_row_for_del_func generate_row_for_del) {
toku_ydb_lock();
int r = env_set_generate_row_callback_for_del(env, generate_row_for_del);
toku_ydb_unlock();
return r;
}
static int env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT *val, uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array, void *extra);
static int env_del_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT *val, uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array, void *extra);
static int
locked_env_put_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) {
locked_env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT *val, uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array, void *extra) {
toku_ydb_lock();
int r = env_put_multiple(env, txn, row, num_dbs, dbs, flags, extra);
int r = env_put_multiple(env, src_db, txn, key, val, num_dbs, db_array, keys, vals, flags_array, extra);
toku_ydb_unlock();
return r;
}
static int
locked_env_del_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) {
locked_env_del_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT *val, uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array, void *extra) {
toku_ydb_lock();
int r = env_del_multiple(env, txn, row, num_dbs, dbs, flags, extra);
int r = env_del_multiple(env, src_db, txn, key, val, num_dbs, db_array, keys, flags_array, extra);
toku_ydb_unlock();
return r;
}
......@@ -1472,7 +1475,8 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) {
SENV(dbrename);
SENV(set_default_bt_compare);
SENV(set_default_dup_compare);
SENV(set_multiple_callbacks);
SENV(set_generate_row_callback_for_put);
SENV(set_generate_row_callback_for_del);
SENV(put_multiple);
SENV(del_multiple);
SENV(checkpointing_set_period);
......@@ -3639,10 +3643,8 @@ toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) {
}
static int
env_del_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) {
env_del_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT *val, uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array, void *extra) {
int r;
int generated_dbts = 0;
DBT keydbts[num_dbs];
uint32_t lock_flags[num_dbs];
uint32_t remaining_flags[num_dbs];
BRT brts[num_dbs];
......@@ -3650,22 +3652,19 @@ env_del_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs,
r = EINVAL;
goto cleanup;
}
if (!env->i->generate_keys_for_del || !env->i->cleanup_keys_for_del) {
if (!env->i->generate_row_for_del) {
r = EINVAL;
goto cleanup;
}
memset(keydbts, 0, sizeof(keydbts));
//Generate all the DBTs
r = env->i->generate_keys_for_del(row, num_dbs, dbs, keydbts, extra);
if (r!=0) goto cleanup;
generated_dbts = 1;
uint32_t which_db;
for (which_db = 0; which_db < num_dbs; which_db++) {
DB *db = dbs[which_db];
lock_flags[which_db] = get_prelocked_flags(flags[which_db], txn, db);
remaining_flags[which_db] = flags[which_db] & ~lock_flags[which_db];
DB *db = db_array[which_db];
//Generate the row
r = env->i->generate_row_for_del(db, src_db, &keys[which_db], key, val, extra);
if (r!=0) goto cleanup;
lock_flags[which_db] = get_prelocked_flags(flags_array[which_db], txn, db);
remaining_flags[which_db] = flags_array[which_db] & ~lock_flags[which_db];
if (remaining_flags[which_db] & ~DB_DELETE_ANY) {
r = EINVAL;
......@@ -3674,40 +3673,35 @@ env_del_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs,
BOOL error_if_missing = (BOOL)(!(remaining_flags[which_db]&DB_DELETE_ANY));
if (error_if_missing) {
//Check if the key exists in the db.
r = db_getf_set(db, txn, lock_flags[which_db], &keydbts[which_db], ydb_getf_do_nothing, NULL);
r = db_getf_set(db, txn, lock_flags[which_db], &keys[which_db], ydb_getf_do_nothing, NULL);
if (r!=0) goto cleanup;
}
//Check overwrite constraints
if (r!=0) goto cleanup;
//Do locking if necessary.
if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) {
//Needs locking
RANGE_LOCK_REQUEST_S request;
//Left end of range == right end of range (point lock)
write_lock_request_init(&request, txn, db,
&keydbts[which_db], toku_lt_neg_infinity,
&keydbts[which_db], toku_lt_infinity);
&keys[which_db], toku_lt_neg_infinity,
&keys[which_db], toku_lt_infinity);
r = grab_range_lock(&request);
if (r!=0) goto cleanup;
}
brts[which_db] = db->i->brt;
}
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
r = toku_brt_log_del_multiple(ttxn, brts, num_dbs, row);
BRT src_brt = src_db ? src_db->i->brt : NULL;
r = toku_brt_log_del_multiple(ttxn, src_brt, brts, num_dbs, key, val);
if (r!=0) goto cleanup;
for (which_db = 0; which_db < num_dbs; which_db++) {
DB *db = dbs[which_db];
num_deletes++; // accountability
r = toku_brt_maybe_delete(db->i->brt, &keydbts[which_db], ttxn, FALSE, ZERO_LSN, FALSE);
DB *db = db_array[which_db];
num_deletes++;
r = toku_brt_maybe_delete(db->i->brt, &keys[which_db], ttxn, FALSE, ZERO_LSN, FALSE);
if (r!=0) goto cleanup;
}
cleanup:
if (generated_dbts) {
int r2 = env->i->cleanup_keys_for_del(row, num_dbs, dbs, keydbts, extra);
if (r==0) r = r2;
}
return r;
}
......@@ -4294,10 +4288,8 @@ toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags) {
}
static int
env_put_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) {
env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT *val, uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array, void *extra) {
int r;
int generated_dbts = 0;
DBT keydbts[num_dbs], valdbts[num_dbs];
uint32_t lock_flags[num_dbs];
uint32_t remaining_flags[num_dbs];
BRT brts[num_dbs];
......@@ -4305,26 +4297,22 @@ env_put_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs,
r = EINVAL;
goto cleanup;
}
if (!env->i->generate_keys_vals_for_put || !env->i->cleanup_keys_vals_for_put) {
if (!env->i->generate_row_for_put) {
r = EINVAL;
goto cleanup;
}
memset(keydbts, 0, sizeof(keydbts));
memset(valdbts, 0, sizeof(valdbts));
//Generate all the DBTs
r = env->i->generate_keys_vals_for_put(row, num_dbs, dbs, keydbts, valdbts, extra);
if (r!=0) goto cleanup;
generated_dbts = 1;
uint32_t which_db;
for (which_db = 0; which_db < num_dbs; which_db++) {
DB *db = dbs[which_db];
lock_flags[which_db] = get_prelocked_flags(flags[which_db], txn, db);
remaining_flags[which_db] = flags[which_db] & ~lock_flags[which_db];
DB *db = db_array[which_db];
//Generate the row
r = env->i->generate_row_for_put(db, src_db, &keys[which_db], &vals[which_db], key, val, extra);
if (r!=0) goto cleanup;
lock_flags[which_db] = get_prelocked_flags(flags_array[which_db], txn, db);
remaining_flags[which_db] = flags_array[which_db] & ~lock_flags[which_db];
//Check overwrite constraints
r = db_put_check_overwrite_constraint(db, txn,
&keydbts[which_db], &valdbts[which_db],
&keys[which_db], &vals[which_db],
lock_flags[which_db], remaining_flags[which_db]);
if (r!=0) goto cleanup;
//Do locking if necessary.
......@@ -4333,28 +4321,25 @@ env_put_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs,
RANGE_LOCK_REQUEST_S request;
//Left end of range == right end of range (point lock)
write_lock_request_init(&request, txn, db,
&keydbts[which_db], &valdbts[which_db],
&keydbts[which_db], &valdbts[which_db]);
&keys[which_db], &vals[which_db],
&keys[which_db], &vals[which_db]);
r = grab_range_lock(&request);
if (r!=0) goto cleanup;
}
brts[which_db] = db->i->brt;
}
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
r = toku_brt_log_put_multiple(ttxn, brts, num_dbs, row);
BRT src_brt = src_db ? src_db->i->brt : NULL;
r = toku_brt_log_put_multiple(ttxn, src_brt, brts, num_dbs, key, val);
if (r!=0) goto cleanup;
for (which_db = 0; which_db < num_dbs; which_db++) {
DB *db = dbs[which_db];
DB *db = db_array[which_db];
num_inserts++;
r = toku_brt_maybe_insert(db->i->brt, &keydbts[which_db], &valdbts[which_db], ttxn, FALSE, ZERO_LSN, FALSE);
r = toku_brt_maybe_insert(db->i->brt, &keys[which_db], &vals[which_db], ttxn, FALSE, ZERO_LSN, FALSE);
if (r!=0) goto cleanup;
}
cleanup:
if (generated_dbts) {
int r2 = env->i->cleanup_keys_vals_for_put(row, num_dbs, dbs, keydbts, valdbts, extra);
if (r==0) r = r2;
}
return r;
}
......
......@@ -94,7 +94,7 @@ void *toku_memdup (const void *v, size_t len);
char *toku_strdup (const char *s) __attribute__((__visibility__("default")));
/* Copy memory. Analogous to strdup() Crashes instead of returning NULL */
void *toku_xmemdup (const void *v, size_t len);
void *toku_xmemdup (const void *v, size_t len) __attribute__((__visibility__("default")));
/* Toku-version of strdup. Use this so that it calls toku_xmalloc() Crashes instead of returning NULL */
char *toku_xstrdup (const char *s) __attribute__((__visibility__("default")));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment