Commit bfac7da4 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge in the 2462 branch that has a rudimentary implementation of polling. Refs #2462. [t:2462]

{{{
svn merge -c 18768 https://svn.tokutek.com/tokudb/toku/tokudb.2462
}}}
.


git-svn-id: file:///svn/toku/tokudb@18771 c7de825b-a66e-492c-adef-691d508d4ae1
parent 42140abd
......@@ -51,7 +51,7 @@ struct __toku_loader_internal;
struct __toku_loader {
struct __toku_loader_internal *i;
int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress), void *poll_extra); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */
int (*abort)(DB_LOADER *loader); /* abort loading, free memory */
......@@ -71,7 +71,6 @@ typedef struct __toku_engine_status {
u_int64_t time_ydb_lock_held_unavailable; /* number of times a thread migrated and theld is unavailable */
u_int64_t max_time_ydb_lock_held; /* max time a client thread held the ydb lock */
u_int64_t total_time_ydb_lock_held;/* total time client threads held the ydb lock */
u_int64_t logger_lock_ctr; /* how many times has logger lock been taken/released */
u_int32_t checkpoint_period; /* delay between automatic checkpoints */
u_int32_t checkpoint_footprint; /* state of checkpoint procedure */
char checkpoint_time_begin[26]; /* time of last checkpoint begin */
......@@ -104,6 +103,12 @@ typedef struct __toku_engine_status {
u_int64_t sequential_queries; /* ydb sequential queries */
u_int64_t fsync_count; /* number of times fsync performed */
u_int64_t fsync_time; /* total time required to fsync */
u_int64_t logger_ilock_ctr; /* how many times has logger input lock been taken or released */
u_int64_t logger_olock_ctr; /* how many times has logger output condition lock been taken or released */
u_int64_t logger_swap_ctr; /* how many times have logger buffers been swapped */
char enospc_most_recent[26]; /* time of most recent ENOSPC error return from disk write */
u_int64_t enospc_threads_blocked; /* how many threads are currently blocked by ENOSPC */
u_int64_t enospc_total; /* how many times has ENOSPC been returned by disk write */
} ENGINE_STATUS;
typedef enum {
DB_BTREE=1,
......@@ -209,7 +214,7 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* FOR TEST ONLY: lookup existing iname */;
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags, void *extra);
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags);
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
......
......@@ -51,7 +51,7 @@ struct __toku_loader_internal;
struct __toku_loader {
struct __toku_loader_internal *i;
int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress), void *poll_extra); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */
int (*abort)(DB_LOADER *loader); /* abort loading, free memory */
......@@ -71,7 +71,6 @@ typedef struct __toku_engine_status {
u_int64_t time_ydb_lock_held_unavailable; /* number of times a thread migrated and theld is unavailable */
u_int64_t max_time_ydb_lock_held; /* max time a client thread held the ydb lock */
u_int64_t total_time_ydb_lock_held;/* total time client threads held the ydb lock */
u_int64_t logger_lock_ctr; /* how many times has logger lock been taken/released */
u_int32_t checkpoint_period; /* delay between automatic checkpoints */
u_int32_t checkpoint_footprint; /* state of checkpoint procedure */
char checkpoint_time_begin[26]; /* time of last checkpoint begin */
......@@ -104,6 +103,12 @@ typedef struct __toku_engine_status {
u_int64_t sequential_queries; /* ydb sequential queries */
u_int64_t fsync_count; /* number of times fsync performed */
u_int64_t fsync_time; /* total time required to fsync */
u_int64_t logger_ilock_ctr; /* how many times has logger input lock been taken or released */
u_int64_t logger_olock_ctr; /* how many times has logger output condition lock been taken or released */
u_int64_t logger_swap_ctr; /* how many times have logger buffers been swapped */
char enospc_most_recent[26]; /* time of most recent ENOSPC error return from disk write */
u_int64_t enospc_threads_blocked; /* how many threads are currently blocked by ENOSPC */
u_int64_t enospc_total; /* how many times has ENOSPC been returned by disk write */
} ENGINE_STATUS;
typedef enum {
DB_BTREE=1,
......@@ -211,7 +216,7 @@ struct __toku_db_env {
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* FOR TEST ONLY: lookup existing iname */;
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags, void *extra);
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags);
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
......
......@@ -51,7 +51,7 @@ struct __toku_loader_internal;
struct __toku_loader {
struct __toku_loader_internal *i;
int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress), void *poll_extra); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */
int (*abort)(DB_LOADER *loader); /* abort loading, free memory */
......@@ -71,7 +71,6 @@ typedef struct __toku_engine_status {
u_int64_t time_ydb_lock_held_unavailable; /* number of times a thread migrated and theld is unavailable */
u_int64_t max_time_ydb_lock_held; /* max time a client thread held the ydb lock */
u_int64_t total_time_ydb_lock_held;/* total time client threads held the ydb lock */
u_int64_t logger_lock_ctr; /* how many times has logger lock been taken/released */
u_int32_t checkpoint_period; /* delay between automatic checkpoints */
u_int32_t checkpoint_footprint; /* state of checkpoint procedure */
char checkpoint_time_begin[26]; /* time of last checkpoint begin */
......@@ -104,6 +103,12 @@ typedef struct __toku_engine_status {
u_int64_t sequential_queries; /* ydb sequential queries */
u_int64_t fsync_count; /* number of times fsync performed */
u_int64_t fsync_time; /* total time required to fsync */
u_int64_t logger_ilock_ctr; /* how many times has logger input lock been taken or released */
u_int64_t logger_olock_ctr; /* how many times has logger output condition lock been taken or released */
u_int64_t logger_swap_ctr; /* how many times have logger buffers been swapped */
char enospc_most_recent[26]; /* time of most recent ENOSPC error return from disk write */
u_int64_t enospc_threads_blocked; /* how many threads are currently blocked by ENOSPC */
u_int64_t enospc_total; /* how many times has ENOSPC been returned by disk write */
} ENGINE_STATUS;
typedef enum {
DB_BTREE=1,
......@@ -212,7 +217,7 @@ struct __toku_db_env {
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
void *app_private; /* 32-bit offset=44 size=4, 64=bit offset=88 size=8 */
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* FOR TEST ONLY: lookup existing iname */;
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags, void *extra);
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags);
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,
......
......@@ -51,7 +51,7 @@ struct __toku_loader_internal;
struct __toku_loader {
struct __toku_loader_internal *i;
int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress), void *poll_extra); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */
int (*abort)(DB_LOADER *loader); /* abort loading, free memory */
......@@ -71,7 +71,6 @@ typedef struct __toku_engine_status {
u_int64_t time_ydb_lock_held_unavailable; /* number of times a thread migrated and theld is unavailable */
u_int64_t max_time_ydb_lock_held; /* max time a client thread held the ydb lock */
u_int64_t total_time_ydb_lock_held;/* total time client threads held the ydb lock */
u_int64_t logger_lock_ctr; /* how many times has logger lock been taken/released */
u_int32_t checkpoint_period; /* delay between automatic checkpoints */
u_int32_t checkpoint_footprint; /* state of checkpoint procedure */
char checkpoint_time_begin[26]; /* time of last checkpoint begin */
......@@ -104,6 +103,12 @@ typedef struct __toku_engine_status {
u_int64_t sequential_queries; /* ydb sequential queries */
u_int64_t fsync_count; /* number of times fsync performed */
u_int64_t fsync_time; /* total time required to fsync */
u_int64_t logger_ilock_ctr; /* how many times has logger input lock been taken or released */
u_int64_t logger_olock_ctr; /* how many times has logger output condition lock been taken or released */
u_int64_t logger_swap_ctr; /* how many times have logger buffers been swapped */
char enospc_most_recent[26]; /* time of most recent ENOSPC error return from disk write */
u_int64_t enospc_threads_blocked; /* how many threads are currently blocked by ENOSPC */
u_int64_t enospc_total; /* how many times has ENOSPC been returned by disk write */
} ENGINE_STATUS;
typedef enum {
DB_BTREE=1,
......@@ -211,7 +216,7 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* FOR TEST ONLY: lookup existing iname */;
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags, void *extra);
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags);
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
......
......@@ -51,7 +51,7 @@ struct __toku_loader_internal;
struct __toku_loader {
struct __toku_loader_internal *i;
int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress), void *poll_extra); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */
int (*abort)(DB_LOADER *loader); /* abort loading, free memory */
......@@ -71,7 +71,6 @@ typedef struct __toku_engine_status {
u_int64_t time_ydb_lock_held_unavailable; /* number of times a thread migrated and theld is unavailable */
u_int64_t max_time_ydb_lock_held; /* max time a client thread held the ydb lock */
u_int64_t total_time_ydb_lock_held;/* total time client threads held the ydb lock */
u_int64_t logger_lock_ctr; /* how many times has logger lock been taken/released */
u_int32_t checkpoint_period; /* delay between automatic checkpoints */
u_int32_t checkpoint_footprint; /* state of checkpoint procedure */
char checkpoint_time_begin[26]; /* time of last checkpoint begin */
......@@ -104,6 +103,12 @@ typedef struct __toku_engine_status {
u_int64_t sequential_queries; /* ydb sequential queries */
u_int64_t fsync_count; /* number of times fsync performed */
u_int64_t fsync_time; /* total time required to fsync */
u_int64_t logger_ilock_ctr; /* how many times has logger input lock been taken or released */
u_int64_t logger_olock_ctr; /* how many times has logger output condition lock been taken or released */
u_int64_t logger_swap_ctr; /* how many times have logger buffers been swapped */
char enospc_most_recent[26]; /* time of most recent ENOSPC error return from disk write */
u_int64_t enospc_threads_blocked; /* how many threads are currently blocked by ENOSPC */
u_int64_t enospc_total; /* how many times has ENOSPC been returned by disk write */
} ENGINE_STATUS;
typedef enum {
DB_BTREE=1,
......@@ -213,7 +218,7 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* FOR TEST ONLY: lookup existing iname */;
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags, void *extra);
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags);
void *app_private; /* 32-bit offset=52 size=4, 64=bit offset=104 size=8 */
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
......
......@@ -405,7 +405,7 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__
printf("struct __toku_loader {\n");
printf(" struct __toku_loader_internal *i;\n");
printf(" int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */\n");
printf(" int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */\n");
printf(" int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress), void *poll_extra); /* set the polling function */\n");
printf(" int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */\n");
printf(" int (*close)(DB_LOADER *loader); /* finish loading, free memory */\n");
printf(" int (*abort)(DB_LOADER *loader); /* abort loading, free memory */\n");
......@@ -491,7 +491,7 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__
"int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */",
"int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */",
"int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* FOR TEST ONLY: lookup existing iname */",
"int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags, void *extra)",
"int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags)",
"int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,\n"
" const DBT *key, const DBT *val,\n"
" uint32_t num_dbs, DB **db_array, DBT *keys, DBT *vals, uint32_t *flags_array,\n"
......
......@@ -51,7 +51,7 @@ struct __toku_loader_internal;
struct __toku_loader {
struct __toku_loader_internal *i;
int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress), void *poll_extra); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */
int (*abort)(DB_LOADER *loader); /* abort loading, free memory */
......@@ -71,7 +71,6 @@ typedef struct __toku_engine_status {
u_int64_t time_ydb_lock_held_unavailable; /* number of times a thread migrated and theld is unavailable */
u_int64_t max_time_ydb_lock_held; /* max time a client thread held the ydb lock */
u_int64_t total_time_ydb_lock_held;/* total time client threads held the ydb lock */
u_int64_t logger_lock_ctr; /* how many times has logger lock been taken/released */
u_int32_t checkpoint_period; /* delay between automatic checkpoints */
u_int32_t checkpoint_footprint; /* state of checkpoint procedure */
char checkpoint_time_begin[26]; /* time of last checkpoint begin */
......@@ -104,6 +103,12 @@ typedef struct __toku_engine_status {
u_int64_t sequential_queries; /* ydb sequential queries */
u_int64_t fsync_count; /* number of times fsync performed */
u_int64_t fsync_time; /* total time required to fsync */
u_int64_t logger_ilock_ctr; /* how many times has logger input lock been taken or released */
u_int64_t logger_olock_ctr; /* how many times has logger output condition lock been taken or released */
u_int64_t logger_swap_ctr; /* how many times have logger buffers been swapped */
char enospc_most_recent[26]; /* time of most recent ENOSPC error return from disk write */
u_int64_t enospc_threads_blocked; /* how many threads are currently blocked by ENOSPC */
u_int64_t enospc_total; /* how many times has ENOSPC been returned by disk write */
} ENGINE_STATUS;
typedef enum {
DB_BTREE=1,
......@@ -213,7 +218,7 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* FOR TEST ONLY: lookup existing iname */;
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags, void *extra);
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags);
void *app_private;
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
......
......@@ -51,7 +51,7 @@ struct __toku_loader_internal;
struct __toku_loader {
struct __toku_loader_internal *i;
int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress), void *poll_extra); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */
int (*abort)(DB_LOADER *loader); /* abort loading, free memory */
......@@ -71,7 +71,6 @@ typedef struct __toku_engine_status {
u_int64_t time_ydb_lock_held_unavailable; /* number of times a thread migrated and theld is unavailable */
u_int64_t max_time_ydb_lock_held; /* max time a client thread held the ydb lock */
u_int64_t total_time_ydb_lock_held;/* total time client threads held the ydb lock */
u_int64_t logger_lock_ctr; /* how many times has logger lock been taken/released */
u_int32_t checkpoint_period; /* delay between automatic checkpoints */
u_int32_t checkpoint_footprint; /* state of checkpoint procedure */
char checkpoint_time_begin[26]; /* time of last checkpoint begin */
......@@ -219,7 +218,7 @@ struct __toku_db_env {
int (*get_engine_status) (DB_ENV*, ENGINE_STATUS*) /* Fill in status struct */;
int (*get_engine_status_text) (DB_ENV*, char*, int) /* Fill in status text */;
int (*get_iname) (DB_ENV* env, DBT* dname_dbt, DBT* iname_dbt) /* FOR TEST ONLY: lookup existing iname */;
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags, void *extra);
int (*create_loader) (DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags);
void *app_private;
int (*put_multiple) (DB_ENV *env, DB *src_db, DB_TXN *txn,
const DBT *key, const DBT *val,
......
......@@ -852,7 +852,8 @@ static int loader_do_i (BRTLOADER bl,
}
int toku_brt_loader_close (BRTLOADER bl,
void (*error_callback)(DB *, int i, int err, DBT *key, DBT *val, void *extra), void *error_callback_extra
void (*error_callback)(DB *, int i, int err, DBT *key, DBT *val, void *extra), void *error_callback_extra,
int (*poll_function)(void *extra, float progress), void *poll_extra
)
/* Effect: Close the bulk loader.
* Return all the file descriptors in the array fds. */
......@@ -865,6 +866,9 @@ int toku_brt_loader_close (BRTLOADER bl,
if (result!=0) goto error;
toku_free((void*)bl->new_fnames_in_env[i]);
bl->new_fnames_in_env[i] = NULL;
if (poll_function && poll_function(poll_extra, (float)i/(float)bl->N)) {
goto error;
}
}
result = brtloader_fi_close (&bl->file_infos, bl->fprimary_rows); if (result) goto error;
result = brtloader_fi_unlink(&bl->file_infos, bl->fprimary_rows); if (result) goto error;
......
......@@ -17,8 +17,8 @@ int toku_brt_loader_open (BRTLOADER *bl,
const char *temp_file_template);
int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val);
int toku_brt_loader_close (BRTLOADER bl,
void (*error_callback)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra),
void *extra);
void (*error_callback)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra), void *error_callback_extra,
int (*poll_callback)(void *extra, float progress), void *poll_callback_extra);
void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*));
......
......@@ -37,10 +37,10 @@ struct __toku_loader_internal {
uint32_t *db_flags;
uint32_t *dbt_flags;
uint32_t loader_flags;
void *extra;
void (*error_callback)(DB *db, int i, int err, DBT *key, DBT *val, void *extra_extra);
void (*error_callback)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra);
void *error_extra;
int (*poll_func)(void *extra, float progress);
int (*poll_func)(void *poll_extra, float progress);
void *poll_extra;
char *temp_file_template;
DBT *ekeys;
......@@ -81,11 +81,10 @@ int toku_loader_create_loader(DB_ENV *env,
DB *dbs[],
uint32_t db_flags[N],
uint32_t dbt_flags[N],
uint32_t loader_flags,
void *extra UU())
uint32_t loader_flags)
{
DB_LOADER *loader;
XCALLOC(loader); // init to all zeroes
XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func)
XCALLOC(loader->i);
loader->i->env = env;
......@@ -96,7 +95,6 @@ int toku_loader_create_loader(DB_ENV *env,
loader->i->db_flags = db_flags;
loader->i->dbt_flags = dbt_flags;
loader->i->loader_flags = loader_flags;
loader->i->extra = extra;
loader->i->temp_file_template = (char *)toku_malloc(MAX_FILE_SIZE);
int n = snprintf(loader->i->temp_file_template, MAX_FILE_SIZE, "%s/tempXXXXXX", env->i->dir);
......@@ -169,9 +167,11 @@ int toku_loader_create_loader(DB_ENV *env,
}
int toku_loader_set_poll_function(DB_LOADER *loader,
int (*poll_func)(void *extra, float progress))
int (*poll_func)(void *extra, float progress),
void *poll_extra)
{
loader->i->poll_func = poll_func;
loader->i->poll_extra = poll_extra;
return 0;
}
......@@ -243,7 +243,9 @@ int toku_loader_close(DB_LOADER *loader)
}
if ( !(loader->i->loader_flags & LOADER_USE_PUTS ) ) {
r = toku_brt_loader_close(loader->i->brt_loader, loader->i->error_callback, loader->i->error_extra);
r = toku_brt_loader_close(loader->i->brt_loader,
loader->i->error_callback, loader->i->error_extra,
loader->i->poll_func, loader->i->poll_extra);
if (r!=0) goto cleanup_and_return_r;
for (int i=0; i<loader->i->N; i++) {
......
......@@ -9,9 +9,9 @@
* Serial No. 11/760379 and to the patents and/or patent applications resulting from it.
*/
int toku_loader_create_loader(DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[N], uint32_t db_flags[N], uint32_t dbt_flags[N], uint32_t loader_flags, void *extra);
int toku_loader_set_error_callback(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra), void *extra);
int toku_loader_set_poll_function(DB_LOADER *loader, int (*poll_func)(void *extra, float progress));
int toku_loader_create_loader(DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[N], uint32_t db_flags[N], uint32_t dbt_flags[N], uint32_t loader_flags);
int toku_loader_set_error_callback(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra);
int toku_loader_set_poll_function(DB_LOADER *loader, int (*poll_func)(void *poll_extra, float progress), void *poll_extra);
int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val);
int toku_loader_close(DB_LOADER *loader);
int toku_loader_abort(DB_LOADER *loader);
......
......@@ -212,12 +212,12 @@ static void test_loader(DB **dbs)
// create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags, NULL);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
CKERR(r);
struct error_extra error_extra = {.error_count=0};
r = loader->set_error_callback(loader, error_callback, (void*)&error_extra);
CKERR(r);
r = loader->set_poll_function(loader, NULL);
r = loader->set_poll_function(loader, NULL, NULL);
CKERR(r);
// using loader->put, put values into DB
......
......@@ -52,11 +52,11 @@ static void test_loader(DB **dbs)
// create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags, NULL);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
CKERR(r);
r = loader->set_error_callback(loader, NULL, NULL);
CKERR(r);
r = loader->set_poll_function(loader, NULL);
r = loader->set_poll_function(loader, NULL, NULL);
CKERR(r);
// using loader->put, put values into DB
......
......@@ -177,6 +177,16 @@ static void check_results(DB **dbs)
printf("\nCheck OK\n");
}
static void *expect_poll_void = &expect_poll_void;
static int poll_count=0;
static int poll_function (void *extra, float progress) {
//printf("progress: %5.1f%%\n", progress*100);
assert(extra==expect_poll_void);
assert(0.0<=progress && progress<1.0);
poll_count++;
return 0;
}
static void test_loader(DB **dbs)
{
int r;
......@@ -193,11 +203,11 @@ static void test_loader(DB **dbs)
// create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags, NULL);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
CKERR(r);
r = loader->set_error_callback(loader, NULL, NULL);
CKERR(r);
r = loader->set_poll_function(loader, NULL);
r = loader->set_poll_function(loader, poll_function, expect_poll_void);
CKERR(r);
// using loader->put, put values into DB
......@@ -214,12 +224,16 @@ static void test_loader(DB **dbs)
}
if( CHECK_RESULTS || verbose ) {printf("\n"); fflush(stdout);}
poll_count=0;
// close the loader
printf("closing"); fflush(stdout);
r = loader->close(loader);
printf(" done\n");
CKERR(r);
assert(poll_count>0);
r = txn->commit(txn, 0);
CKERR(r);
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment