Commit 47c5441a authored by Bradley C. Kuszmaul's avatar Bradley C. Kuszmaul Committed by Yoni Fogel

Merge the basic dbufio working test in. It doesn't test destination files,...

Merge the basic dbufio working test in.  It doesn't test destination files, and the failure cases aren't running yet.  But the ...dbufio() code looks fairly well covered anyway. [t:2623] Refs #2623.
{{{
svn merge -r 20423:20425 https://svn.tokutek.com/tokudb/toku/tokudb.2623
}}}
.


git-svn-id: file:///svn/toku/tokudb@20426 c7de825b-a66e-492c-adef-691d508d4ae1
parent 48b56ff1
...@@ -227,6 +227,8 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp, ...@@ -227,6 +227,8 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
const char *temp_file_template, const char *temp_file_template,
LSN load_lsn); LSN load_lsn);
void toku_brtloader_internal_destroy (BRTLOADER bl, BOOL is_error);
C_END C_END
#endif #endif
...@@ -278,7 +278,7 @@ int brtloader_open_temp_file (BRTLOADER bl, FIDX *file_idx) ...@@ -278,7 +278,7 @@ int brtloader_open_temp_file (BRTLOADER bl, FIDX *file_idx)
return result; return result;
} }
static void brtloader_destroy (BRTLOADER bl, BOOL is_error) { void toku_brtloader_internal_destroy (BRTLOADER bl, BOOL is_error) {
int r = toku_pthread_mutex_destroy(&bl->mutex); resource_assert(r == 0); int r = toku_pthread_mutex_destroy(&bl->mutex); resource_assert(r == 0);
// These frees rely on the fact that if you free a NULL pointer then nothing bad happens. // These frees rely on the fact that if you free a NULL pointer then nothing bad happens.
toku_free(bl->dbs); toku_free(bl->dbs);
...@@ -389,8 +389,8 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp, ...@@ -389,8 +389,8 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
bl->N = N; bl->N = N;
bl->load_lsn = load_lsn; bl->load_lsn = load_lsn;
#define MY_CALLOC_N(n,v) CALLOC_N(n,v); if (!v) { int r = errno; brtloader_destroy(bl, TRUE); return r; } #define MY_CALLOC_N(n,v) CALLOC_N(n,v); if (!v) { int r = errno; toku_brtloader_internal_destroy(bl, TRUE); return r; }
#define SET_TO_MY_STRDUP(lval, s) do { char *v = toku_strdup(s); if (!v) { int r = errno; brtloader_destroy(bl, TRUE); return r; } lval = v; } while (0) #define SET_TO_MY_STRDUP(lval, s) do { char *v = toku_strdup(s); if (!v) { int r = errno; toku_brtloader_internal_destroy(bl, TRUE); return r; } lval = v; } while (0)
MY_CALLOC_N(N, bl->dbs); MY_CALLOC_N(N, bl->dbs);
for (int i=0; i<N; i++) bl->dbs[i]=dbs[i]; for (int i=0; i<N; i++) bl->dbs[i]=dbs[i];
...@@ -409,7 +409,7 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp, ...@@ -409,7 +409,7 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
{ {
int r = brtloader_init_file_infos(&bl->file_infos); int r = brtloader_init_file_infos(&bl->file_infos);
if (r!=0) { brtloader_destroy(bl, TRUE); return r; } if (r!=0) { toku_brtloader_internal_destroy(bl, TRUE); return r; }
} }
SET_TO_MY_STRDUP(bl->temp_file_template, temp_file_template); SET_TO_MY_STRDUP(bl->temp_file_template, temp_file_template);
...@@ -422,28 +422,28 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp, ...@@ -422,28 +422,28 @@ int toku_brt_loader_internal_init (/* out */ BRTLOADER *blp,
for(int i=0;i<N;i++) { for(int i=0;i<N;i++) {
{ {
int r = init_rowset(&bl->rows[i], memory_per_rowset(bl)); int r = init_rowset(&bl->rows[i], memory_per_rowset(bl));
if (r!=0) {brtloader_destroy(bl, TRUE); return r; } if (r!=0) { toku_brtloader_internal_destroy(bl, TRUE); return r; }
} }
init_merge_fileset(&bl->fs[i]); init_merge_fileset(&bl->fs[i]);
} }
{ // note : currently brt_loader_init_error_callback always returns 0 { // note : currently brt_loader_init_error_callback always returns 0
int r = brt_loader_init_error_callback(&bl->error_callback); int r = brt_loader_init_error_callback(&bl->error_callback);
if (r!=0) { brtloader_destroy(bl, TRUE); return r; } if (r!=0) { toku_brtloader_internal_destroy(bl, TRUE); return r; }
} }
brt_loader_init_poll_callback(&bl->poll_callback); brt_loader_init_poll_callback(&bl->poll_callback);
{ {
int r = init_rowset(&bl->primary_rowset, memory_per_rowset(bl)); int r = init_rowset(&bl->primary_rowset, memory_per_rowset(bl));
if (r!=0) { brtloader_destroy(bl, TRUE); return r; } if (r!=0) { toku_brtloader_internal_destroy(bl, TRUE); return r; }
} }
{ int r = queue_create(&bl->primary_rowset_queue, EXTRACTOR_QUEUE_DEPTH); { int r = queue_create(&bl->primary_rowset_queue, EXTRACTOR_QUEUE_DEPTH);
if (r!=0) { brtloader_destroy(bl, TRUE); return r; } if (r!=0) { toku_brtloader_internal_destroy(bl, TRUE); return r; }
} }
//printf("%s:%d toku_pthread_create\n", __FILE__, __LINE__); //printf("%s:%d toku_pthread_create\n", __FILE__, __LINE__);
{ {
int r = toku_pthread_mutex_init(&bl->mutex, NULL); int r = toku_pthread_mutex_init(&bl->mutex, NULL);
if (r != 0) { brtloader_destroy(bl, TRUE); return r; } if (r != 0) { toku_brtloader_internal_destroy(bl, TRUE); return r; }
} }
bl->extractor_live = TRUE; bl->extractor_live = TRUE;
...@@ -493,7 +493,7 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp, ...@@ -493,7 +493,7 @@ int toku_brt_loader_open (/* out */ BRTLOADER *blp,
if (r!=0) { if (r!=0) {
result = r; result = r;
toku_pthread_mutex_destroy(&bl->mutex); toku_pthread_mutex_destroy(&bl->mutex);
brtloader_destroy(bl, TRUE); toku_brtloader_internal_destroy(bl, TRUE);
} }
} }
BL_TRACE(blt_open); BL_TRACE(blt_open);
...@@ -2405,7 +2405,7 @@ static int toku_brt_loader_close_internal (BRTLOADER bl) ...@@ -2405,7 +2405,7 @@ static int toku_brt_loader_close_internal (BRTLOADER bl)
invariant(bl->file_infos.n_files_extant == 0); invariant(bl->file_infos.n_files_extant == 0);
invariant(bl->progress == PROGRESS_MAX); invariant(bl->progress == PROGRESS_MAX);
error: error:
brtloader_destroy(bl, (BOOL)(result!=0)); toku_brtloader_internal_destroy(bl, (BOOL)(result!=0));
BL_TRACE(blt_close); BL_TRACE(blt_close);
BL_TRACE_END; BL_TRACE_END;
return result; return result;
...@@ -2445,7 +2445,7 @@ int toku_brt_loader_close (BRTLOADER bl, ...@@ -2445,7 +2445,7 @@ int toku_brt_loader_close (BRTLOADER bl,
if (r && result == 0) if (r && result == 0)
result = r; result = r;
} else } else
brtloader_destroy(bl, TRUE); toku_brtloader_internal_destroy(bl, TRUE);
return result; return result;
} }
...@@ -2478,7 +2478,7 @@ int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error) ...@@ -2478,7 +2478,7 @@ int toku_brt_loader_abort(BRTLOADER bl, BOOL is_error)
for (int i = 0; i < bl->N; i++) for (int i = 0; i < bl->N; i++)
invariant(!bl->fractal_threads_live[i]); invariant(!bl->fractal_threads_live[i]);
brtloader_destroy(bl, is_error); toku_brtloader_internal_destroy(bl, is_error);
return result; return result;
} }
......
...@@ -92,7 +92,7 @@ check_test-assert$(BINSUF): test-assert$(BINSUF) $(PTHREAD_LOCAL) ...@@ -92,7 +92,7 @@ check_test-assert$(BINSUF): test-assert$(BINSUF) $(PTHREAD_LOCAL)
@# one argument, "ok" should not error @# one argument, "ok" should not error
$(VGRIND) ./$< ok $(SUMMARIZE_CMD) $(VGRIND) ./$< ok $(SUMMARIZE_CMD)
check_brtloader-test-merge-files-dbufio: EXTRA_ARGS=dir.$@ check_brtloader-test-merge-files-dbufio: EXTRA_ARGS=-r 8000 -s dir.$@
check_brtloader-test$(BINSUF): EXTRA_ARGS=dir.$@ check_brtloader-test$(BINSUF): EXTRA_ARGS=dir.$@
......
...@@ -134,7 +134,9 @@ static void err_cb(DB *db UU(), int dbn, int err, DBT *key UU(), DBT *val UU(), ...@@ -134,7 +134,9 @@ static void err_cb(DB *db UU(), int dbn, int err, DBT *key UU(), DBT *val UU(),
abort(); abort();
} }
enum { N_SOURCES = 2, N_RECORDS=10, N_DEST_DBS=1 }; enum { N_SOURCES = 2, N_DEST_DBS=1 };
int N_RECORDS = 10;
static char *make_fname(const char *directory, const char *fname, int idx) { static char *make_fname(const char *directory, const char *fname, int idx) {
int len = strlen(directory)+strlen(fname)+20; int len = strlen(directory)+strlen(fname)+20;
...@@ -145,16 +147,37 @@ static char *make_fname(const char *directory, const char *fname, int idx) { ...@@ -145,16 +147,37 @@ static char *make_fname(const char *directory, const char *fname, int idx) {
} }
struct consumer_thunk {
QUEUE q;
int64_t n_read;
};
static void *consumer_thread (void *ctv) {
struct consumer_thunk *cthunk = (struct consumer_thunk *)ctv;
while (1) {
void *item;
int r = queue_deq(cthunk->q, &item, NULL, NULL);
if (r==EOF) return NULL;
assert(r==0);
struct rowset *rowset = (struct rowset *)item;
cthunk->n_read += rowset->n_rows;
destroy_rowset(rowset);
toku_free(rowset);
}
}
static void test (const char *directory) { static void test (const char *directory) {
int *XMALLOC_N(N_SOURCES, fds); int *XMALLOC_N(N_SOURCES, fds);
char **XMALLOC_N(N_SOURCES, fnames); char **XMALLOC_N(N_SOURCES, fnames);
int *XMALLOC_N(N_SOURCES, n_records_in_fd);
for (int i=0; i<N_SOURCES; i++) { for (int i=0; i<N_SOURCES; i++) {
fnames[i] = make_fname(directory, "temp", i); fnames[i] = make_fname(directory, "temp", i);
fds[i] = open(fnames[i], O_CREAT|O_RDWR, S_IRWXU); fds[i] = open(fnames[i], O_CREAT|O_RDWR, S_IRWXU);
assert(fds[i]>=0); assert(fds[i]>=0);
n_records_in_fd[i] = 0;
} }
for (int i=0; i<N_RECORDS; i++) { for (int i=0; i<N_RECORDS; i++) {
int size=4; int size=4;
...@@ -164,6 +187,7 @@ static void test (const char *directory) { ...@@ -164,6 +187,7 @@ static void test (const char *directory) {
{ int r = write(fd, &i, 4); assert(r==4); } { int r = write(fd, &i, 4); assert(r==4); }
{ int r = write(fd, &size, 4); assert(r==4); } { int r = write(fd, &size, 4); assert(r==4); }
{ int r = write(fd, &i, 4); assert(r==4); } { int r = write(fd, &i, 4); assert(r==4); }
n_records_in_fd[fdi]++;
} }
for (int i=0; i<N_SOURCES; i++) { for (int i=0; i<N_SOURCES; i++) {
off_t r = lseek(fds[i], 0, SEEK_SET); off_t r = lseek(fds[i], 0, SEEK_SET);
...@@ -224,32 +248,70 @@ static void test (const char *directory) { ...@@ -224,32 +248,70 @@ static void test (const char *directory) {
bl->file_infos.n_files = N_SOURCES; bl->file_infos.n_files = N_SOURCES;
bl->file_infos.n_files_limit = N_SOURCES; bl->file_infos.n_files_limit = N_SOURCES;
bl->file_infos.n_files_open = 0; bl->file_infos.n_files_open = 0;
bl->file_infos.n_files_extant = N_SOURCES; bl->file_infos.n_files_extant = 0;
XREALLOC_N(bl->file_infos.n_files_limit, bl->file_infos.file_infos); XREALLOC_N(bl->file_infos.n_files_limit, bl->file_infos.file_infos);
const int BUFFER_SIZE = 100;
for (int i=0; i<N_SOURCES; i++) { for (int i=0; i<N_SOURCES; i++) {
bl->file_infos.file_infos[i] = (struct file_info){ .is_open = FALSE, // all we really need is the number of records in the file. The rest of the file_info is unused by the dbufio code.n
.is_extant = TRUE, bl->file_infos.file_infos[i].n_rows = n_records_in_fd[i];
.fname = toku_strdup(fnames[i]), // However we need these for the destroy method to work right.
.file = (FILE*)NULL, bl->file_infos.file_infos[i].is_extant = FALSE;
.n_rows = N_RECORDS, bl->file_infos.file_infos[i].is_open = FALSE;
.buffer_size = BUFFER_SIZE, bl->file_infos.file_infos[i].buffer = NULL;
.buffer = toku_xmalloc(BUFFER_SIZE)};
src_fidxs[i].idx = i; src_fidxs[i].idx = i;
} }
toku_pthread_t consumer;
struct consumer_thunk cthunk = {q, 0};
{
int r = toku_pthread_create(&consumer, NULL, consumer_thread, (void*)&cthunk);
assert(r==0);
}
{ {
int r = toku_merge_some_files_using_dbufio(TRUE, FIDX_NULL, q, N_SOURCES, bfs, src_fidxs, bl, 0, (DB*)NULL, compare_ints, 10000); int r = toku_merge_some_files_using_dbufio(TRUE, FIDX_NULL, q, N_SOURCES, bfs, src_fidxs, bl, 0, (DB*)NULL, compare_ints, 10000);
if (r!=0) printf("%s:%d r=%d (%s)\n", __FILE__, __LINE__, r, errorstr_static(r)); if (r!=0) printf("%s:%d r=%d (%s)\n", __FILE__, __LINE__, r, errorstr_static(r));
assert(r==0); assert(r==0);
} }
{ {
int r = toku_brtloader_destroy(bl); int r = queue_eof(q);
assert(r==0);
}
{
void *result;
int r = toku_pthread_join(consumer, &result);
assert(r==0);
assert(result==NULL);
//printf("n_read = %ld, N_SOURCES=%d N_RECORDS=%d\n", cthunk.n_read, N_SOURCES, N_RECORDS);
assert(cthunk.n_read == N_RECORDS);
}
printf("%s:%d Destroying\n", __FILE__, __LINE__);
{
int r = queue_destroy(bl->primary_rowset_queue);
assert(r==0); assert(r==0);
} }
{
int r = queue_destroy(q);
assert(r==0);
}
toku_brtloader_internal_destroy(bl, FALSE);
{ {
int r = toku_cachetable_close(&ct); int r = toku_cachetable_close(&ct);
assert(r==0); assert(r==0);
} }
for (int i=0; i<N_DEST_DBS; i++) {
toku_free((void*)new_fnames_in_env[i]);
}
for (int i=0; i<N_SOURCES; i++) {
toku_free(fnames[i]);
}
destroy_dbufio_fileset(bfs);
toku_free(fnames);
toku_free(fds);
toku_free(dbs);
toku_free(descriptors);
toku_free(new_fnames_in_env);
toku_free(bt_compare_functions);
toku_free(lsnp);
toku_free(src_fidxs);
toku_free(n_records_in_fd);
} }
...@@ -265,24 +327,23 @@ static int usage(const char *progname, int n) { ...@@ -265,24 +327,23 @@ static int usage(const char *progname, int n) {
int test_main (int argc, const char *argv[]) { int test_main (int argc, const char *argv[]) {
const char *progname=argv[0]; const char *progname=argv[0];
int n = 1;
argc--; argv++; argc--; argv++;
while (argc>0) { while (argc>0) {
if (strcmp(argv[0],"-h")==0) { if (strcmp(argv[0],"-h")==0) {
return usage(progname, n); return usage(progname, N_RECORDS);
} else if (strcmp(argv[0],"-v")==0) { } else if (strcmp(argv[0],"-v")==0) {
verbose=1; verbose=1;
} else if (strcmp(argv[0],"-q")==0) { } else if (strcmp(argv[0],"-q")==0) {
verbose=0; verbose=0;
} else if (strcmp(argv[0],"-r") == 0) { } else if (strcmp(argv[0],"-r") == 0) {
argc--; argv++; argc--; argv++;
n = atoi(argv[0]); N_RECORDS = atoi(argv[0]);
} else if (strcmp(argv[0],"-s") == 0) { } else if (strcmp(argv[0],"-s") == 0) {
toku_brtloader_set_size_factor(1); toku_brtloader_set_size_factor(1);
} else if (strcmp(argv[0],"-m") == 0) { } else if (strcmp(argv[0],"-m") == 0) {
my_malloc_event = 1; my_malloc_event = 1;
} else if (argc!=1) { } else if (argc!=1) {
return usage(progname, n); return usage(progname, N_RECORDS);
} }
else { else {
break; break;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment