Commit 91944fe9 authored by Yoni Fogel's avatar Yoni Fogel

[t:2318] Merge #2318 back to main up to r17166

git-svn-id: file:///svn/toku/tokudb@17178 c7de825b-a66e-492c-adef-691d508d4ae1
parent 18ff965a
...@@ -104,10 +104,6 @@ typedef enum { ...@@ -104,10 +104,6 @@ typedef enum {
#define DB_VERB_RECOVERY 4 #define DB_VERB_RECOVERY 4
#define DB_VERB_REPLICATION 8 #define DB_VERB_REPLICATION 8
#define DB_VERB_WAITSFOR 16 #define DB_VERB_WAITSFOR 16
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 32
#define DB_DBT_DUPOK 64
#define DB_ARCH_ABS 1 #define DB_ARCH_ABS 1
#define DB_ARCH_LOG 4 #define DB_ARCH_LOG 4
#define DB_CREATE 1 #define DB_CREATE 1
...@@ -164,6 +160,11 @@ typedef enum { ...@@ -164,6 +160,11 @@ typedef enum {
#define DB_PRELOCKED 0x00800000 #define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000 #define DB_PRELOCKED_WRITE 0x00400000
#define DB_DBT_APPMALLOC 1 #define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 64
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 32
#define DB_DBT_TEMPMEMORY 2
#define DB_TXN_WRITE_NOSYNC 524288 #define DB_TXN_WRITE_NOSYNC 524288
#define DB_TXN_NOWAIT 2048 #define DB_TXN_NOWAIT 2048
#define DB_TXN_SYNC 4096 #define DB_TXN_SYNC 4096
...@@ -196,14 +197,25 @@ struct __toku_db_env { ...@@ -196,14 +197,25 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */; int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */; int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */; int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */; int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */; const DBT *key, const DBT *val,
int (*set_multiple_callbacks) (DB_ENV *env, uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra), void *extra) /* Insert into multiple dbs */;
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra), int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra), int (*generate_row_for_put)(DB *dest_db, DB *src_db,
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */; DBT *dest_key, DBT *dest_val,
void* __toku_dummy0[21]; const DBT *src_key, const DBT *src_val,
void *extra));;
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env,
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void* __toku_dummy0[20];
char __toku_dummy1[64]; char __toku_dummy1[64];
void *api1_internal; /* 32-bit offset=212 size=4, 64=bit offset=360 size=8 */ void *api1_internal; /* 32-bit offset=212 size=4, 64=bit offset=360 size=8 */
void* __toku_dummy2[7]; void* __toku_dummy2[7];
......
...@@ -104,10 +104,6 @@ typedef enum { ...@@ -104,10 +104,6 @@ typedef enum {
#define DB_VERB_RECOVERY 2 #define DB_VERB_RECOVERY 2
#define DB_VERB_REPLICATION 4 #define DB_VERB_REPLICATION 4
#define DB_VERB_WAITSFOR 8 #define DB_VERB_WAITSFOR 8
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 32
#define DB_DBT_DUPOK 64
#define DB_ARCH_ABS 1 #define DB_ARCH_ABS 1
#define DB_ARCH_LOG 4 #define DB_ARCH_LOG 4
#define DB_CREATE 1 #define DB_CREATE 1
...@@ -165,6 +161,11 @@ typedef enum { ...@@ -165,6 +161,11 @@ typedef enum {
#define DB_PRELOCKED 0x00800000 #define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000 #define DB_PRELOCKED_WRITE 0x00400000
#define DB_DBT_APPMALLOC 1 #define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 64
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 32
#define DB_DBT_TEMPMEMORY 2
#define DB_LOG_AUTOREMOVE 65536 #define DB_LOG_AUTOREMOVE 65536
#define DB_TXN_WRITE_NOSYNC 268435456 #define DB_TXN_WRITE_NOSYNC 268435456
#define DB_TXN_NOWAIT 4096 #define DB_TXN_NOWAIT 4096
...@@ -198,14 +199,25 @@ struct __toku_db_env { ...@@ -198,14 +199,25 @@ struct __toku_db_env {
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */; int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */ void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */; int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */; int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */; const DBT *key, const DBT *val,
int (*set_multiple_callbacks) (DB_ENV *env, uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra), void *extra) /* Insert into multiple dbs */;
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra), int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra), int (*generate_row_for_put)(DB *dest_db, DB *src_db,
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */; DBT *dest_key, DBT *dest_val,
void* __toku_dummy0[21]; const DBT *src_key, const DBT *src_val,
void *extra));;
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env,
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void* __toku_dummy0[20];
char __toku_dummy1[96]; char __toku_dummy1[96];
void *api1_internal; /* 32-bit offset=244 size=4, 64=bit offset=392 size=8 */ void *api1_internal; /* 32-bit offset=244 size=4, 64=bit offset=392 size=8 */
void* __toku_dummy2[7]; void* __toku_dummy2[7];
......
...@@ -104,10 +104,6 @@ typedef enum { ...@@ -104,10 +104,6 @@ typedef enum {
#define DB_VERB_RECOVERY 2 #define DB_VERB_RECOVERY 2
#define DB_VERB_REPLICATION 8 #define DB_VERB_REPLICATION 8
#define DB_VERB_WAITSFOR 16 #define DB_VERB_WAITSFOR 16
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 32
#define DB_DBT_DUPOK 64
#define DB_ARCH_ABS 1 #define DB_ARCH_ABS 1
#define DB_ARCH_LOG 4 #define DB_ARCH_LOG 4
#define DB_CREATE 1 #define DB_CREATE 1
...@@ -165,6 +161,11 @@ typedef enum { ...@@ -165,6 +161,11 @@ typedef enum {
#define DB_PRELOCKED 0x00800000 #define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000 #define DB_PRELOCKED_WRITE 0x00400000
#define DB_DBT_APPMALLOC 1 #define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 64
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 32
#define DB_DBT_TEMPMEMORY 2
#define DB_LOG_AUTOREMOVE 262144 #define DB_LOG_AUTOREMOVE 262144
#define DB_TXN_WRITE_NOSYNC 1024 #define DB_TXN_WRITE_NOSYNC 1024
#define DB_TXN_NOWAIT 8192 #define DB_TXN_NOWAIT 8192
...@@ -199,14 +200,25 @@ struct __toku_db_env { ...@@ -199,14 +200,25 @@ struct __toku_db_env {
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */; int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */ void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */; int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */; int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */; const DBT *key, const DBT *val,
int (*set_multiple_callbacks) (DB_ENV *env, uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra), void *extra) /* Insert into multiple dbs */;
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra), int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra), int (*generate_row_for_put)(DB *dest_db, DB *src_db,
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */; DBT *dest_key, DBT *dest_val,
void* __toku_dummy0[36]; const DBT *src_key, const DBT *src_val,
void *extra));;
int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env,
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void* __toku_dummy0[35];
char __toku_dummy1[128]; char __toku_dummy1[128];
void *api1_internal; /* 32-bit offset=336 size=4, 64=bit offset=544 size=8 */ void *api1_internal; /* 32-bit offset=336 size=4, 64=bit offset=544 size=8 */
void* __toku_dummy2[7]; void* __toku_dummy2[7];
......
...@@ -104,10 +104,6 @@ typedef enum { ...@@ -104,10 +104,6 @@ typedef enum {
#define DB_VERB_RECOVERY 2 #define DB_VERB_RECOVERY 2
#define DB_VERB_REPLICATION 8 #define DB_VERB_REPLICATION 8
#define DB_VERB_WAITSFOR 16 #define DB_VERB_WAITSFOR 16
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 64
#define DB_DBT_DUPOK 128
#define DB_ARCH_ABS 1 #define DB_ARCH_ABS 1
#define DB_ARCH_LOG 4 #define DB_ARCH_LOG 4
#define DB_CREATE 1 #define DB_CREATE 1
...@@ -165,6 +161,11 @@ typedef enum { ...@@ -165,6 +161,11 @@ typedef enum {
#define DB_PRELOCKED 0x00800000 #define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000 #define DB_PRELOCKED_WRITE 0x00400000
#define DB_DBT_APPMALLOC 1 #define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 128
#define DB_DBT_MALLOC 4
#define DB_DBT_REALLOC 16
#define DB_DBT_USERMEM 64
#define DB_DBT_TEMPMEMORY 2
#define DB_LOG_AUTOREMOVE 524288 #define DB_LOG_AUTOREMOVE 524288
#define DB_TXN_WRITE_NOSYNC 2048 #define DB_TXN_WRITE_NOSYNC 2048
#define DB_TXN_NOWAIT 16384 #define DB_TXN_NOWAIT 16384
...@@ -198,15 +199,26 @@ struct __toku_db_env { ...@@ -198,15 +199,26 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */; int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */; int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */; int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */; int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */ void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */; int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*set_multiple_callbacks) (DB_ENV *env, int (*generate_row_for_put)(DB *dest_db, DB *src_db,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra), DBT *dest_key, DBT *dest_val,
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra), const DBT *src_key, const DBT *src_val,
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra), void *extra));;
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */; int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
void* __toku_dummy0[36]; const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env,
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void* __toku_dummy0[35];
char __toku_dummy1[128]; char __toku_dummy1[128];
void *api1_internal; /* 32-bit offset=336 size=4, 64=bit offset=544 size=8 */ void *api1_internal; /* 32-bit offset=336 size=4, 64=bit offset=544 size=8 */
void* __toku_dummy2[8]; void* __toku_dummy2[8];
......
...@@ -104,10 +104,6 @@ typedef enum { ...@@ -104,10 +104,6 @@ typedef enum {
#define DB_VERB_RECOVERY 8 #define DB_VERB_RECOVERY 8
#define DB_VERB_REPLICATION 32 #define DB_VERB_REPLICATION 32
#define DB_VERB_WAITSFOR 64 #define DB_VERB_WAITSFOR 64
#define DB_DBT_MALLOC 8
#define DB_DBT_REALLOC 64
#define DB_DBT_USERMEM 256
#define DB_DBT_DUPOK 2
#define DB_ARCH_ABS 1 #define DB_ARCH_ABS 1
#define DB_ARCH_LOG 4 #define DB_ARCH_LOG 4
#define DB_CREATE 1 #define DB_CREATE 1
...@@ -166,7 +162,12 @@ typedef enum { ...@@ -166,7 +162,12 @@ typedef enum {
#define DB_PRELOCKED 0x00800000 #define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000 #define DB_PRELOCKED_WRITE 0x00400000
#define DB_DBT_APPMALLOC 1 #define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 2
#define DB_DBT_MALLOC 8
#define DB_DBT_MULTIPLE 16 #define DB_DBT_MULTIPLE 16
#define DB_DBT_REALLOC 64
#define DB_DBT_USERMEM 256
#define DB_DBT_TEMPMEMORY 4
#define DB_LOG_AUTOREMOVE 524288 #define DB_LOG_AUTOREMOVE 524288
#define DB_TXN_WRITE_NOSYNC 4096 #define DB_TXN_WRITE_NOSYNC 4096
#define DB_TXN_NOWAIT 1024 #define DB_TXN_NOWAIT 1024
...@@ -200,15 +201,26 @@ struct __toku_db_env { ...@@ -200,15 +201,26 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */; int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */; int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */; int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */; int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */ void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */; int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*set_multiple_callbacks) (DB_ENV *env, int (*generate_row_for_put)(DB *dest_db, DB *src_db,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra), DBT *dest_key, DBT *dest_val,
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra), const DBT *src_key, const DBT *src_val,
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra), void *extra));;
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */; int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
void* __toku_dummy0[37]; const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env,
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void* __toku_dummy0[36];
char __toku_dummy1[144]; char __toku_dummy1[144];
void *api1_internal; /* 32-bit offset=356 size=4, 64=bit offset=568 size=8 */ void *api1_internal; /* 32-bit offset=356 size=4, 64=bit offset=568 size=8 */
void* __toku_dummy2[8]; void* __toku_dummy2[8];
......
...@@ -60,11 +60,6 @@ void print_defines (void) { ...@@ -60,11 +60,6 @@ void print_defines (void) {
dodefine(DB_VERB_REPLICATION); dodefine(DB_VERB_REPLICATION);
dodefine(DB_VERB_WAITSFOR); dodefine(DB_VERB_WAITSFOR);
dodefine(DB_DBT_MALLOC);
dodefine(DB_DBT_REALLOC);
dodefine(DB_DBT_USERMEM);
dodefine(DB_DBT_DUPOK);
dodefine(DB_ARCH_ABS); dodefine(DB_ARCH_ABS);
dodefine(DB_ARCH_LOG); dodefine(DB_ARCH_LOG);
...@@ -136,10 +131,19 @@ void print_defines (void) { ...@@ -136,10 +131,19 @@ void print_defines (void) {
printf("#define DB_PRELOCKED 0x00800000\n"); // private tokudb printf("#define DB_PRELOCKED 0x00800000\n"); // private tokudb
printf("#define DB_PRELOCKED_WRITE 0x00400000\n"); // private tokudb printf("#define DB_PRELOCKED_WRITE 0x00400000\n"); // private tokudb
dodefine(DB_DBT_APPMALLOC); {
//dbt flags
uint32_t dbt_flags = 0;
dodefine_track(dbt_flags, DB_DBT_APPMALLOC);
dodefine_track(dbt_flags, DB_DBT_DUPOK);
dodefine_track(dbt_flags, DB_DBT_MALLOC);
#ifdef DB_DBT_MULTIPLE #ifdef DB_DBT_MULTIPLE
dodefine(DB_DBT_MULTIPLE); dodefine_track(dbt_flags, DB_DBT_MULTIPLE);
#endif #endif
dodefine_track(dbt_flags, DB_DBT_REALLOC);
dodefine_track(dbt_flags, DB_DBT_USERMEM);
dodefine_from_track(dbt_flags, DB_DBT_TEMPMEMORY);
}
// flags for the env->set_flags function // flags for the env->set_flags function
#if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3 #if DB_VERSION_MAJOR == 4 && DB_VERSION_MINOR >= 3
...@@ -447,13 +451,24 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un ...@@ -447,13 +451,24 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
"int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */", "int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */",
"int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */", "int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */",
"int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */", "int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */",
"int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */", "int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,\n"
"int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */", " const DBT *key, const DBT *val,\n"
"int (*set_multiple_callbacks) (DB_ENV *env,\n" " uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,\n"
" int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),\n" " void *extra) /* Insert into multiple dbs */",
" int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra),\n" "int (*set_generate_row_callback_for_put) (DB_ENV *env, \n"
" int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra),\n" " int (*generate_row_for_put)(DB *dest_db, DB *src_db,\n"
" int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */", " DBT *dest_key, DBT *dest_val,\n"
" const DBT *src_key, const DBT *src_val,\n"
" void *extra));",
"int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,\n"
" const DBT *key, const DBT *val,\n"
" uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,\n"
" void *extra) /* Insert into multiple dbs */",
"int (*set_generate_row_callback_for_del) (DB_ENV *env, \n"
" int (*generate_row_for_del)(DB *dest_db, DB *src_db,\n"
" DBT *dest_key,\n"
" const DBT *src_key, const DBT *src_val,\n"
" void *extra));",
NULL}; NULL};
print_struct("db_env", 1, db_env_fields32, db_env_fields64, sizeof(db_env_fields32)/sizeof(db_env_fields32[0]), extra); print_struct("db_env", 1, db_env_fields32, db_env_fields64, sizeof(db_env_fields32)/sizeof(db_env_fields32[0]), extra);
} }
......
...@@ -104,10 +104,6 @@ typedef enum { ...@@ -104,10 +104,6 @@ typedef enum {
#define DB_VERB_RECOVERY 8 #define DB_VERB_RECOVERY 8
#define DB_VERB_REPLICATION 32 #define DB_VERB_REPLICATION 32
#define DB_VERB_WAITSFOR 64 #define DB_VERB_WAITSFOR 64
#define DB_DBT_MALLOC 8
#define DB_DBT_REALLOC 64
#define DB_DBT_USERMEM 256
#define DB_DBT_DUPOK 2
#define DB_ARCH_ABS 1 #define DB_ARCH_ABS 1
#define DB_ARCH_LOG 4 #define DB_ARCH_LOG 4
#define DB_CREATE 1 #define DB_CREATE 1
...@@ -166,7 +162,12 @@ typedef enum { ...@@ -166,7 +162,12 @@ typedef enum {
#define DB_PRELOCKED 0x00800000 #define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000 #define DB_PRELOCKED_WRITE 0x00400000
#define DB_DBT_APPMALLOC 1 #define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 2
#define DB_DBT_MALLOC 8
#define DB_DBT_MULTIPLE 16 #define DB_DBT_MULTIPLE 16
#define DB_DBT_REALLOC 64
#define DB_DBT_USERMEM 256
#define DB_DBT_TEMPMEMORY 4
#define DB_LOG_AUTOREMOVE 524288 #define DB_LOG_AUTOREMOVE 524288
#define DB_TXN_WRITE_NOSYNC 4096 #define DB_TXN_WRITE_NOSYNC 4096
#define DB_TXN_NOWAIT 1024 #define DB_TXN_NOWAIT 1024
...@@ -200,14 +201,25 @@ struct __toku_db_env { ...@@ -200,14 +201,25 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */; int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */; int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */; int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */; int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
void *app_private; void *app_private;
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */; int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*set_multiple_callbacks) (DB_ENV *env, int (*generate_row_for_put)(DB *dest_db, DB *src_db,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra), DBT *dest_key, DBT *dest_val,
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra), const DBT *src_key, const DBT *src_val,
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra), void *extra));;
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */; int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env,
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void *api1_internal; void *api1_internal;
int (*close) (DB_ENV *, u_int32_t); int (*close) (DB_ENV *, u_int32_t);
int (*dbremove) (DB_ENV *, DB_TXN *, const char *, const char *, u_int32_t); int (*dbremove) (DB_ENV *, DB_TXN *, const char *, const char *, u_int32_t);
......
...@@ -100,10 +100,15 @@ check-no-rolltmp: $(TARGET_TDB) ...@@ -100,10 +100,15 @@ check-no-rolltmp: $(TARGET_TDB)
./$(TARGET_TDB) $(VERBVERBOSE) --env no-rolltmp.dir --singlex --nolog --check_small_rolltmp $(SUMMARIZE_CMD) ./$(TARGET_TDB) $(VERBVERBOSE) --env no-rolltmp.dir --singlex --nolog --check_small_rolltmp $(SUMMARIZE_CMD)
# Check to make sure that if we make a file that's bigger than 4GB that we can read the file back out and get all the rows. # Check to make sure that if we make a file that's bigger than 4GB that we can read the file back out and get all the rows.
ifeq ($(SKIP_4G),1)
check-4G:
@echo SKIPPED SLOW TEST $@
else
check-4G: $(TARGET_TDB) $(SCANSCAN_TDB) check-4G: $(TARGET_TDB) $(SCANSCAN_TDB)
( ./$(TARGET_TDB) $(VERBVERBOSE) --env 4g.dir --norandom --compressibility 1 --valsize 10000 1 && \ ( ./$(TARGET_TDB) $(VERBVERBOSE) --env 4g.dir --norandom --compressibility 1 --valsize 10000 1 && \
./$(SCANSCAN_TDB) --env 4g.dir --lwc --prelock --prelockflag --nox > 4g.out && \ ./$(SCANSCAN_TDB) --env 4g.dir --lwc --prelock --prelockflag --nox > 4g.out && \
fgrep "(1048576 rows)" 4g.out > /dev/null ) $(SUMMARIZE_CMD) fgrep "(1048576 rows)" 4g.out > /dev/null ) $(SUMMARIZE_CMD)
endif
clean: clean:
rm -f $(TARGETS) 4g.out rm -f $(TARGETS) 4g.out
......
...@@ -126,33 +126,20 @@ DB *dbs[MAX_DBS]; ...@@ -126,33 +126,20 @@ DB *dbs[MAX_DBS];
uint32_t put_flagss[MAX_DBS]; uint32_t put_flagss[MAX_DBS];
DB_TXN *parenttid=0; DB_TXN *parenttid=0;
DB_TXN *tid=0; DB_TXN *tid=0;
DBT dest_keys[MAX_DBS];
DBT dest_vals[MAX_DBS];
#if defined(TOKUDB) #if defined(TOKUDB)
static int static int
put_multiple_generate(DBT *row, uint32_t num_dbs_in, DB **dbs_in, DBT *keys, DBT *vals, void *extra) { put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra) {
assert((int)num_dbs_in == num_dbs); assert(src_db == NULL);
assert(dest_db != NULL);
assert(extra == &put_flags); //Verifying extra gets set right. assert(extra == &put_flags); //Verifying extra gets set right.
assert(row->size >= 4);
int32_t row_keysize = *(int32_t*)row->data;
assert(row_keysize == keysize);
assert((int)row->size >= 4+keysize);
int32_t row_valsize = row->size - 4 - keysize;
assert(row_valsize == valsize);
void *key = ((uint8_t*)row->data)+4;
void *val = ((uint8_t*)row->data)+4 + keysize;
for (which = 0; which < num_dbs; which++) {
assert(dbs_in[which] == dbs[which]);
keys[which].size = keysize;
keys[which].data = key;
vals[which].size = valsize;
vals[which].data = val;
}
return 0;
}
static int dest_key->data = src_key->data;
put_multiple_clean(DBT *UU(row), uint32_t UU(num_dbs_in), DB **UU(dbs_in), DBT *UU(keys), DBT *UU(vals), void *extra) { dest_key->size = src_key->size;
assert(extra == &put_flags); //Verifying extra gets set right. dest_val->data = src_val->data;
dest_val->size = src_val->size;
return 0; return 0;
} }
#endif #endif
...@@ -200,9 +187,7 @@ static void benchmark_setup (void) { ...@@ -200,9 +187,7 @@ static void benchmark_setup (void) {
} }
#if defined(TOKUDB) #if defined(TOKUDB)
if (insert_multiple) { if (insert_multiple) {
r = dbenv->set_multiple_callbacks(dbenv, r = dbenv->set_generate_row_callback_for_put(dbenv, put_multiple_generate);
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r); CKERR(r);
} }
#endif #endif
...@@ -369,19 +354,18 @@ static void fill_array (unsigned char *data, int size) { ...@@ -369,19 +354,18 @@ static void fill_array (unsigned char *data, int size) {
static void insert (long long v) { static void insert (long long v) {
int r; int r;
unsigned char data[keysize+valsize+4]; unsigned char kc[keysize];
unsigned char *kc = data+4, *vc = data+keysize+4; unsigned char vc[valsize];;
DBT kt, vt; DBT kt, vt;
fill_array(kc, keysize); fill_array(kc, keysize);
long_long_to_array(kc, keysize, v); // Fill in the array first, then write the long long in. long_long_to_array(kc, keysize, v); // Fill in the array first, then write the long long in.
fill_array(vc, valsize); fill_array(vc, valsize);
long_long_to_array(vc, valsize, v); long_long_to_array(vc, valsize, v);
*(uint32_t*)(data) = keysize; fill_dbt(&kt, kc, keysize);
fill_dbt(&vt, vc, valsize);
if (insert_multiple) { if (insert_multiple) {
DBT row;
fill_dbt(&row, data, sizeof(data));
#if defined(TOKUDB) #if defined(TOKUDB)
r = dbenv->put_multiple(dbenv, tid, &row, num_dbs, dbs, put_flagss, &put_flags); //Extra used just to verify its passed right r = dbenv->put_multiple(dbenv, NULL, tid, &kt, &vt, num_dbs, dbs, dest_keys, dest_vals, put_flagss, &put_flags); //Extra used just to verify its passed right
#else #else
r = EINVAL; r = EINVAL;
#endif #endif
...@@ -390,7 +374,7 @@ static void insert (long long v) { ...@@ -390,7 +374,7 @@ static void insert (long long v) {
else { else {
for (which = 0; which < num_dbs; which++) { for (which = 0; which < num_dbs; which++) {
DB *db = dbs[which]; DB *db = dbs[which];
r = db->put(db, tid, fill_dbt(&kt, kc, keysize), fill_dbt(&vt, vc, valsize), put_flags); r = db->put(db, tid, &kt, &vt, put_flags);
CKERR(r); CKERR(r);
} }
} }
...@@ -711,6 +695,8 @@ int main (int argc, const char *argv[]) { ...@@ -711,6 +695,8 @@ int main (int argc, const char *argv[]) {
} }
#endif #endif
if (insert_multiple) { if (insert_multiple) {
memset(dest_keys, 0, sizeof(dest_keys));
memset(dest_vals, 0, sizeof(dest_vals));
for (which = 0; which < num_dbs; which++) { for (which = 0; which < num_dbs; which++) {
put_flagss[i] = put_flags; put_flagss[i] = put_flags;
} }
......
...@@ -104,10 +104,6 @@ typedef enum { ...@@ -104,10 +104,6 @@ typedef enum {
#define DB_VERB_RECOVERY 8 #define DB_VERB_RECOVERY 8
#define DB_VERB_REPLICATION 32 #define DB_VERB_REPLICATION 32
#define DB_VERB_WAITSFOR 64 #define DB_VERB_WAITSFOR 64
#define DB_DBT_MALLOC 8
#define DB_DBT_REALLOC 64
#define DB_DBT_USERMEM 256
#define DB_DBT_DUPOK 2
#define DB_ARCH_ABS 1 #define DB_ARCH_ABS 1
#define DB_ARCH_LOG 4 #define DB_ARCH_LOG 4
#define DB_CREATE 1 #define DB_CREATE 1
...@@ -166,7 +162,12 @@ typedef enum { ...@@ -166,7 +162,12 @@ typedef enum {
#define DB_PRELOCKED 0x00800000 #define DB_PRELOCKED 0x00800000
#define DB_PRELOCKED_WRITE 0x00400000 #define DB_PRELOCKED_WRITE 0x00400000
#define DB_DBT_APPMALLOC 1 #define DB_DBT_APPMALLOC 1
#define DB_DBT_DUPOK 2
#define DB_DBT_MALLOC 8
#define DB_DBT_MULTIPLE 16 #define DB_DBT_MULTIPLE 16
#define DB_DBT_REALLOC 64
#define DB_DBT_USERMEM 256
#define DB_DBT_TEMPMEMORY 4
#define DB_LOG_AUTOREMOVE 524288 #define DB_LOG_AUTOREMOVE 524288
#define DB_TXN_WRITE_NOSYNC 4096 #define DB_TXN_WRITE_NOSYNC 4096
#define DB_TXN_NOWAIT 1024 #define DB_TXN_NOWAIT 1024
...@@ -200,14 +201,25 @@ struct __toku_db_env { ...@@ -200,14 +201,25 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */; int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */; int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */; int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* lookup existing iname */;
int (*put_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Insert into multiple dbs */; int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
void *app_private; void *app_private;
int (*del_multiple) (DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) /* Delete from multiple dbs */; int (*set_generate_row_callback_for_put) (DB_ENV *env,
int (*set_multiple_callbacks) (DB_ENV *env, int (*generate_row_for_put)(DB *dest_db, DB *src_db,
int (*generate_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra), DBT *dest_key, DBT *dest_val,
int (*cleanup_keys_vals_for_put)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra), const DBT *src_key, const DBT *src_val,
int (*generate_keys_for_del)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra), void *extra));;
int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra)) /* set callbacks for env_(put|del)_multiple */; int (*del_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array,
void *extra) /* Insert into multiple dbs */;
int (*set_generate_row_callback_for_del) (DB_ENV *env,
int (*generate_row_for_del)(DB *dest_db, DB *src_db,
DBT *dest_key,
const DBT *src_key, const DBT *src_val,
void *extra));;
void *api1_internal; void *api1_internal;
int (*close) (DB_ENV *, u_int32_t); int (*close) (DB_ENV *, u_int32_t);
int (*dbremove) (DB_ENV *, DB_TXN *, const char *, const char *, u_int32_t); int (*dbremove) (DB_ENV *, DB_TXN *, const char *, const char *, u_int32_t);
......
...@@ -2639,7 +2639,7 @@ txn_note_doing_work(TOKUTXN txn) { ...@@ -2639,7 +2639,7 @@ txn_note_doing_work(TOKUTXN txn) {
} }
int int
toku_brt_log_put_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row) { toku_brt_log_put_multiple (TOKUTXN txn, BRT src_brt, BRT *brts, int num_brts, const DBT *key, const DBT *val) {
int r = 0; int r = 0;
assert(txn); assert(txn);
assert(num_brts > 0); assert(num_brts > 0);
...@@ -2651,9 +2651,11 @@ toku_brt_log_put_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row) { ...@@ -2651,9 +2651,11 @@ toku_brt_log_put_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row) {
for (i = 0; i < num_brts; i++) { for (i = 0; i < num_brts; i++) {
fnums[i] = toku_cachefile_filenum(brts[i]->cf); fnums[i] = toku_cachefile_filenum(brts[i]->cf);
} }
BYTESTRING rowbs = {.len=row->size, .data=row->data}; BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
TXNID xid = toku_txn_get_txnid(txn); TXNID xid = toku_txn_get_txnid(txn);
r = toku_log_enq_insert_multiple(logger, (LSN*)0, 0, filenums, xid, rowbs); FILENUM src_filenum = src_brt ? toku_cachefile_filenum(src_brt->cf) : FILENUM_NONE;
r = toku_log_enq_insert_multiple(logger, (LSN*)0, 0, src_filenum, filenums, xid, keybs, valbs);
} }
return r; return r;
} }
...@@ -2705,7 +2707,7 @@ int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) { ...@@ -2705,7 +2707,7 @@ int toku_brt_delete(BRT brt, DBT *key, TOKUTXN txn) {
} }
int int
toku_brt_log_del_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row) { toku_brt_log_del_multiple (TOKUTXN txn, BRT src_brt, BRT *brts, int num_brts, const DBT *key, const DBT *val) {
int r = 0; int r = 0;
assert(txn); assert(txn);
assert(num_brts > 0); assert(num_brts > 0);
...@@ -2717,9 +2719,11 @@ toku_brt_log_del_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row) { ...@@ -2717,9 +2719,11 @@ toku_brt_log_del_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row) {
for (i = 0; i < num_brts; i++) { for (i = 0; i < num_brts; i++) {
fnums[i] = toku_cachefile_filenum(brts[i]->cf); fnums[i] = toku_cachefile_filenum(brts[i]->cf);
} }
BYTESTRING rowbs = {.len=row->size, .data=row->data}; BYTESTRING keybs = {.len=key->size, .data=key->data};
BYTESTRING valbs = {.len=val->size, .data=val->data};
TXNID xid = toku_txn_get_txnid(txn); TXNID xid = toku_txn_get_txnid(txn);
r = toku_log_enq_delete_multiple(logger, (LSN*)0, 0, filenums, xid, rowbs); FILENUM src_filenum = src_brt ? toku_cachefile_filenum(src_brt->cf) : FILENUM_NONE;
r = toku_log_enq_delete_multiple(logger, (LSN*)0, 0, src_filenum, filenums, xid, keybs, valbs);
} }
return r; return r;
} }
......
...@@ -59,8 +59,8 @@ int toku_brt_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn); ...@@ -59,8 +59,8 @@ int toku_brt_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn);
// Effect: Insert a key and data pair into a brt if the oplsn is newer than the brt lsn. This function is called during recovery. // Effect: Insert a key and data pair into a brt if the oplsn is newer than the brt lsn. This function is called during recovery.
// Returns 0 if successful // Returns 0 if successful
int toku_brt_maybe_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, int do_logging); int toku_brt_maybe_insert (BRT brt, DBT *k, DBT *v, TOKUTXN txn, BOOL oplsn_valid, LSN oplsn, int do_logging);
int toku_brt_log_put_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row); int toku_brt_log_put_multiple (TOKUTXN txn, BRT src_brt, BRT *brts, int num_brts, const DBT *key, const DBT *val);
int toku_brt_log_del_multiple (TOKUTXN txn, BRT *brts, int num_brts, DBT *row); int toku_brt_log_del_multiple (TOKUTXN txn, BRT src_brt, BRT *brts, int num_brts, const DBT *key, const DBT *val);
// Effect: Delete a key from a brt // Effect: Delete a key from a brt
// Returns 0 if successful // Returns 0 if successful
......
...@@ -47,6 +47,7 @@ typedef struct __toku_lsn { u_int64_t lsn; } LSN; ...@@ -47,6 +47,7 @@ typedef struct __toku_lsn { u_int64_t lsn; } LSN;
/* Make the FILEID a struct for the same reason. */ /* Make the FILEID a struct for the same reason. */
typedef struct __toku_fileid { u_int32_t fileid; } FILENUM; typedef struct __toku_fileid { u_int32_t fileid; } FILENUM;
#define FILENUM_NONE ((FILENUM){UINT32_MAX})
typedef struct { typedef struct {
u_int32_t num; u_int32_t num;
...@@ -106,10 +107,8 @@ typedef struct brt_msg BRT_MSG_S, *BRT_MSG; ...@@ -106,10 +107,8 @@ typedef struct brt_msg BRT_MSG_S, *BRT_MSG;
typedef int (*brt_compare_func)(DB *, const DBT *, const DBT *); typedef int (*brt_compare_func)(DB *, const DBT *, const DBT *);
typedef int (*generate_keys_vals_for_put_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra); typedef int (*generate_row_for_put_func)(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra);
typedef int (*cleanup_keys_vals_for_put_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, DBT *vals, void *extra); typedef int (*generate_row_for_del_func)(DB *dest_db, DB *src_db, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra);
typedef int (*generate_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra);
typedef int (*cleanup_keys_for_del_func)(DBT *row, uint32_t num_dbs, DB **dbs, DBT *keys, void *extra);
#define UU(x) x __attribute__((__unused__)) #define UU(x) x __attribute__((__unused__))
......
...@@ -386,6 +386,7 @@ is_filenum_reserved(CACHETABLE ct, FILENUM filenum) { ...@@ -386,6 +386,7 @@ is_filenum_reserved(CACHETABLE ct, FILENUM filenum) {
static void static void
reserve_filenum(CACHETABLE ct, FILENUM filenum) { reserve_filenum(CACHETABLE ct, FILENUM filenum) {
int r; int r;
assert(filenum.fileid != FILENUM_NONE.fileid);
uint32_t index; uint32_t index;
r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, NULL, &index, NULL); r = toku_omt_find_zero(ct->reserved_filenums, find_by_filenum, &filenum, NULL, &index, NULL);
...@@ -477,6 +478,7 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd ...@@ -477,6 +478,7 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
CACHEFILE extant; CACHEFILE extant;
struct fileid fileid; struct fileid fileid;
if (with_filenum) assert(filenum.fileid != FILENUM_NONE.fileid);
if (reserved) assert(with_filenum); if (reserved) assert(with_filenum);
r = toku_os_get_unique_file_id(fd, &fileid); r = toku_os_get_unique_file_id(fd, &fileid);
if (r != 0) { if (r != 0) {
...@@ -530,6 +532,7 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd ...@@ -530,6 +532,7 @@ int toku_cachetable_openfd_with_filenum (CACHEFILE *cfptr, CACHETABLE ct, int fd
} else { } else {
// find an unused fileid and use it // find an unused fileid and use it
try_again: try_again:
assert(next_filenum_to_use.fileid != FILENUM_NONE.fileid);
for (extant = ct->cachefiles; extant; extant=extant->next) { for (extant = ct->cachefiles; extant; extant=extant->next) {
if (next_filenum_to_use.fileid==extant->filenum.fileid) { if (next_filenum_to_use.fileid==extant->filenum.fileid) {
next_filenum_to_use.fileid++; next_filenum_to_use.fileid++;
......
...@@ -150,13 +150,17 @@ const struct logtype logtypes[] = { ...@@ -150,13 +150,17 @@ const struct logtype logtypes[] = {
{"TXNID", "xid", 0}, {"TXNID", "xid", 0},
{"BYTESTRING", "key", 0}, {"BYTESTRING", "key", 0},
NULLFIELD}}, NULLFIELD}},
{"enq_insert_multiple", 'm', FA{{"FILENUMS", "filenums", 0}, {"enq_insert_multiple", 'm', FA{{"FILENUM", "src_filenum", 0},
{"FILENUMS", "dest_filenums", 0},
{"TXNID", "xid", 0}, {"TXNID", "xid", 0},
{"BYTESTRING", "row", 0}, {"BYTESTRING", "src_key", 0},
{"BYTESTRING", "src_val", 0},
NULLFIELD}}, NULLFIELD}},
{"enq_delete_multiple", 'M', FA{{"FILENUMS", "filenums", 0}, {"enq_delete_multiple", 'M', FA{{"FILENUM", "src_filenum", 0},
{"FILENUMS", "dest_filenums", 0},
{"TXNID", "xid", 0}, {"TXNID", "xid", 0},
{"BYTESTRING", "row", 0}, {"BYTESTRING", "src_key", 0},
{"BYTESTRING", "src_val", 0},
NULLFIELD}}, NULLFIELD}},
{"comment", 'T', FA{{"u_int64_t", "timestamp", 0}, {"comment", 'T', FA{{"u_int64_t", "timestamp", 0},
{"BYTESTRING", "comment", 0}, {"BYTESTRING", "comment", 0},
......
...@@ -166,10 +166,8 @@ struct recover_env { ...@@ -166,10 +166,8 @@ struct recover_env {
TOKULOGGER logger; TOKULOGGER logger;
brt_compare_func bt_compare; brt_compare_func bt_compare;
brt_compare_func dup_compare; brt_compare_func dup_compare;
generate_keys_vals_for_put_func generate_keys_vals_for_put; generate_row_for_put_func generate_row_for_put;
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put; generate_row_for_del_func generate_row_for_del;
generate_keys_for_del_func generate_keys_for_del;
cleanup_keys_for_del_func cleanup_keys_for_del;
struct scan_state ss; struct scan_state ss;
struct file_map fmap; struct file_map fmap;
BOOL goforward; BOOL goforward;
...@@ -177,10 +175,8 @@ struct recover_env { ...@@ -177,10 +175,8 @@ struct recover_env {
typedef struct recover_env *RECOVER_ENV; typedef struct recover_env *RECOVER_ENV;
static int recover_env_init (RECOVER_ENV renv, brt_compare_func bt_compare, brt_compare_func dup_compare, static int recover_env_init (RECOVER_ENV renv, brt_compare_func bt_compare, brt_compare_func dup_compare,
generate_keys_vals_for_put_func generate_keys_vals_for_put, generate_row_for_put_func generate_row_for_put,
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put, generate_row_for_del_func generate_row_for_del,
generate_keys_for_del_func generate_keys_for_del,
cleanup_keys_for_del_func cleanup_keys_for_del,
size_t cachetable_size) { size_t cachetable_size) {
int r; int r;
...@@ -192,10 +188,8 @@ static int recover_env_init (RECOVER_ENV renv, brt_compare_func bt_compare, brt_ ...@@ -192,10 +188,8 @@ static int recover_env_init (RECOVER_ENV renv, brt_compare_func bt_compare, brt_
toku_logger_set_cachetable(renv->logger, renv->ct); toku_logger_set_cachetable(renv->logger, renv->ct);
renv->bt_compare = bt_compare; renv->bt_compare = bt_compare;
renv->dup_compare = dup_compare; renv->dup_compare = dup_compare;
renv->generate_keys_vals_for_put = generate_keys_vals_for_put; renv->generate_row_for_put = generate_row_for_put;
renv->cleanup_keys_vals_for_put = cleanup_keys_vals_for_put; renv->generate_row_for_del = generate_row_for_del;
renv->generate_keys_for_del = generate_keys_for_del;
renv->cleanup_keys_for_del = cleanup_keys_for_del;
file_map_init(&renv->fmap); file_map_init(&renv->fmap);
renv->goforward = FALSE; renv->goforward = FALSE;
...@@ -497,54 +491,51 @@ static int toku_recover_enq_insert_multiple (struct logtype_enq_insert_multiple ...@@ -497,54 +491,51 @@ static int toku_recover_enq_insert_multiple (struct logtype_enq_insert_multiple
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); //cannot happen after checkpoint begin assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); //cannot happen after checkpoint begin
return 0; return 0;
} }
#if 0 DB *src_db = NULL;
int (*generate_keys_vals_for_put) (DBT *row, uint32_t num_dbs, DB *dbs[num_dbs], DBT keys[num_dbs], DBT vals[num_dbs], void *extra), {
int (*cleanup_keys_vals_for_put) (DBT *row, uint32_t num_dbs, DB *dbs[num_dbs], DBT keys[num_dbs], DBT vals[num_dbs], void *extra),
int (*generate_keys_for_del) (DBT *row, uint32_t num_dbs, DB *dbs[num_dbs], DBT keys[num_dbs], void *extra),
int (*cleanup_keys_for_del) (DBT *row, uint32_t num_dbs, DB *dbs[num_dbs], DBT keys[num_dbs], void *extra));
#endif
DB* dbs[l->filenums.num];
memset(dbs, 0, sizeof(dbs));
uint32_t file;
uint32_t num_dbs = 0;
for (file = 0; file < l->filenums.num; file++) {
struct file_map_tuple *tuple = NULL; struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenums.filenums[file], &tuple); r = file_map_find(&renv->fmap, l->src_filenum, &tuple);
if (r!=0) { if (l->src_filenum.fileid == FILENUM_NONE.fileid)
// if we didn't find a cachefile, then we don't have to do anything for this file. assert(r!=0);
continue; else {
assert(r==0); //How do we continue if src_db is specified but missing?
src_db = tuple->brt->db;
} }
dbs[num_dbs++] = tuple->brt->db;
} }
if (num_dbs == 0) //All files are closed/deleted. We're done. uint32_t file;
return 0; DBT src_key, src_val, dest_key, dest_val;
DBT keydbts[num_dbs], valdbts[num_dbs], rowdbt; toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len);
memset(keydbts, 0, sizeof(keydbts)); toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len);
memset(valdbts, 0, sizeof(valdbts)); toku_init_dbt(&dest_key);
//Generate all the DBTs toku_init_dbt(&dest_val);
toku_fill_dbt(&rowdbt, l->row.data, l->row.len); dest_key.flags = DB_DBT_REALLOC;
r = renv->generate_keys_vals_for_put(&rowdbt, num_dbs, dbs, keydbts, valdbts, NULL); dest_val.flags = DB_DBT_REALLOC;
assert(r==0);
for (file = 0; file < l->dest_filenums.num; file++) {
uint32_t which_db = 0;
for (file = 0; file < l->filenums.num; file++) {
struct file_map_tuple *tuple = NULL; struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenums.filenums[file], &tuple); r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple);
if (r!=0) { if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything for this file. // if we didn't find a cachefile, then we don't have to do anything for this file.
continue; continue;
} }
assert(tuple->brt->db == dbs[which_db]); DB *db = tuple->brt->db;
r = renv->generate_row_for_put(db, src_db, &dest_key, &dest_val, &src_key, &src_val, NULL);
r = toku_brt_maybe_insert(tuple->brt, &keydbts[which_db], &valdbts[which_db], txn, TRUE, l->lsn, FALSE); assert(r==0);
r = toku_brt_maybe_insert(tuple->brt, &dest_key, &dest_val, txn, TRUE, l->lsn, FALSE);
assert(r == 0); assert(r == 0);
which_db++; //DB_DBT_TEMPMEMORY indicates the return values are stored in temporary memory that does
//not need to be freed. We need to continue using DB_DBT_REALLOC however.
if (dest_key.flags & DB_DBT_TEMPMEMORY) {
toku_init_dbt(&dest_key);
dest_key.flags = DB_DBT_REALLOC;
} }
assert(which_db == num_dbs); if (dest_val.flags & DB_DBT_TEMPMEMORY) {
toku_init_dbt(&dest_val);
//Do cleanup of all dbts. dest_val.flags = DB_DBT_REALLOC;
r = renv->cleanup_keys_vals_for_put(&rowdbt, num_dbs, dbs, keydbts, valdbts, NULL); }
assert(r==0); }
if (dest_key.flags & DB_DBT_REALLOC && dest_key.data) toku_free(dest_key.data); //TODO: #2321 May need windows hack
if (dest_val.flags & DB_DBT_REALLOC && dest_val.data) toku_free(dest_val.data); //TODO: #2321 May need windows hack
return 0; return 0;
} }
...@@ -564,49 +555,44 @@ static int toku_recover_enq_delete_multiple (struct logtype_enq_delete_multiple ...@@ -564,49 +555,44 @@ static int toku_recover_enq_delete_multiple (struct logtype_enq_delete_multiple
assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); //cannot happen after checkpoint begin assert(renv->ss.ss == FORWARD_OLDER_CHECKPOINT_BEGIN); //cannot happen after checkpoint begin
return 0; return 0;
} }
DB *src_db = NULL;
DB* dbs[l->filenums.num]; {
memset(dbs, 0, sizeof(dbs));
uint32_t file;
uint32_t num_dbs = 0;
for (file = 0; file < l->filenums.num; file++) {
struct file_map_tuple *tuple = NULL; struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenums.filenums[file], &tuple); r = file_map_find(&renv->fmap, l->src_filenum, &tuple);
if (r!=0) { if (l->src_filenum.fileid == FILENUM_NONE.fileid)
// if we didn't find a cachefile, then we don't have to do anything for this file. assert(r!=0);
continue; else {
assert(r==0); //How do we continue if src_db is specified but missing?
src_db = tuple->brt->db;
} }
dbs[num_dbs++] = tuple->brt->db;
} }
if (num_dbs == 0) //All files are closed/deleted. We're done. uint32_t file;
return 0; DBT src_key, src_val, dest_key;
DBT keydbts[num_dbs], rowdbt; toku_fill_dbt(&src_key, l->src_key.data, l->src_key.len);
memset(keydbts, 0, sizeof(keydbts)); toku_fill_dbt(&src_val, l->src_val.data, l->src_val.len);
toku_init_dbt(&dest_key);
//Generate all the DBTs dest_key.flags = DB_DBT_REALLOC;
toku_fill_dbt(&rowdbt, l->row.data, l->row.len);
r = renv->generate_keys_for_del(&rowdbt, num_dbs, dbs, keydbts, NULL);
assert(r==0);
uint32_t which_db = 0; for (file = 0; file < l->dest_filenums.num; file++) {
for (file = 0; file < l->filenums.num; file++) {
struct file_map_tuple *tuple = NULL; struct file_map_tuple *tuple = NULL;
r = file_map_find(&renv->fmap, l->filenums.filenums[file], &tuple); r = file_map_find(&renv->fmap, l->dest_filenums.filenums[file], &tuple);
if (r!=0) { if (r!=0) {
// if we didn't find a cachefile, then we don't have to do anything for this file. // if we didn't find a cachefile, then we don't have to do anything for this file.
continue; continue;
} }
assert(tuple->brt->db == dbs[which_db]); DB *db = tuple->brt->db;
r = renv->generate_row_for_del(db, src_db, &dest_key, &src_key, &src_val, NULL);
r = toku_brt_maybe_delete(tuple->brt, &keydbts[which_db], txn, TRUE, l->lsn, FALSE); assert(r==0);
r = toku_brt_maybe_delete(tuple->brt, &dest_key, txn, TRUE, l->lsn, FALSE);
assert(r == 0); assert(r == 0);
which_db++; //DB_DBT_TEMPMEMORY indicates the return values are stored in temporary memory that does
//not need to be freed. We need to continue using DB_DBT_REALLOC however.
if (dest_key.flags & DB_DBT_TEMPMEMORY) {
toku_init_dbt(&dest_key);
dest_key.flags = DB_DBT_REALLOC;
} }
assert(which_db == num_dbs); }
if (dest_key.flags & DB_DBT_REALLOC && dest_key.data) toku_free(dest_key.data); //TODO: #2321 May need windows hack
//Do cleanup of all dbts.
r = renv->cleanup_keys_for_del(&rowdbt, num_dbs, dbs, keydbts, NULL);
assert(r==0);
return 0; return 0;
} }
...@@ -1331,10 +1317,8 @@ int tokudb_recover_delete_rolltmp_files(const char *UU(data_dir), const char *lo ...@@ -1331,10 +1317,8 @@ int tokudb_recover_delete_rolltmp_files(const char *UU(data_dir), const char *lo
int tokudb_recover(const char *env_dir, const char *log_dir, int tokudb_recover(const char *env_dir, const char *log_dir,
brt_compare_func bt_compare, brt_compare_func bt_compare,
brt_compare_func dup_compare, brt_compare_func dup_compare,
generate_keys_vals_for_put_func generate_keys_vals_for_put, generate_row_for_put_func generate_row_for_put,
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put, generate_row_for_del_func generate_row_for_del,
generate_keys_for_del_func generate_keys_for_del,
cleanup_keys_for_del_func cleanup_keys_for_del,
size_t cachetable_size) { size_t cachetable_size) {
int r; int r;
int lockfd = -1; int lockfd = -1;
...@@ -1353,10 +1337,8 @@ int tokudb_recover(const char *env_dir, const char *log_dir, ...@@ -1353,10 +1337,8 @@ int tokudb_recover(const char *env_dir, const char *log_dir,
if (tokudb_needs_recovery(log_dir, FALSE)) { if (tokudb_needs_recovery(log_dir, FALSE)) {
struct recover_env renv; struct recover_env renv;
r = recover_env_init(&renv, bt_compare, dup_compare, r = recover_env_init(&renv, bt_compare, dup_compare,
generate_keys_vals_for_put, generate_row_for_put,
cleanup_keys_vals_for_put, generate_row_for_del,
generate_keys_for_del,
cleanup_keys_for_del,
cachetable_size); cachetable_size);
assert(r == 0); assert(r == 0);
......
...@@ -19,10 +19,8 @@ ...@@ -19,10 +19,8 @@
int tokudb_recover (const char *env_dir, const char *log_dir, int tokudb_recover (const char *env_dir, const char *log_dir,
brt_compare_func bt_compare, brt_compare_func bt_compare,
brt_compare_func dup_compare, brt_compare_func dup_compare,
generate_keys_vals_for_put_func generate_keys_vals_for_put, generate_row_for_put_func generate_row_for_put,
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put, generate_row_for_del_func generate_row_for_del,
generate_keys_for_del_func generate_keys_for_del,
cleanup_keys_for_del_func cleanup_keys_for_del,
size_t cachetable_size); size_t cachetable_size);
// Effect: Check the tokudb logs to determine whether or not we need to run recovery. // Effect: Check the tokudb logs to determine whether or not we need to run recovery.
......
...@@ -35,7 +35,7 @@ int recovery_main (int argc, const char *argv[]) { ...@@ -35,7 +35,7 @@ int recovery_main (int argc, const char *argv[]) {
return(1); return(1);
} }
int r = tokudb_recover(data_dir, log_dir, NULL, NULL, NULL, NULL, NULL, NULL, 0); int r = tokudb_recover(data_dir, log_dir, NULL, NULL, NULL, NULL, 0);
if (r!=0) { if (r!=0) {
fprintf(stderr, "Recovery failed\n"); fprintf(stderr, "Recovery failed\n");
return(1); return(1);
......
...@@ -40,7 +40,7 @@ run_test(void) { ...@@ -40,7 +40,7 @@ run_test(void) {
r = close(devnul); assert(r==0); r = close(devnul); assert(r==0);
// run recovery // run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0); r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, 0);
assert(r == 0); assert(r == 0);
return 0; return 0;
......
...@@ -25,7 +25,7 @@ run_test(void) { ...@@ -25,7 +25,7 @@ run_test(void) {
r = toku_logger_close(&logger); assert(r == 0); r = toku_logger_close(&logger); assert(r == 0);
// run recovery // run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0); r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, 0);
assert(r == 0); assert(r == 0);
return 0; return 0;
} }
......
...@@ -31,7 +31,7 @@ run_test(void) { ...@@ -31,7 +31,7 @@ run_test(void) {
r = close(devnul); r = close(devnul);
assert(r==0); assert(r==0);
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0); r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, 0);
assert(r == 0); assert(r == 0);
return 0; return 0;
} }
......
...@@ -20,7 +20,7 @@ run_test(void) { ...@@ -20,7 +20,7 @@ run_test(void) {
r = toku_logger_close(&logger); assert(r == 0); r = toku_logger_close(&logger); assert(r == 0);
// run recovery // run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0); r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, 0);
assert(r == DB_RUNRECOVERY); assert(r == DB_RUNRECOVERY);
return 0; return 0;
......
...@@ -34,7 +34,7 @@ run_test(void) { ...@@ -34,7 +34,7 @@ run_test(void) {
r = close(devnul); assert(r==0); r = close(devnul); assert(r==0);
// run recovery // run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0); r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, 0);
assert(r == 0); assert(r == 0);
return 0; return 0;
} }
......
...@@ -34,7 +34,7 @@ run_test(void) { ...@@ -34,7 +34,7 @@ run_test(void) {
r = close(devnul); assert(r==0); r = close(devnul); assert(r==0);
// run recovery // run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0); r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, 0);
assert(r == 0); assert(r == 0);
return 0; return 0;
} }
......
...@@ -28,7 +28,7 @@ run_test(void) { ...@@ -28,7 +28,7 @@ run_test(void) {
r = close(devnul); assert(r==0); r = close(devnul); assert(r==0);
// run recovery // run recovery
r = tokudb_recover("/junk", TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0); r = tokudb_recover("/junk", TESTDIR, 0, 0, NULL, NULL, 0);
assert(r != 0); assert(r != 0);
return 0; return 0;
} }
......
...@@ -20,7 +20,7 @@ run_test(void) { ...@@ -20,7 +20,7 @@ run_test(void) {
r = close(devnul); assert(r==0); r = close(devnul); assert(r==0);
// run recovery // run recovery
r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, NULL, NULL, 0); r = tokudb_recover(TESTDIR, TESTDIR, 0, 0, NULL, NULL, 0);
assert(r != 0); assert(r != 0);
return 0; return 0;
} }
......
...@@ -14,7 +14,7 @@ run_test(void) { ...@@ -14,7 +14,7 @@ run_test(void) {
r = toku_os_mkdir(TESTDIR, S_IRWXU); assert(r == 0); r = toku_os_mkdir(TESTDIR, S_IRWXU); assert(r == 0);
// run recovery // run recovery
r = tokudb_recover(NULL, NULL, 0, 0, NULL, NULL, NULL, NULL, 0); r = tokudb_recover(NULL, NULL, 0, 0, NULL, NULL, 0);
assert(r != 0); assert(r != 0);
return 0; return 0;
} }
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
toku_free; toku_free;
toku_malloc; toku_malloc;
toku_xmemdup;
toku_os_get_file_size; toku_os_get_file_size;
toku_os_getpid; toku_os_getpid;
toku_os_gettid; toku_os_gettid;
......
...@@ -13,35 +13,37 @@ char names_single[MAX_DBS][sizeof("dbs_0xFFF")]; ...@@ -13,35 +13,37 @@ char names_single[MAX_DBS][sizeof("dbs_0xFFF")];
char names_multiple[MAX_DBS][sizeof("dbm_0xFFF")]; char names_multiple[MAX_DBS][sizeof("dbm_0xFFF")];
uint32_t num_dbs; uint32_t num_dbs;
uint32_t flags[MAX_DBS]; uint32_t flags[MAX_DBS];
uint32_t ids[MAX_DBS];
uint32_t kbuf[MAX_DBS][MAX_KEY/4]; uint32_t kbuf[MAX_DBS][MAX_KEY/4];
uint32_t vbuf[MAX_DBS][MAX_VAL/4]; uint32_t vbuf[MAX_DBS][MAX_VAL/4];
DBT dest_keys[MAX_DBS];
DBT dest_vals[MAX_DBS];
#define CKERRIFNOT0(r) do { if (num_dbs>0) { CKERR(r); } else { CKERR2(r, EINVAL); } } while (0) #define CKERRIFNOT0(r) do { if (num_dbs>0) { CKERR(r); } else { CKERR2(r, EINVAL); } } while (0)
#define CKERR2IFNOT0(r, rexpect) do { if (num_dbs>0) { CKERR2(r, rexpect); } else { CKERR2(r, EINVAL); } } while (0) #define CKERR2IFNOT0(r, rexpect) do { if (num_dbs>0) { CKERR2(r, rexpect); } else { CKERR2(r, EINVAL); } } while (0)
static int static int
put_multiple_generate(DBT *row, uint32_t num_dbs_in, DB **UU(dbs_in), DBT *keys, DBT *vals, void *extra) { put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra) {
assert(num_dbs_in > 0); if (extra == NULL) {
assert(num_dbs_in == num_dbs); assert(src_db == dbs_multiple[0]);
}
else {
assert(src_db == NULL);
assert(extra==&num_dbs); //Verifying extra gets set right. assert(extra==&num_dbs); //Verifying extra gets set right.
assert(row->size == 4);
uint32_t which;
for (which = 0; which < num_dbs_in; which++) {
kbuf[which][0] = *(uint32_t*)row->data;
kbuf[which][1] = which;
vbuf[which][0] = which;
vbuf[which][1] = *(uint32_t*)row->data;
keys[which].data = kbuf[which];
keys[which].size = sizeof(kbuf[which]);
vals[which].data = vbuf[which];
vals[which].size = sizeof(vbuf[which]);
} }
return 0; uint32_t which = *(uint32_t*)dest_db->app_private;
} assert(which < MAX_DBS);
static int assert(src_key->size == 4);
put_multiple_clean(DBT *UU(row), uint32_t UU(num_dbs_in), DB **UU(dbs_in), DBT *UU(keys), DBT *UU(vals), void *extra) { assert(src_val->size == 4);
assert(extra==&num_dbs); //Verifying extra gets set right. kbuf[which][0] = *(uint32_t*)src_key->data;
kbuf[which][1] = which;
vbuf[which][0] = which;
vbuf[which][1] = *(uint32_t*)src_val->data;
dest_key->data = kbuf[which];
dest_key->size = sizeof(kbuf[which]);
dest_val->data = vbuf[which];
dest_val->size = sizeof(vbuf[which]);
return 0; return 0;
} }
...@@ -55,9 +57,7 @@ static void run_test (void) { ...@@ -55,9 +57,7 @@ static void run_test (void) {
DB_ENV *env; DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r); r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env, r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
...@@ -69,11 +69,13 @@ static void run_test (void) { ...@@ -69,11 +69,13 @@ static void run_test (void) {
CKERR(r); CKERR(r);
DB *db; DB *db;
for (which = 0; which < num_dbs; which++) { for (which = 0; which < num_dbs; which++) {
ids[which] = which;
r = db_create(&dbs_multiple[which], env, 0); r = db_create(&dbs_multiple[which], env, 0);
CKERR(r); CKERR(r);
db = dbs_multiple[which]; db = dbs_multiple[which];
r = db->open(db, txn, names_multiple[which], NULL, DB_BTREE, DB_CREATE, 0666); r = db->open(db, txn, names_multiple[which], NULL, DB_BTREE, DB_CREATE, 0666);
CKERR(r); CKERR(r);
db->app_private = &ids[which];
r = db_create(&dbs_single[which], env, 0); r = db_create(&dbs_single[which], env, 0);
CKERR(r); CKERR(r);
db = dbs_single[which]; db = dbs_single[which];
...@@ -95,8 +97,10 @@ static void run_test (void) { ...@@ -95,8 +97,10 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0); r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r); CKERR(r);
DBT rowdbt={.data=&magic, .size=sizeof(magic)}; uint32_t magic2 = ~magic;
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs); DBT keydbt = {.data=&magic, .size=sizeof(magic)};
DBT valdbt = {.data=&magic2, .size=sizeof(magic2)};
r = env->put_multiple(env, dbs_multiple[0], txn, &keydbt, &valdbt, num_dbs, dbs_multiple, dest_keys, dest_vals, flags, NULL);
CKERRIFNOT0(r); CKERRIFNOT0(r);
for (which = 0; which < num_dbs; which++) { for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])}; DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
...@@ -116,8 +120,10 @@ static void run_test (void) { ...@@ -116,8 +120,10 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0); r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r); CKERR(r);
DBT rowdbt={.data=&magic, .size=sizeof(magic)}; uint32_t magic2 = ~magic;
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs); DBT keydbt = {.data=&magic, .size=sizeof(magic)};
DBT valdbt = {.data=&magic2, .size=sizeof(magic2)};
r = env->put_multiple(env, NULL, txn, &keydbt, &valdbt, num_dbs, dbs_multiple, dest_keys, dest_vals, flags, &num_dbs);
CKERRIFNOT0(r); CKERRIFNOT0(r);
for (which = 0; which < num_dbs; which++) { for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])}; DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
...@@ -137,8 +143,10 @@ static void run_test (void) { ...@@ -137,8 +143,10 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0); r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r); CKERR(r);
DBT rowdbt={.data=&magic, .size=sizeof(magic)}; uint32_t magic2 = ~magic;
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs); DBT keydbt = {.data=&magic, .size=sizeof(magic)};
DBT valdbt = {.data=&magic2, .size=sizeof(magic2)};
r = env->put_multiple(env, NULL, txn, &keydbt, &valdbt, num_dbs, dbs_multiple, dest_keys, dest_vals, flags, &num_dbs);
CKERR2IFNOT0(r, DB_KEYEXIST); CKERR2IFNOT0(r, DB_KEYEXIST);
for (which = 0; which < num_dbs; which++) { for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])}; DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
...@@ -161,8 +169,10 @@ static void run_test (void) { ...@@ -161,8 +169,10 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txna, 0); r = env->txn_begin(env, NULL, &txna, 0);
CKERR(r); CKERR(r);
DBT rowdbt={.data=&magic, .size=sizeof(magic)}; uint32_t magic2 = ~magic;
r = env->put_multiple(env, txna, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs); DBT keydbt = {.data=&magic, .size=sizeof(magic)};
DBT valdbt = {.data=&magic2, .size=sizeof(magic2)};
r = env->put_multiple(env, NULL, txna, &keydbt, &valdbt, num_dbs, dbs_multiple, dest_keys, dest_vals, flags, &num_dbs);
CKERRIFNOT0(r); CKERRIFNOT0(r);
for (which = 0; which < num_dbs; which++) { for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])}; DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
...@@ -177,7 +187,7 @@ static void run_test (void) { ...@@ -177,7 +187,7 @@ static void run_test (void) {
CKERR(r); CKERR(r);
//Lock should fail //Lock should fail
r = env->put_multiple(env, txnb, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs); r = env->put_multiple(env, NULL, txnb, &keydbt, &valdbt, num_dbs, dbs_multiple, dest_keys, dest_vals, flags, &num_dbs);
CKERR2IFNOT0(r, DB_LOCK_NOTGRANTED); CKERR2IFNOT0(r, DB_LOCK_NOTGRANTED);
for (which = 0; which < num_dbs; which++) { for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])}; DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
...@@ -189,7 +199,7 @@ static void run_test (void) { ...@@ -189,7 +199,7 @@ static void run_test (void) {
r = txna->commit(txna, 0); r = txna->commit(txna, 0);
//Should succeed this time. //Should succeed this time.
r = env->put_multiple(env, txnb, &rowdbt, num_dbs, dbs_multiple, flags, &num_dbs); r = env->put_multiple(env, NULL, txnb, &keydbt, &valdbt, num_dbs, dbs_multiple, dest_keys, dest_vals, flags, &num_dbs);
CKERRIFNOT0(r); CKERRIFNOT0(r);
for (which = 0; which < num_dbs; which++) { for (which = 0; which < num_dbs; which++) {
DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])}; DBT key={.data = kbuf[which], .size = sizeof(kbuf[which])};
...@@ -259,6 +269,8 @@ int test_main (int argc, char *argv[]) { ...@@ -259,6 +269,8 @@ int test_main (int argc, char *argv[]) {
for (which = 0; which < MAX_DBS; which++) { for (which = 0; which < MAX_DBS; which++) {
sprintf(names_multiple[which], "dbm_0x%02X", which); sprintf(names_multiple[which], "dbm_0x%02X", which);
sprintf(names_single[which], "dbs_0x%02X", which); sprintf(names_single[which], "dbs_0x%02X", which);
dbt_init(&dest_keys[which], NULL, 0);
dbt_init(&dest_vals[which], NULL, 0);
} }
for (num_dbs = 0; num_dbs < 4; num_dbs++) { for (num_dbs = 0; num_dbs < 4; num_dbs++) {
run_test(); run_test();
......
...@@ -9,38 +9,50 @@ const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG ...@@ -9,38 +9,50 @@ const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG
char *namea="a.db"; char *namea="a.db";
char *nameb="b.db"; char *nameb="b.db";
enum {num_dbs = 2}; enum {num_dbs = 2};
static DBT dest_keys[num_dbs];
static DBT dest_vals[num_dbs];
BOOL do_test=FALSE, do_recover=FALSE; BOOL do_test=FALSE, do_recover=FALSE;
static int static int
put_multiple_generate(DBT *row, uint32_t num_dbs_in, DB **UU(dbs_in), DBT *keys, DBT *vals, void *extra) { crash_on_upgrade(DB* db,
assert((int)num_dbs_in == num_dbs); u_int32_t old_version, const DBT *old_descriptor, const DBT *old_key, const DBT *old_val,
if (do_recover) u_int32_t new_version, const DBT *new_descriptor, const DBT *new_key, const DBT *new_val) {
assert(extra==NULL); db = db;
else old_version = old_version;
assert(extra==&namea); //Verifying extra gets set right. old_descriptor = old_descriptor;
assert(row->size >= 4); old_key = old_key;
int32_t keysize = *(int32_t*)row->data; old_val = old_val;
assert((int)row->size >= 4+keysize); new_version = new_version;
int32_t valsize = row->size - 4 - keysize; new_descriptor = new_descriptor;
void *key = ((uint8_t*)row->data)+4; new_key = new_key;
void *val = ((uint8_t*)row->data)+4 + keysize; new_val = new_val;
int which; assert(FALSE);
for (which = 0; which < num_dbs; which++) {
keys[which].size = keysize;
keys[which].data = key;
vals[which].size = valsize;
vals[which].data = val;
}
return 0;
} }
static int static int
put_multiple_clean(DBT *UU(row), uint32_t UU(num_dbs_in), DB **UU(dbs_in), DBT *UU(keys), DBT *UU(vals), void *extra) { put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra) {
if (do_recover) if (extra == NULL) {
assert(extra==NULL); if (src_db) {
else assert(src_db->descriptor);
assert(src_db->descriptor->size == 4);
assert((*(uint32_t*)src_db->descriptor->data) == 0);
}
}
else {
assert(src_db == NULL);
assert(extra==&namea); //Verifying extra gets set right. assert(extra==&namea); //Verifying extra gets set right.
}
assert(dest_db->descriptor->size == 4);
uint32_t which = *(uint32_t*)dest_db->descriptor->data;
assert(which < num_dbs);
if (dest_key->data) toku_free(dest_key->data);
if (dest_val->data) toku_free(dest_val->data);
dest_key->data = toku_xmemdup (src_key->data, src_key->size);
dest_key->size = src_key->size;
dest_val->data = toku_xmemdup (src_val->data, src_val->size);
dest_val->size = src_val->size;
return 0; return 0;
} }
...@@ -52,9 +64,7 @@ static void run_test (void) { ...@@ -52,9 +64,7 @@ static void run_test (void) {
DB_ENV *env; DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r); r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env, r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
...@@ -64,10 +74,21 @@ static void run_test (void) { ...@@ -64,10 +74,21 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &oldest_living_txn, 0); CKERR(r); r = env->txn_begin(env, NULL, &oldest_living_txn, 0); CKERR(r);
} }
DBT descriptor;
uint32_t which;
for (which = 0; which < num_dbs; which++) {
dbt_init_realloc(&dest_keys[which]);
dbt_init_realloc(&dest_vals[which]);
}
dbt_init(&descriptor, &which, sizeof(which));
DB *dba; DB *dba;
DB *dbb; DB *dbb;
r = db_create(&dba, env, 0); CKERR(r); r = db_create(&dba, env, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r); r = db_create(&dbb, env, 0); CKERR(r);
which = 0;
r = dba->set_descriptor(dba, 1, &descriptor, crash_on_upgrade); CKERR(r);
which = 1;
r = dbb->set_descriptor(dbb, 1, &descriptor, crash_on_upgrade); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r); r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r); r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
...@@ -79,18 +100,13 @@ static void run_test (void) { ...@@ -79,18 +100,13 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2}; DBT k={.data="a", .size=2};
DBT v={.data="a", .size=2}; DBT v={.data="a", .size=2};
uint8_t row[4+k.size+v.size]; r = env->put_multiple(env, dba, txn, &k, &v, num_dbs, dbs, dest_keys, dest_vals, flags, NULL);
*(uint32_t*)&row[0] = k.size; CKERR(r);
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r);
r = txn->abort(txn); CKERR(r); r = txn->abort(txn); CKERR(r);
} }
r = dbb->close(dbb, 0); CKERR(r); r = dbb->close(dbb, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r); r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r); r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT, 0666); CKERR(r);
dbs[1] = dbb; dbs[1] = dbb;
// txn_begin; insert <a,b>; // txn_begin; insert <a,b>;
...@@ -99,13 +115,8 @@ static void run_test (void) { ...@@ -99,13 +115,8 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2}; DBT k={.data="a", .size=2};
DBT v={.data="b", .size=2}; DBT v={.data="b", .size=2};
uint8_t row[4+k.size+v.size]; r = env->put_multiple(env, NULL, txn, &k, &v, num_dbs, dbs, dest_keys, dest_vals, flags, &namea);
*(uint32_t*)&row[0] = k.size; CKERR(r);
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r);
} }
// checkpoint // checkpoint
...@@ -125,9 +136,7 @@ static void run_recover (void) { ...@@ -125,9 +136,7 @@ static void run_recover (void) {
// run recovery // run recovery
r = db_env_create(&env, 0); CKERR(r); r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env, r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r); CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
......
...@@ -9,38 +9,41 @@ const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG ...@@ -9,38 +9,41 @@ const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG
char *namea="a.db"; char *namea="a.db";
char *nameb="b.db"; char *nameb="b.db";
enum {num_dbs = 2}; enum {num_dbs = 2};
static DBT dest_keys[num_dbs];
static DBT dest_vals[num_dbs];
BOOL do_test=FALSE, do_recover=FALSE; BOOL do_test=FALSE, do_recover=FALSE;
static int static int
put_multiple_generate(DBT *row, uint32_t num_dbs_in, DB **UU(dbs_in), DBT *keys, DBT *vals, void *extra) { crash_on_upgrade(DB* db,
assert(num_dbs_in > 0); u_int32_t old_version, const DBT *old_descriptor, const DBT *old_key, const DBT *old_val,
if (do_recover) u_int32_t new_version, const DBT *new_descriptor, const DBT *new_key, const DBT *new_val) {
assert(extra==NULL); db = db;
else old_version = old_version;
assert(extra==&namea); //Verifying extra gets set right. old_descriptor = old_descriptor;
assert(row->size >= 4); old_key = old_key;
int32_t keysize = *(int32_t*)row->data; old_val = old_val;
assert((int)row->size >= 4+keysize); new_version = new_version;
int32_t valsize = row->size - 4 - keysize; new_descriptor = new_descriptor;
void *key = ((uint8_t*)row->data)+4; new_key = new_key;
void *val = ((uint8_t*)row->data)+4 + keysize; new_val = new_val;
uint32_t which; assert(FALSE);
for (which = 0; which < num_dbs_in; which++) {
keys[which].size = keysize;
keys[which].data = key;
vals[which].size = valsize;
vals[which].data = val;
}
return 0;
} }
static int static int
put_multiple_clean(DBT *UU(row), uint32_t UU(num_dbs_in), DB **UU(dbs_in), DBT *UU(keys), DBT *UU(vals), void *extra) { put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra) {
if (do_recover) assert(src_db == NULL);
assert(extra==NULL); assert(extra==&namea || extra==NULL); //Verifying extra gets set right.
else assert(dest_db->descriptor->size == 4);
assert(extra==&namea); //Verifying extra gets set right. uint32_t which = *(uint32_t*)dest_db->descriptor->data;
assert(which < num_dbs);
if (dest_key->data) toku_free(dest_key->data);
if (dest_val->data) toku_free(dest_val->data);
dest_key->data = toku_xmemdup (src_key->data, src_key->size);
dest_key->size = src_key->size;
dest_val->data = toku_xmemdup (src_val->data, src_val->size);
dest_val->size = src_val->size;
return 0; return 0;
} }
...@@ -52,9 +55,7 @@ static void run_test (void) { ...@@ -52,9 +55,7 @@ static void run_test (void) {
DB_ENV *env; DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r); r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env, r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
...@@ -64,10 +65,21 @@ static void run_test (void) { ...@@ -64,10 +65,21 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &oldest_living_txn, 0); CKERR(r); r = env->txn_begin(env, NULL, &oldest_living_txn, 0); CKERR(r);
} }
DBT descriptor;
uint32_t which;
for (which = 0; which < num_dbs; which++) {
dbt_init_realloc(&dest_keys[which]);
dbt_init_realloc(&dest_vals[which]);
}
dbt_init(&descriptor, &which, sizeof(which));
DB *dba; DB *dba;
DB *dbb; DB *dbb;
r = db_create(&dba, env, 0); CKERR(r); r = db_create(&dba, env, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r); r = db_create(&dbb, env, 0); CKERR(r);
which = 0;
r = dba->set_descriptor(dba, 1, &descriptor, crash_on_upgrade); CKERR(r);
which = 1;
r = dbb->set_descriptor(dbb, 1, &descriptor, crash_on_upgrade); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r); r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r); r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
...@@ -79,18 +91,14 @@ static void run_test (void) { ...@@ -79,18 +91,14 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2}; DBT k={.data="a", .size=2};
DBT v={.data="a", .size=2}; DBT v={.data="a", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r); r = env->put_multiple(env, NULL, txn, &k, &v, num_dbs, dbs, dest_keys, dest_vals, flags, &namea);
CKERR(r);
r = txn->abort(txn); CKERR(r); r = txn->abort(txn); CKERR(r);
} }
r = dbb->close(dbb, 0); CKERR(r); r = dbb->close(dbb, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r); r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r); r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT, 0666); CKERR(r);
dbs[1] = dbb; dbs[1] = dbb;
// txn_begin; insert <a,b>; // txn_begin; insert <a,b>;
...@@ -99,13 +107,9 @@ static void run_test (void) { ...@@ -99,13 +107,9 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2}; DBT k={.data="a", .size=2};
DBT v={.data="b", .size=2}; DBT v={.data="b", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r); r = env->put_multiple(env, NULL, txn, &k, &v, num_dbs, dbs, dest_keys, dest_vals, flags, &namea);
CKERR(r);
r = txn->commit(txn, 0); CKERR(r); r = txn->commit(txn, 0); CKERR(r);
} }
{ {
...@@ -133,9 +137,7 @@ static void run_recover (void) { ...@@ -133,9 +137,7 @@ static void run_recover (void) {
// run recovery // run recovery
r = db_env_create(&env, 0); CKERR(r); r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env, r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r); CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
......
...@@ -9,38 +9,50 @@ const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG ...@@ -9,38 +9,50 @@ const int envflags = DB_INIT_MPOOL|DB_CREATE|DB_THREAD |DB_INIT_LOCK|DB_INIT_LOG
char *namea="a.db"; char *namea="a.db";
char *nameb="b.db"; char *nameb="b.db";
enum {num_dbs = 2}; enum {num_dbs = 2};
static DBT dest_keys[num_dbs];
static DBT dest_vals[num_dbs];
BOOL do_test=FALSE, do_recover=FALSE; BOOL do_test=FALSE, do_recover=FALSE;
static int static int
put_multiple_generate(DBT *row, uint32_t num_dbs_in, DB **UU(dbs_in), DBT *keys, DBT *vals, void *extra) { crash_on_upgrade(DB* db,
assert(num_dbs_in > 0); u_int32_t old_version, const DBT *old_descriptor, const DBT *old_key, const DBT *old_val,
if (do_recover) u_int32_t new_version, const DBT *new_descriptor, const DBT *new_key, const DBT *new_val) {
assert(extra==NULL); db = db;
else old_version = old_version;
assert(extra==&namea); //Verifying extra gets set right. old_descriptor = old_descriptor;
assert(row->size >= 4); old_key = old_key;
int32_t keysize = *(int32_t*)row->data; old_val = old_val;
assert((int)row->size >= 4+keysize); new_version = new_version;
int32_t valsize = row->size - 4 - keysize; new_descriptor = new_descriptor;
void *key = ((uint8_t*)row->data)+4; new_key = new_key;
void *val = ((uint8_t*)row->data)+4 + keysize; new_val = new_val;
uint32_t which; assert(FALSE);
for (which = 0; which < num_dbs_in; which++) {
keys[which].size = keysize;
keys[which].data = key;
vals[which].size = valsize;
vals[which].data = val;
}
return 0;
} }
static int static int
put_multiple_clean(DBT *UU(row), uint32_t UU(num_dbs_in), DB **UU(dbs_in), DBT *UU(keys), DBT *UU(vals), void *extra) { put_multiple_generate(DB *dest_db, DB *src_db, DBT *dest_key, DBT *dest_val, const DBT *src_key, const DBT *src_val, void *extra) {
if (do_recover) if (extra == NULL) {
assert(extra==NULL); if (src_db) {
else assert(src_db->descriptor);
assert(src_db->descriptor->size == 4);
assert((*(uint32_t*)src_db->descriptor->data) == 0);
}
}
else {
assert(src_db == NULL);
assert(extra==&namea); //Verifying extra gets set right. assert(extra==&namea); //Verifying extra gets set right.
}
assert(dest_db->descriptor->size == 4);
uint32_t which = *(uint32_t*)dest_db->descriptor->data;
assert(which < num_dbs);
if (dest_key->data) toku_free(dest_key->data);
if (dest_val->data) toku_free(dest_val->data);
dest_key->data = toku_xmemdup (src_key->data, src_key->size);
dest_key->size = src_key->size;
dest_val->data = toku_xmemdup (src_val->data, src_val->size);
dest_val->size = src_val->size;
return 0; return 0;
} }
...@@ -52,9 +64,7 @@ static void run_test (void) { ...@@ -52,9 +64,7 @@ static void run_test (void) {
DB_ENV *env; DB_ENV *env;
r = db_env_create(&env, 0); CKERR(r); r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env, r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r); CKERR(r);
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
...@@ -64,10 +74,21 @@ static void run_test (void) { ...@@ -64,10 +74,21 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &oldest_living_txn, 0); CKERR(r); r = env->txn_begin(env, NULL, &oldest_living_txn, 0); CKERR(r);
} }
DBT descriptor;
uint32_t which;
for (which = 0; which < num_dbs; which++) {
dbt_init_realloc(&dest_keys[which]);
dbt_init_realloc(&dest_vals[which]);
}
dbt_init(&descriptor, &which, sizeof(which));
DB *dba; DB *dba;
DB *dbb; DB *dbb;
r = db_create(&dba, env, 0); CKERR(r); r = db_create(&dba, env, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r); r = db_create(&dbb, env, 0); CKERR(r);
which = 0;
r = dba->set_descriptor(dba, 1, &descriptor, crash_on_upgrade); CKERR(r);
which = 1;
r = dbb->set_descriptor(dbb, 1, &descriptor, crash_on_upgrade); CKERR(r);
r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r); r = dba->open(dba, NULL, namea, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r); r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r);
...@@ -79,18 +100,14 @@ static void run_test (void) { ...@@ -79,18 +100,14 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2}; DBT k={.data="a", .size=2};
DBT v={.data="a", .size=2}; DBT v={.data="a", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r); r = env->put_multiple(env, dba, txn, &k, &v, num_dbs, dbs, dest_keys, dest_vals, flags, NULL);
CKERR(r);
r = txn->abort(txn); CKERR(r); r = txn->abort(txn); CKERR(r);
} }
r = dbb->close(dbb, 0); CKERR(r); r = dbb->close(dbb, 0); CKERR(r);
r = db_create(&dbb, env, 0); CKERR(r); r = db_create(&dbb, env, 0); CKERR(r);
r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT|DB_CREATE, 0666); CKERR(r); r = dbb->open(dbb, NULL, nameb, NULL, DB_BTREE, DB_AUTO_COMMIT, 0666); CKERR(r);
dbs[1] = dbb; dbs[1] = dbb;
// txn_begin; insert <a,b>; // txn_begin; insert <a,b>;
...@@ -99,20 +116,16 @@ static void run_test (void) { ...@@ -99,20 +116,16 @@ static void run_test (void) {
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
DBT k={.data="a", .size=2}; DBT k={.data="a", .size=2};
DBT v={.data="b", .size=2}; DBT v={.data="b", .size=2};
uint8_t row[4+k.size+v.size];
*(uint32_t*)&row[0] = k.size;
memcpy(row+4, k.data, k.size);
memcpy(row+4+k.size, v.data, v.size);
DBT rowdbt = {.data = row, .size = sizeof(row)};
r = env->put_multiple(env, txn, &rowdbt, num_dbs, dbs, flags, &namea); CKERR(r); r = env->put_multiple(env, NULL, txn, &k, &v, num_dbs, dbs, dest_keys, dest_vals, flags, &namea);
CKERR(r);
r = txn->commit(txn, 0); CKERR(r); r = txn->commit(txn, 0); CKERR(r);
} }
{ {
DB_TXN *txn; DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
r = dba->close(dba, 0); CKERR(r); r = dba->close(dbb, 0); CKERR(r);
r = env->dbremove(env, txn, namea, NULL, 0); CKERR(r); r = env->dbremove(env, txn, nameb, NULL, 0); CKERR(r);
r = txn->commit(txn, 0); CKERR(r); r = txn->commit(txn, 0); CKERR(r);
} }
...@@ -131,9 +144,7 @@ static void run_recover (void) { ...@@ -131,9 +144,7 @@ static void run_recover (void) {
// run recovery // run recovery
r = db_env_create(&env, 0); CKERR(r); r = db_env_create(&env, 0); CKERR(r);
r = env->set_multiple_callbacks(env, r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
put_multiple_generate, put_multiple_clean,
NULL, NULL);
CKERR(r); CKERR(r);
r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, ENVDIR, envflags + DB_RECOVER, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
...@@ -141,13 +152,13 @@ static void run_recover (void) { ...@@ -141,13 +152,13 @@ static void run_recover (void) {
{ {
DB *db; DB *db;
r = db_create(&db, env, 0); CKERR(r); r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, namea, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR2(r, ENOENT); r = db->open(db, NULL, nameb, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR2(r, ENOENT);
r = db->close(db, 0); CKERR(r); r = db->close(db, 0); CKERR(r);
} }
{ {
DB *db; DB *db;
r = db_create(&db, env, 0); CKERR(r); r = db_create(&db, env, 0); CKERR(r);
r = db->open(db, NULL, nameb, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR(r); r = db->open(db, NULL, namea, NULL, DB_UNKNOWN, DB_AUTO_COMMIT, 0666); CKERR(r);
DB_TXN *txn; DB_TXN *txn;
r = env->txn_begin(env, NULL, &txn, 0); CKERR(r); r = env->txn_begin(env, NULL, &txn, 0); CKERR(r);
......
...@@ -55,10 +55,8 @@ struct __toku_db_env_internal { ...@@ -55,10 +55,8 @@ struct __toku_db_env_internal {
char *data_dir; char *data_dir;
int (*bt_compare) (DB *, const DBT *, const DBT *); int (*bt_compare) (DB *, const DBT *, const DBT *);
int (*dup_compare) (DB *, const DBT *, const DBT *); int (*dup_compare) (DB *, const DBT *, const DBT *);
generate_keys_vals_for_put_func generate_keys_vals_for_put; generate_row_for_put_func generate_row_for_put;
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put; generate_row_for_del_func generate_row_for_del;
generate_keys_for_del_func generate_keys_for_del;
cleanup_keys_for_del_func cleanup_keys_for_del;
//void (*noticecall)(DB_ENV *, db_notices); //void (*noticecall)(DB_ENV *, db_notices);
unsigned long cachetable_size; unsigned long cachetable_size;
CACHETABLE cachetable; CACHETABLE cachetable;
......
...@@ -338,8 +338,7 @@ ydb_do_recovery (DB_ENV *env) { ...@@ -338,8 +338,7 @@ ydb_do_recovery (DB_ENV *env) {
} }
toku_ydb_unlock(); toku_ydb_unlock();
int r = tokudb_recover(envdir, logdir, env->i->bt_compare, env->i->dup_compare, int r = tokudb_recover(envdir, logdir, env->i->bt_compare, env->i->dup_compare,
env->i->generate_keys_vals_for_put, env->i->cleanup_keys_vals_for_put, env->i->generate_row_for_put, env->i->generate_row_for_del,
env->i->generate_keys_for_del, env->i->cleanup_keys_for_del,
env->i->cachetable_size); env->i->cachetable_size);
toku_ydb_lock(); toku_ydb_lock();
toku_free(logdir); toku_free(logdir);
...@@ -1228,54 +1227,58 @@ locked_env_set_default_bt_compare(DB_ENV * env, int (*bt_compare) (DB *, const D ...@@ -1228,54 +1227,58 @@ locked_env_set_default_bt_compare(DB_ENV * env, int (*bt_compare) (DB *, const D
} }
static int static int
env_set_multiple_callbacks(DB_ENV *env, env_set_generate_row_callback_for_put(DB_ENV *env, generate_row_for_put_func generate_row_for_put) {
generate_keys_vals_for_put_func generate_keys_vals_for_put,
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put,
generate_keys_for_del_func generate_keys_for_del,
cleanup_keys_for_del_func cleanup_keys_for_del) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
int r = 0; int r = 0;
if (env_opened(env)) r = EINVAL; if (env_opened(env)) r = EINVAL;
else { else {
env->i->generate_keys_vals_for_put = generate_keys_vals_for_put; env->i->generate_row_for_put = generate_row_for_put;
env->i->cleanup_keys_vals_for_put = cleanup_keys_vals_for_put;
env->i->generate_keys_for_del = generate_keys_for_del;
env->i->cleanup_keys_for_del = cleanup_keys_for_del;
} }
return r; return r;
} }
static int static int
locked_env_set_multiple_callbacks(DB_ENV *env, env_set_generate_row_callback_for_del(DB_ENV *env, generate_row_for_del_func generate_row_for_del) {
generate_keys_vals_for_put_func generate_keys_vals_for_put, HANDLE_PANICKED_ENV(env);
cleanup_keys_vals_for_put_func cleanup_keys_vals_for_put, int r = 0;
generate_keys_for_del_func generate_keys_for_del, if (env_opened(env)) r = EINVAL;
cleanup_keys_for_del_func cleanup_keys_for_del) { else {
env->i->generate_row_for_del = generate_row_for_del;
}
return r;
}
static int
locked_env_set_generate_row_callback_for_put(DB_ENV *env, generate_row_for_put_func generate_row_for_put) {
toku_ydb_lock();
int r = env_set_generate_row_callback_for_put(env, generate_row_for_put);
toku_ydb_unlock();
return r;
}
static int
locked_env_set_generate_row_callback_for_del(DB_ENV *env, generate_row_for_del_func generate_row_for_del) {
toku_ydb_lock(); toku_ydb_lock();
int r = env_set_multiple_callbacks(env, int r = env_set_generate_row_callback_for_del(env, generate_row_for_del);
generate_keys_vals_for_put,
cleanup_keys_vals_for_put,
generate_keys_for_del,
cleanup_keys_for_del);
toku_ydb_unlock(); toku_ydb_unlock();
return r; return r;
} }
static int env_put_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra); static int env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT *val, uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array, void *extra);
static int env_del_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra); static int env_del_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT *val, uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array, void *extra);
static int static int
locked_env_put_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) { locked_env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT *val, uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array, void *extra) {
toku_ydb_lock(); toku_ydb_lock();
int r = env_put_multiple(env, txn, row, num_dbs, dbs, flags, extra); int r = env_put_multiple(env, src_db, txn, key, val, num_dbs, db_array, keys, vals, flags_array, extra);
toku_ydb_unlock(); toku_ydb_unlock();
return r; return r;
} }
static int static int
locked_env_del_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) { locked_env_del_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT *val, uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array, void *extra) {
toku_ydb_lock(); toku_ydb_lock();
int r = env_del_multiple(env, txn, row, num_dbs, dbs, flags, extra); int r = env_del_multiple(env, src_db, txn, key, val, num_dbs, db_array, keys, flags_array, extra);
toku_ydb_unlock(); toku_ydb_unlock();
return r; return r;
} }
...@@ -1472,7 +1475,8 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) { ...@@ -1472,7 +1475,8 @@ static int toku_env_create(DB_ENV ** envp, u_int32_t flags) {
SENV(dbrename); SENV(dbrename);
SENV(set_default_bt_compare); SENV(set_default_bt_compare);
SENV(set_default_dup_compare); SENV(set_default_dup_compare);
SENV(set_multiple_callbacks); SENV(set_generate_row_callback_for_put);
SENV(set_generate_row_callback_for_del);
SENV(put_multiple); SENV(put_multiple);
SENV(del_multiple); SENV(del_multiple);
SENV(checkpointing_set_period); SENV(checkpointing_set_period);
...@@ -3639,10 +3643,8 @@ toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) { ...@@ -3639,10 +3643,8 @@ toku_db_del(DB *db, DB_TXN *txn, DBT *key, u_int32_t flags) {
} }
static int static int
env_del_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) { env_del_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT *val, uint32_t num_dbs, DB **db_array, DBT *keys, uint32_t *flags_array, void *extra) {
int r; int r;
int generated_dbts = 0;
DBT keydbts[num_dbs];
uint32_t lock_flags[num_dbs]; uint32_t lock_flags[num_dbs];
uint32_t remaining_flags[num_dbs]; uint32_t remaining_flags[num_dbs];
BRT brts[num_dbs]; BRT brts[num_dbs];
...@@ -3650,22 +3652,19 @@ env_del_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, ...@@ -3650,22 +3652,19 @@ env_del_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs,
r = EINVAL; r = EINVAL;
goto cleanup; goto cleanup;
} }
if (!env->i->generate_keys_for_del || !env->i->cleanup_keys_for_del) { if (!env->i->generate_row_for_del) {
r = EINVAL; r = EINVAL;
goto cleanup; goto cleanup;
} }
memset(keydbts, 0, sizeof(keydbts));
//Generate all the DBTs
r = env->i->generate_keys_for_del(row, num_dbs, dbs, keydbts, extra);
if (r!=0) goto cleanup;
generated_dbts = 1;
uint32_t which_db; uint32_t which_db;
for (which_db = 0; which_db < num_dbs; which_db++) { for (which_db = 0; which_db < num_dbs; which_db++) {
DB *db = dbs[which_db]; DB *db = db_array[which_db];
lock_flags[which_db] = get_prelocked_flags(flags[which_db], txn, db); //Generate the row
remaining_flags[which_db] = flags[which_db] & ~lock_flags[which_db]; r = env->i->generate_row_for_del(db, src_db, &keys[which_db], key, val, extra);
if (r!=0) goto cleanup;
lock_flags[which_db] = get_prelocked_flags(flags_array[which_db], txn, db);
remaining_flags[which_db] = flags_array[which_db] & ~lock_flags[which_db];
if (remaining_flags[which_db] & ~DB_DELETE_ANY) { if (remaining_flags[which_db] & ~DB_DELETE_ANY) {
r = EINVAL; r = EINVAL;
...@@ -3674,40 +3673,35 @@ env_del_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, ...@@ -3674,40 +3673,35 @@ env_del_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs,
BOOL error_if_missing = (BOOL)(!(remaining_flags[which_db]&DB_DELETE_ANY)); BOOL error_if_missing = (BOOL)(!(remaining_flags[which_db]&DB_DELETE_ANY));
if (error_if_missing) { if (error_if_missing) {
//Check if the key exists in the db. //Check if the key exists in the db.
r = db_getf_set(db, txn, lock_flags[which_db], &keydbts[which_db], ydb_getf_do_nothing, NULL); r = db_getf_set(db, txn, lock_flags[which_db], &keys[which_db], ydb_getf_do_nothing, NULL);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
} }
//Check overwrite constraints
if (r!=0) goto cleanup;
//Do locking if necessary. //Do locking if necessary.
if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) { if (db->i->lt && !(lock_flags[which_db] & DB_PRELOCKED_WRITE)) {
//Needs locking //Needs locking
RANGE_LOCK_REQUEST_S request; RANGE_LOCK_REQUEST_S request;
//Left end of range == right end of range (point lock) //Left end of range == right end of range (point lock)
write_lock_request_init(&request, txn, db, write_lock_request_init(&request, txn, db,
&keydbts[which_db], toku_lt_neg_infinity, &keys[which_db], toku_lt_neg_infinity,
&keydbts[which_db], toku_lt_infinity); &keys[which_db], toku_lt_infinity);
r = grab_range_lock(&request); r = grab_range_lock(&request);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
} }
brts[which_db] = db->i->brt; brts[which_db] = db->i->brt;
} }
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn; TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
r = toku_brt_log_del_multiple(ttxn, brts, num_dbs, row); BRT src_brt = src_db ? src_db->i->brt : NULL;
r = toku_brt_log_del_multiple(ttxn, src_brt, brts, num_dbs, key, val);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
for (which_db = 0; which_db < num_dbs; which_db++) { for (which_db = 0; which_db < num_dbs; which_db++) {
DB *db = dbs[which_db]; DB *db = db_array[which_db];
num_deletes++; // accountability num_deletes++;
r = toku_brt_maybe_delete(db->i->brt, &keydbts[which_db], ttxn, FALSE, ZERO_LSN, FALSE); r = toku_brt_maybe_delete(db->i->brt, &keys[which_db], ttxn, FALSE, ZERO_LSN, FALSE);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
} }
cleanup: cleanup:
if (generated_dbts) {
int r2 = env->i->cleanup_keys_for_del(row, num_dbs, dbs, keydbts, extra);
if (r==0) r = r2;
}
return r; return r;
} }
...@@ -4294,10 +4288,8 @@ toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags) { ...@@ -4294,10 +4288,8 @@ toku_db_put(DB *db, DB_TXN *txn, DBT *key, DBT *val, u_int32_t flags) {
} }
static int static int
env_put_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, uint32_t *flags, void *extra) { env_put_multiple(DB_ENV *env, DB *src_db, DB_TXN *txn, const DBT *key, const DBT *val, uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array, void *extra) {
int r; int r;
int generated_dbts = 0;
DBT keydbts[num_dbs], valdbts[num_dbs];
uint32_t lock_flags[num_dbs]; uint32_t lock_flags[num_dbs];
uint32_t remaining_flags[num_dbs]; uint32_t remaining_flags[num_dbs];
BRT brts[num_dbs]; BRT brts[num_dbs];
...@@ -4305,26 +4297,22 @@ env_put_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, ...@@ -4305,26 +4297,22 @@ env_put_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs,
r = EINVAL; r = EINVAL;
goto cleanup; goto cleanup;
} }
if (!env->i->generate_keys_vals_for_put || !env->i->cleanup_keys_vals_for_put) { if (!env->i->generate_row_for_put) {
r = EINVAL; r = EINVAL;
goto cleanup; goto cleanup;
} }
memset(keydbts, 0, sizeof(keydbts));
memset(valdbts, 0, sizeof(valdbts));
//Generate all the DBTs
r = env->i->generate_keys_vals_for_put(row, num_dbs, dbs, keydbts, valdbts, extra);
if (r!=0) goto cleanup;
generated_dbts = 1;
uint32_t which_db; uint32_t which_db;
for (which_db = 0; which_db < num_dbs; which_db++) { for (which_db = 0; which_db < num_dbs; which_db++) {
DB *db = dbs[which_db]; DB *db = db_array[which_db];
lock_flags[which_db] = get_prelocked_flags(flags[which_db], txn, db); //Generate the row
remaining_flags[which_db] = flags[which_db] & ~lock_flags[which_db]; r = env->i->generate_row_for_put(db, src_db, &keys[which_db], &vals[which_db], key, val, extra);
if (r!=0) goto cleanup;
lock_flags[which_db] = get_prelocked_flags(flags_array[which_db], txn, db);
remaining_flags[which_db] = flags_array[which_db] & ~lock_flags[which_db];
//Check overwrite constraints //Check overwrite constraints
r = db_put_check_overwrite_constraint(db, txn, r = db_put_check_overwrite_constraint(db, txn,
&keydbts[which_db], &valdbts[which_db], &keys[which_db], &vals[which_db],
lock_flags[which_db], remaining_flags[which_db]); lock_flags[which_db], remaining_flags[which_db]);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
//Do locking if necessary. //Do locking if necessary.
...@@ -4333,28 +4321,25 @@ env_put_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs, ...@@ -4333,28 +4321,25 @@ env_put_multiple(DB_ENV *env, DB_TXN *txn, DBT *row, uint32_t num_dbs, DB **dbs,
RANGE_LOCK_REQUEST_S request; RANGE_LOCK_REQUEST_S request;
//Left end of range == right end of range (point lock) //Left end of range == right end of range (point lock)
write_lock_request_init(&request, txn, db, write_lock_request_init(&request, txn, db,
&keydbts[which_db], &valdbts[which_db], &keys[which_db], &vals[which_db],
&keydbts[which_db], &valdbts[which_db]); &keys[which_db], &vals[which_db]);
r = grab_range_lock(&request); r = grab_range_lock(&request);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
} }
brts[which_db] = db->i->brt; brts[which_db] = db->i->brt;
} }
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn; TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
r = toku_brt_log_put_multiple(ttxn, brts, num_dbs, row); BRT src_brt = src_db ? src_db->i->brt : NULL;
r = toku_brt_log_put_multiple(ttxn, src_brt, brts, num_dbs, key, val);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
for (which_db = 0; which_db < num_dbs; which_db++) { for (which_db = 0; which_db < num_dbs; which_db++) {
DB *db = dbs[which_db]; DB *db = db_array[which_db];
num_inserts++; num_inserts++;
r = toku_brt_maybe_insert(db->i->brt, &keydbts[which_db], &valdbts[which_db], ttxn, FALSE, ZERO_LSN, FALSE); r = toku_brt_maybe_insert(db->i->brt, &keys[which_db], &vals[which_db], ttxn, FALSE, ZERO_LSN, FALSE);
if (r!=0) goto cleanup; if (r!=0) goto cleanup;
} }
cleanup: cleanup:
if (generated_dbts) {
int r2 = env->i->cleanup_keys_vals_for_put(row, num_dbs, dbs, keydbts, valdbts, extra);
if (r==0) r = r2;
}
return r; return r;
} }
......
...@@ -94,7 +94,7 @@ void *toku_memdup (const void *v, size_t len); ...@@ -94,7 +94,7 @@ void *toku_memdup (const void *v, size_t len);
char *toku_strdup (const char *s) __attribute__((__visibility__("default"))); char *toku_strdup (const char *s) __attribute__((__visibility__("default")));
/* Copy memory. Analogous to strdup() Crashes instead of returning NULL */ /* Copy memory. Analogous to strdup() Crashes instead of returning NULL */
void *toku_xmemdup (const void *v, size_t len); void *toku_xmemdup (const void *v, size_t len) __attribute__((__visibility__("default")));
/* Toku-version of strdup. Use this so that it calls toku_xmalloc() Crashes instead of returning NULL */ /* Toku-version of strdup. Use this so that it calls toku_xmalloc() Crashes instead of returning NULL */
char *toku_xstrdup (const char *s) __attribute__((__visibility__("default"))); char *toku_xstrdup (const char *s) __attribute__((__visibility__("default")));
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment