Commit e25ea559 authored by Rich Prohaska's avatar Rich Prohaska Committed by Yoni Fogel

#4682 remove cilkarts code from the loader. refs[t:4682]

git-svn-id: file:///svn/toku/tokudb@42090 c7de825b-a66e-492c-adef-691d508d4ae1
parent 40ef154d
......@@ -6,12 +6,9 @@
#ident "Copyright (c) 2007-2011 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <c_dialects.h>
#include <brttypes.h>
#include "cachetable.h"
C_BEGIN
/**
* Put an empty node (that is, no fields filled) into the cachetable.
* In the process, write dependent nodes out for checkpoint if
......@@ -129,6 +126,4 @@ toku_unpin_brtnode(BRT brt, BRTNODE node);
void
toku_unpin_brtnode_read_only(BRT brt, BRTNODE node);
C_END
#endif
......@@ -6,9 +6,6 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include <brttypes.h>
#include <c_dialects.h>
C_BEGIN
#define ft_flush_before_applying_inbox 1
#define ft_flush_before_child_pin 2
......@@ -161,6 +158,5 @@ default_pick_child_after_split(struct brt_header *h,
int childnumb,
void *extra);
C_END
#endif // End of header guardian.
......@@ -7,9 +7,6 @@
// This must be first to make the 64-bit file mode work right in Linux
#include <brttypes.h>
#include <c_dialects.h>
C_BEGIN
typedef enum {
BRT_FLUSHER_CLEANER_TOTAL_NODES = 0, // total number of nodes whose buffers are potentially flushed by cleaner thread
......@@ -146,6 +143,4 @@ toku_brt_hot_optimize(BRT brt,
int (*progress_callback)(void *extra, float progress),
void *progress_extra);
C_END
#endif // End of header guardian.
......@@ -25,14 +25,11 @@
#include "omt.h"
#include "leafentry.h"
#include "block_table.h"
#include "c_dialects.h"
#include "mempool.h"
#include "compress.h"
// Uncomment the following to use quicklz
C_BEGIN
#ifndef BRT_FANOUT
#define BRT_FANOUT 16
#endif
......@@ -969,6 +966,4 @@ void toku_flusher_thread_set_callback(void (*callback_f)(int, void*), void* extr
void toku_brt_header_note_hot_begin(BRT brt);
void toku_brt_header_note_hot_complete(BRT brt, BOOL success, MSN msn_at_start_of_hot);
C_END
#endif
......@@ -13,11 +13,8 @@
#include "cachetable.h"
#include "log.h"
#include "brt-search.h"
#include "c_dialects.h"
#include "compress.h"
C_BEGIN
// A callback function is invoked with the key, and the data.
// The pointers (to the bytevecs) must not be modified. The data must be copied out before the callback function returns.
// Note: In the thread-safe version, the brt node remains locked while the callback function runs. So return soon, and don't call the BRT code from the callback function.
......@@ -314,6 +311,4 @@ int toku_brt_strerror_r(int error, char *buf, size_t buflen);
extern BOOL garbage_collection_debug;
C_END
#endif
......@@ -113,7 +113,3 @@ int brt_loader_call_poll_function(brtloader_poll_callback p, float progress) {
r = p->poll_function(p->poll_extra, progress);
return r;
}
#if defined(__cplusplus) || defined(__cilkplusplus)
}
#endif
......@@ -10,9 +10,6 @@
#include "queue.h"
#include "toku_pthread.h"
#include "dbufio.h"
#include "c_dialects.h"
C_BEGIN
/* These functions are exported to allow the tests to compile. */
......@@ -202,13 +199,10 @@ int merge_row_arrays_base (struct row dest[/*an+bn*/], struct row a[/*an*/], int
int merge_files (struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func, int progress_allocation, QUEUE);
CILK_BEGIN
int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func);
int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func, BRTLOADER, struct rowset *);
CILK_END
//int write_file_to_dbfile (int outfile, FIDX infile, BRTLOADER bl, const DESCRIPTOR descriptor, int progress_allocation);
int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, DBUFIO_FILESET bfs, FIDX srcs_fidxs[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation);
......@@ -261,6 +255,4 @@ int brt_loader_lock_init(BRTLOADER bl);
void brt_loader_lock_destroy(BRTLOADER bl);
void brt_loader_set_fractal_workers_count_from_c(BRTLOADER bl);
C_END
#endif
......@@ -23,7 +23,6 @@
#include "pqueue.h"
#include "trace_mem.h"
#include "dbufio.h"
#include "c_dialects.h"
#include "leafentry.h"
#include "log-internal.h"
......@@ -50,9 +49,6 @@
#define cilk_worker_count 1
#endif
// mark everything as C and selectively mark cilk functions
C_BEGIN
static size_t (*os_fwrite_fun)(const void *,size_t,size_t,FILE*)=NULL;
void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FILE*)) {
os_fwrite_fun=fwrite_fun;
......@@ -437,14 +433,12 @@ static unsigned brt_loader_get_fractal_workers_count(BRTLOADER bl) {
return w;
}
CILK_BEGIN
static void brt_loader_set_fractal_workers_count(BRTLOADER bl) {
brt_loader_lock(bl);
if (bl->fractal_workers == 0)
bl->fractal_workers = cilk_worker_count;
brt_loader_unlock(bl);
}
CILK_END
// To compute a merge, we have a certain amount of memory to work with.
// We perform only one fanin at a time.
......@@ -938,7 +932,6 @@ int add_row (struct rowset *rows, DBT *key, DBT *val)
static int process_primary_rows (BRTLOADER bl, struct rowset *primary_rowset);
CILK_BEGIN
static int finish_primary_rows_internal (BRTLOADER bl)
// now we have been asked to finish up.
// Be sure to destroy the rowsets.
......@@ -966,14 +959,9 @@ static int finish_primary_rows_internal (BRTLOADER bl)
toku_free(ra);
return r;
}
CILK_END
static int finish_primary_rows (BRTLOADER bl) {
#if defined(__cilkplusplus)
return cilk::run(finish_primary_rows_internal, bl);
#else
return finish_primary_rows_internal (bl);
#endif
}
static void* extractor_thread (void *blv) {
......@@ -1090,13 +1078,7 @@ static DBT make_dbt (void *data, u_int32_t size) {
return result;
}
// gcc 4.1 does not like f&a
// Previously this macro was defined without "()". This macro should look like a function call not a variable dereference, however.
#if defined(__cilkplusplus)
#define inc_error_count() __sync_fetch_and_add(&error_count, 1)
#else
#define inc_error_count() error_count++
#endif
static TXNID leafentry_xid(BRTLOADER bl, int which_db) {
TXNID le_xid = TXNID_NONE;
......@@ -1114,8 +1096,6 @@ size_t brtloader_leafentry_size(size_t key_size, size_t val_size, TXNID xid) {
return s;
}
CILK_BEGIN
static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_rowset)
// process the rows in primary_rowset, and then destroy the rowset.
// if FLUSH is true then write all the buffered rows out.
......@@ -1235,19 +1215,13 @@ static int process_primary_rows_internal (BRTLOADER bl, struct rowset *primary_r
BL_TRACE(blt_extractor);
return r;
}
CILK_END
static int process_primary_rows (BRTLOADER bl, struct rowset *primary_rowset) {
BL_TRACE(blt_extractor);
#if defined(__cilkplusplus)
int r = cilk::run(process_primary_rows_internal, bl, primary_rowset);
#else
int r = process_primary_rows_internal (bl, primary_rowset);
#endif
int r = process_primary_rows_internal (bl, primary_rowset);
BL_TRACE(blt_extractor);
return r;
}
int toku_brt_loader_put (BRTLOADER bl, DBT *key, DBT *val)
/* Effect: Put a key-value pair into the brt loader. Called by DB_LOADER->put().
......@@ -1371,7 +1345,6 @@ static int binary_search (int *location,
#define SWAP(typ,x,y) { typ tmp = x; x=y; y=tmp; }
CILK_BEGIN
static int merge_row_arrays (struct row dest[/*an+bn*/], struct row a[/*an*/], int an, struct row b[/*bn*/], int bn,
int which_db, DB *dest_db, brt_compare_func compare,
BRTLOADER bl,
......@@ -1408,9 +1381,7 @@ static int merge_row_arrays (struct row dest[/*an+bn*/], struct row a[/*an*/], i
if (ra!=0) return ra;
else return rb;
}
CILK_END
CILK_BEGIN
int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func compare, BRTLOADER bl, struct rowset *rowset)
/* Sort an array of rows (using mergesort).
* Arguments:
......@@ -1445,18 +1416,12 @@ int mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_d
toku_free(tmp);
return 0;
}
CILK_END
// C function for testing mergesort_row_array
int brt_loader_mergesort_row_array (struct row rows[/*n*/], int n, int which_db, DB *dest_db, brt_compare_func compare, BRTLOADER bl, struct rowset *rowset) {
#if defined(__cilkplusplus)
return cilk::run(mergesort_row_array, rows, n, which_db, dest_db, compare, bl, rowset);
#else
return mergesort_row_array (rows, n, which_db, dest_db, compare, bl, rowset);
#endif
return mergesort_row_array (rows, n, which_db, dest_db, compare, bl, rowset);
}
CILK_BEGIN
static int sort_rows (struct rowset *rows, int which_db, DB *dest_db, brt_compare_func compare,
BRTLOADER bl)
/* Effect: Sort a collection of rows.
......@@ -1467,7 +1432,6 @@ static int sort_rows (struct rowset *rows, int which_db, DB *dest_db, brt_compar
{
return mergesort_row_array(rows->rows, rows->n_rows, which_db, dest_db, compare, bl, rows);
}
CILK_END
/* filesets Maintain a collection of files. Typically these files are each individually sorted, and we will merge them.
* These files have two parts, one is for the data rows, and the other is a collection of offsets so we an more easily parallelize the manipulation (e.g., by allowing us to find the offset of the ith row quickly). */
......@@ -1564,7 +1528,6 @@ static int write_rowset_to_file (BRTLOADER bl, FIDX sfile, const struct rowset r
}
CILK_BEGIN
int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare)
/* Effect: Given a rowset, sort it and write it to a temporary file.
* Note: The loader maintains for each index the most recently written-to file, as well as the DBT for the last key written into that file.
......@@ -1633,15 +1596,10 @@ int sort_and_write_rows (struct rowset rows, struct merge_fileset *fs, BRTLOADER
return result;
}
CILK_END
// C function for testing sort_and_write_rows
int brt_loader_sort_and_write_rows (struct rowset *rows, struct merge_fileset *fs, BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare) {
#if defined(__cilkplusplus)
return cilk::run(sort_and_write_rows, *rows, fs, bl, which_db, dest_db, compare);
#else
return sort_and_write_rows (*rows, fs, bl, which_db, dest_db, compare);
#endif
return sort_and_write_rows (*rows, fs, bl, which_db, dest_db, compare);
}
int toku_merge_some_files_using_dbufio (const BOOL to_q, FIDX dest_data, QUEUE q, int n_sources, DBUFIO_FILESET bfs, FIDX srcs_fidxs[/*n_sources*/], BRTLOADER bl, int which_db, DB *dest_db, brt_compare_func compare, int progress_allocation)
......@@ -2218,10 +2176,8 @@ static struct leaf_buf *start_leaf (struct dbout *out, const DESCRIPTOR UU(desc)
return lbuf;
}
CILK_BEGIN
static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progress_allocation, BRTLOADER bl, uint32_t target_basementnodesize);
static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, struct subtrees_info *sts, const DESCRIPTOR descriptor, uint32_t target_nodesize, uint32_t target_basementnodesize);
CILK_END
static void add_pair_to_leafnode (struct leaf_buf *lbuf, unsigned char *key, int keylen, unsigned char *val, int vallen, int this_leafentry_size);
static int write_translation_table (struct dbout *out, long long *off_of_translation_p);
static int write_header (struct dbout *out, long long translation_location_on_disk, long long translation_size_on_disk);
......@@ -2261,7 +2217,6 @@ static int copy_maxkey(DBT *maxkey) {
return r;
}
CILK_BEGIN
static int toku_loader_write_brt_from_q (BRTLOADER bl,
const DESCRIPTOR descriptor,
int fd, // write to here
......@@ -2530,7 +2485,6 @@ static int toku_loader_write_brt_from_q (BRTLOADER bl,
return result;
}
CILK_END
int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
const DESCRIPTOR descriptor,
......@@ -2546,22 +2500,14 @@ int toku_loader_write_brt_from_q_in_C (BRTLOADER bl,
{
target_nodesize = target_nodesize == 0 ? default_loader_nodesize : target_nodesize;
target_basementnodesize = target_basementnodesize == 0 ? default_loader_basementnodesize : target_basementnodesize;
#if defined(__cilkplusplus)
return cilk::run(toku_loader_write_brt_from_q, bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db, target_nodesize, target_basementnodesize, target_compression_method);
#else
return toku_loader_write_brt_from_q (bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db, target_nodesize, target_basementnodesize, target_compression_method);
#endif
return toku_loader_write_brt_from_q (bl, descriptor, fd, progress_allocation, q, total_disksize_estimate, which_db, target_nodesize, target_basementnodesize, target_compression_method);
}
static void* fractal_thread (void *ftav) {
BL_TRACE(blt_start_fractal_thread);
struct fractal_thread_args *fta = (struct fractal_thread_args *)ftav;
#if defined(__cilkplusplus)
int r = cilk::run(toku_loader_write_brt_from_q, fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db, fta->target_nodesize, fta->target_basementnodesize, fta->target_compression_method);
#else
int r = toku_loader_write_brt_from_q (fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db, fta->target_nodesize, fta->target_basementnodesize, fta->target_compression_method);
#endif
int r = toku_loader_write_brt_from_q (fta->bl, fta->descriptor, fta->fd, fta->progress_allocation, fta->q, fta->total_disksize_estimate, fta->which_db, fta->target_nodesize, fta->target_basementnodesize, fta->target_compression_method);
fta->errno_result = r;
return NULL;
}
......@@ -2805,7 +2751,6 @@ static int write_literal(struct dbout *out, void*data, size_t len) {
return result;
}
CILK_BEGIN
static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progress_allocation, BRTLOADER bl, uint32_t target_basementnodesize) {
int result = 0;
......@@ -2845,8 +2790,6 @@ static void finish_leafnode (struct dbout *out, struct leaf_buf *lbuf, int progr
brt_loader_set_panic(bl, result, TRUE);
}
CILK_END
static int write_translation_table (struct dbout *out, long long *off_of_translation_p) {
seek_align(out);
struct dbuf ttable;
......@@ -2991,8 +2934,6 @@ static int setup_nonleaf_block (int n_children,
return result;
}
CILK_BEGIN
static void write_nonleaf_node (BRTLOADER bl, struct dbout *out, int64_t blocknum_of_new_node, int n_children,
DBT *pivots, /* must free this array, as well as the things it points t */
struct subtree_info *subtree_info, int height, const DESCRIPTOR UU(desc), uint32_t target_nodesize, uint32_t target_basementnodesize)
......@@ -3186,15 +3127,8 @@ static int write_nonleaves (BRTLOADER bl, FIDX pivots_fidx, struct dbout *out, s
return result;
}
CILK_END
void brt_loader_set_fractal_workers_count_from_c(BRTLOADER bl) {
#if defined(__cilkplusplus)
cilk::run(brt_loader_set_fractal_workers_count, bl);
#else
brt_loader_set_fractal_workers_count (bl);
#endif
brt_loader_set_fractal_workers_count (bl);
}
C_END
......@@ -6,10 +6,6 @@
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "c_dialects.h"
C_BEGIN
// The loader callbacks are C functions and need to be defined as such
typedef void (*brt_loader_error_func)(DB *, int which_db, int err, DBT *key, DBT *val, void *extra);
......@@ -46,6 +42,4 @@ void brtloader_set_os_fwrite (size_t (*fwrite_fun)(const void*,size_t,size_t,FIL
size_t brtloader_leafentry_size(size_t key_size, size_t val_size, TXNID xid);
C_END
#endif // BRTLOADER_H
......@@ -6,9 +6,6 @@
#include <toku_portability.h>
#include <toku_pthread.h>
#include "c_dialects.h"
C_BEGIN
/* Maintain a set of files for reading, with double buffering for the reads. */
......@@ -26,6 +23,4 @@ int panic_dbufio_fileset(DBUFIO_FILESET, int error);
void dbufio_print(DBUFIO_FILESET);
C_END
#endif
......@@ -3,10 +3,6 @@
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "c_dialects.h"
C_BEGIN
//
// The kibbutz is another threadpool meant to do arbitrary work.
// It is introduced in Dr. No, and as of Dr. No, the only work kibbutzim
......@@ -38,6 +34,4 @@ void toku_kibbutz_enq (KIBBUTZ k, void (*f)(void*), void *extra);
//
void toku_kibbutz_destroy (KIBBUTZ k);
C_END
#endif
......@@ -6,9 +6,6 @@
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "brttypes.h"
#include "c_dialects.h"
C_BEGIN
// The abstraction:
//
......@@ -53,5 +50,4 @@ int queue_destroy (QUEUE q);
// Requires: The queue must be empty and no consumer should try to dequeue after this (one way to do this is to make sure the consumer saw EOF).
// Returns 0 on success. If the queue is not empty, returns EINVAL. Other errors are likely to be bad (some sort of mutex or condvar failure).
C_END
#endif
......@@ -11,8 +11,6 @@
#include "test.h"
#include "brtloader-internal.h"
C_BEGIN
static int event_count, event_count_trigger;
static void my_assert_hook (void) {
......@@ -536,5 +534,3 @@ int test_main (int argc, const char *argv[]) {
return 0;
}
C_END
......@@ -5,10 +5,6 @@
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#include "c_dialects.h"
C_BEGIN
// A toku_thread is toku_pthread that can be cached.
struct toku_thread;
......@@ -53,7 +49,4 @@ int toku_thread_pool_run(struct toku_thread_pool *pool, int dowait, int *nthread
// Print the state of the thread pool
void toku_thread_pool_print(struct toku_thread_pool *pool, FILE *out);
C_END
#endif
......@@ -9,9 +9,6 @@
#include <errno.h>
#include "toku_assert.h"
#include "toku_pthread.h"
#include "c_dialects.h"
C_BEGIN
struct workitem;
......@@ -217,7 +214,5 @@ void toku_destroy_workers(WORKQUEUE wq, THREADPOOL *tpptr);
// workers.
void *toku_worker(void *arg);
C_END
#endif
......@@ -8,10 +8,6 @@
#include <toku_list.h>
#include <toku_pthread.h>
#include "c_dialects.h"
C_BEGIN
// The work struct is the base class for work to be done by some threads
struct work {
struct toku_list next;
......@@ -107,6 +103,4 @@ workset_join(struct workset *ws) {
workset_unlock(ws);
}
C_END
#endif
#ifndef C_DIALECTS_H
#define C_DIALECTS_H
#ident "$Id$"
#ident "Copyright (c) 2007-2010 Tokutek Inc. All rights reserved."
#ident "The technology is licensed by the Massachusetts Institute of Technology, Rutgers State University of New Jersey, and the Research Foundation of State University of New York at Stony Brook under United States of America Serial No. 11/760379 and to the patents and/or patent applications resulting from it."
#if defined(__cplusplus) || defined(__cilkplusplus)
#define C_BEGIN extern "C" {
#define C_END }
#else
#define C_BEGIN
#define C_END
#endif
#if defined(__cilkplusplus)
#define CILK_BEGIN extern "Cilk++" {
#define CILK_END }
#else
#define CILK_BEGIN
#define CILK_END
#endif
#endif
......@@ -5,10 +5,11 @@
/* It evaluates the argument and then calls a function toku_do_assert() which takes all the hits for the branches not taken. */
#include <stdint.h>
#include "c_dialects.h"
#include "errno.h"
C_BEGIN
#if defined(__cplusplus)
extern "C" {
#endif
#ifdef NDEBUG
#error NDEBUG should not be set
......@@ -52,6 +53,8 @@ extern void (*do_assert_hook)(void); // Set this to a function you want called a
#define resource_assert(a) assert(a) // indicates resource must be available, otherwise unrecoverable
#define resource_assert_zero(a) assert_zero(a) // indicates resource must be available, otherwise unrecoverable
C_END
#if defined(__cplusplus)
};
#endif
#endif
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment