Commit ac739077 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

fix the loader error callback and the pqueue-test closes[t:2576]

git-svn-id: file:///svn/toku/tokudb@19913 c7de825b-a66e-492c-adef-691d508d4ae1
parent ff1c1428
......@@ -71,14 +71,15 @@ struct poll_callback_s {
brt_loader_poll_func poll_function;
void *poll_extra;
};
typedef struct poll_callback_s *brtloader_poll_callback;
int brt_loader_init_poll_callback(BRTLOADER);
int brt_loader_init_poll_callback(brtloader_poll_callback);
void brt_loader_destroy_poll_callback(BRTLOADER);
void brt_loader_destroy_poll_callback(brtloader_poll_callback);
void brt_loader_set_poll_function(BRTLOADER, brt_loader_poll_func poll_function, void *poll_extra);
void brt_loader_set_poll_function(brtloader_poll_callback, brt_loader_poll_func poll_function, void *poll_extra);
int brt_loader_call_poll_function(BRTLOADER, float progress);
int brt_loader_call_poll_function(brtloader_poll_callback, float progress);
struct error_callback_s {
brt_loader_error_func error_callback;
......@@ -90,24 +91,22 @@ struct error_callback_s {
DBT key;
DBT val;
toku_pthread_mutex_t mutex;
int (*set_error_and_callback)(BRTLOADER, int error, DB *db, int which_db, DBT *key, DBT *val);
BRTLOADER bl;
};
typedef struct error_callback_s *brtloader_error_callback;
int brt_loader_init_error_callback(BRTLOADER);
int brt_loader_init_error_callback(brtloader_error_callback);
void brt_loader_destroy_error_callback(BRTLOADER);
void brt_loader_destroy_error_callback(brtloader_error_callback);
int brt_loader_get_error(BRTLOADER);
int brt_loader_get_error(brtloader_error_callback);
void brt_loader_set_error_function(BRTLOADER, brt_loader_error_func error_function, void *extra);
void brt_loader_set_error_function(brtloader_error_callback, brt_loader_error_func error_function, void *extra);
int brt_loader_set_error(BRTLOADER, int error, DB *db, int which_db, DBT *key, DBT *val);
int brt_loader_set_error(brtloader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val);
int brt_loader_call_error_function(BRTLOADER);
int brt_loader_call_error_function(brtloader_error_callback);
int brt_loader_set_error_and_callback(BRTLOADER, int error, DB *db, int which_db, DBT *key, DBT *val);
int brt_loader_set_error_and_callback(brtloader_error_callback, int error, DB *db, int which_db, DBT *key, DBT *val);
struct brtloader_s {
int panic;
......
......@@ -246,8 +246,8 @@ static void brtloader_destroy (BRTLOADER bl, BOOL is_error) {
toku_free(bl->fractal_queues);
toku_free(bl->fractal_threads_live);
brt_loader_destroy_error_callback(bl);
brt_loader_destroy_poll_callback(bl);
brt_loader_destroy_error_callback(&bl->error_callback);
brt_loader_destroy_poll_callback(&bl->poll_callback);
}
static void *extractor_thread (void*);
......@@ -326,8 +326,8 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
init_merge_fileset(&bl->fs[i]);
}
brt_loader_init_error_callback(bl);
brt_loader_init_poll_callback(bl);
brt_loader_init_error_callback(&bl->error_callback);
brt_loader_init_poll_callback(&bl->poll_callback);
{ int r = init_rowset(&bl->primary_rowset); if (r!=0) return r; }
{ int r = queue_create(&bl->primary_rowset_queue, 1); if (r!=0) return r; }
......@@ -809,7 +809,7 @@ int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val)
* Return value: 0 on success, an error number otherwise.
*/
{
if (bl->panic || brt_loader_get_error(bl))
if (bl->panic || brt_loader_get_error(&bl->error_callback))
return EINVAL; // previous panic
bl->n_rows++;
// return loader_write_row(key, val, bl->fprimary_rows, &bl->fprimary_offset, bl);
......@@ -848,7 +848,7 @@ int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int
if (compare_result==0) {
if (bl->error_callback.error_callback) {
DBT aval; memset(&aval, 0, sizeof aval); aval.data=rowset->data + a->off + a->klen; aval.size = a->vlen;
brt_loader_set_error(bl, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
brt_loader_set_error(&bl->error_callback, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
}
return DB_KEYEXIST;
} else if (compare_result<0) {
......@@ -894,7 +894,7 @@ static int binary_search (int *location,
if (compare_result==0) {
if (bl->error_callback.error_callback) {
DBT aval = make_dbt(rowset->data + a[a2].off + a[a2].klen, a[a2].vlen);
brt_loader_set_error(bl, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
brt_loader_set_error(&bl->error_callback, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
}
return DB_KEYEXIST;
} else if (compare_result<0) {
......@@ -1082,7 +1082,7 @@ static int update_progress (int N,
bl->progress+=N;
//printf(" %20s: %d ", message, bl->progress);
int r = brt_loader_call_poll_function(bl, (float)bl->progress/(float)PROGRESS_MAX);
int r = brt_loader_call_poll_function(&bl->poll_callback, (float)bl->progress/(float)PROGRESS_MAX);
toku_pthread_mutex_unlock(&update_progress_lock);
return r;
}
......@@ -1956,15 +1956,15 @@ int toku_brt_loader_close (BRTLOADER bl,
//printf("Closing\n");
brt_loader_set_error_function(bl, error_function, error_extra);
brt_loader_set_error_function(&bl->error_callback, error_function, error_extra);
brt_loader_set_poll_function(bl, poll_function, poll_extra);
brt_loader_set_poll_function(&bl->poll_callback, poll_function, poll_extra);
r = finish_extractor(bl);
assert(r==0); // !!! should check this error code and cleanup if needed.
// check for an error during extraction
r = brt_loader_call_error_function(bl);
r = brt_loader_call_error_function(&bl->error_callback);
if (r) {
brtloader_destroy(bl, TRUE);
return r;
......@@ -2398,36 +2398,34 @@ int brt_loader_write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, con
}
#endif
int brt_loader_init_error_callback(BRTLOADER bl) {
memset(&bl->error_callback, 0, sizeof bl->error_callback);
bl->error_callback.set_error_and_callback = brt_loader_set_error_and_callback;
bl->error_callback.bl = bl;
int r = toku_pthread_mutex_init(&bl->error_callback.mutex, NULL); assert(r == 0);
int brt_loader_init_error_callback(brtloader_error_callback loader_error) {
memset(loader_error, 0, sizeof *loader_error);
int r = toku_pthread_mutex_init(&loader_error->mutex, NULL); assert(r == 0);
return r;
}
void brt_loader_destroy_error_callback(BRTLOADER bl) {
int r = toku_pthread_mutex_destroy(&bl->error_callback.mutex); assert(r == 0);
toku_free(bl->error_callback.key.data);
toku_free(bl->error_callback.val.data);
memset(&bl->error_callback, 0, sizeof bl->error_callback);
void brt_loader_destroy_error_callback(brtloader_error_callback loader_error) {
int r = toku_pthread_mutex_destroy(&loader_error->mutex); assert(r == 0);
toku_free(loader_error->key.data);
toku_free(loader_error->val.data);
memset(loader_error, 0, sizeof *loader_error);
}
int brt_loader_get_error(BRTLOADER bl) {
return bl->error_callback.error;
int brt_loader_get_error(brtloader_error_callback loader_error) {
return loader_error->error;
}
void brt_loader_set_error_function(BRTLOADER bl, brt_loader_error_func error_function, void *error_extra) {
bl->error_callback.error_callback = error_function;
bl->error_callback.extra = error_extra;
void brt_loader_set_error_function(brtloader_error_callback loader_error, brt_loader_error_func error_function, void *error_extra) {
loader_error->error_callback = error_function;
loader_error->extra = error_extra;
}
static void error_callback_lock(BRTLOADER bl) {
int r = toku_pthread_mutex_lock(&bl->error_callback.mutex); assert(r == 0);
static void error_callback_lock(brtloader_error_callback loader_error) {
int r = toku_pthread_mutex_lock(&loader_error->mutex); assert(r == 0);
}
static void error_callback_unlock(BRTLOADER bl) {
int r = toku_pthread_mutex_unlock(&bl->error_callback.mutex); assert(r == 0);
static void error_callback_unlock(brtloader_error_callback loader_error) {
int r = toku_pthread_mutex_unlock(&loader_error->mutex); assert(r == 0);
}
static void copy_dbt(DBT *dest, DBT *src) {
......@@ -2438,63 +2436,63 @@ static void copy_dbt(DBT *dest, DBT *src) {
}
}
int brt_loader_set_error(BRTLOADER bl, int error, DB *db, int which_db, DBT *key, DBT *val) {
int brt_loader_set_error(brtloader_error_callback loader_error, int error, DB *db, int which_db, DBT *key, DBT *val) {
int r = 0;
error_callback_lock(bl);
if (bl->error_callback.error) { // there can be only one
error_callback_lock(loader_error);
if (loader_error->error) { // there can be only one
r = EEXIST;
} else {
bl->error_callback.error = error; // set the error
bl->error_callback.db = db;
bl->error_callback.which_db = which_db;
copy_dbt(&bl->error_callback.key, key); // copy the data
copy_dbt(&bl->error_callback.val, val);
loader_error->error = error; // set the error
loader_error->db = db;
loader_error->which_db = which_db;
copy_dbt(&loader_error->key, key); // copy the data
copy_dbt(&loader_error->val, val);
}
error_callback_unlock(bl);
error_callback_unlock(loader_error);
return r;
}
int brt_loader_call_error_function(BRTLOADER bl) {
int brt_loader_call_error_function(brtloader_error_callback loader_error) {
int r;
error_callback_lock(bl);
r = bl->error_callback.error;
if (r && !bl->error_callback.did_callback) {
bl->error_callback.did_callback = 1;
bl->error_callback.error_callback(bl->error_callback.db,
bl->error_callback.which_db,
bl->error_callback.error,
&bl->error_callback.key,
&bl->error_callback.val,
bl->error_callback.extra);
}
error_callback_unlock(bl); return r;
}
int brt_loader_set_error_and_callback(BRTLOADER bl, int error, DB *db, int which_db, DBT *key, DBT *val) {
int r = brt_loader_set_error(bl, error, db, which_db, key, val);
error_callback_lock(loader_error);
r = loader_error->error;
if (r && !loader_error->did_callback) {
loader_error->did_callback = 1;
loader_error->error_callback(loader_error->db,
loader_error->which_db,
loader_error->error,
&loader_error->key,
&loader_error->val,
loader_error->extra);
}
error_callback_unlock(loader_error); return r;
}
int brt_loader_set_error_and_callback(brtloader_error_callback loader_error, int error, DB *db, int which_db, DBT *key, DBT *val) {
int r = brt_loader_set_error(loader_error, error, db, which_db, key, val);
if (r == 0)
r = brt_loader_call_error_function(bl);
r = brt_loader_call_error_function(loader_error);
return r;
}
int brt_loader_init_poll_callback(BRTLOADER bl) {
memset(&bl->poll_callback, 0, sizeof bl->poll_callback);
int brt_loader_init_poll_callback(brtloader_poll_callback p) {
memset(p, 0, sizeof *p);
return 0;
}
void brt_loader_destroy_poll_callback(BRTLOADER bl) {
memset(&bl->poll_callback, 0, sizeof bl->poll_callback);
void brt_loader_destroy_poll_callback(brtloader_poll_callback p) {
memset(p, 0, sizeof *p);
}
void brt_loader_set_poll_function(BRTLOADER bl, brt_loader_poll_func poll_function, void *poll_extra) {
bl->poll_callback.poll_function = poll_function;
bl->poll_callback.poll_extra = poll_extra;
void brt_loader_set_poll_function(brtloader_poll_callback p, brt_loader_poll_func poll_function, void *poll_extra) {
p->poll_function = poll_function;
p->poll_extra = poll_extra;
};
int brt_loader_call_poll_function(BRTLOADER bl, float progress) {
int brt_loader_call_poll_function(brtloader_poll_callback p, float progress) {
int r = 0;
if (bl->poll_callback.poll_function)
r = bl->poll_callback.poll_function(bl->poll_callback.poll_extra, progress);
if (p->poll_function)
r = p->poll_function(p->poll_extra, progress);
return r;
}
......
......@@ -55,8 +55,8 @@ static int pqueue_compare(pqueue_t *q, DBT *next_key, DBT *next_val, DBT *curr_k
int r = q->compare(q->db, next_key, curr_key);
if ( r == 0 ) { // duplicate key : next_key == curr_key
q->dup_error = 1;
if (q->error_callback->set_error_and_callback)
q->error_callback->set_error_and_callback(q->error_callback->bl, DB_KEYEXIST, q->db, q->which_db, next_key, next_val);
if (q->error_callback)
brt_loader_set_error_and_callback(q->error_callback, DB_KEYEXIST, q->db, q->which_db, next_key, next_val);
}
return ( r > -1 );
}
......
......@@ -102,8 +102,8 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
toku_brt_loader_set_n_rows(&bl, n);
brt_loader_init_error_callback(&bl);
brt_loader_set_error_function(&bl, err_cb, NULL);
brt_loader_init_error_callback(&bl.error_callback);
brt_loader_set_error_function(&bl.error_callback, err_cb, NULL);
r = brt_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints, 0); CKERR(r);
destroy_rowset(&aset);
......@@ -163,7 +163,7 @@ static void test_write_dbfile (char *template, int n, char *output_name) {
// walk a cursor through the dbfile and verify the rows
verify_dbfile(n, output_name);
brt_loader_destroy_error_callback(&bl);
brt_loader_destroy_error_callback(&bl.error_callback);
}
/* Test to see if we can open temporary files. */
......
......@@ -64,11 +64,11 @@ static void test_merge_internal (int a[], int na, int b[], int nb, BOOL dups) {
struct row *MALLOC_N(na+nb, cr);
DB *dest_db = NULL;
struct brtloader_s bl;
brt_loader_init_error_callback(&bl);
brt_loader_set_error_function(&bl, dups ? expect_dups_cb : err_cb, NULL);
brt_loader_init_error_callback(&bl.error_callback);
brt_loader_set_error_function(&bl.error_callback, dups ? expect_dups_cb : err_cb, NULL);
struct rowset rs = {.data=(char*)ab};
merge_row_arrays_base(cr, ar, na, br, nb, 0, dest_db, compare_ints, &bl, &rs);
brt_loader_call_error_function(&bl);
brt_loader_call_error_function(&bl.error_callback);
if (dups) {
assert(founddup);
} else {
......@@ -95,7 +95,7 @@ static void test_merge_internal (int a[], int na, int b[], int nb, BOOL dups) {
toku_free(ar);
toku_free(br);
toku_free(ab);
brt_loader_destroy_error_callback(&bl);
brt_loader_destroy_error_callback(&bl.error_callback);
}
/* Test the basic merger. */
......@@ -295,7 +295,7 @@ static void verify_dbfile(int n, int sorted_keys[], const char *sorted_vals[], c
fill_rowset(&bset, b_keys, b_vals, 4);
toku_brt_loader_set_n_rows(&bl, 6+3);
brt_loader_set_error_function(&bl, err_cb, NULL);
brt_loader_set_error_function(&bl.error_callback, err_cb, NULL);
r = brt_loader_sort_and_write_rows(&aset, &fs, &bl, 0, dest_db, compare_ints, 0); CKERR(r);
r = brt_loader_sort_and_write_rows(&bset, &fs, &bl, 0, dest_db, compare_ints, 0); CKERR(r);
assert(fs.n_temp_files==2 && fs.n_temp_files_limit >= fs.n_temp_files);
......
......@@ -35,11 +35,6 @@ static void err_cb(DB *db, int which_db, int err, DBT *key, DBT *val, void *extr
if (verbose) printf("err_cb : key <%d> val <%d>\n", *(int *)key->data, *(int *)val->data);
}
static int err_cb_set_error_and_callback(BRTLOADER bl, int error, DB *db, int which_db, DBT *key, DBT *val) {
bl->error_callback.error_callback(db, which_db, error, key, val, bl->error_callback.extra);
return 0;
}
static int run_test(void)
{
const int n_sources=10;
......@@ -49,13 +44,9 @@ static int run_test(void)
DB *dest_db = NULL;
brt_compare_func compare = test_compare;
int r;
struct error_callback_s error_callback = {
.error_callback = err_cb,
.extra = NULL,
.db = NULL,
.which_db = 0,
.set_error_and_callback = err_cb_set_error_and_callback
};
struct error_callback_s error_callback;
brt_loader_init_error_callback(&error_callback);
brt_loader_set_error_function(&error_callback, err_cb, NULL);
r = pqueue_init(&pq, n_sources, 0, dest_db, compare, &error_callback);
if (r) return r;
......@@ -84,7 +75,9 @@ static int run_test(void)
for (int i=0; i<n_sources; i++) {
r = pqueue_pop(pq, &node); assert(r==0);
if (verbose) printf("%d : %d\n", i, *(int*)(node->key->data));
if ( *(int*)(node->key->data) != i ) { if (verbose) printf("FAIL\n"); return -1; }
if ( *(int*)(node->key->data) != i ) {
if (verbose) printf("FAIL\n"); return -1;
}
}
pqueue_free(pq);
if (verbose) printf("test1 : PASS\n");
......@@ -172,8 +165,11 @@ found_duplicate6:
if ( found_dup != 6 ) { printf("FAIL\n"); return -1; }
if (verbose) printf("test3 : PASS\n");
pqueue_free(pq);
brt_loader_destroy_error_callback(&error_callback);
// test 4 - find duplicate when inserting
brt_loader_init_error_callback(&error_callback);
brt_loader_set_error_function(&error_callback, err_cb, NULL);
r = pqueue_init(&pq, 10, 0, dest_db, compare, &error_callback); if (r) return r;
found_dup = -1;
......@@ -211,6 +207,7 @@ found_duplicate0:
if (verbose) printf("PASS\n");
pqueue_free(pq);
toku_free(pq_nodes);
brt_loader_destroy_error_callback(&error_callback);
return 0;
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment