Commit 788b96e4 authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge 2216b onto main line. Refs #2216. [t:2216]

{{{
svn merge -r18206:18672 https://svn.tokutek.com/tokudb/toku/tokudb.2216b
svn merge -r 18738:18746 https://svn.tokutek.com/tokudb/toku/tokudb.2216b
}}}
.


git-svn-id: file:///svn/toku/tokudb@18749 c7de825b-a66e-492c-adef-691d508d4ae1
parent 1db53a83
...@@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER; ...@@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER;
struct __toku_loader_internal; struct __toku_loader_internal;
struct __toku_loader { struct __toku_loader {
struct __toku_loader_internal *i; struct __toku_loader_internal *i;
int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */ int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */ int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */ int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */ int (*close)(DB_LOADER *loader); /* finish loading, free memory */
......
...@@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER; ...@@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER;
struct __toku_loader_internal; struct __toku_loader_internal;
struct __toku_loader { struct __toku_loader {
struct __toku_loader_internal *i; struct __toku_loader_internal *i;
int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */ int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */ int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */ int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */ int (*close)(DB_LOADER *loader); /* finish loading, free memory */
......
...@@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER; ...@@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER;
struct __toku_loader_internal; struct __toku_loader_internal;
struct __toku_loader { struct __toku_loader {
struct __toku_loader_internal *i; struct __toku_loader_internal *i;
int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */ int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */ int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */ int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */ int (*close)(DB_LOADER *loader); /* finish loading, free memory */
......
...@@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER; ...@@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER;
struct __toku_loader_internal; struct __toku_loader_internal;
struct __toku_loader { struct __toku_loader {
struct __toku_loader_internal *i; struct __toku_loader_internal *i;
int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */ int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */ int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */ int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */ int (*close)(DB_LOADER *loader); /* finish loading, free memory */
......
...@@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER; ...@@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER;
struct __toku_loader_internal; struct __toku_loader_internal;
struct __toku_loader { struct __toku_loader {
struct __toku_loader_internal *i; struct __toku_loader_internal *i;
int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */ int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */ int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */ int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */ int (*close)(DB_LOADER *loader); /* finish loading, free memory */
......
...@@ -404,7 +404,7 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__ ...@@ -404,7 +404,7 @@ int main (int argc __attribute__((__unused__)), char *const argv[] __attribute__
printf("struct __toku_loader_internal;\n"); printf("struct __toku_loader_internal;\n");
printf("struct __toku_loader {\n"); printf("struct __toku_loader {\n");
printf(" struct __toku_loader_internal *i;\n"); printf(" struct __toku_loader_internal *i;\n");
printf(" int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */\n"); printf(" int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */\n");
printf(" int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */\n"); printf(" int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */\n");
printf(" int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */\n"); printf(" int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */\n");
printf(" int (*close)(DB_LOADER *loader); /* finish loading, free memory */\n"); printf(" int (*close)(DB_LOADER *loader); /* finish loading, free memory */\n");
......
...@@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER; ...@@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER;
struct __toku_loader_internal; struct __toku_loader_internal;
struct __toku_loader { struct __toku_loader {
struct __toku_loader_internal *i; struct __toku_loader_internal *i;
int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */ int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */ int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */ int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */ int (*close)(DB_LOADER *loader); /* finish loading, free memory */
......
...@@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER; ...@@ -50,7 +50,7 @@ typedef struct __toku_loader DB_LOADER;
struct __toku_loader_internal; struct __toku_loader_internal;
struct __toku_loader { struct __toku_loader {
struct __toku_loader_internal *i; struct __toku_loader_internal *i;
int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); /* set the error callback */ int (*set_error_callback)(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *error_extra), void *error_extra); /* set the error callback */
int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */ int (*set_poll_function)(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); /* set the polling function */
int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */ int (*put)(DB_LOADER *loader, DBT *key, DBT* val); /* give a row to the loader */
int (*close)(DB_LOADER *loader); /* finish loading, free memory */ int (*close)(DB_LOADER *loader); /* finish loading, free memory */
......
...@@ -4,7 +4,24 @@ ...@@ -4,7 +4,24 @@
/* These functions are exported to allow the tests to compile. */ /* These functions are exported to allow the tests to compile. */
int brtloader_open_temp_file (BRTLOADER bl, FILE **filep, char **fnamep); /* These structures maintain a collection of all the open temporary files used by the loader. */
struct file_info {
BOOL is_open;
BOOL is_extant; // if true, the file must be unlinked.
char *fname;
FILE *file;
};
struct file_infos {
int n_files;
int n_files_limit;
struct file_info *file_infos;
int n_files_open, n_files_extant;
};
typedef struct fidx { int idx; } FIDX;
static const FIDX FIDX_NULL __attribute__((__unused__)) = {-1};
int brtloader_open_temp_file (BRTLOADER bl, FIDX*file_idx);
struct brtloader_s { struct brtloader_s {
int panic; int panic;
...@@ -20,15 +37,17 @@ struct brtloader_s { ...@@ -20,15 +37,17 @@ struct brtloader_s {
const char **new_fnames_in_env; // the file names that the final data will be written to (relative to env). const char **new_fnames_in_env; // the file names that the final data will be written to (relative to env).
const char *temp_file_template; const char *temp_file_template;
FILE *fprimary_rows; char *fprimary_rows_name; FIDX fprimary_rows; // the file index (in the file_infos) for the data
FILE *fprimary_idx; char *fprimary_idx_name; FIDX fprimary_idx; // the file index for the index
u_int64_t fprimary_offset; u_int64_t fprimary_offset;
CACHETABLE cachetable; CACHETABLE cachetable;
/* To make it easier to recover from errors, we don't use FILE*, instead we use an index into the file_infos. */
struct file_infos file_infos;
}; };
/* These data structures are used for manipulating a collection of rows in main memory. */ /* These data structures are used for manipulating a collection of rows in main memory. */
struct row { struct row {
char *data; size_t off; // the offset in the data array.
int klen,vlen; int klen,vlen;
}; };
struct rowset { struct rowset {
...@@ -38,25 +57,42 @@ struct rowset { ...@@ -38,25 +57,42 @@ struct rowset {
char *data; char *data;
}; };
void init_rowset (struct rowset *rows); int init_rowset (struct rowset *rows);
void destroy_rowset (struct rowset *rows); void destroy_rowset (struct rowset *rows);
void add_row (struct rowset *rows, DBT *key, DBT *val); void add_row (struct rowset *rows, DBT *key, DBT *val);
int loader_write_row(DBT *key, DBT *val, FILE *data, FILE *idx, u_int64_t *dataoff, BRTLOADER bl); int loader_write_row(DBT *key, DBT *val, FIDX data, FIDX idx, u_int64_t *dataoff, BRTLOADER bl);
int loader_read_row (FILE *f, DBT *key, DBT *val, BRTLOADER bl); int loader_read_row (FIDX f, DBT *key, DBT *val, BRTLOADER bl);
struct error_callback_s {
void (*error_callback)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra);
DB *db;
int which_db;
void *extra;
};
void merge (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn, int merge (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn,
DB *dest_db, brt_compare_func); DB *dest_db, brt_compare_func,
void mergesort_row_array (struct row rows[/*n*/], int n, DB *dest_db, brt_compare_func); struct error_callback_s *,
struct rowset *);
int mergesort_row_array (struct row rows[/*n*/], int n, DB *dest_db, brt_compare_func, struct error_callback_s *, struct rowset *);
struct fileset { struct merge_fileset {
int n_temp_files, n_temp_files_limit; int n_temp_files, n_temp_files_limit;
char **temp_data_names; FIDX *data_fidxs;
char **temp_idx_names; FIDX *idx_fidxs;
}; };
void init_fileset (struct fileset *fs); void init_merge_fileset (struct merge_fileset *fs);
void destroy_merge_fileset (struct merge_fileset *fs);
int sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func,
struct error_callback_s *error_callback);
int merge_files (struct merge_fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func, struct error_callback_s *);
int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const struct descriptor *descriptor);
int sort_and_write_rows (struct rowset *rows, struct fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func); int brtloader_init_file_infos (struct file_infos *fi);
int merge_files (struct fileset *fs, BRTLOADER bl, DB *dest_db, brt_compare_func); void brtloader_fi_destroy (struct file_infos *fi, BOOL is_error);
int write_file_to_dbfile (int outfile, FILE *infile, BRTLOADER bl, const struct descriptor *descriptor); int brtloader_fi_close (struct file_infos *fi, FIDX idx);
int brtloader_fi_reopen (struct file_infos *fi, FIDX idx, const char *mode);
int brtloader_fi_unlink (struct file_infos *fi, FIDX idx);
This diff is collapsed.
...@@ -16,6 +16,10 @@ int toku_brt_loader_open (BRTLOADER *bl, ...@@ -16,6 +16,10 @@ int toku_brt_loader_open (BRTLOADER *bl,
brt_compare_func bt_compare_functions[/*N*/], brt_compare_func bt_compare_functions[/*N*/],
const char *temp_file_template); const char *temp_file_template);
int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val); int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val);
int toku_brt_loader_close (BRTLOADER bl); int toku_brt_loader_close (BRTLOADER bl,
void (*error_callback)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra),
void *extra);
void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*));
#endif // BRTLOADER_H #endif // BRTLOADER_H
...@@ -22,26 +22,55 @@ static int compare_ints (DB *dest_db, const DBT *akey, const DBT *bkey) { ...@@ -22,26 +22,55 @@ static int compare_ints (DB *dest_db, const DBT *akey, const DBT *bkey) {
return qsort_compare_ints(akey->data, bkey->data); return qsort_compare_ints(akey->data, bkey->data);
} }
static void test_merge_internal (int a[], int na, int b[], int nb) { static void err_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) {
fprintf(stderr, "error in test");
abort();
}
BOOL founddup;
static void expect_dups_cb(DB *db UU(), int dbn UU(), int err UU(), DBT *key UU(), DBT *val UU(), void *extra UU()) {
founddup=TRUE;
}
static void test_merge_internal (int a[], int na, int b[], int nb, BOOL dups) {
int *MALLOC_N(na+nb, ab); // the combined array a and b
for (int i=0; i<na; i++) {
ab[i]=a[i];
}
for (int i=0; i<nb; i++) {
ab[na+i] = b[i];
}
struct row *MALLOC_N(na, ar); struct row *MALLOC_N(na, ar);
struct row *MALLOC_N(nb, br); struct row *MALLOC_N(nb, br);
for (int i=0; i<na; i++) { for (int i=0; i<na; i++) {
ar[i].data = (void*)&a[i]; ar[i].off = i*sizeof(a[0]);
ar[i].klen = sizeof(a[i]); ar[i].klen = sizeof(a[i]);
ar[i].vlen = 0; ar[i].vlen = 0;
} }
for (int i=0; i<nb; i++) { for (int i=0; i<nb; i++) {
br[i].data = (void*)&b[i]; br[i].off = (na+i)*sizeof(b[0]);
br[i].klen = sizeof(b[i]); br[i].klen = sizeof(b[i]);
br[i].vlen = 0; br[i].vlen = 0;
} }
struct row *MALLOC_N(na+nb, cr); struct row *MALLOC_N(na+nb, cr);
DB *dest_db = NULL; DB *dest_db = NULL;
merge(cr, ar, na, br, nb, dest_db, compare_ints); struct error_callback_s cb;
if (dups) {
cb.error_callback = expect_dups_cb;
founddup=FALSE;
} else {
cb.error_callback = err_cb;
}
struct rowset rs = {.data=(char*)ab};
merge(cr, ar, na, br, nb, dest_db, compare_ints, &cb, &rs);
if (dups) {
assert(founddup);
} else {
// verify the merge
int i=0; int i=0;
int j=0; int j=0;
for (int k=0; k<na+nb; k++) { for (int k=0; k<na+nb; k++) {
int vc = *(int*)cr[k].data; int voff = cr[k].off;
int vc = *(int*)(((char*)ab)+voff);
if (i<na && j<nb) { if (i<na && j<nb) {
if (vc==a[i]) { if (vc==a[i]) {
assert(a[i]<=b[j]); assert(a[i]<=b[j]);
...@@ -54,9 +83,11 @@ static void test_merge_internal (int a[], int na, int b[], int nb) { ...@@ -54,9 +83,11 @@ static void test_merge_internal (int a[], int na, int b[], int nb) {
} }
} }
} }
}
toku_free(cr); toku_free(cr);
toku_free(ar); toku_free(ar);
toku_free(br); toku_free(br);
toku_free(ab);
} }
/* Test the basic merger. */ /* Test the basic merger. */
...@@ -64,38 +95,41 @@ static void test_merge (void) { ...@@ -64,38 +95,41 @@ static void test_merge (void) {
{ {
int avals[]={1,2,3,4,5}; int avals[]={1,2,3,4,5};
int *bvals = NULL; //icc won't let us use a zero-sized array explicitly or by [] = {} construction. int *bvals = NULL; //icc won't let us use a zero-sized array explicitly or by [] = {} construction.
test_merge_internal(avals, 5, bvals, 0); test_merge_internal(avals, 5, bvals, 0, FALSE);
test_merge_internal(bvals, 0, avals, 5); test_merge_internal(bvals, 0, avals, 5, FALSE);
} }
{ {
int avals[]={1,3,5,7}; int avals[]={1,3,5,7};
int bvals[]={2,4}; int bvals[]={2,4};
test_merge_internal(avals, 4, bvals, 2); test_merge_internal(avals, 4, bvals, 2, FALSE);
test_merge_internal(bvals, 2, avals, 4); test_merge_internal(bvals, 2, avals, 4, FALSE);
} }
{ {
int avals[]={1,2,3,5,6,7}; int avals[]={1,2,3,5,6,7};
int bvals[]={2,4,5,6,8}; int bvals[]={2,4,5,6,8};
test_merge_internal(avals, 6, bvals, 5); test_merge_internal(avals, 6, bvals, 5, TRUE);
test_merge_internal(bvals, 5, avals, 6); test_merge_internal(bvals, 5, avals, 6, TRUE);
} }
} }
static void test_internal_mergesort_row_array (int a[], int n) { static void test_internal_mergesort_row_array (int a[], int n) {
struct row *MALLOC_N(n, ar); struct row *MALLOC_N(n, ar);
for (int i=0; i<n; i++) { for (int i=0; i<n; i++) {
ar[i].data = (void*)&a[i]; ar[i].off = i*sizeof(a[0]);
ar[i].klen = sizeof(a[i]); ar[i].klen = sizeof(a[i]);
ar[i].vlen = 0; ar[i].vlen = 0;
} }
mergesort_row_array (ar, n, NULL, compare_ints); struct rowset rs = {.data=(char*)a};
mergesort_row_array (ar, n, NULL, compare_ints, NULL, &rs);
int *MALLOC_N(n, tmp); int *MALLOC_N(n, tmp);
for (int i=0; i<n; i++) { for (int i=0; i<n; i++) {
tmp[i]=a[i]; tmp[i]=a[i];
} }
qsort(tmp, n, sizeof(a[0]), qsort_compare_ints); qsort(tmp, n, sizeof(a[0]), qsort_compare_ints);
for (int i=0; i<n; i++) { for (int i=0; i<n; i++) {
assert(tmp[i]==*(int*)ar[i].data); int voff = ar[i].off;
int v = *(int*)(((char*)a)+voff);
assert(tmp[i]==v);
} }
toku_free(ar); toku_free(ar);
toku_free(tmp); toku_free(tmp);
...@@ -107,11 +141,20 @@ static void test_mergesort_row_array (void) { ...@@ -107,11 +141,20 @@ static void test_mergesort_row_array (void) {
for (int i=0; i<=4; i++) for (int i=0; i<=4; i++)
test_internal_mergesort_row_array(avals, i); test_internal_mergesort_row_array(avals, i);
} }
for (int i=0; i<100; i++) { const int MAX_LEN = 100;
int len=1+random()%100; enum { MAX_VAL = 1000 };
for (int i=0; i<MAX_LEN; i++) {
BOOL used[MAX_VAL];
for (int j=0; j<MAX_VAL; j++) used[j]=FALSE;
int len=1+random()%MAX_LEN;
int avals[len]; int avals[len];
for (int j=0; j<len; j++) { for (int j=0; j<len; j++) {
avals[j] = random()%len; int v;
do {
v = random()%MAX_VAL;
} while (used[v]);
avals[j] = v;
used[v] = TRUE;
} }
test_internal_mergesort_row_array(avals, len); test_internal_mergesort_row_array(avals, len);
} }
...@@ -120,15 +163,14 @@ static void test_mergesort_row_array (void) { ...@@ -120,15 +163,14 @@ static void test_mergesort_row_array (void) {
static void test_read_write_rows (char *template) { static void test_read_write_rows (char *template) {
struct brtloader_s bl = {.panic = 0, struct brtloader_s bl = {.panic = 0,
.temp_file_template = template}; .temp_file_template = template};
int r; int r = brtloader_init_file_infos(&bl.file_infos);
FILE *file; CKERR(r);
char *fname; FIDX file;
r = brtloader_open_temp_file(&bl, &file, &fname); r = brtloader_open_temp_file(&bl, &file);
CKERR(r); CKERR(r);
FILE *idx; FIDX idx;
char *idxname; r = brtloader_open_temp_file(&bl, &idx);
r = brtloader_open_temp_file(&bl, &idx, &idxname);
CKERR(r); CKERR(r);
size_t dataoff=0; size_t dataoff=0;
...@@ -146,11 +188,11 @@ static void test_read_write_rows (char *template) { ...@@ -146,11 +188,11 @@ static void test_read_write_rows (char *template) {
if (actual_size != dataoff) fprintf(stderr, "actual_size=%"PRIu64", dataoff=%"PRIu64"\n", actual_size, dataoff); if (actual_size != dataoff) fprintf(stderr, "actual_size=%"PRIu64", dataoff=%"PRIu64"\n", actual_size, dataoff);
assert(actual_size == dataoff); assert(actual_size == dataoff);
r = fclose(file); r = brtloader_fi_close(&bl.file_infos, file);
CKERR(r); CKERR(r);
file = fopen(fname, "r"); r = brtloader_fi_reopen(&bl.file_infos, file, "r");
assert(file); CKERR(r);
{ {
int n_read=0; int n_read=0;
...@@ -168,18 +210,20 @@ static void test_read_write_rows (char *template) { ...@@ -168,18 +210,20 @@ static void test_read_write_rows (char *template) {
toku_free(key.data); toku_free(key.data);
toku_free(val.data); toku_free(val.data);
} }
r = fclose(file); r = brtloader_fi_close(&bl.file_infos, file);
CKERR(r); CKERR(r);
r = fclose(idx); r = brtloader_fi_close(&bl.file_infos, idx);
CKERR(r); CKERR(r);
r = unlink(fname); r = brtloader_fi_unlink(&bl.file_infos, file);
CKERR(r); CKERR(r);
r = unlink(idxname); r = brtloader_fi_unlink(&bl.file_infos, idx);
CKERR(r); CKERR(r);
toku_free(fname); assert(bl.file_infos.n_files_open==0);
toku_free(idxname); assert(bl.file_infos.n_files_extant==0);
brtloader_fi_destroy(&bl.file_infos, FALSE);
} }
static void fill_rowset (struct rowset *rows, static void fill_rowset (struct rowset *rows,
...@@ -200,9 +244,10 @@ static void test_merge_files (char *template) { ...@@ -200,9 +244,10 @@ static void test_merge_files (char *template) {
DB *dest_db = NULL; DB *dest_db = NULL;
struct brtloader_s bl = {.panic = 0, struct brtloader_s bl = {.panic = 0,
.temp_file_template = template}; .temp_file_template = template};
int r; int r = brtloader_init_file_infos(&bl.file_infos);
struct fileset fs; CKERR(r);
init_fileset(&fs); struct merge_fileset fs;
init_merge_fileset(&fs);
int a_keys[] = { 1, 3, 5, 7, 8, 9}; int a_keys[] = { 1, 3, 5, 7, 8, 9};
int b_keys[] = { 2, 4, 6 }; int b_keys[] = { 2, 4, 6 };
...@@ -212,18 +257,22 @@ static void test_merge_files (char *template) { ...@@ -212,18 +257,22 @@ static void test_merge_files (char *template) {
fill_rowset(&aset, a_keys, a_vals, 6); fill_rowset(&aset, a_keys, a_vals, 6);
fill_rowset(&bset, b_keys, b_vals, 3); fill_rowset(&bset, b_keys, b_vals, 3);
r = sort_and_write_rows(&aset, &fs, &bl, dest_db, compare_ints); CKERR(r); struct error_callback_s cb;
r = sort_and_write_rows(&bset, &fs, &bl, dest_db, compare_ints); CKERR(r); cb.error_callback = err_cb;
r = sort_and_write_rows(&aset, &fs, &bl, dest_db, compare_ints, &cb); CKERR(r);
r = sort_and_write_rows(&bset, &fs, &bl, dest_db, compare_ints, &cb); CKERR(r);
assert(fs.n_temp_files==2 && fs.n_temp_files_limit >= fs.n_temp_files); assert(fs.n_temp_files==2 && fs.n_temp_files_limit >= fs.n_temp_files);
destroy_rowset(&aset); destroy_rowset(&aset);
destroy_rowset(&bset); destroy_rowset(&bset);
for (int i=0; i<2; i++) assert(fs.temp_data_names[i] != NULL && fs.temp_idx_names[i] != NULL); for (int i=0; i<2; i++) assert(fs.data_fidxs[i].idx != -1 && fs.idx_fidxs[i].idx != -1);
r = merge_files(&fs, &bl, dest_db, compare_ints); CKERR(r); r = merge_files(&fs, &bl, dest_db, compare_ints, &cb); CKERR(r);
assert(fs.n_temp_files==1); assert(fs.n_temp_files==1);
FILE *inf = fopen(fs.temp_data_names[0], "r"); FIDX inf = fs.data_fidxs[0];
r = brtloader_fi_reopen(&bl.file_infos, inf, "r");
CKERR(r);
char *name = toku_strdup(template); char *name = toku_strdup(template);
int fd = mkstemp(name); int fd = mkstemp(name);
fprintf(stderr, "Final data in %s\n", name); fprintf(stderr, "Final data in %s\n", name);
...@@ -231,18 +280,15 @@ static void test_merge_files (char *template) { ...@@ -231,18 +280,15 @@ static void test_merge_files (char *template) {
struct descriptor desc = {.version = 1, .dbt = (DBT){.size = 4, .data="abcd"}}; struct descriptor desc = {.version = 1, .dbt = (DBT){.size = 4, .data="abcd"}};
r = write_file_to_dbfile(fd, inf, &bl, &desc); r = write_file_to_dbfile(fd, inf, &bl, &desc);
CKERR(r); CKERR(r);
r = fclose(inf); r = brtloader_fi_close(&bl.file_infos, inf);
CKERR(r); CKERR(r);
r = unlink(fs.temp_data_names[0]); r = brtloader_fi_unlink(&bl.file_infos, fs.data_fidxs[0]);
CKERR(r); CKERR(r);
r = unlink(fs.temp_idx_names[0]); r = brtloader_fi_unlink(&bl.file_infos, fs.idx_fidxs[0]);
CKERR(r); CKERR(r);
destroy_merge_fileset(&fs);
toku_free(fs.temp_data_names[0]); brtloader_fi_destroy(&bl.file_infos, FALSE);
toku_free(fs.temp_idx_names[0]);
toku_free(fs.temp_data_names);
toku_free(fs.temp_idx_names);
toku_free(name); toku_free(name);
} }
......
...@@ -38,7 +38,8 @@ struct __toku_loader_internal { ...@@ -38,7 +38,8 @@ struct __toku_loader_internal {
uint32_t *dbt_flags; uint32_t *dbt_flags;
uint32_t loader_flags; uint32_t loader_flags;
void *extra; void *extra;
void (*error_callback)(DB *db, int i, int err, DBT *key, DBT *val, void *extra); void (*error_callback)(DB *db, int i, int err, DBT *key, DBT *val, void *extra_extra);
void *error_extra;
int (*poll_func)(void *extra, float progress); int (*poll_func)(void *extra, float progress);
char *temp_file_template; char *temp_file_template;
...@@ -147,7 +148,7 @@ int toku_loader_create_loader(DB_ENV *env, ...@@ -147,7 +148,7 @@ int toku_loader_create_loader(DB_ENV *env,
} }
loader->i->ekeys = NULL; loader->i->ekeys = NULL;
loader->i->evals = NULL; loader->i->evals = NULL;
r = ydb_load_inames (env, txn, N, dbs, new_inames_in_env); r = locked_ydb_load_inames (env, txn, N, dbs, new_inames_in_env);
assert(r==0); assert(r==0);
toku_brt_loader_open(&loader->i->brt_loader, toku_brt_loader_open(&loader->i->brt_loader,
loader->i->env->i->cachetable, loader->i->env->i->cachetable,
...@@ -175,9 +176,11 @@ int toku_loader_set_poll_function(DB_LOADER *loader, ...@@ -175,9 +176,11 @@ int toku_loader_set_poll_function(DB_LOADER *loader,
} }
int toku_loader_set_error_callback(DB_LOADER *loader, int toku_loader_set_error_callback(DB_LOADER *loader,
void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)) void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra),
void *error_extra)
{ {
loader->i->error_callback = error_cb; loader->i->error_callback = error_cb;
loader->i->error_extra = error_extra;
return 0; return 0;
} }
...@@ -230,31 +233,36 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val) ...@@ -230,31 +233,36 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val)
int toku_loader_close(DB_LOADER *loader) int toku_loader_close(DB_LOADER *loader)
{ {
int r=0;
if ( loader->i->err_errno != 0 ) { if ( loader->i->err_errno != 0 ) {
if ( loader->i->error_callback != NULL ) { if ( loader->i->error_callback != NULL ) {
loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, NULL); loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra);
} }
toku_free(loader->i->err_key.data); toku_free(loader->i->err_key.data);
toku_free(loader->i->err_val.data); toku_free(loader->i->err_val.data);
} }
if ( !(loader->i->loader_flags & LOADER_USE_PUTS ) ) { if ( !(loader->i->loader_flags & LOADER_USE_PUTS ) ) {
toku_brt_loader_close(loader->i->brt_loader); r = toku_brt_loader_close(loader->i->brt_loader, loader->i->error_callback, loader->i->error_extra);
if (r!=0) goto cleanup_and_return_r;
for (int i=0; i<loader->i->N; i++) { for (int i=0; i<loader->i->N; i++) {
toku_ydb_lock(); //Must hold ydb lock for dictionary_redirect. toku_ydb_lock(); //Must hold ydb lock for dictionary_redirect.
int r = toku_dictionary_redirect(loader->i->inames_in_env[i], r = toku_dictionary_redirect(loader->i->inames_in_env[i],
loader->i->dbs[i]->i->brt, loader->i->dbs[i]->i->brt,
db_txn_struct_i(loader->i->txn)->tokutxn); db_txn_struct_i(loader->i->txn)->tokutxn);
assert(r==0); assert(r==0);
toku_ydb_unlock(); toku_ydb_unlock();
}
cleanup_and_return_r:
for (int i=0; i<loader->i->N; i++) {
toku_free(loader->i->inames_in_env[i]); toku_free(loader->i->inames_in_env[i]);
} }
toku_free(loader->i->inames_in_env); toku_free(loader->i->inames_in_env);
toku_free(loader->i->brt_loader); toku_free(loader->i->brt_loader);
// TODO: release table locks // TODO: release table locks
} } else {
if (loader->i->loader_flags & LOADER_USE_PUTS) { // (loader->i->loader_flags & LOADER_USE_PUTS);
int num_dbts = loader->i->N; int num_dbts = loader->i->N;
for (int i=0; i<num_dbts; i++) { for (int i=0; i<num_dbts; i++) {
if (loader->i->ekeys && if (loader->i->ekeys &&
...@@ -274,7 +282,7 @@ int toku_loader_close(DB_LOADER *loader) ...@@ -274,7 +282,7 @@ int toku_loader_close(DB_LOADER *loader)
toku_free(loader->i->temp_file_template); toku_free(loader->i->temp_file_template);
toku_free(loader->i); toku_free(loader->i);
toku_free(loader); toku_free(loader);
return 0; return r;
} }
int toku_loader_abort(DB_LOADER *loader) int toku_loader_abort(DB_LOADER *loader)
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
*/ */
int toku_loader_create_loader(DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[N], uint32_t db_flags[N], uint32_t dbt_flags[N], uint32_t loader_flags, void *extra); int toku_loader_create_loader(DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[N], uint32_t db_flags[N], uint32_t dbt_flags[N], uint32_t loader_flags, void *extra);
int toku_loader_set_error_callback(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra)); int toku_loader_set_error_callback(DB_LOADER *loader, void (*error_cb)(DB *db, int i, int err, DBT *key, DBT *val, void *extra), void *extra);
int toku_loader_set_poll_function(DB_LOADER *loader, int (*poll_func)(void *extra, float progress)); int toku_loader_set_poll_function(DB_LOADER *loader, int (*poll_func)(void *extra, float progress));
int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val); int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val);
int toku_loader_close(DB_LOADER *loader); int toku_loader_close(DB_LOADER *loader);
......
...@@ -128,6 +128,7 @@ BDB_DONTRUN_TESTS = \ ...@@ -128,6 +128,7 @@ BDB_DONTRUN_TESTS = \
recover-delboth-after-checkpoint \ recover-delboth-after-checkpoint \
loader-reference-test \ loader-reference-test \
loader-stress-test \ loader-stress-test \
loader-dup-test \
recover-lsn-filter-multiple \ recover-lsn-filter-multiple \
recover-put-multiple-fdelete-all \ recover-put-multiple-fdelete-all \
recover-put-multiple-fdelete-some \ recover-put-multiple-fdelete-some \
......
This diff is collapsed.
...@@ -54,7 +54,7 @@ static void test_loader(DB **dbs) ...@@ -54,7 +54,7 @@ static void test_loader(DB **dbs)
CKERR(r); CKERR(r);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags, NULL); r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags, NULL);
CKERR(r); CKERR(r);
r = loader->set_error_callback(loader, NULL); r = loader->set_error_callback(loader, NULL, NULL);
CKERR(r); CKERR(r);
r = loader->set_poll_function(loader, NULL); r = loader->set_poll_function(loader, NULL);
CKERR(r); CKERR(r);
...@@ -108,7 +108,8 @@ static void run_test(void) ...@@ -108,7 +108,8 @@ static void run_test(void)
r = env->set_default_dup_compare(env, int64_dbt_cmp); CKERR(r); r = env->set_default_dup_compare(env, int64_dbt_cmp); CKERR(r);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate); r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r); CKERR(r);
int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE | DB_INIT_LOG; // int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE | DB_INIT_LOG;
int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE | DB_INIT_LOG;
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
env->set_errfile(env, stderr); env->set_errfile(env, stderr);
//Disable auto-checkpointing //Disable auto-checkpointing
......
...@@ -195,7 +195,7 @@ static void test_loader(DB **dbs) ...@@ -195,7 +195,7 @@ static void test_loader(DB **dbs)
CKERR(r); CKERR(r);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags, NULL); r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags, NULL);
CKERR(r); CKERR(r);
r = loader->set_error_callback(loader, NULL); r = loader->set_error_callback(loader, NULL, NULL);
CKERR(r); CKERR(r);
r = loader->set_poll_function(loader, NULL); r = loader->set_poll_function(loader, NULL);
CKERR(r); CKERR(r);
...@@ -241,7 +241,8 @@ static void run_test(void) ...@@ -241,7 +241,8 @@ static void run_test(void)
r = env->set_default_dup_compare(env, uint_dbt_cmp); CKERR(r); r = env->set_default_dup_compare(env, uint_dbt_cmp); CKERR(r);
r = env->set_generate_row_callback_for_put(env, put_multiple_generate); r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r); CKERR(r);
int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE; // int envflags = DB_INIT_LOCK | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE;
int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE;
r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r); r = env->open(env, ENVDIR, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
env->set_errfile(env, stderr); env->set_errfile(env, stderr);
//Disable auto-checkpointing //Disable auto-checkpointing
......
...@@ -1404,7 +1404,6 @@ env_get_engine_status(DB_ENV * env, ENGINE_STATUS * engstat) { ...@@ -1404,7 +1404,6 @@ env_get_engine_status(DB_ENV * env, ENGINE_STATUS * engstat) {
engstat->enospc_threads_blocked = enospc_threads_blocked; engstat->enospc_threads_blocked = enospc_threads_blocked;
engstat->enospc_total = enospc_total; engstat->enospc_total = enospc_total;
} }
} }
return r; return r;
} }
...@@ -5377,6 +5376,14 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname ...@@ -5377,6 +5376,14 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname
return rval; return rval;
} }
int
locked_ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_inames_in_env[N]) {
toku_ydb_lock();
int r = ydb_load_inames(env, txn, N, dbs, new_inames_in_env);
toku_ydb_unlock();
return r;
}
// TODO 2216: Patch out this (dangerous) function when loader is working and // TODO 2216: Patch out this (dangerous) function when loader is working and
// we don't need to test the low-level redirect anymore. // we don't need to test the low-level redirect anymore.
// for use by test programs only, just a wrapper around brt call: // for use by test programs only, just a wrapper around brt call:
......
...@@ -25,5 +25,12 @@ int ydb_load_inames(DB_ENV * env, ...@@ -25,5 +25,12 @@ int ydb_load_inames(DB_ENV * env,
DB * dbs[/*N*/], DB * dbs[/*N*/],
/*out*/ char * new_inames_in_env[N]); /*out*/ char * new_inames_in_env[N]);
// Wrapper to ydb_load_inames if you are not holding the ydb lock.
int locked_ydb_load_inames(DB_ENV * env,
DB_TXN * txn,
int N,
DB * dbs[/*N*/],
/*out*/ char * new_inames_in_env[N]);
#endif #endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment