Commit 58841607 authored by Zardosht Kasheff's avatar Zardosht Kasheff Committed by Yoni Fogel

[t:4941], remove tabs from ftloader.c

git-svn-id: file:///svn/toku/tokudb@43936 c7de825b-a66e-492c-adef-691d508d4ae1
parent 57b7bc04
...@@ -51,9 +51,9 @@ void ft_loader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FIL ...@@ -51,9 +51,9 @@ void ft_loader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FIL
static size_t do_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stream) { static size_t do_fwrite (const void *ptr, size_t size, size_t nmemb, FILE *stream) {
if (os_fwrite_fun) { if (os_fwrite_fun) {
return os_fwrite_fun(ptr, size, nmemb, stream); return os_fwrite_fun(ptr, size, nmemb, stream);
} else { } else {
return fwrite(ptr, size, nmemb, stream); return fwrite(ptr, size, nmemb, stream);
} }
} }
...@@ -152,7 +152,7 @@ int ft_loader_init_file_infos (struct file_infos *fi) { ...@@ -152,7 +152,7 @@ int ft_loader_init_file_infos (struct file_infos *fi) {
fi->n_files_extant = 0; fi->n_files_extant = 0;
MALLOC_N(fi->n_files_limit, fi->file_infos); MALLOC_N(fi->n_files_limit, fi->file_infos);
if (fi->file_infos == NULL) if (fi->file_infos == NULL)
result = errno; result = errno;
return result; return result;
} }
...@@ -164,19 +164,19 @@ void ft_loader_fi_destroy (struct file_infos *fi, BOOL is_error) ...@@ -164,19 +164,19 @@ void ft_loader_fi_destroy (struct file_infos *fi, BOOL is_error)
{ {
toku_mutex_destroy(&fi->lock); toku_mutex_destroy(&fi->lock);
if (!is_error) { if (!is_error) {
invariant(fi->n_files_open==0); invariant(fi->n_files_open==0);
invariant(fi->n_files_extant==0); invariant(fi->n_files_extant==0);
} }
for (int i=0; i<fi->n_files; i++) { for (int i=0; i<fi->n_files; i++) {
if (fi->file_infos[i].is_open) { if (fi->file_infos[i].is_open) {
invariant(is_error); invariant(is_error);
toku_os_fclose(fi->file_infos[i].file); // don't check for errors, since we are in an error case. toku_os_fclose(fi->file_infos[i].file); // don't check for errors, since we are in an error case.
} }
if (fi->file_infos[i].is_extant) { if (fi->file_infos[i].is_extant) {
invariant(is_error); invariant(is_error);
unlink(fi->file_infos[i].fname); unlink(fi->file_infos[i].fname);
toku_free(fi->file_infos[i].fname); toku_free(fi->file_infos[i].fname);
} }
cleanup_big_buffer(&fi->file_infos[i]); cleanup_big_buffer(&fi->file_infos[i]);
} }
toku_free(fi->file_infos); toku_free(fi->file_infos);
...@@ -193,8 +193,8 @@ static int open_file_add (struct file_infos *fi, ...@@ -193,8 +193,8 @@ static int open_file_add (struct file_infos *fi,
int result = 0; int result = 0;
toku_mutex_lock(&fi->lock); toku_mutex_lock(&fi->lock);
if (fi->n_files >= fi->n_files_limit) { if (fi->n_files >= fi->n_files_limit) {
fi->n_files_limit *=2; fi->n_files_limit *=2;
XREALLOC_N(fi->n_files_limit, fi->file_infos); XREALLOC_N(fi->n_files_limit, fi->file_infos);
} }
invariant(fi->n_files < fi->n_files_limit); invariant(fi->n_files < fi->n_files_limit);
fi->file_infos[fi->n_files].is_open = TRUE; fi->file_infos[fi->n_files].is_open = TRUE;
...@@ -279,11 +279,11 @@ int ...@@ -279,11 +279,11 @@ int
ft_loader_fi_close_all(struct file_infos *fi) { ft_loader_fi_close_all(struct file_infos *fi) {
int rval = 0; int rval = 0;
for (int i = 0; i < fi->n_files; i++) { for (int i = 0; i < fi->n_files; i++) {
int r; int r;
FIDX idx = { i }; FIDX idx = { i };
r = ft_loader_fi_close(fi, idx, FALSE); // ignore files that are already closed r = ft_loader_fi_close(fi, idx, FALSE); // ignore files that are already closed
if (rval == 0 && r) if (rval == 0 && r)
rval = r; // capture first error rval = r; // capture first error
} }
return rval; return rval;
} }
...@@ -389,21 +389,21 @@ static uint64_t memory_per_rowset_during_extract (FTLOADER bl) ...@@ -389,21 +389,21 @@ static uint64_t memory_per_rowset_during_extract (FTLOADER bl)
// Return how much memory can be allocated for each rowset. // Return how much memory can be allocated for each rowset.
{ {
if (size_factor==1) { if (size_factor==1) {
return 16*1024; return 16*1024;
} else { } else {
// There is a primary rowset being maintained by the foreground thread. // There is a primary rowset being maintained by the foreground thread.
// There could be two more in the queue. // There could be two more in the queue.
// There is one rowset for each index (bl->N) being filled in. // There is one rowset for each index (bl->N) being filled in.
// Later we may have sort_and_write operations spawning in parallel, and will need to account for that. // Later we may have sort_and_write operations spawning in parallel, and will need to account for that.
int n_copies = (1 // primary rowset int n_copies = (1 // primary rowset
+EXTRACTOR_QUEUE_DEPTH // the number of primaries in the queue +EXTRACTOR_QUEUE_DEPTH // the number of primaries in the queue
+bl->N // the N rowsets being constructed by the extractor thread. +bl->N // the N rowsets being constructed by the extractor thread.
+bl->N // the N sort buffers +bl->N // the N sort buffers
+1 // Give the extractor thread one more so that it can have temporary space for sorting. This is overkill. +1 // Give the extractor thread one more so that it can have temporary space for sorting. This is overkill.
); );
int64_t extra_reserved_memory = bl->N * FILE_BUFFER_SIZE; // for each index we are writing to a file at any given time. int64_t extra_reserved_memory = bl->N * FILE_BUFFER_SIZE; // for each index we are writing to a file at any given time.
int64_t tentative_rowset_size = ((int64_t)(bl->reserved_memory - extra_reserved_memory))/(n_copies); int64_t tentative_rowset_size = ((int64_t)(bl->reserved_memory - extra_reserved_memory))/(n_copies);
return MAX(tentative_rowset_size, (int64_t)MIN_ROWSET_MEMORY); return MAX(tentative_rowset_size, (int64_t)MIN_ROWSET_MEMORY);
} }
} }
...@@ -470,14 +470,14 @@ static uint64_t memory_per_rowset_during_merge (FTLOADER bl, int merge_factor, B ...@@ -470,14 +470,14 @@ static uint64_t memory_per_rowset_during_merge (FTLOADER bl, int merge_factor, B
} }
int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
CACHETABLE cachetable, CACHETABLE cachetable,
generate_row_for_put_func g, generate_row_for_put_func g,
DB *src_db, DB *src_db,
int N, FT_HANDLE brts[/*N*/], DB* dbs[/*N*/], int N, FT_HANDLE brts[/*N*/], DB* dbs[/*N*/],
const char *new_fnames_in_env[/*N*/], const char *new_fnames_in_env[/*N*/],
ft_compare_func bt_compare_functions[/*N*/], ft_compare_func bt_compare_functions[/*N*/],
const char *temp_file_template, const char *temp_file_template,
LSN load_lsn, LSN load_lsn,
TOKUTXN txn) TOKUTXN txn)
// Effect: Allocate and initialize a FTLOADER, but do not create the extractor thread. // Effect: Allocate and initialize a FTLOADER, but do not create the extractor thread.
{ {
...@@ -489,7 +489,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, ...@@ -489,7 +489,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
if (bl->cachetable) if (bl->cachetable)
bl->reserved_memory = toku_cachetable_reserve_memory(bl->cachetable, 2.0/3.0); // allocate 2/3 of the unreserved part (which is 3/4 of the memory to start with). bl->reserved_memory = toku_cachetable_reserve_memory(bl->cachetable, 2.0/3.0); // allocate 2/3 of the unreserved part (which is 3/4 of the memory to start with).
else else
bl->reserved_memory = 512*1024*1024; // if no cache table use 512MB. bl->reserved_memory = 512*1024*1024; // if no cache table use 512MB.
//printf("Reserved memory=%ld\n", bl->reserved_memory); //printf("Reserved memory=%ld\n", bl->reserved_memory);
bl->src_db = src_db; bl->src_db = src_db;
...@@ -544,7 +544,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, ...@@ -544,7 +544,7 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
if (r!=0) { toku_ft_loader_internal_destroy(bl, TRUE); return r; } if (r!=0) { toku_ft_loader_internal_destroy(bl, TRUE); return r; }
} }
init_merge_fileset(&bl->fs[i]); init_merge_fileset(&bl->fs[i]);
bl->last_key[i].flags = DB_DBT_REALLOC; // don't really need this, but it's nice to maintain it. We use ulen to keep track of the realloced space. bl->last_key[i].flags = DB_DBT_REALLOC; // don't really need this, but it's nice to maintain it. We use ulen to keep track of the realloced space.
} }
ft_loader_init_error_callback(&bl->error_callback); ft_loader_init_error_callback(&bl->error_callback);
...@@ -569,12 +569,12 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp, ...@@ -569,12 +569,12 @@ int toku_ft_loader_internal_init (/* out */ FTLOADER *blp,
int toku_ft_loader_open (/* out */ FTLOADER *blp, int toku_ft_loader_open (/* out */ FTLOADER *blp,
CACHETABLE cachetable, CACHETABLE cachetable,
generate_row_for_put_func g, generate_row_for_put_func g,
DB *src_db, DB *src_db,
int N, FT_HANDLE brts[/*N*/], DB* dbs[/*N*/], int N, FT_HANDLE brts[/*N*/], DB* dbs[/*N*/],
const char *new_fnames_in_env[/*N*/], const char *new_fnames_in_env[/*N*/],
ft_compare_func bt_compare_functions[/*N*/], ft_compare_func bt_compare_functions[/*N*/],
const char *temp_file_template, const char *temp_file_template,
LSN load_lsn, LSN load_lsn,
TOKUTXN txn) TOKUTXN txn)
/* Effect: called by DB_ENV->create_loader to create a brt loader. /* Effect: called by DB_ENV->create_loader to create a brt loader.
...@@ -591,22 +591,22 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp, ...@@ -591,22 +591,22 @@ int toku_ft_loader_open (/* out */ FTLOADER *blp,
{ {
int result = 0; int result = 0;
{ {
int r = toku_ft_loader_internal_init(blp, cachetable, g, src_db, int r = toku_ft_loader_internal_init(blp, cachetable, g, src_db,
N, brts, dbs, N, brts, dbs,
new_fnames_in_env, new_fnames_in_env,
bt_compare_functions, bt_compare_functions,
temp_file_template, temp_file_template,
load_lsn, load_lsn,
txn); txn);
if (r!=0) result = r; if (r!=0) result = r;
} }
if (result==0) { if (result==0) {
FTLOADER bl = *blp; FTLOADER bl = *blp;
int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl); int r = toku_pthread_create(&bl->extractor_thread, NULL, extractor_thread, (void*)bl);
if (r==0) { if (r==0) {
bl->extractor_live = TRUE; bl->extractor_live = TRUE;
} else { } else {
result = r; result = r;
(void) toku_ft_loader_internal_destroy(bl, TRUE); (void) toku_ft_loader_internal_destroy(bl, TRUE);
} }
} }
...@@ -642,13 +642,13 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, FTLOADE ...@@ -642,13 +642,13 @@ static int bl_fwrite(void *ptr, size_t size, size_t nmemb, FILE *stream, FTLOADE
{ {
size_t r = do_fwrite(ptr, size, nmemb, stream); size_t r = do_fwrite(ptr, size, nmemb, stream);
if (r!=nmemb) { if (r!=nmemb) {
int e; int e;
if (os_fwrite_fun) // if using hook to induce artificial errors (for testing) ... if (os_fwrite_fun) // if using hook to induce artificial errors (for testing) ...
e = errno; // ... then there is no error in the stream, but there is one in errno e = errno; // ... then there is no error in the stream, but there is one in errno
else else
e = ferror(stream); e = ferror(stream);
invariant(e!=0); invariant(e!=0);
return e; return e;
} }
return 0; return 0;
} }
...@@ -665,17 +665,17 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream) ...@@ -665,17 +665,17 @@ static int bl_fread (void *ptr, size_t size, size_t nmemb, FILE *stream)
{ {
size_t r = fread(ptr, size, nmemb, stream); size_t r = fread(ptr, size, nmemb, stream);
if (r==0) { if (r==0) {
if (feof(stream)) return EOF; if (feof(stream)) return EOF;
else { else {
do_error: ; do_error: ;
int e = ferror(stream); int e = ferror(stream);
// r == 0 && !feof && e == 0, how does this happen? invariant(e!=0); // r == 0 && !feof && e == 0, how does this happen? invariant(e!=0);
return e; return e;
} }
} else if (r<nmemb) { } else if (r<nmemb) {
goto do_error; goto do_error;
} else { } else {
return 0; return 0;
} }
} }
...@@ -686,7 +686,7 @@ static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, FTLOADER b ...@@ -686,7 +686,7 @@ static int bl_write_dbt (DBT *dbt, FILE* datafile, uint64_t *dataoff, FTLOADER b
if ((r=bl_fwrite(&dlen, sizeof(dlen), 1, datafile, bl))) return r; if ((r=bl_fwrite(&dlen, sizeof(dlen), 1, datafile, bl))) return r;
if ((r=bl_fwrite(dbt->data, 1, dlen, datafile, bl))) return r; if ((r=bl_fwrite(dbt->data, 1, dlen, datafile, bl))) return r;
if (dataoff) if (dataoff)
*dataoff += dlen + sizeof(dlen); *dataoff += dlen + sizeof(dlen);
return 0; return 0;
} }
...@@ -694,14 +694,14 @@ static int bl_read_dbt (/*in*/DBT *dbt, FILE *stream) ...@@ -694,14 +694,14 @@ static int bl_read_dbt (/*in*/DBT *dbt, FILE *stream)
{ {
int len; int len;
{ {
int r; int r;
if ((r = bl_fread(&len, sizeof(len), 1, stream))) return r; if ((r = bl_fread(&len, sizeof(len), 1, stream))) return r;
invariant(len>=0); invariant(len>=0);
} }
if ((int)dbt->ulen<len) { dbt->ulen=len; dbt->data=toku_xrealloc(dbt->data, len); } if ((int)dbt->ulen<len) { dbt->ulen=len; dbt->data=toku_xrealloc(dbt->data, len); }
{ {
int r; int r;
if ((r = bl_fread(dbt->data, 1, len, stream))) return r; if ((r = bl_fread(dbt->data, 1, len, stream))) return r;
} }
dbt->size = len; dbt->size = len;
return 0; return 0;
...@@ -712,35 +712,35 @@ static int bl_read_dbt_from_dbufio (/*in*/DBT *dbt, DBUFIO_FILESET bfs, int file ...@@ -712,35 +712,35 @@ static int bl_read_dbt_from_dbufio (/*in*/DBT *dbt, DBUFIO_FILESET bfs, int file
int result = 0; int result = 0;
u_int32_t len; u_int32_t len;
{ {
size_t n_read; size_t n_read;
int r = dbufio_fileset_read(bfs, filenum, &len, sizeof(len), &n_read); int r = dbufio_fileset_read(bfs, filenum, &len, sizeof(len), &n_read);
if (r!=0) { if (r!=0) {
result = r; result = r;
} else if (n_read<sizeof(len)) { } else if (n_read<sizeof(len)) {
result = TOKUDB_NO_DATA; // must have run out of data prematurely. This is not EOF, it's a real error. result = TOKUDB_NO_DATA; // must have run out of data prematurely. This is not EOF, it's a real error.
} }
} }
if (result==0) { if (result==0) {
if (dbt->ulen<len) { if (dbt->ulen<len) {
void * data = toku_realloc(dbt->data, len); void * data = toku_realloc(dbt->data, len);
if (data==NULL) { if (data==NULL) {
result = errno; result = errno;
} else { } else {
dbt->ulen=len; dbt->ulen=len;
dbt->data=data; dbt->data=data;
} }
} }
} }
if (result==0) { if (result==0) {
size_t n_read; size_t n_read;
int r = dbufio_fileset_read(bfs, filenum, dbt->data, len, &n_read); int r = dbufio_fileset_read(bfs, filenum, dbt->data, len, &n_read);
if (r!=0) { if (r!=0) {
result = r; result = r;
} else if (n_read<len) { } else if (n_read<len) {
result = TOKUDB_NO_DATA; // must have run out of data prematurely. This is not EOF, it's a real error. result = TOKUDB_NO_DATA; // must have run out of data prematurely. This is not EOF, it's a real error.
} else { } else {
dbt->size = len; dbt->size = len;
} }
} }
return result; return result;
} }
...@@ -779,12 +779,12 @@ int loader_read_row (FILE *f, DBT *key, DBT *val) ...@@ -779,12 +779,12 @@ int loader_read_row (FILE *f, DBT *key, DBT *val)
*/ */
{ {
{ {
int r = bl_read_dbt(key, f); int r = bl_read_dbt(key, f);
if (r!=0) return r; if (r!=0) return r;
} }
{ {
int r = bl_read_dbt(val, f); int r = bl_read_dbt(val, f);
if (r!=0) return r; if (r!=0) return r;
} }
return 0; return 0;
} }
...@@ -800,12 +800,12 @@ static int loader_read_row_from_dbufio (DBUFIO_FILESET bfs, int filenum, DBT *ke ...@@ -800,12 +800,12 @@ static int loader_read_row_from_dbufio (DBUFIO_FILESET bfs, int filenum, DBT *ke
*/ */
{ {
{ {
int r = bl_read_dbt_from_dbufio(key, bfs, filenum); int r = bl_read_dbt_from_dbufio(key, bfs, filenum);
if (r!=0) return r; if (r!=0) return r;
} }
{ {
int r = bl_read_dbt_from_dbufio(val, bfs, filenum); int r = bl_read_dbt_from_dbufio(val, bfs, filenum);
if (r!=0) return r; if (r!=0) return r;
} }
return 0; return 0;
} }
...@@ -833,10 +833,10 @@ int init_rowset (struct rowset *rows, uint64_t memory_budget) ...@@ -833,10 +833,10 @@ int init_rowset (struct rowset *rows, uint64_t memory_budget)
if (rows->rows==NULL || rows->data==NULL) { if (rows->rows==NULL || rows->data==NULL) {
if (result == 0) if (result == 0)
result = errno; result = errno;
toku_free(rows->rows); toku_free(rows->rows);
toku_free(rows->data); toku_free(rows->data);
rows->rows = NULL; rows->rows = NULL;
rows->data = NULL; rows->data = NULL;
} }
return result; return result;
} }
...@@ -858,7 +858,7 @@ static int row_wont_fit (struct rowset *rows, size_t size) ...@@ -858,7 +858,7 @@ static int row_wont_fit (struct rowset *rows, size_t size)
{ {
// Account for the memory used by the data and also the row structures. // Account for the memory used by the data and also the row structures.
size_t memory_in_use = (rows->n_rows*sizeof(struct row) size_t memory_in_use = (rows->n_rows*sizeof(struct row)
+ rows->n_bytes); + rows->n_bytes);
return (rows->memory_budget < memory_in_use + size); return (rows->memory_budget < memory_in_use + size);
} }
...@@ -869,8 +869,8 @@ int add_row (struct rowset *rows, DBT *key, DBT *val) ...@@ -869,8 +869,8 @@ int add_row (struct rowset *rows, DBT *key, DBT *val)
if (rows->n_rows >= rows->n_rows_limit) { if (rows->n_rows >= rows->n_rows_limit) {
struct row *old_rows = rows->rows; struct row *old_rows = rows->rows;
size_t old_n_rows_limit = rows->n_rows_limit; size_t old_n_rows_limit = rows->n_rows_limit;
rows->n_rows_limit *= 2; rows->n_rows_limit *= 2;
REALLOC_N(rows->n_rows_limit, rows->rows); REALLOC_N(rows->n_rows_limit, rows->rows);
if (rows->rows == NULL) { if (rows->rows == NULL) {
result = errno; result = errno;
rows->rows = old_rows; rows->rows = old_rows;
...@@ -887,12 +887,12 @@ int add_row (struct rowset *rows, DBT *key, DBT *val) ...@@ -887,12 +887,12 @@ int add_row (struct rowset *rows, DBT *key, DBT *val)
rows->rows[rows->n_rows++] = newrow; rows->rows[rows->n_rows++] = newrow;
if (next_off > rows->n_bytes_limit) { if (next_off > rows->n_bytes_limit) {
size_t old_n_bytes_limit = rows->n_bytes_limit; size_t old_n_bytes_limit = rows->n_bytes_limit;
while (next_off > rows->n_bytes_limit) { while (next_off > rows->n_bytes_limit) {
rows->n_bytes_limit = rows->n_bytes_limit*2; rows->n_bytes_limit = rows->n_bytes_limit*2;
} }
invariant(next_off <= rows->n_bytes_limit); invariant(next_off <= rows->n_bytes_limit);
char *old_data = rows->data; char *old_data = rows->data;
REALLOC_N(rows->n_bytes_limit, rows->data); REALLOC_N(rows->n_bytes_limit, rows->data);
if (rows->data == NULL) { if (rows->data == NULL) {
result = errno; result = errno;
rows->data = old_data; rows->data = old_data;
...@@ -920,9 +920,9 @@ static int finish_primary_rows_internal (FTLOADER bl) ...@@ -920,9 +920,9 @@ static int finish_primary_rows_internal (FTLOADER bl)
#endif #endif
cilk_for (int i = 0; i < bl->N; i++) { cilk_for (int i = 0; i < bl->N; i++) {
//printf("%s:%d extractor finishing index %d with %ld rows\n", __FILE__, __LINE__, i, rows->n_rows); //printf("%s:%d extractor finishing index %d with %ld rows\n", __FILE__, __LINE__, i, rows->n_rows);
ra[i] = sort_and_write_rows(bl->rows[i], &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i]); ra[i] = sort_and_write_rows(bl->rows[i], &(bl->fs[i]), bl, i, bl->dbs[i], bl->bt_compare_funs[i]);
zero_rowset(&bl->rows[i]); zero_rowset(&bl->rows[i]);
} }
// Implicit cilk_sync after that cilk_for loop. // Implicit cilk_sync after that cilk_for loop.
...@@ -944,30 +944,30 @@ static void* extractor_thread (void *blv) { ...@@ -944,30 +944,30 @@ static void* extractor_thread (void *blv) {
FTLOADER bl = (FTLOADER)blv; FTLOADER bl = (FTLOADER)blv;
int r = 0; int r = 0;
while (1) { while (1) {
void *item; void *item;
{ {
int rq = queue_deq(bl->primary_rowset_queue, &item, NULL, NULL); int rq = queue_deq(bl->primary_rowset_queue, &item, NULL, NULL);
if (rq==EOF) break; if (rq==EOF) break;
invariant(rq==0); // other errors are arbitrarily bad. invariant(rq==0); // other errors are arbitrarily bad.
} }
struct rowset *primary_rowset = (struct rowset *)item; struct rowset *primary_rowset = (struct rowset *)item;
//printf("%s:%d extractor got %ld rows\n", __FILE__, __LINE__, primary_rowset.n_rows); //printf("%s:%d extractor got %ld rows\n", __FILE__, __LINE__, primary_rowset.n_rows);
// Now we have some rows to output // Now we have some rows to output
{ {
r = process_primary_rows(bl, primary_rowset); r = process_primary_rows(bl, primary_rowset);
if (r) if (r)
ft_loader_set_panic(bl, r, FALSE); ft_loader_set_panic(bl, r, FALSE);
} }
} }
//printf("%s:%d extractor finishing\n", __FILE__, __LINE__); //printf("%s:%d extractor finishing\n", __FILE__, __LINE__);
if (r == 0) { if (r == 0) {
r = finish_primary_rows(bl); r = finish_primary_rows(bl);
if (r) if (r)
ft_loader_set_panic(bl, r, FALSE); ft_loader_set_panic(bl, r, FALSE);
} }
return NULL; return NULL;
} }
...@@ -988,10 +988,10 @@ static int loader_do_put(FTLOADER bl, ...@@ -988,10 +988,10 @@ static int loader_do_put(FTLOADER bl,
int result; int result;
result = add_row(&bl->primary_rowset, pkey, pval); result = add_row(&bl->primary_rowset, pkey, pval);
if (result == 0 && row_wont_fit(&bl->primary_rowset, 0)) { if (result == 0 && row_wont_fit(&bl->primary_rowset, 0)) {
// queue the rows for further processing by the extractor thread. // queue the rows for further processing by the extractor thread.
//printf("%s:%d please extract %ld\n", __FILE__, __LINE__, bl->primary_rowset.n_rows); //printf("%s:%d please extract %ld\n", __FILE__, __LINE__, bl->primary_rowset.n_rows);
enqueue_for_extraction(bl); enqueue_for_extraction(bl);
{ {
int r = init_rowset(&bl->primary_rowset, memory_per_rowset_during_extract(bl)); int r = init_rowset(&bl->primary_rowset, memory_per_rowset_during_extract(bl));
// bl->primary_rowset will get destroyed by toku_ft_loader_abort // bl->primary_rowset will get destroyed by toku_ft_loader_abort
if (r != 0) if (r != 0)
...@@ -1008,26 +1008,26 @@ finish_extractor (FTLOADER bl) { ...@@ -1008,26 +1008,26 @@ finish_extractor (FTLOADER bl) {
int rval; int rval;
if (bl->primary_rowset.n_rows>0) { if (bl->primary_rowset.n_rows>0) {
enqueue_for_extraction(bl); enqueue_for_extraction(bl);
} else { } else {
destroy_rowset(&bl->primary_rowset); destroy_rowset(&bl->primary_rowset);
} }
//printf("%s:%d please finish extraction\n", __FILE__, __LINE__); //printf("%s:%d please finish extraction\n", __FILE__, __LINE__);
{ {
int r = queue_eof(bl->primary_rowset_queue); int r = queue_eof(bl->primary_rowset_queue);
invariant(r==0); invariant(r==0);
} }
//printf("%s:%d joining\n", __FILE__, __LINE__); //printf("%s:%d joining\n", __FILE__, __LINE__);
{ {
void *toku_pthread_retval; void *toku_pthread_retval;
int r = toku_pthread_join(bl->extractor_thread, &toku_pthread_retval); int r = toku_pthread_join(bl->extractor_thread, &toku_pthread_retval);
resource_assert_zero(r); resource_assert_zero(r);
invariant(toku_pthread_retval == NULL); invariant(toku_pthread_retval == NULL);
bl->extractor_live = FALSE; bl->extractor_live = FALSE;
} }
{ {
int r = queue_destroy(bl->primary_rowset_queue); int r = queue_destroy(bl->primary_rowset_queue);
invariant(r==0); invariant(r==0);
} }
rval = ft_loader_fi_close_all(&bl->file_infos); rval = ft_loader_fi_close_all(&bl->file_infos);
...@@ -1077,103 +1077,103 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro ...@@ -1077,103 +1077,103 @@ static int process_primary_rows_internal (FTLOADER bl, struct rowset *primary_ro
#pragma cilk grainsize = 1 #pragma cilk grainsize = 1
#endif #endif
cilk_for (int i = 0; i < bl->N; i++) { cilk_for (int i = 0; i < bl->N; i++) {
unsigned int klimit,vlimit; // maximum row sizes. unsigned int klimit,vlimit; // maximum row sizes.
toku_ft_get_maximum_advised_key_value_lengths(&klimit, &vlimit); toku_ft_get_maximum_advised_key_value_lengths(&klimit, &vlimit);
error_codes[i] = 0; error_codes[i] = 0;
struct rowset *rows = &(bl->rows[i]); struct rowset *rows = &(bl->rows[i]);
struct merge_fileset *fs = &(bl->fs[i]); struct merge_fileset *fs = &(bl->fs[i]);
ft_compare_func compare = bl->bt_compare_funs[i]; ft_compare_func compare = bl->bt_compare_funs[i];
DBT skey = zero_dbt; DBT skey = zero_dbt;
skey.flags = DB_DBT_REALLOC; skey.flags = DB_DBT_REALLOC;
DBT sval=skey; DBT sval=skey;
// Don't parallelize this loop, or we have to lock access to add_row() which would be a lot of overehad. // Don't parallelize this loop, or we have to lock access to add_row() which would be a lot of overehad.
// Also this way we can reuse the DB_DBT_REALLOC'd value inside skey and sval without a race. // Also this way we can reuse the DB_DBT_REALLOC'd value inside skey and sval without a race.
for (size_t prownum=0; prownum<primary_rowset->n_rows; prownum++) { for (size_t prownum=0; prownum<primary_rowset->n_rows; prownum++) {
if (error_count) break; if (error_count) break;
struct row *prow = &primary_rowset->rows[prownum]; struct row *prow = &primary_rowset->rows[prownum];
DBT pkey = zero_dbt; DBT pkey = zero_dbt;
DBT pval = zero_dbt; DBT pval = zero_dbt;
pkey.data = primary_rowset->data + prow->off; pkey.data = primary_rowset->data + prow->off;
pkey.size = prow->klen; pkey.size = prow->klen;
pval.data = primary_rowset->data + prow->off + prow->klen; pval.data = primary_rowset->data + prow->off + prow->klen;
pval.size = prow->vlen; pval.size = prow->vlen;
{ {
int r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, &skey, &sval, &pkey, &pval); int r = bl->generate_row_for_put(bl->dbs[i], bl->src_db, &skey, &sval, &pkey, &pval);
if (r != 0) { if (r != 0) {
error_codes[i] = r; error_codes[i] = r;
inc_error_count(); inc_error_count();
break; break;
} }
if (skey.size > klimit) { if (skey.size > klimit) {
error_codes[i] = EINVAL; error_codes[i] = EINVAL;
fprintf(stderr, "Key too big (keysize=%d bytes, limit=%d bytes)\n", skey.size, klimit); fprintf(stderr, "Key too big (keysize=%d bytes, limit=%d bytes)\n", skey.size, klimit);
inc_error_count();
break;
}
if (sval.size > vlimit) {
error_codes[i] = EINVAL;
fprintf(stderr, "Row too big (rowsize=%d bytes, limit=%d bytes)\n", sval.size, vlimit);
inc_error_count();
break;
}
}
bl->extracted_datasizes[i] += ft_loader_leafentry_size(skey.size, sval.size, leafentry_xid(bl, i));
if (row_wont_fit(rows, skey.size + sval.size)) {
//printf("%s:%d rows.n_rows=%ld rows.n_bytes=%ld\n", __FILE__, __LINE__, rows->n_rows, rows->n_bytes);
int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
// If we do spawn this, then we must account for the additional storage in the memory_per_rowset() function.
init_rowset(rows, memory_per_rowset_during_extract(bl)); // we passed the contents of rows to sort_and_write_rows.
if (r != 0) {
error_codes[i] = r;
inc_error_count(); inc_error_count();
break; break;
} }
} if (sval.size > vlimit) {
int r = add_row(rows, &skey, &sval); error_codes[i] = EINVAL;
fprintf(stderr, "Row too big (rowsize=%d bytes, limit=%d bytes)\n", sval.size, vlimit);
inc_error_count();
break;
}
}
bl->extracted_datasizes[i] += ft_loader_leafentry_size(skey.size, sval.size, leafentry_xid(bl, i));
if (row_wont_fit(rows, skey.size + sval.size)) {
//printf("%s:%d rows.n_rows=%ld rows.n_bytes=%ld\n", __FILE__, __LINE__, rows->n_rows, rows->n_bytes);
int r = sort_and_write_rows(*rows, fs, bl, i, bl->dbs[i], compare); // cannot spawn this because of the race on rows. If we were to create a new rows, and if sort_and_write_rows were to destroy the rows it is passed, we could spawn it, however.
// If we do spawn this, then we must account for the additional storage in the memory_per_rowset() function.
init_rowset(rows, memory_per_rowset_during_extract(bl)); // we passed the contents of rows to sort_and_write_rows.
if (r != 0) {
error_codes[i] = r;
inc_error_count();
break;
}
}
int r = add_row(rows, &skey, &sval);
if (r != 0) { if (r != 0) {
error_codes[i] = r; error_codes[i] = r;
inc_error_count(); inc_error_count();
break; break;
} }
//flags==0 means generate_row_for_put callback changed it //flags==0 means generate_row_for_put callback changed it
//(and freed any memory necessary to do so) so that values are now stored //(and freed any memory necessary to do so) so that values are now stored
//in temporary memory that does not need to be freed. We need to continue //in temporary memory that does not need to be freed. We need to continue
//using DB_DBT_REALLOC however. //using DB_DBT_REALLOC however.
if (skey.flags == 0) { if (skey.flags == 0) {
toku_init_dbt(&skey); toku_init_dbt(&skey);
skey.flags = DB_DBT_REALLOC; skey.flags = DB_DBT_REALLOC;
} }
if (sval.flags == 0) { if (sval.flags == 0) {
toku_init_dbt(&sval); toku_init_dbt(&sval);
sval.flags = DB_DBT_REALLOC; sval.flags = DB_DBT_REALLOC;
} }
} }
{ {
if (skey.flags) { if (skey.flags) {
toku_free(skey.data); skey.data = NULL; toku_free(skey.data); skey.data = NULL;
} }
if (sval.flags) { if (sval.flags) {
toku_free(sval.data); sval.data = NULL; toku_free(sval.data); sval.data = NULL;
} }
} }
} }
destroy_rowset(primary_rowset); destroy_rowset(primary_rowset);
toku_free(primary_rowset); toku_free(primary_rowset);
int r = 0; int r = 0;
if (error_count > 0) { if (error_count > 0) {
for (int i=0; i<bl->N; i++) { for (int i=0; i<bl->N; i++) {
if (error_codes[i]) r = error_codes[i]; if (error_codes[i]) r = error_codes[i];
} }
invariant(r); // found the error invariant(r); // found the error
} }
toku_free(error_codes); toku_free(error_codes);
...@@ -1206,10 +1206,10 @@ u_int64_t toku_ft_loader_get_n_rows(FTLOADER bl) { ...@@ -1206,10 +1206,10 @@ u_int64_t toku_ft_loader_get_n_rows(FTLOADER bl) {
} }
int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn, int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn,
int which_db, DB *dest_db, ft_compare_func compare, int which_db, DB *dest_db, ft_compare_func compare,
FTLOADER bl, FTLOADER bl,
struct rowset *rowset) struct rowset *rowset)
/* Effect: Given two arrays of rows, a and b, merge them using the comparison function, and write them into dest. /* Effect: Given two arrays of rows, a and b, merge them using the comparison function, and write them into dest.
* This function is suitable for use in a mergesort. * This function is suitable for use in a mergesort.
* If a pair of duplicate keys is ever noticed, then call the error_callback function (if it exists), and return DB_KEYEXIST. * If a pair of duplicate keys is ever noticed, then call the error_callback function (if it exists), and return DB_KEYEXIST.
...@@ -1222,43 +1222,43 @@ int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int ...@@ -1222,43 +1222,43 @@ int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int
*/ */
{ {
while (an>0 && bn>0) { while (an>0 && bn>0) {
DBT akey; memset(&akey, 0, sizeof akey); akey.data=rowset->data+a->off; akey.size=a->klen; DBT akey; memset(&akey, 0, sizeof akey); akey.data=rowset->data+a->off; akey.size=a->klen;
DBT bkey; memset(&bkey, 0, sizeof bkey); bkey.data=rowset->data+b->off; bkey.size=b->klen; DBT bkey; memset(&bkey, 0, sizeof bkey); bkey.data=rowset->data+b->off; bkey.size=b->klen;
int compare_result = compare(dest_db, &akey, &bkey); int compare_result = compare(dest_db, &akey, &bkey);
if (compare_result==0) { if (compare_result==0) {
if (bl->error_callback.error_callback) { if (bl->error_callback.error_callback) {
DBT aval; memset(&aval, 0, sizeof aval); aval.data=rowset->data + a->off + a->klen; aval.size = a->vlen; DBT aval; memset(&aval, 0, sizeof aval); aval.data=rowset->data + a->off + a->klen; aval.size = a->vlen;
ft_loader_set_error(&bl->error_callback, DB_KEYEXIST, dest_db, which_db, &akey, &aval); ft_loader_set_error(&bl->error_callback, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
} }
return DB_KEYEXIST; return DB_KEYEXIST;
} else if (compare_result<0) { } else if (compare_result<0) {
// a is smaller // a is smaller
*dest = *a; *dest = *a;
dest++; a++; an--; dest++; a++; an--;
} else { } else {
*dest = *b; *dest = *b;
dest++; b++; bn--; dest++; b++; bn--;
} }
} }
while (an>0) { while (an>0) {
*dest = *a; *dest = *a;
dest++; a++; an--; dest++; a++; an--;
} }
while (bn>0) { while (bn>0) {
*dest = *b; *dest = *b;
dest++; b++; bn--; dest++; b++; bn--;
} }
return 0; return 0;
} }
static int binary_search (int *location, static int binary_search (int *location,
const DBT *key, const DBT *key,
struct row a[/*an*/], int an, struct row a[/*an*/], int an,
int abefore, int abefore,
int which_db, DB *dest_db, ft_compare_func compare, int which_db, DB *dest_db, ft_compare_func compare,
FTLOADER bl, FTLOADER bl,
struct rowset *rowset) struct rowset *rowset)
// Given a sorted array of rows a, and a dbt key, find the first row in a that is > key. // Given a sorted array of rows a, and a dbt key, find the first row in a that is > key.
// If no such row exists, then consider the result to be equal to an. // If no such row exists, then consider the result to be equal to an.
// On success store abefore+the index into *location // On success store abefore+the index into *location
...@@ -1266,51 +1266,51 @@ static int binary_search (int *location, ...@@ -1266,51 +1266,51 @@ static int binary_search (int *location,
// Return DB_KEYEXIST if we find a row that is equal to key. // Return DB_KEYEXIST if we find a row that is equal to key.
{ {
if (an==0) { if (an==0) {
*location = abefore; *location = abefore;
return 0; return 0;
} else { } else {
int a2 = an/2; int a2 = an/2;
DBT akey = make_dbt(rowset->data+a[a2].off, a[a2].klen); DBT akey = make_dbt(rowset->data+a[a2].off, a[a2].klen);
int compare_result = compare(dest_db, key, &akey); int compare_result = compare(dest_db, key, &akey);
if (compare_result==0) { if (compare_result==0) {
if (bl->error_callback.error_callback) { if (bl->error_callback.error_callback) {
DBT aval = make_dbt(rowset->data + a[a2].off + a[a2].klen, a[a2].vlen); DBT aval = make_dbt(rowset->data + a[a2].off + a[a2].klen, a[a2].vlen);
ft_loader_set_error(&bl->error_callback, DB_KEYEXIST, dest_db, which_db, &akey, &aval); ft_loader_set_error(&bl->error_callback, DB_KEYEXIST, dest_db, which_db, &akey, &aval);
} }
return DB_KEYEXIST; return DB_KEYEXIST;
} else if (compare_result<0) { } else if (compare_result<0) {
// key is before a2 // key is before a2
if (an==1) { if (an==1) {
*location = abefore; *location = abefore;
return 0; return 0;
} else { } else {
return binary_search(location, key, return binary_search(location, key,
a, a2, a, a2,
abefore, abefore,
which_db, dest_db, compare, bl, rowset); which_db, dest_db, compare, bl, rowset);
} }
} else { } else {
// key is after a2 // key is after a2
if (an==1) { if (an==1) {
*location = abefore + 1; *location = abefore + 1;
return 0; return 0;
} else { } else {
return binary_search(location, key, return binary_search(location, key,
a+a2, an-a2, a+a2, an-a2,
abefore+a2, abefore+a2,
which_db, dest_db, compare, bl, rowset); which_db, dest_db, compare, bl, rowset);
} }
} }
} }
} }
#define SWAP(typ,x,y) { typ tmp = x; x=y; y=tmp; } #define SWAP(typ,x,y) { typ tmp = x; x=y; y=tmp; }
static int merge_row_arrays (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn, static int merge_row_arrays (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn,
int which_db, DB *dest_db, ft_compare_func compare, int which_db, DB *dest_db, ft_compare_func compare,
FTLOADER bl, FTLOADER bl,
struct rowset *rowset) struct rowset *rowset)
/* Effect: Given two sorted arrays of rows, a and b, merge them using the comparison function, and write them into dest. /* Effect: Given two sorted arrays of rows, a and b, merge them using the comparison function, and write them into dest.
* This function is a cilk function with parallelism, and is suitable for use in a mergesort. * This function is a cilk function with parallelism, and is suitable for use in a mergesort.
* Arguments: * Arguments:
...@@ -1322,19 +1322,19 @@ static int merge_row_arrays (struct row dest[/*an+bn*/], struct row a[/*an*/], i ...@@ -1322,19 +1322,19 @@ static int merge_row_arrays (struct row dest[/*an+bn*/], struct row a[/*an*/], i
*/ */
{ {
if (an + bn < 10000) { if (an + bn < 10000) {
return merge_row_arrays_base(dest, a, an, b, bn, which_db, dest_db, compare, bl, rowset); return merge_row_arrays_base(dest, a, an, b, bn, which_db, dest_db, compare, bl, rowset);
} }
if (an < bn) { if (an < bn) {
SWAP(struct row *,a, b) SWAP(struct row *,a, b)
SWAP(int ,an,bn) SWAP(int ,an,bn)
} }
// an >= bn // an >= bn
int a2 = an/2; int a2 = an/2;
DBT akey = make_dbt(rowset->data+a[a2].off, a[a2].klen); DBT akey = make_dbt(rowset->data+a[a2].off, a[a2].klen);
int b2 = 0; // initialize to zero so we can add the answer in. int b2 = 0; // initialize to zero so we can add the answer in.
{ {
int r = binary_search(&b2, &akey, b, bn, 0, which_db, dest_db, compare, bl, rowset); int r = binary_search(&b2, &akey, b, bn, 0, which_db, dest_db, compare, bl, rowset);
if (r!=0) return r; // for example if we found a duplicate, called the error_callback, and now we return an error code. if (r!=0) return r; // for example if we found a duplicate, called the error_callback, and now we return an error code.
} }
int ra, rb; int ra, rb;
ra = cilk_spawn merge_row_arrays(dest, a, a2, b, b2, which_db, dest_db, compare, bl, rowset); ra = cilk_spawn merge_row_arrays(dest, a, a2, b, b2, which_db, dest_db, compare, bl, rowset);
...@@ -1368,11 +1368,11 @@ int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_d ...@@ -1368,11 +1368,11 @@ int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_d
struct row *MALLOC_N(n, tmp); struct row *MALLOC_N(n, tmp);
if (tmp == NULL) return errno; if (tmp == NULL) return errno;
{ {
int r = merge_row_arrays(tmp, rows, mid, rows+mid, n-mid, which_db, dest_db, compare, bl, rowset); int r = merge_row_arrays(tmp, rows, mid, rows+mid, n-mid, which_db, dest_db, compare, bl, rowset);
if (r!=0) { if (r!=0) {
toku_free(tmp); toku_free(tmp);
return r; return r;
} }
} }
memcpy(rows, tmp, sizeof(*tmp)*n); memcpy(rows, tmp, sizeof(*tmp)*n);
toku_free(tmp); toku_free(tmp);
...@@ -1385,7 +1385,7 @@ int ft_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, ...@@ -1385,7 +1385,7 @@ int ft_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db,
} }
static int sort_rows (struct rowset *rows, int which_db, DB *dest_db, ft_compare_func compare, static int sort_rows (struct rowset *rows, int which_db, DB *dest_db, ft_compare_func compare,
FTLOADER bl) FTLOADER bl)
/* Effect: Sort a collection of rows. /* Effect: Sort a collection of rows.
* If any duplicates are found, then call the error_callback function and return non zero. * If any duplicates are found, then call the error_callback function and return non zero.
* Otherwise return 0. * Otherwise return 0.
...@@ -1438,8 +1438,8 @@ static int extend_fileset (FTLOADER bl, struct merge_fileset *fs, FIDX*ffile) ...@@ -1438,8 +1438,8 @@ static int extend_fileset (FTLOADER bl, struct merge_fileset *fs, FIDX*ffile)
r = ft_loader_open_temp_file(bl, &sfile); if (r!=0) return r; r = ft_loader_open_temp_file(bl, &sfile); if (r!=0) return r;
if (fs->n_temp_files+1 > fs->n_temp_files_limit) { if (fs->n_temp_files+1 > fs->n_temp_files_limit) {
fs->n_temp_files_limit = (fs->n_temp_files+1)*2; fs->n_temp_files_limit = (fs->n_temp_files+1)*2;
XREALLOC_N(fs->n_temp_files_limit, fs->data_fidxs); XREALLOC_N(fs->n_temp_files_limit, fs->data_fidxs);
} }
fs->data_fidxs[fs->n_temp_files] = sfile; fs->data_fidxs[fs->n_temp_files] = sfile;
fs->n_temp_files++; fs->n_temp_files++;
...@@ -1453,8 +1453,8 @@ static int extend_fileset (FTLOADER bl, struct merge_fileset *fs, FIDX*ffile) ...@@ -1453,8 +1453,8 @@ static int extend_fileset (FTLOADER bl, struct merge_fileset *fs, FIDX*ffile)
static toku_mutex_t update_progress_lock = { PTHREAD_MUTEX_INITIALIZER }; static toku_mutex_t update_progress_lock = { PTHREAD_MUTEX_INITIALIZER };
static int update_progress (int N, static int update_progress (int N,
FTLOADER bl, FTLOADER bl,
const char *UU(message)) const char *UU(message))
{ {
// Need a lock here because of cilk and also the various pthreads. // Need a lock here because of cilk and also the various pthreads.
// Must protect the increment and the call to the poll_function. // Must protect the increment and the call to the poll_function.
...@@ -1463,13 +1463,13 @@ static int update_progress (int N, ...@@ -1463,13 +1463,13 @@ static int update_progress (int N,
int result; int result;
if (bl->progress_callback_result == 0) { if (bl->progress_callback_result == 0) {
//printf(" %20s: %d ", message, bl->progress); //printf(" %20s: %d ", message, bl->progress);
result = ft_loader_call_poll_function(&bl->poll_callback, (float)bl->progress/(float)PROGRESS_MAX); result = ft_loader_call_poll_function(&bl->poll_callback, (float)bl->progress/(float)PROGRESS_MAX);
if (result!=0) { if (result!=0) {
bl->progress_callback_result = result; bl->progress_callback_result = result;
} }
} else { } else {
result = bl->progress_callback_result; result = bl->progress_callback_result;
} }
toku_mutex_unlock(&update_progress_lock); toku_mutex_unlock(&update_progress_lock);
return result; return result;
...@@ -1479,12 +1479,12 @@ static int update_progress (int N, ...@@ -1479,12 +1479,12 @@ static int update_progress (int N,
static int write_rowset_to_file (FTLOADER bl, FIDX sfile, const struct rowset rows) { static int write_rowset_to_file (FTLOADER bl, FIDX sfile, const struct rowset rows) {
FILE *sstream = toku_bl_fidx2file(bl, sfile); FILE *sstream = toku_bl_fidx2file(bl, sfile);
for (size_t i=0; i<rows.n_rows; i++) { for (size_t i=0; i<rows.n_rows; i++) {
DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen); DBT skey = make_dbt(rows.data + rows.rows[i].off, rows.rows[i].klen);
DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen, rows.rows[i].vlen); DBT sval = make_dbt(rows.data + rows.rows[i].off + rows.rows[i].klen, rows.rows[i].vlen);
u_int64_t soffset=0; // don't really need this. u_int64_t soffset=0; // don't really need this.
int r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, bl); int r = loader_write_row(&skey, &sval, sfile, sstream, &soffset, bl);
if (r != 0) return r; if (r != 0) return r;
} }
return 0; return 0;
} }
...@@ -1548,8 +1548,8 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, FTLOADER ...@@ -1548,8 +1548,8 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, FTLOADER
} }
} }
// Note: if result == 0 then invariant fs->have_sorted_output == TRUE // Note: if result == 0 then invariant fs->have_sorted_output == TRUE
} }
} }
} }
destroy_rowset(&rows); destroy_rowset(&rows);
...@@ -1593,7 +1593,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1593,7 +1593,7 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
DBT zero = zero_dbt; zero.flags=DB_DBT_REALLOC; DBT zero = zero_dbt; zero.flags=DB_DBT_REALLOC;
for (int i=0; i<n_sources; i++) { for (int i=0; i<n_sources; i++) {
keys[i] = vals[i] = zero; // fill these all in with zero so we can delete stuff more reliably. keys[i] = vals[i] = zero; // fill these all in with zero so we can delete stuff more reliably.
} }
pqueue_t *pq = NULL; pqueue_t *pq = NULL;
...@@ -1601,44 +1601,44 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1601,44 +1601,44 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
if (pq_nodes == NULL) { result = errno; } if (pq_nodes == NULL) { result = errno; }
if (result==0) { if (result==0) {
int r = pqueue_init(&pq, n_sources, which_db, dest_db, compare, &bl->error_callback); int r = pqueue_init(&pq, n_sources, which_db, dest_db, compare, &bl->error_callback);
if (r!=0) result = r; if (r!=0) result = r;
} }
u_int64_t n_rows = 0; u_int64_t n_rows = 0;
if (result==0) { if (result==0) {
// load pqueue with first value from each source // load pqueue with first value from each source
for (int i=0; i<n_sources; i++) { for (int i=0; i<n_sources; i++) {
int r = loader_read_row_from_dbufio(bfs, i, &keys[i], &vals[i]); int r = loader_read_row_from_dbufio(bfs, i, &keys[i], &vals[i]);
if (r==EOF) continue; // if the file is empty, don't initialize the pqueue. if (r==EOF) continue; // if the file is empty, don't initialize the pqueue.
if (r!=0) { if (r!=0) {
result = r; result = r;
break; break;
} }
pq_nodes[i].key = &keys[i]; pq_nodes[i].key = &keys[i];
pq_nodes[i].val = &vals[i]; pq_nodes[i].val = &vals[i];
pq_nodes[i].i = i; pq_nodes[i].i = i;
r = pqueue_insert(pq, &pq_nodes[i]); r = pqueue_insert(pq, &pq_nodes[i]);
if (r!=0) { if (r!=0) {
result = r; result = r;
// path tested by loader-dup-test5.tdbrun // path tested by loader-dup-test5.tdbrun
// printf("%s:%d returning\n", __FILE__, __LINE__); // printf("%s:%d returning\n", __FILE__, __LINE__);
break; break;
} }
dataoff[i] = 0; dataoff[i] = 0;
toku_mutex_lock(&bl->file_infos.lock); toku_mutex_lock(&bl->file_infos.lock);
n_rows += bl->file_infos.file_infos[srcs_fidxs[i].idx].n_rows; n_rows += bl->file_infos.file_infos[srcs_fidxs[i].idx].n_rows;
toku_mutex_unlock(&bl->file_infos.lock); toku_mutex_unlock(&bl->file_infos.lock);
} }
} }
u_int64_t n_rows_done = 0; u_int64_t n_rows_done = 0;
struct rowset *output_rowset = NULL; struct rowset *output_rowset = NULL;
if (result==0 && to_q) { if (result==0 && to_q) {
XMALLOC(output_rowset); // freed in cleanup XMALLOC(output_rowset); // freed in cleanup
int r = init_rowset(output_rowset, memory_per_rowset_during_merge(bl, n_sources, to_q)); int r = init_rowset(output_rowset, memory_per_rowset_during_merge(bl, n_sources, to_q));
if (r!=0) result = r; if (r!=0) result = r;
} }
...@@ -1650,88 +1650,88 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1650,88 +1650,88 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
pqueue_node_t *node; pqueue_node_t *node;
int r = pqueue_pop(pq, &node); int r = pqueue_pop(pq, &node);
if (r!=0) { if (r!=0) {
result = r; result = r;
invariant(0); invariant(0);
break; break;
} }
mini = node->i; mini = node->i;
} }
if (to_q) { if (to_q) {
if (row_wont_fit(output_rowset, keys[mini].size + vals[mini].size)) { if (row_wont_fit(output_rowset, keys[mini].size + vals[mini].size)) {
{ {
int r = queue_enq(q, (void*)output_rowset, 1, NULL); int r = queue_enq(q, (void*)output_rowset, 1, NULL);
if (r!=0) { if (r!=0) {
result = r; result = r;
break; break;
} }
} }
XMALLOC(output_rowset); // freed in cleanup XMALLOC(output_rowset); // freed in cleanup
{ {
int r = init_rowset(output_rowset, memory_per_rowset_during_merge(bl, n_sources, to_q)); int r = init_rowset(output_rowset, memory_per_rowset_during_merge(bl, n_sources, to_q));
if (r!=0) { if (r!=0) {
result = r; result = r;
break; break;
} }
} }
} }
{ {
int r = add_row(output_rowset, &keys[mini], &vals[mini]); int r = add_row(output_rowset, &keys[mini], &vals[mini]);
if (r!=0) { if (r!=0) {
result = r; result = r;
break; break;
} }
} }
} else { } else {
// write it to the dest file // write it to the dest file
int r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], bl); int r = loader_write_row(&keys[mini], &vals[mini], dest_data, dest_stream, &dataoff[mini], bl);
if (r!=0) { if (r!=0) {
result = r; result = r;
break; break;
} }
} }
{ {
// read next row from file that just sourced min value // read next row from file that just sourced min value
int r = loader_read_row_from_dbufio(bfs, mini, &keys[mini], &vals[mini]); int r = loader_read_row_from_dbufio(bfs, mini, &keys[mini], &vals[mini]);
if (r!=0) { if (r!=0) {
if (r==EOF) { if (r==EOF) {
// on feof, queue size permanently smaller // on feof, queue size permanently smaller
toku_free(keys[mini].data); keys[mini].data = NULL; toku_free(keys[mini].data); keys[mini].data = NULL;
toku_free(vals[mini].data); vals[mini].data = NULL; toku_free(vals[mini].data); vals[mini].data = NULL;
} else { } else {
fprintf(stderr, "%s:%d r=%d errno=%d bfs=%p mini=%d\n", __FILE__, __LINE__, r, errno, bfs, mini); fprintf(stderr, "%s:%d r=%d errno=%d bfs=%p mini=%d\n", __FILE__, __LINE__, r, errno, bfs, mini);
dbufio_print(bfs); dbufio_print(bfs);
result = r; result = r;
break; break;
} }
} else { } else {
// insert value into queue (re-populate queue) // insert value into queue (re-populate queue)
pq_nodes[mini].key = &keys[mini]; pq_nodes[mini].key = &keys[mini];
r = pqueue_insert(pq, &pq_nodes[mini]); r = pqueue_insert(pq, &pq_nodes[mini]);
if (r!=0) { if (r!=0) {
// Note: This error path tested by loader-dup-test1.tdbrun (and by loader-dup-test4) // Note: This error path tested by loader-dup-test1.tdbrun (and by loader-dup-test4)
result = r; result = r;
// printf("%s:%d returning\n", __FILE__, __LINE__); // printf("%s:%d returning\n", __FILE__, __LINE__);
break; break;
} }
} }
} }
n_rows_done++; n_rows_done++;
const u_int64_t rows_per_report = size_factor*1024; const u_int64_t rows_per_report = size_factor*1024;
if (n_rows_done%rows_per_report==0) { if (n_rows_done%rows_per_report==0) {
// need to update the progress. // need to update the progress.
double fraction_of_remaining_we_just_did = (double)rows_per_report / (double)(n_rows - n_rows_done + rows_per_report); double fraction_of_remaining_we_just_did = (double)rows_per_report / (double)(n_rows - n_rows_done + rows_per_report);
invariant(0<= fraction_of_remaining_we_just_did && fraction_of_remaining_we_just_did<=1); invariant(0<= fraction_of_remaining_we_just_did && fraction_of_remaining_we_just_did<=1);
int progress_just_done = fraction_of_remaining_we_just_did * progress_allocation; int progress_just_done = fraction_of_remaining_we_just_did * progress_allocation;
progress_allocation -= progress_just_done; progress_allocation -= progress_just_done;
// ignore the result from update_progress here, we'll call update_progress again below, which will give us the nonzero result. // ignore the result from update_progress here, we'll call update_progress again below, which will give us the nonzero result.
int r = update_progress(progress_just_done, bl, "in file merge"); int r = update_progress(progress_just_done, bl, "in file merge");
if (0) printf("%s:%d Progress=%d\n", __FILE__, __LINE__, r); if (0) printf("%s:%d Progress=%d\n", __FILE__, __LINE__, r);
} }
} }
if (result==0 && to_q) { if (result==0 && to_q) {
int r = queue_enq(q, (void*)output_rowset, 1, NULL); int r = queue_enq(q, (void*)output_rowset, 1, NULL);
if (r!=0) if (r!=0)
result = r; result = r;
else else
...@@ -1740,19 +1740,19 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q ...@@ -1740,19 +1740,19 @@ int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q
// cleanup // cleanup
for (int i=0; i<n_sources; i++) { for (int i=0; i<n_sources; i++) {
toku_free(keys[i].data); keys[i].data = NULL; toku_free(keys[i].data); keys[i].data = NULL;
toku_free(vals[i].data); vals[i].data = NULL; toku_free(vals[i].data); vals[i].data = NULL;
} }
if (output_rowset) { if (output_rowset) {
destroy_rowset(output_rowset); destroy_rowset(output_rowset);
toku_free(output_rowset); toku_free(output_rowset);
} }
if (pq) { pqueue_free(pq); pq=NULL; } if (pq) { pqueue_free(pq); pq=NULL; }
toku_free(pq_nodes); toku_free(pq_nodes);
{ {
int r = update_progress(progress_allocation, bl, "end of merge_some_files"); int r = update_progress(progress_allocation, bl, "end of merge_some_files");
//printf("%s:%d Progress=%d\n", __FILE__, __LINE__, r); //printf("%s:%d Progress=%d\n", __FILE__, __LINE__, r);
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
} }
return result; return result;
} }
...@@ -1764,35 +1764,35 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou ...@@ -1764,35 +1764,35 @@ static int merge_some_files (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sou
int *MALLOC_N(n_sources, fds); int *MALLOC_N(n_sources, fds);
if (fds==NULL) result=errno; if (fds==NULL) result=errno;
if (result==0) { if (result==0) {
for (int i=0; i<n_sources; i++) { for (int i=0; i<n_sources; i++) {
int r = fileno(toku_bl_fidx2file(bl, srcs_fidxs[i])); // we rely on the fact that when the files are closed, the fd is also closed. int r = fileno(toku_bl_fidx2file(bl, srcs_fidxs[i])); // we rely on the fact that when the files are closed, the fd is also closed.
if (r==-1) { if (r==-1) {
result=errno; result=errno;
break; break;
} }
fds[i] = r; fds[i] = r;
} }
} }
if (result==0) { if (result==0) {
int r = create_dbufio_fileset(&bfs, n_sources, fds, memory_per_rowset_during_merge(bl, n_sources, to_q)); int r = create_dbufio_fileset(&bfs, n_sources, fds, memory_per_rowset_during_merge(bl, n_sources, to_q));
if (r!=0) { result = r; } if (r!=0) { result = r; }
} }
if (result==0) { if (result==0) {
int r = toku_merge_some_files_using_dbufio (to_q, dest_data, q, n_sources, bfs, srcs_fidxs, bl, which_db, dest_db, compare, progress_allocation); int r = toku_merge_some_files_using_dbufio (to_q, dest_data, q, n_sources, bfs, srcs_fidxs, bl, which_db, dest_db, compare, progress_allocation);
if (r!=0) { result = r; } if (r!=0) { result = r; }
} }
if (bfs!=NULL) { if (bfs!=NULL) {
if (result != 0) if (result != 0)
(void) panic_dbufio_fileset(bfs, result); (void) panic_dbufio_fileset(bfs, result);
int r = destroy_dbufio_fileset(bfs); int r = destroy_dbufio_fileset(bfs);
if (r!=0 && result==0) result=r; if (r!=0 && result==0) result=r;
bfs = NULL; bfs = NULL;
} }
if (fds!=NULL) { if (fds!=NULL) {
toku_free(fds); toku_free(fds);
fds = NULL; fds = NULL;
} }
return result; return result;
} }
...@@ -1806,20 +1806,20 @@ static int int_min (int a, int b) ...@@ -1806,20 +1806,20 @@ static int int_min (int a, int b)
static int n_passes (int N, int B) { static int n_passes (int N, int B) {
int result = 0; int result = 0;
while (N>1) { while (N>1) {
N = (N+B-1)/B; N = (N+B-1)/B;
result++; result++;
} }
return result; return result;
} }
int merge_files (struct merge_fileset *fs, int merge_files (struct merge_fileset *fs,
FTLOADER bl, FTLOADER bl,
// These are needed for the comparison function and error callback. // These are needed for the comparison function and error callback.
int which_db, DB *dest_db, ft_compare_func compare, int which_db, DB *dest_db, ft_compare_func compare,
int progress_allocation, int progress_allocation,
// Write rowsets into this queue. // Write rowsets into this queue.
QUEUE output_q QUEUE output_q
) )
/* Effect: Given a fileset, merge all the files writing all the answers into a queue. /* Effect: Given a fileset, merge all the files writing all the answers into a queue.
* All the files in fs, and any temporary files will be closed and unlinked (and the fileset will be empty) * All the files in fs, and any temporary files will be closed and unlinked (and the fileset will be empty)
* Return value: 0 on success, otherwise an error number. * Return value: 0 on success, otherwise an error number.
...@@ -1832,99 +1832,99 @@ int merge_files (struct merge_fileset *fs, ...@@ -1832,99 +1832,99 @@ int merge_files (struct merge_fileset *fs,
const int final_mergelimit = (size_factor == 1) ? 4 : merge_fanin(bl, TRUE); // try for a merge to the leaf level const int final_mergelimit = (size_factor == 1) ? 4 : merge_fanin(bl, TRUE); // try for a merge to the leaf level
const int earlier_mergelimit = (size_factor == 1) ? 4 : merge_fanin(bl, FALSE); // try for a merge at nonleaf. const int earlier_mergelimit = (size_factor == 1) ? 4 : merge_fanin(bl, FALSE); // try for a merge at nonleaf.
int n_passes_left = (fs->n_temp_files<=final_mergelimit) int n_passes_left = (fs->n_temp_files<=final_mergelimit)
? 1 ? 1
: 1+n_passes((fs->n_temp_files+final_mergelimit-1)/final_mergelimit, earlier_mergelimit); : 1+n_passes((fs->n_temp_files+final_mergelimit-1)/final_mergelimit, earlier_mergelimit);
// printf("%d files, %d on last pass, %d on earlier passes, %d passes\n", fs->n_temp_files, final_mergelimit, earlier_mergelimit, n_passes_left); // printf("%d files, %d on last pass, %d on earlier passes, %d passes\n", fs->n_temp_files, final_mergelimit, earlier_mergelimit, n_passes_left);
int result = 0; int result = 0;
while (fs->n_temp_files > 0) { while (fs->n_temp_files > 0) {
int progress_allocation_for_this_pass = progress_allocation/n_passes_left; int progress_allocation_for_this_pass = progress_allocation/n_passes_left;
progress_allocation -= progress_allocation_for_this_pass; progress_allocation -= progress_allocation_for_this_pass;
//printf("%s:%d n_passes_left=%d progress_allocation_for_this_pass=%d\n", __FILE__, __LINE__, n_passes_left, progress_allocation_for_this_pass); //printf("%s:%d n_passes_left=%d progress_allocation_for_this_pass=%d\n", __FILE__, __LINE__, n_passes_left, progress_allocation_for_this_pass);
invariant(fs->n_temp_files>0); invariant(fs->n_temp_files>0);
struct merge_fileset next_file_set; struct merge_fileset next_file_set;
BOOL to_queue = (BOOL)(fs->n_temp_files <= final_mergelimit); BOOL to_queue = (BOOL)(fs->n_temp_files <= final_mergelimit);
init_merge_fileset(&next_file_set); init_merge_fileset(&next_file_set);
while (fs->n_temp_files>0) { while (fs->n_temp_files>0) {
// grab some files and merge them. // grab some files and merge them.
int n_to_merge = int_min(to_queue?final_mergelimit:earlier_mergelimit, fs->n_temp_files); int n_to_merge = int_min(to_queue?final_mergelimit:earlier_mergelimit, fs->n_temp_files);
// We are about to do n_to_merge/n_temp_files of the remaining for this pass. // We are about to do n_to_merge/n_temp_files of the remaining for this pass.
int progress_allocation_for_this_subpass = progress_allocation_for_this_pass * (double)n_to_merge / (double)fs->n_temp_files; int progress_allocation_for_this_subpass = progress_allocation_for_this_pass * (double)n_to_merge / (double)fs->n_temp_files;
// printf("%s:%d progress_allocation_for_this_subpass=%d n_temp_files=%d b=%llu\n", __FILE__, __LINE__, progress_allocation_for_this_subpass, fs->n_temp_files, (long long unsigned) memory_per_rowset_during_merge(bl, n_to_merge, to_queue)); // printf("%s:%d progress_allocation_for_this_subpass=%d n_temp_files=%d b=%llu\n", __FILE__, __LINE__, progress_allocation_for_this_subpass, fs->n_temp_files, (long long unsigned) memory_per_rowset_during_merge(bl, n_to_merge, to_queue));
progress_allocation_for_this_pass -= progress_allocation_for_this_subpass; progress_allocation_for_this_pass -= progress_allocation_for_this_subpass;
//printf("%s:%d merging\n", __FILE__, __LINE__); //printf("%s:%d merging\n", __FILE__, __LINE__);
FIDX merged_data = FIDX_NULL; FIDX merged_data = FIDX_NULL;
FIDX *XMALLOC_N(n_to_merge, data_fidxs); FIDX *XMALLOC_N(n_to_merge, data_fidxs);
for (int i=0; i<n_to_merge; i++) { for (int i=0; i<n_to_merge; i++) {
data_fidxs[i] = FIDX_NULL; data_fidxs[i] = FIDX_NULL;
} }
for (int i=0; i<n_to_merge; i++) { for (int i=0; i<n_to_merge; i++) {
int idx = fs->n_temp_files -1 -i; int idx = fs->n_temp_files -1 -i;
FIDX fidx = fs->data_fidxs[idx]; FIDX fidx = fs->data_fidxs[idx];
result = ft_loader_fi_reopen(&bl->file_infos, fidx, "r"); result = ft_loader_fi_reopen(&bl->file_infos, fidx, "r");
if (result) break; if (result) break;
data_fidxs[i] = fidx; data_fidxs[i] = fidx;
} }
if (result==0 && !to_queue) { if (result==0 && !to_queue) {
result = extend_fileset(bl, &next_file_set, &merged_data); result = extend_fileset(bl, &next_file_set, &merged_data);
} }
if (result==0) { if (result==0) {
result = merge_some_files(to_queue, merged_data, output_q, n_to_merge, data_fidxs, bl, which_db, dest_db, compare, progress_allocation_for_this_subpass); result = merge_some_files(to_queue, merged_data, output_q, n_to_merge, data_fidxs, bl, which_db, dest_db, compare, progress_allocation_for_this_subpass);
// if result!=0, fall through // if result!=0, fall through
if (result==0) { if (result==0) {
/*nothing*/;// this is gratuitous, but we need something to give code coverage tools to help us know that it's important to distinguish between result==0 and result!=0 /*nothing*/;// this is gratuitous, but we need something to give code coverage tools to help us know that it's important to distinguish between result==0 and result!=0
} }
} }
//printf("%s:%d merged\n", __FILE__, __LINE__); //printf("%s:%d merged\n", __FILE__, __LINE__);
for (int i=0; i<n_to_merge; i++) { for (int i=0; i<n_to_merge; i++) {
if (!fidx_is_null(data_fidxs[i])) { if (!fidx_is_null(data_fidxs[i])) {
{ {
int r = ft_loader_fi_close(&bl->file_infos, data_fidxs[i], TRUE); int r = ft_loader_fi_close(&bl->file_infos, data_fidxs[i], TRUE);
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
} }
{ {
int r = ft_loader_fi_unlink(&bl->file_infos, data_fidxs[i]); int r = ft_loader_fi_unlink(&bl->file_infos, data_fidxs[i]);
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
} }
data_fidxs[i] = FIDX_NULL; data_fidxs[i] = FIDX_NULL;
} }
} }
fs->n_temp_files -= n_to_merge; fs->n_temp_files -= n_to_merge;
if (!to_queue && !fidx_is_null(merged_data)) { if (!to_queue && !fidx_is_null(merged_data)) {
int r = ft_loader_fi_close(&bl->file_infos, merged_data, TRUE); int r = ft_loader_fi_close(&bl->file_infos, merged_data, TRUE);
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
} }
toku_free(data_fidxs); toku_free(data_fidxs);
if (result!=0) break; if (result!=0) break;
} }
destroy_merge_fileset(fs); destroy_merge_fileset(fs);
*fs = next_file_set; *fs = next_file_set;
// Update the progress // Update the progress
n_passes_left--; n_passes_left--;
if (result==0) { invariant(progress_allocation_for_this_pass==0); } if (result==0) { invariant(progress_allocation_for_this_pass==0); }
if (result!=0) break; if (result!=0) break;
} }
if (result) ft_loader_set_panic(bl, result, TRUE); if (result) ft_loader_set_panic(bl, result, TRUE);
{ {
int r = queue_eof(output_q); int r = queue_eof(output_q);
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
} }
// It's conceivable that the progress_allocation could be nonzero (for example if bl->N==0) // It's conceivable that the progress_allocation could be nonzero (for example if bl->N==0)
{ {
int r = update_progress(progress_allocation, bl, "did merge_files"); int r = update_progress(progress_allocation, bl, "did merge_files");
if (r!=0 && result==0) result = r; if (r!=0 && result==0) result = r;
} }
return result; return result;
} }
...@@ -1952,8 +1952,8 @@ static void subtrees_info_destroy(struct subtrees_info *p) { ...@@ -1952,8 +1952,8 @@ static void subtrees_info_destroy(struct subtrees_info *p) {
static void allocate_node (struct subtrees_info *sts, int64_t b) { static void allocate_node (struct subtrees_info *sts, int64_t b) {
if (sts->n_subtrees >= sts->n_subtrees_limit) { if (sts->n_subtrees >= sts->n_subtrees_limit) {
sts->n_subtrees_limit *= 2; sts->n_subtrees_limit *= 2;
XREALLOC_N(sts->n_subtrees_limit, sts->subtrees); XREALLOC_N(sts->n_subtrees_limit, sts->subtrees);
} }
sts->subtrees[sts->n_subtrees].block = b; sts->subtrees[sts->n_subtrees].block = b;
sts->n_subtrees++; sts->n_subtrees++;
...@@ -2055,12 +2055,12 @@ static int allocate_block (struct dbout *out, int64_t *ret_block_number) ...@@ -2055,12 +2055,12 @@ static int allocate_block (struct dbout *out, int64_t *ret_block_number)
if (block_number >= out->n_translations_limit) { if (block_number >= out->n_translations_limit) {
int64_t old_n_translations_limit = out->n_translations_limit; int64_t old_n_translations_limit = out->n_translations_limit;
struct translation *old_translation = out->translation; struct translation *old_translation = out->translation;
if (out->n_translations_limit==0) { if (out->n_translations_limit==0) {
out->n_translations_limit = 1; out->n_translations_limit = 1;
} else { } else {
out->n_translations_limit *= 2; out->n_translations_limit *= 2;
} }
REALLOC_N(out->n_translations_limit, out->translation); REALLOC_N(out->n_translations_limit, out->translation);
if (out->translation == NULL) { if (out->translation == NULL) {
result = errno; result = errno;
out->n_translations_limit = old_n_translations_limit; out->n_translations_limit = old_n_translations_limit;
...@@ -2079,9 +2079,9 @@ static void putbuf_bytes (struct dbuf *dbuf, const void *bytes, int nbytes) { ...@@ -2079,9 +2079,9 @@ static void putbuf_bytes (struct dbuf *dbuf, const void *bytes, int nbytes) {
if (!dbuf->error && dbuf->off + nbytes > dbuf->buflen) { if (!dbuf->error && dbuf->off + nbytes > dbuf->buflen) {
unsigned char *oldbuf = dbuf->buf; unsigned char *oldbuf = dbuf->buf;
int oldbuflen = dbuf->buflen; int oldbuflen = dbuf->buflen;
dbuf->buflen += dbuf->off + nbytes; dbuf->buflen += dbuf->off + nbytes;
dbuf->buflen *= 2; dbuf->buflen *= 2;
REALLOC_N(dbuf->buflen, dbuf->buf); REALLOC_N(dbuf->buflen, dbuf->buf);
if (dbuf->buf == NULL) { if (dbuf->buf == NULL) {
dbuf->error = errno; dbuf->error = errno;
dbuf->buf = oldbuf; dbuf->buf = oldbuf;
...@@ -2171,11 +2171,11 @@ static int copy_maxkey(DBT *maxkey) { ...@@ -2171,11 +2171,11 @@ static int copy_maxkey(DBT *maxkey) {
} }
static int toku_loader_write_ft_from_q (FTLOADER bl, static int toku_loader_write_ft_from_q (FTLOADER bl,
const DESCRIPTOR descriptor, const DESCRIPTOR descriptor,
int fd, // write to here int fd, // write to here
int progress_allocation, int progress_allocation,
QUEUE q, QUEUE q,
uint64_t total_disksize_estimate, uint64_t total_disksize_estimate,
int which_db, int which_db,
uint32_t target_nodesize, uint32_t target_nodesize,
uint32_t target_basementnodesize, uint32_t target_basementnodesize,
...@@ -2260,74 +2260,74 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, ...@@ -2260,74 +2260,74 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
STAT64INFO_S deltas = ZEROSTATS; STAT64INFO_S deltas = ZEROSTATS;
while (result == 0) { while (result == 0) {
void *item; void *item;
{ {
int rr = queue_deq(q, &item, NULL, NULL); int rr = queue_deq(q, &item, NULL, NULL);
if (rr == EOF) break; if (rr == EOF) break;
if (rr != 0) { if (rr != 0) {
ft_loader_set_panic(bl, rr, TRUE); // error after cilk sync ft_loader_set_panic(bl, rr, TRUE); // error after cilk sync
break; break;
} }
} }
struct rowset *output_rowset = (struct rowset *)item; struct rowset *output_rowset = (struct rowset *)item;
for (unsigned int i = 0; i < output_rowset->n_rows; i++) { for (unsigned int i = 0; i < output_rowset->n_rows; i++) {
DBT key = make_dbt(output_rowset->data+output_rowset->rows[i].off, output_rowset->rows[i].klen); DBT key = make_dbt(output_rowset->data+output_rowset->rows[i].off, output_rowset->rows[i].klen);
DBT val = make_dbt(output_rowset->data+output_rowset->rows[i].off + output_rowset->rows[i].klen, output_rowset->rows[i].vlen); DBT val = make_dbt(output_rowset->data+output_rowset->rows[i].off + output_rowset->rows[i].klen, output_rowset->rows[i].vlen);
size_t this_leafentry_size = ft_loader_leafentry_size(key.size, val.size, le_xid); size_t this_leafentry_size = ft_loader_leafentry_size(key.size, val.size, le_xid);
used_estimate += this_leafentry_size; used_estimate += this_leafentry_size;
// Spawn off a node if // Spawn off a node if
// a) there is at least one row in it, and // a) there is at least one row in it, and
// b) this item would make the nodesize too big, or // b) this item would make the nodesize too big, or
// c) the remaining amount won't fit in the current node and the current node's data is more than the remaining amount // c) the remaining amount won't fit in the current node and the current node's data is more than the remaining amount
uint64_t remaining_amount = total_disksize_estimate - used_estimate; uint64_t remaining_amount = total_disksize_estimate - used_estimate;
uint64_t used_here = lbuf->off + 1000; // leave 1000 for various overheads. uint64_t used_here = lbuf->off + 1000; // leave 1000 for various overheads.
uint64_t target_size = (target_nodesize*7L)/8; // use only 7/8 of the node. uint64_t target_size = (target_nodesize*7L)/8; // use only 7/8 of the node.
uint64_t used_here_with_next_key = used_here + this_leafentry_size; uint64_t used_here_with_next_key = used_here + this_leafentry_size;
if (lbuf->nkeys > 0 && if (lbuf->nkeys > 0 &&
((used_here_with_next_key >= target_size) || (used_here + remaining_amount >= target_size && lbuf->off > remaining_amount))) { ((used_here_with_next_key >= target_size) || (used_here + remaining_amount >= target_size && lbuf->off > remaining_amount))) {
int progress_this_node = progress_allocation * (double)(old_n_rows_remaining - n_rows_remaining)/(double)old_n_rows_remaining; int progress_this_node = progress_allocation * (double)(old_n_rows_remaining - n_rows_remaining)/(double)old_n_rows_remaining;
progress_allocation -= progress_this_node; progress_allocation -= progress_this_node;
old_n_rows_remaining = n_rows_remaining; old_n_rows_remaining = n_rows_remaining;
allocate_node(&sts, lblock); allocate_node(&sts, lblock);
n_pivots++; n_pivots++;
invariant(maxkey.data != NULL); invariant(maxkey.data != NULL);
if ((r = bl_write_dbt(&maxkey, pivots_stream, NULL, bl))) { if ((r = bl_write_dbt(&maxkey, pivots_stream, NULL, bl))) {
ft_loader_set_panic(bl, r, TRUE); // error after cilk sync ft_loader_set_panic(bl, r, TRUE); // error after cilk sync
if (result == 0) result = r; if (result == 0) result = r;
break; break;
} }
cilk_spawn finish_leafnode(&out, lbuf, progress_this_node, bl, target_basementnodesize, target_compression_method); cilk_spawn finish_leafnode(&out, lbuf, progress_this_node, bl, target_basementnodesize, target_compression_method);
lbuf = NULL; lbuf = NULL;
r = allocate_block(&out, &lblock); r = allocate_block(&out, &lblock);
if (r != 0) { if (r != 0) {
ft_loader_set_panic(bl, r, TRUE); ft_loader_set_panic(bl, r, TRUE);
if (result == 0) result = r; if (result == 0) result = r;
break; break;
} }
lbuf = start_leaf(&out, descriptor, lblock, le_xid, target_nodesize); lbuf = start_leaf(&out, descriptor, lblock, le_xid, target_nodesize);
} }
add_pair_to_leafnode(lbuf, (unsigned char *) key.data, key.size, (unsigned char *) val.data, val.size, this_leafentry_size, &deltas); add_pair_to_leafnode(lbuf, (unsigned char *) key.data, key.size, (unsigned char *) val.data, val.size, this_leafentry_size, &deltas);
n_rows_remaining--; n_rows_remaining--;
update_maxkey(&maxkey, &key); // set the new maxkey to the current key update_maxkey(&maxkey, &key); // set the new maxkey to the current key
} }
r = copy_maxkey(&maxkey); // make a copy of maxkey before the rowset is destroyed r = copy_maxkey(&maxkey); // make a copy of maxkey before the rowset is destroyed
if (result == 0) if (result == 0)
result = r; result = r;
destroy_rowset(output_rowset); destroy_rowset(output_rowset);
toku_free(output_rowset); toku_free(output_rowset);
if (result == 0) if (result == 0)
result = ft_loader_get_error(&bl->error_callback); // check if an error was posted and terminate this quickly result = ft_loader_get_error(&bl->error_callback); // check if an error was posted and terminate this quickly
...@@ -2362,8 +2362,8 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, ...@@ -2362,8 +2362,8 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
n_pivots++; n_pivots++;
{ {
DBT key = make_dbt(0,0); // must write an extra DBT into the pivots file. DBT key = make_dbt(0,0); // must write an extra DBT into the pivots file.
r = bl_write_dbt(&key, pivots_stream, NULL, bl); r = bl_write_dbt(&key, pivots_stream, NULL, bl);
if (r) { if (r) {
result = r; goto error; result = r; goto error;
} }
...@@ -2375,29 +2375,29 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, ...@@ -2375,29 +2375,29 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
} }
{ {
invariant(sts.n_subtrees==1); invariant(sts.n_subtrees==1);
out.h->h->root_blocknum = make_blocknum(sts.subtrees[0].block); out.h->h->root_blocknum = make_blocknum(sts.subtrees[0].block);
toku_free(sts.subtrees); sts.subtrees = NULL; toku_free(sts.subtrees); sts.subtrees = NULL;
// write the descriptor // write the descriptor
{ {
seek_align(&out); seek_align(&out);
invariant(out.n_translations >= RESERVED_BLOCKNUM_DESCRIPTOR); invariant(out.n_translations >= RESERVED_BLOCKNUM_DESCRIPTOR);
invariant(out.translation[RESERVED_BLOCKNUM_DESCRIPTOR].off == -1); invariant(out.translation[RESERVED_BLOCKNUM_DESCRIPTOR].off == -1);
out.translation[RESERVED_BLOCKNUM_DESCRIPTOR].off = out.current_off; out.translation[RESERVED_BLOCKNUM_DESCRIPTOR].off = out.current_off;
size_t desc_size = 4+toku_serialize_descriptor_size(descriptor); size_t desc_size = 4+toku_serialize_descriptor_size(descriptor);
invariant(desc_size>0); invariant(desc_size>0);
out.translation[RESERVED_BLOCKNUM_DESCRIPTOR].size = desc_size; out.translation[RESERVED_BLOCKNUM_DESCRIPTOR].size = desc_size;
struct wbuf wbuf; struct wbuf wbuf;
char *XMALLOC_N(desc_size, buf); char *XMALLOC_N(desc_size, buf);
wbuf_init(&wbuf, buf, desc_size); wbuf_init(&wbuf, buf, desc_size);
toku_serialize_descriptor_contents_to_wbuf(&wbuf, descriptor); toku_serialize_descriptor_contents_to_wbuf(&wbuf, descriptor);
u_int32_t checksum = x1764_finish(&wbuf.checksum); u_int32_t checksum = x1764_finish(&wbuf.checksum);
wbuf_int(&wbuf, checksum); wbuf_int(&wbuf, checksum);
invariant(wbuf.ndone==desc_size); invariant(wbuf.ndone==desc_size);
r = toku_os_write(out.fd, wbuf.buf, wbuf.ndone); r = toku_os_write(out.fd, wbuf.buf, wbuf.ndone);
out.current_off += desc_size; out.current_off += desc_size;
toku_free(buf); // wbuf_destroy toku_free(buf); // wbuf_destroy
if (r) { if (r) {
result = r; goto error; result = r; goto error;
} }
...@@ -2443,11 +2443,11 @@ static int toku_loader_write_ft_from_q (FTLOADER bl, ...@@ -2443,11 +2443,11 @@ static int toku_loader_write_ft_from_q (FTLOADER bl,
} }
int toku_loader_write_brt_from_q_in_C (FTLOADER bl, int toku_loader_write_brt_from_q_in_C (FTLOADER bl,
const DESCRIPTOR descriptor, const DESCRIPTOR descriptor,
int fd, // write to here int fd, // write to here
int progress_allocation, int progress_allocation,
QUEUE q, QUEUE q,
uint64_t total_disksize_estimate, uint64_t total_disksize_estimate,
int which_db, int which_db,
uint32_t target_nodesize, uint32_t target_nodesize,
uint32_t target_basementnodesize, uint32_t target_basementnodesize,
...@@ -2468,7 +2468,7 @@ static void* fractal_thread (void *ftav) { ...@@ -2468,7 +2468,7 @@ static void* fractal_thread (void *ftav) {
} }
static int loader_do_i (FTLOADER bl, static int loader_do_i (FTLOADER bl,
int which_db, int which_db,
DB *dest_db, DB *dest_db,
ft_compare_func compare, ft_compare_func compare,
const DESCRIPTOR descriptor, const DESCRIPTOR descriptor,
...@@ -2492,9 +2492,9 @@ static int loader_do_i (FTLOADER bl, ...@@ -2492,9 +2492,9 @@ static int loader_do_i (FTLOADER bl,
if (r) goto error; if (r) goto error;
{ {
mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO; mode_t mode = S_IRWXU|S_IRWXG|S_IRWXO;
int fd = toku_os_open(new_fname, O_RDWR| O_CREAT | O_BINARY, mode); // #2621 int fd = toku_os_open(new_fname, O_RDWR| O_CREAT | O_BINARY, mode); // #2621
if (fd < 0) { if (fd < 0) {
r = errno; goto error; r = errno; goto error;
} }
...@@ -2507,13 +2507,13 @@ static int loader_do_i (FTLOADER bl, ...@@ -2507,13 +2507,13 @@ static int loader_do_i (FTLOADER bl,
r = dest_db->get_compression_method(dest_db, &target_compression_method); r = dest_db->get_compression_method(dest_db, &target_compression_method);
invariant_zero(r); invariant_zero(r);
// This structure must stay live until the join below. // This structure must stay live until the join below.
struct fractal_thread_args fta = { bl, struct fractal_thread_args fta = { bl,
descriptor, descriptor,
fd, fd,
progress_allocation, progress_allocation,
bl->fractal_queues[which_db], bl->fractal_queues[which_db],
bl->extracted_datasizes[which_db], bl->extracted_datasizes[which_db],
0, 0,
which_db, which_db,
target_nodesize, target_nodesize,
...@@ -2521,27 +2521,27 @@ static int loader_do_i (FTLOADER bl, ...@@ -2521,27 +2521,27 @@ static int loader_do_i (FTLOADER bl,
target_compression_method, target_compression_method,
}; };
r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta); r = toku_pthread_create(bl->fractal_threads+which_db, NULL, fractal_thread, (void*)&fta);
if (r) { if (r) {
int r2 __attribute__((__unused__)) = queue_destroy(bl->fractal_queues[which_db]); int r2 __attribute__((__unused__)) = queue_destroy(bl->fractal_queues[which_db]);
// ignore r2, since we already have an error // ignore r2, since we already have an error
goto error; goto error;
} }
invariant(bl->fractal_threads_live[which_db]==FALSE); invariant(bl->fractal_threads_live[which_db]==FALSE);
bl->fractal_threads_live[which_db] = TRUE; bl->fractal_threads_live[which_db] = TRUE;
r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]); r = merge_files(fs, bl, which_db, dest_db, compare, allocation_for_merge, bl->fractal_queues[which_db]);
{ {
void *toku_pthread_retval; void *toku_pthread_retval;
int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval); int r2 = toku_pthread_join(bl->fractal_threads[which_db], &toku_pthread_retval);
invariant(fta.bl==bl); // this is a gratuitous assertion to make sure that the fta struct is still live here. A previous bug but that struct into a C block statement. invariant(fta.bl==bl); // this is a gratuitous assertion to make sure that the fta struct is still live here. A previous bug but that struct into a C block statement.
resource_assert_zero(r2); resource_assert_zero(r2);
invariant(toku_pthread_retval==NULL); invariant(toku_pthread_retval==NULL);
invariant(bl->fractal_threads_live[which_db]); invariant(bl->fractal_threads_live[which_db]);
bl->fractal_threads_live[which_db] = FALSE; bl->fractal_threads_live[which_db] = FALSE;
if (r == 0) r = fta.errno_result; if (r == 0) r = fta.errno_result;
} }
} }
error: // this is the cleanup code. Even if r==0 (no error) we fall through to here. error: // this is the cleanup code. Even if r==0 (no error) we fall through to here.
...@@ -2579,7 +2579,7 @@ static int toku_ft_loader_close_internal (FTLOADER bl) ...@@ -2579,7 +2579,7 @@ static int toku_ft_loader_close_internal (FTLOADER bl)
goto error; goto error;
invariant(0 <= bl->progress && bl->progress <= PROGRESS_MAX); invariant(0 <= bl->progress && bl->progress <= PROGRESS_MAX);
} }
if (result==0) invariant(remaining_progress==0); if (result==0) invariant(remaining_progress==0);
// fsync the directory containing the new tokudb files. // fsync the directory containing the new tokudb files.
char *fname0 = toku_cachetable_get_fname_in_cwd(bl->cachetable, bl->new_fnames_in_env[0]); char *fname0 = toku_cachetable_get_fname_in_cwd(bl->cachetable, bl->new_fnames_in_env[0]);
...@@ -2599,8 +2599,8 @@ static int toku_ft_loader_close_internal (FTLOADER bl) ...@@ -2599,8 +2599,8 @@ static int toku_ft_loader_close_internal (FTLOADER bl)
int toku_ft_loader_close (FTLOADER bl, int toku_ft_loader_close (FTLOADER bl,
ft_loader_error_func error_function, void *error_extra, ft_loader_error_func error_function, void *error_extra,
ft_loader_poll_func poll_function, void *poll_extra ft_loader_poll_func poll_function, void *poll_extra
) )
{ {
int result = 0; int result = 0;
...@@ -2662,7 +2662,7 @@ int toku_ft_loader_abort(FTLOADER bl, BOOL is_error) ...@@ -2662,7 +2662,7 @@ int toku_ft_loader_abort(FTLOADER bl, BOOL is_error)
} }
for (int i = 0; i < bl->N; i++) for (int i = 0; i < bl->N; i++)
invariant(!bl->fractal_threads_live[i]); invariant(!bl->fractal_threads_live[i]);
toku_ft_loader_internal_destroy(bl, is_error); toku_ft_loader_internal_destroy(bl, is_error);
return result; return result;
...@@ -2749,8 +2749,8 @@ static int write_translation_table (struct dbout *out, long long *off_of_transla ...@@ -2749,8 +2749,8 @@ static int write_translation_table (struct dbout *out, long long *off_of_transla
out->translation[1].off = off_of_translation; out->translation[1].off = off_of_translation;
out->translation[1].size = bt_size_on_disk; out->translation[1].size = bt_size_on_disk;
for (int i=0; i<out->n_translations; i++) { for (int i=0; i<out->n_translations; i++) {
putbuf_int64(&ttable, out->translation[i].off); putbuf_int64(&ttable, out->translation[i].off);
putbuf_int64(&ttable, out->translation[i].size); putbuf_int64(&ttable, out->translation[i].size);
} }
unsigned int checksum = x1764_memory(ttable.buf, ttable.off); unsigned int checksum = x1764_memory(ttable.buf, ttable.off);
putbuf_int32(&ttable, checksum); putbuf_int32(&ttable, checksum);
...@@ -2786,7 +2786,7 @@ write_header (struct dbout *out, long long translation_location_on_disk, long lo ...@@ -2786,7 +2786,7 @@ write_header (struct dbout *out, long long translation_location_on_disk, long lo
} }
static int read_some_pivots (FIDX pivots_file, int n_to_read, FTLOADER bl, static int read_some_pivots (FIDX pivots_file, int n_to_read, FTLOADER bl,
/*out*/ DBT pivots[/*n_to_read*/]) /*out*/ DBT pivots[/*n_to_read*/])
// pivots is an array to be filled in. The pivots array is uninitialized. // pivots is an array to be filled in. The pivots array is uninitialized.
{ {
for (int i = 0; i < n_to_read; i++) for (int i = 0; i < n_to_read; i++)
...@@ -2796,8 +2796,8 @@ static int read_some_pivots (FIDX pivots_file, int n_to_read, FTLOADER bl, ...@@ -2796,8 +2796,8 @@ static int read_some_pivots (FIDX pivots_file, int n_to_read, FTLOADER bl,
int result = 0; int result = 0;
for (int i = 0; i < n_to_read; i++) { for (int i = 0; i < n_to_read; i++) {
int r = bl_read_dbt(&pivots[i], pivots_stream); int r = bl_read_dbt(&pivots[i], pivots_stream);
if (r != 0) { if (r != 0) {
result = r; result = r;
break; break;
} }
...@@ -2812,12 +2812,12 @@ static void delete_pivots(DBT pivots[], int n) { ...@@ -2812,12 +2812,12 @@ static void delete_pivots(DBT pivots[], int n) {
} }
static int setup_nonleaf_block (int n_children, static int setup_nonleaf_block (int n_children,
struct subtrees_info *subtrees, FIDX pivots_file, int64_t first_child_offset_in_subtrees, struct subtrees_info *subtrees, FIDX pivots_file, int64_t first_child_offset_in_subtrees,
struct subtrees_info *next_subtrees, FIDX next_pivots_file, struct subtrees_info *next_subtrees, FIDX next_pivots_file,
struct dbout *out, FTLOADER bl, struct dbout *out, FTLOADER bl,
/*out*/int64_t *blocknum, /*out*/int64_t *blocknum,
/*out*/struct subtree_info **subtrees_info_p, /*out*/struct subtree_info **subtrees_info_p,
/*out*/DBT **pivots_p) /*out*/DBT **pivots_p)
// Do the serial part of setting up a non leaf block. // Do the serial part of setting up a non leaf block.
// Read the pivots out of the file, and store them in a newly allocated array of DBTs (returned in *pivots_p) There are (n_blocks_to_use-1) of these. // Read the pivots out of the file, and store them in a newly allocated array of DBTs (returned in *pivots_p) There are (n_blocks_to_use-1) of these.
// Copy the final pivot into the next_pivots file instead of returning it. // Copy the final pivot into the next_pivots file instead of returning it.
...@@ -2896,7 +2896,7 @@ static void write_nonleaf_node (FTLOADER bl, struct dbout *out, int64_t blocknum ...@@ -2896,7 +2896,7 @@ static void write_nonleaf_node (FTLOADER bl, struct dbout *out, int64_t blocknum
node->totalchildkeylens = 0; node->totalchildkeylens = 0;
for (int i=0; i<n_children-1; i++) { for (int i=0; i<n_children-1; i++) {
toku_clone_dbt(&node->childkeys[i], pivots[i]); toku_clone_dbt(&node->childkeys[i], pivots[i]);
node->totalchildkeylens += pivots[i].size; node->totalchildkeylens += pivots[i].size;
} }
assert(node->bp); assert(node->bp);
for (int i=0; i<n_children; i++) { for (int i=0; i<n_children; i++) {
...@@ -2929,11 +2929,11 @@ static void write_nonleaf_node (FTLOADER bl, struct dbout *out, int64_t blocknum ...@@ -2929,11 +2929,11 @@ static void write_nonleaf_node (FTLOADER bl, struct dbout *out, int64_t blocknum
} }
for (int i=0; i<n_children-1; i++) { for (int i=0; i<n_children-1; i++) {
toku_free(pivots[i].data); toku_free(pivots[i].data);
toku_free(node->childkeys[i].data); toku_free(node->childkeys[i].data);
} }
for (int i=0; i<n_children; i++) { for (int i=0; i<n_children; i++) {
destroy_nonleaf_childinfo(BNC(node,i)); destroy_nonleaf_childinfo(BNC(node,i));
} }
toku_free(pivots); toku_free(pivots);
toku_free(node->bp); toku_free(node->bp);
...@@ -2953,43 +2953,43 @@ static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, st ...@@ -2953,43 +2953,43 @@ static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, st
// Watch out for the case where we saved the last pivot but didn't write any more nodes out. // Watch out for the case where we saved the last pivot but didn't write any more nodes out.
// The trick is not to look at n_pivots, but to look at blocks.n_blocks // The trick is not to look at n_pivots, but to look at blocks.n_blocks
while (sts->n_subtrees > 1) { while (sts->n_subtrees > 1) {
// If there is more than one block in blocks, then we must build another level of the tree. // If there is more than one block in blocks, then we must build another level of the tree.
// we need to create a pivots file for the pivots of the next level. // we need to create a pivots file for the pivots of the next level.
// and a blocks_array // and a blocks_array
// So for example. // So for example.
// 1) we grab 16 pivots and 16 blocks. // 1) we grab 16 pivots and 16 blocks.
// 2) We put the 15 pivots and 16 blocks into an non-leaf node. // 2) We put the 15 pivots and 16 blocks into an non-leaf node.
// 3) We put the 16th pivot into the next pivots file. // 3) We put the 16th pivot into the next pivots file.
{ {
int r = fseek(toku_bl_fidx2file(bl, pivots_fidx), 0, SEEK_SET); int r = fseek(toku_bl_fidx2file(bl, pivots_fidx), 0, SEEK_SET);
if (r!=0) { invariant(errno!=0); return errno; } if (r!=0) { invariant(errno!=0); return errno; }
} }
FIDX next_pivots_file; FIDX next_pivots_file;
{ {
int r = ft_loader_open_temp_file (bl, &next_pivots_file); int r = ft_loader_open_temp_file (bl, &next_pivots_file);
if (r != 0) { result = r; break; } if (r != 0) { result = r; break; }
} }
struct subtrees_info next_sts; struct subtrees_info next_sts;
subtrees_info_init(&next_sts); subtrees_info_init(&next_sts);
next_sts.n_subtrees = 0; next_sts.n_subtrees = 0;
next_sts.n_subtrees_limit = 1; next_sts.n_subtrees_limit = 1;
XMALLOC_N(next_sts.n_subtrees_limit, next_sts.subtrees); XMALLOC_N(next_sts.n_subtrees_limit, next_sts.subtrees);
const int n_per_block = 15; const int n_per_block = 15;
int64_t n_subtrees_used = 0; int64_t n_subtrees_used = 0;
while (sts->n_subtrees - n_subtrees_used >= n_per_block*2) { while (sts->n_subtrees - n_subtrees_used >= n_per_block*2) {
// grab the first N_PER_BLOCK and build a node. // grab the first N_PER_BLOCK and build a node.
DBT *pivots; DBT *pivots;
int64_t blocknum_of_new_node; int64_t blocknum_of_new_node;
struct subtree_info *subtree_info; struct subtree_info *subtree_info;
int r = setup_nonleaf_block (n_per_block, int r = setup_nonleaf_block (n_per_block,
sts, pivots_fidx, n_subtrees_used, sts, pivots_fidx, n_subtrees_used,
&next_sts, next_pivots_file, &next_sts, next_pivots_file,
out, bl, out, bl,
&blocknum_of_new_node, &subtree_info, &pivots); &blocknum_of_new_node, &subtree_info, &pivots);
if (r) { if (r) {
result = r; result = r;
break; break;
...@@ -2997,7 +2997,7 @@ static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, st ...@@ -2997,7 +2997,7 @@ static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, st
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); // frees all the data structures that go into making the node. cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_per_block, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); // frees all the data structures that go into making the node.
n_subtrees_used += n_per_block; n_subtrees_used += n_per_block;
} }
} }
int64_t n_blocks_left = sts->n_subtrees - n_subtrees_used; int64_t n_blocks_left = sts->n_subtrees - n_subtrees_used;
if (result == 0) { if (result == 0) {
...@@ -3022,39 +3022,39 @@ static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, st ...@@ -3022,39 +3022,39 @@ static int write_nonleaves (FTLOADER bl, FIDX pivots_fidx, struct dbout *out, st
n_subtrees_used += n_first; n_subtrees_used += n_first;
} }
} }
} }
if (result == 0) { if (result == 0) {
// Write the last block. // Write the last block.
DBT *pivots; DBT *pivots;
int64_t blocknum_of_new_node; int64_t blocknum_of_new_node;
struct subtree_info *subtree_info; struct subtree_info *subtree_info;
int r = setup_nonleaf_block(n_blocks_left, int r = setup_nonleaf_block(n_blocks_left,
sts, pivots_fidx, n_subtrees_used, sts, pivots_fidx, n_subtrees_used,
&next_sts, next_pivots_file, &next_sts, next_pivots_file,
out, bl, out, bl,
&blocknum_of_new_node, &subtree_info, &pivots); &blocknum_of_new_node, &subtree_info, &pivots);
if (r) { if (r) {
result = r; result = r;
} else { } else {
cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method); cilk_spawn write_nonleaf_node(bl, out, blocknum_of_new_node, n_blocks_left, pivots, subtree_info, height, descriptor, target_nodesize, target_basementnodesize, target_compression_method);
n_subtrees_used += n_blocks_left; n_subtrees_used += n_blocks_left;
} }
} }
if (result == 0) if (result == 0)
invariant(n_subtrees_used == sts->n_subtrees); invariant(n_subtrees_used == sts->n_subtrees);
cilk_sync; cilk_sync;
if (result == 0) // pick up write_nonleaf_node errors if (result == 0) // pick up write_nonleaf_node errors
result = ft_loader_get_error(&bl->error_callback); result = ft_loader_get_error(&bl->error_callback);
// Now set things up for the next iteration. // Now set things up for the next iteration.
int r = ft_loader_fi_close(&bl->file_infos, pivots_fidx, TRUE); if (r != 0 && result == 0) result = r; int r = ft_loader_fi_close(&bl->file_infos, pivots_fidx, TRUE); if (r != 0 && result == 0) result = r;
r = ft_loader_fi_unlink(&bl->file_infos, pivots_fidx); if (r != 0 && result == 0) result = r; r = ft_loader_fi_unlink(&bl->file_infos, pivots_fidx); if (r != 0 && result == 0) result = r;
pivots_fidx = next_pivots_file; pivots_fidx = next_pivots_file;
toku_free(sts->subtrees); sts->subtrees = NULL; toku_free(sts->subtrees); sts->subtrees = NULL;
*sts = next_sts; *sts = next_sts;
height++; height++;
if (result) if (result)
break; break;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment