Commit 3af91c0f authored by Yoni Fogel's avatar Yoni Fogel

refs #5467 merge "kill put loader, fix hot indexer freeze issue" onto main

git-svn-id: file:///svn/toku/tokudb@50137 c7de825b-a66e-492c-adef-691d508d4ae1
parent 34d5b339
......@@ -272,7 +272,11 @@ static void print_defines (void) {
/* LOADER flags */
printf("/* LOADER flags */\n");
printf("#define LOADER_USE_PUTS 1\n"); // minimize space usage
{
uint32_t loader_flags = 0;
dodefine_from_track(loader_flags, LOADER_DISALLOW_PUTS); // Loader is only used for side effects.
dodefine_from_track(loader_flags, LOADER_COMPRESS_INTERMEDIATES);
}
}
static void print_db_env_struct (void) {
......@@ -672,7 +676,7 @@ int main (int argc, char *const argv[] __attribute__((__unused__))) {
printf(" uint8_t stalled_on_checkpoint;\n");
printf("} *TOKU_TXN_PROGRESS, TOKU_TXN_PROGRESS_S;\n");
printf("typedef void(*TXN_PROGRESS_POLL_FUNCTION)(TOKU_TXN_PROGRESS, void*);\n");
printf("struct txn_stat {\n uint64_t rollback_raw_count;\n};\n");
printf("struct txn_stat {\n uint64_t rollback_raw_count;\n uint64_t rollback_num_entries;\n};\n");
print_db_txn_struct();
print_db_txn_stat_struct();
......
......@@ -109,7 +109,6 @@ struct cachefile {
void *userdata;
void (*log_fassociate_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log all open files.
void (*log_suppress_rollback_during_checkpoint)(CACHEFILE cf, void *userdata); // When starting a checkpoint we must log which files need rollbacks suppressed
void (*close_userdata)(CACHEFILE cf, int fd, void *userdata, bool lsnvalid, LSN); // when closing the last reference to a cachefile, first call this function.
void (*begin_checkpoint_userdata)(LSN lsn_of_checkpoint, void *userdata); // before checkpointing cachefiles call this function.
void (*checkpoint_userdata)(CACHEFILE cf, int fd, void *userdata); // when checkpointing a cachefile, call this function.
......
......@@ -2815,7 +2815,6 @@ void
toku_cachefile_set_userdata (CACHEFILE cf,
void *userdata,
void (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
void (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*),
void (*close_userdata)(CACHEFILE, int, void*, bool, LSN),
void (*checkpoint_userdata)(CACHEFILE, int, void*),
void (*begin_checkpoint_userdata)(LSN, void*),
......@@ -2824,7 +2823,6 @@ toku_cachefile_set_userdata (CACHEFILE cf,
void (*note_unpin_by_checkpoint)(CACHEFILE, void*)) {
cf->userdata = userdata;
cf->log_fassociate_during_checkpoint = log_fassociate_during_checkpoint;
cf->log_suppress_rollback_during_checkpoint = log_suppress_rollback_during_checkpoint;
cf->close_userdata = close_userdata;
cf->checkpoint_userdata = checkpoint_userdata;
cf->begin_checkpoint_userdata = begin_checkpoint_userdata;
......@@ -4181,34 +4179,27 @@ void checkpointer::log_begin_checkpoint() {
TXNID last_xid = toku_txn_manager_get_last_xid(mgr);
toku_log_begin_checkpoint(m_logger, &begin_lsn, 0, 0, last_xid);
m_lsn_of_checkpoint_in_progress = begin_lsn;
// Log the list of open dictionaries.
for (CACHEFILE cf = m_cf_list->m_head; cf; cf = cf->next) {
assert(cf->log_fassociate_during_checkpoint);
cf->log_fassociate_during_checkpoint(cf, cf->userdata);
}
// Write open transactions to the log.
r = toku_txn_manager_iter_over_live_txns<checkpointer, log_open_txn> (
m_logger->txn_manager,
m_logger->txn_manager,
this);
assert(r == 0);
// Writes list of dictionaries that have had
// rollback logs suppressed.
for (CACHEFILE cf = m_cf_list->m_head; cf; cf = cf->next) {
assert(cf->log_suppress_rollback_during_checkpoint);
cf->log_suppress_rollback_during_checkpoint(cf, cf->userdata);
}
}
//
// Sets the pending bits of EVERY PAIR in the cachetable, regardless of
// Sets the pending bits of EVERY PAIR in the cachetable, regardless of
// whether the PAIR is clean or not. It will be the responsibility of
// end_checkpoint or client threads to simply clear the pending bit
// if the PAIR is clean.
//
// On entry and exit , the pair list's read list lock is grabbed, and
// On entry and exit , the pair list's read list lock is grabbed, and
// both pending locks are grabbed
//
void checkpointer::turn_on_pending_bits() {
......
......@@ -191,7 +191,6 @@ typedef void (*CACHETABLE_REMOVE_KEY)(CACHEKEY* cachekey, bool for_checkpoint, v
void toku_cachefile_set_userdata(CACHEFILE cf, void *userdata,
void (*log_fassociate_during_checkpoint)(CACHEFILE, void*),
void (*log_suppress_rollback_during_checkpoint)(CACHEFILE, void*),
void (*close_userdata)(CACHEFILE, int, void*, bool, LSN),
void (*checkpoint_userdata)(CACHEFILE, int, void*),
void (*begin_checkpoint_userdata)(LSN, void*),
......
......@@ -64,8 +64,8 @@ void toku_checkpoint_destroy(void);
typedef enum {SCHEDULED_CHECKPOINT = 0, // "normal" checkpoint taken on checkpoint thread
CLIENT_CHECKPOINT = 1, // induced by client, such as FLUSH LOGS or SAVEPOINT
TXN_COMMIT_CHECKPOINT = 2,
STARTUP_CHECKPOINT = 3,
INDEXER_CHECKPOINT = 2,
STARTUP_CHECKPOINT = 3,
UPGRADE_CHECKPOINT = 4,
RECOVERY_CHECKPOINT = 5,
SHUTDOWN_CHECKPOINT = 6} checkpoint_caller_t;
......
......@@ -462,18 +462,10 @@ struct ft {
// the on-disk layout version is from before basement nodes)
int layout_version_read_from_disk;
// If a transaction created this BRT, which one?
// If a transaction locked the BRT when it was empty, which transaction? (Only the latest one matters)
// 0 if no such transaction
// only one thread can write to these at once, this is enforced by
// the lock tree
TXNID txnid_that_created_or_locked_when_empty;
TXNID txnid_that_suppressed_recovery_logs;
// Logically the reference count is zero if live_ft_handles is empty, txns is 0, and pinned_by_checkpoint is false.
// ft_ref_lock protects modifying live_ft_handles, txns, and pinned_by_checkpoint.
toku_mutex_t ft_ref_lock;
toku_mutex_t ft_ref_lock;
struct toku_list live_ft_handles;
// Number of transactions that are using this FT. you should only be able
// to modify this if you have a valid handle in live_ft_handles
......
This diff is collapsed.
......@@ -143,9 +143,9 @@ void toku_ft_load(FT_HANDLE brt, TOKUTXN txn, char const * new_iname, int do_fsy
void toku_ft_hot_index_recovery(TOKUTXN txn, FILENUMS filenums, int do_fsync, int do_log, LSN *hot_index_lsn);
void toku_ft_hot_index(FT_HANDLE brt, TOKUTXN txn, FILENUMS filenums, int do_fsync, LSN *lsn);
void toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int num_fts, const DBT *key, const DBT *val);
void toku_ft_log_put_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, uint32_t num_fts, const DBT *key, const DBT *val);
void toku_ft_log_put (TOKUTXN txn, FT_HANDLE brt, const DBT *key, const DBT *val);
void toku_ft_log_del_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, int num_fts, const DBT *key, const DBT *val);
void toku_ft_log_del_multiple (TOKUTXN txn, FT_HANDLE src_ft, FT_HANDLE *brts, uint32_t num_fts, const DBT *key, const DBT *val);
void toku_ft_log_del (TOKUTXN txn, FT_HANDLE brt, const DBT *key);
// Effect: Delete a key from a brt
......@@ -233,12 +233,6 @@ void toku_maybe_preallocate_in_file (int fd, int64_t size, int64_t expected_size
// Effect: make the file bigger by either doubling it or growing by 16MiB whichever is less, until it is at least size
// Return 0 on success, otherwise an error number.
void toku_ft_suppress_recovery_logs (FT_HANDLE brt, TOKUTXN txn);
// Effect: suppresses recovery logs
// Requires: this is a (target) redirected brt
// implies: txnid_that_created_or_locked_when_empty matches txn
// implies: toku_txn_note_ft(brt, txn) has been called
int toku_ft_get_fragmentation(FT_HANDLE brt, TOKU_DB_FRAGMENTATION report) __attribute__ ((warn_unused_result));
bool toku_ft_is_empty_fast (FT_HANDLE brt) __attribute__ ((warn_unused_result));
......
......@@ -15,14 +15,6 @@
#include <toku_assert.h>
#include <portability/toku_atomic.h>
void
toku_ft_suppress_rollbacks(FT h, TOKUTXN txn) {
TXNID txnid = toku_txn_get_txnid(txn);
assert(h->txnid_that_created_or_locked_when_empty == TXNID_NONE ||
h->txnid_that_created_or_locked_when_empty == txnid);
h->txnid_that_created_or_locked_when_empty = txnid;
}
void
toku_reset_root_xid_that_created(FT ft, TXNID new_root_xid_that_created) {
// Reset the root_xid_that_created field to the given value.
......@@ -109,22 +101,6 @@ ft_log_fassociate_during_checkpoint (CACHEFILE cf, void *header_v) {
toku_log_fassociate(logger, NULL, 0, filenum, ft->h->flags, bs, unlink_on_close);
}
// maps to cf->log_suppress_rollback_during_checkpoint
static void
ft_log_suppress_rollback_during_checkpoint (CACHEFILE cf, void *header_v) {
FT h = (FT) header_v;
TXNID xid = h->txnid_that_created_or_locked_when_empty;
if (xid != TXNID_NONE) {
//Only log if useful.
TOKULOGGER logger = toku_cachefile_logger(cf);
FILENUM filenum = toku_cachefile_filenum (cf);
// We don't have access to the txn here, but the txn is
// necessarily already marked as non-readonly. Use NULL.
TOKUTXN txn = NULL;
toku_log_suppress_rollback(logger, NULL, 0, txn, filenum, xid);
}
}
// Maps to cf->begin_checkpoint_userdata
// Create checkpoint-in-progress versions of header and translation (btt) (and fifo for now...).
// Has access to fd (it is protected).
......@@ -331,7 +307,6 @@ static void ft_init(FT ft, FT_OPTIONS options, CACHEFILE cf) {
toku_cachefile_set_userdata(ft->cf,
ft,
ft_log_fassociate_during_checkpoint,
ft_log_suppress_rollback_during_checkpoint,
ft_close,
ft_checkpoint,
ft_begin_checkpoint,
......@@ -432,7 +407,6 @@ int toku_read_ft_and_store_in_cachefile (FT_HANDLE brt, CACHEFILE cf, LSN max_ac
toku_cachefile_set_userdata(cf,
(void*)h,
ft_log_fassociate_during_checkpoint,
ft_log_suppress_rollback_during_checkpoint,
ft_close,
ft_checkpoint,
ft_begin_checkpoint,
......@@ -720,15 +694,17 @@ toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft_h, TOKU
if (txn) {
toku_txn_maybe_note_ft(txn, new_ft); // mark new ft as touched by this txn
// There is no recovery log entry for redirect,
// and rollback log entries are not allowed for read-only transactions.
// Normally the recovery log entry would ensure the begin was logged.
if (!txn->begin_was_logged) {
toku_maybe_log_begin_txn_for_write_operation(txn);
}
FILENUM old_filenum = toku_cachefile_filenum(old_ft->cf);
FILENUM new_filenum = toku_cachefile_filenum(new_ft->cf);
toku_logger_save_rollback_dictionary_redirect(txn, old_filenum, new_filenum);
TXNID xid = toku_txn_get_txnid(txn);
toku_ft_suppress_rollbacks(new_ft, txn);
toku_log_suppress_rollback(txn->logger, NULL, 0, txn, new_filenum, xid);
}
cleanup:
return r;
}
......
......@@ -20,9 +20,6 @@
void toku_ft_unlink(FT_HANDLE handle);
void toku_ft_unlink_on_commit(FT_HANDLE handle, TOKUTXN txn);
//Effect: suppresses rollback logs
void toku_ft_suppress_rollbacks(FT h, TOKUTXN txn);
void toku_ft_init_reflock(FT ft);
void toku_ft_destroy_reflock(FT ft);
void toku_ft_grab_reflock(FT ft);
......@@ -60,19 +57,19 @@ toku_ft_init(
int toku_dictionary_redirect_abort(FT old_h, FT new_h, TOKUTXN txn) __attribute__ ((warn_unused_result));
int toku_dictionary_redirect (const char *dst_fname_in_env, FT_HANDLE old_ft, TOKUTXN txn);
void toku_reset_root_xid_that_created(FT h, TXNID new_root_xid_that_created);
// Reset the root_xid_that_created field to the given value.
// Reset the root_xid_that_created field to the given value.
// This redefines which xid created the dictionary.
void toku_ft_add_txn_ref(FT h);
void toku_ft_remove_txn_ref(FT h);
void toku_calculate_root_offset_pointer ( FT h, CACHEKEY* root_key, uint32_t *roothash);
void toku_ft_set_new_root_blocknum(FT h, CACHEKEY new_root_key);
void toku_ft_set_new_root_blocknum(FT h, CACHEKEY new_root_key);
LSN toku_ft_checkpoint_lsn(FT h) __attribute__ ((warn_unused_result));
void toku_ft_stat64 (FT h, struct ftstat64_s *s);
// unconditionally set the descriptor for an open FT. can't do this when
// any operation has already occurred on the ft.
// unconditionally set the descriptor for an open FT. can't do this when
// any operation has already occurred on the ft.
// see toku_ft_change_descriptor(), which is the transactional version
// used by the ydb layer. it better describes the client contract.
void toku_ft_update_descriptor(FT ft, DESCRIPTOR d);
......
......@@ -28,7 +28,8 @@ enum ft_layout_version_e {
FT_LAYOUT_VERSION_20 = 20, // Deadshot: Add compression method to log_fcreate,
// mgr_last_xid after begin checkpoint,
// last_xid to shutdown
FT_LAYOUT_VERSION_21 = 21, // Ming: Add max_msn_in_ft to header
FT_LAYOUT_VERSION_21 = 21, // Ming: Add max_msn_in_ft to header,
// Removed log suppression logentry
FT_NEXT_VERSION, // the version after the current version
FT_LAYOUT_VERSION = FT_NEXT_VERSION-1, // A hack so I don't have to change this line.
FT_LAYOUT_MIN_SUPPORTED_VERSION = FT_LAYOUT_VERSION_13, // Minimum version supported
......
......@@ -152,7 +152,6 @@ struct tokutxn {
bool begin_was_logged;
// These are not read until a commit, prepare, or abort starts, and
// they're "monotonic" (only go false->true) during operation:
bool checkpoint_needed_before_commit;
bool do_fsync;
bool force_fsync_on_commit; //This transaction NEEDS an fsync once (if) it commits. (commit means root txn)
......
......@@ -131,15 +131,12 @@ const struct logtype logtypes[] = {
{"uint64_t", "rollentry_raw_count", 0},
{"FILENUMS", "open_filenums", 0},
{"uint8_t", "force_fsync_on_commit", 0},
{"uint64_t", "num_rollback_nodes", 0},
{"uint64_t", "num_rollentries", 0},
{"BLOCKNUM", "spilled_rollback_head", 0},
{"BLOCKNUM", "spilled_rollback_tail", 0},
{"BLOCKNUM", "current_rollback", 0},
{"uint64_t", "num_rollback_nodes", 0},
{"uint64_t", "num_rollentries", 0},
{"BLOCKNUM", "spilled_rollback_head", 0},
{"BLOCKNUM", "spilled_rollback_tail", 0},
{"BLOCKNUM", "current_rollback", 0},
NULLFIELD}, ASSERT_BEGIN_WAS_LOGGED}, // record all transactions
{"suppress_rollback", 'S', FA{{"FILENUM", "filenum", 0},
{"TXNID", "xid", 0},
NULLFIELD}, SHOULD_LOG_BEGIN},
// Records produced by transactions
{"xbegin", 'b', FA{{"TXNID", "xid", 0},{"TXNID", "parentxid", 0},NULLFIELD}, IGNORE_LOG_BEGIN},
{"xcommit",'C', FA{{"TXNID", "xid", 0},NULLFIELD}, ASSERT_BEGIN_WAS_LOGGED},
......@@ -413,7 +410,6 @@ generate_log_writer (void) {
switch (lt->log_begin_action) {
case SHOULD_LOG_BEGIN: {
fprintf(cf, " //txn can be NULL during tests\n");
fprintf(cf, " //txn can be also be NULL for suppress_rollback during checkpoint,\n");
fprintf(cf, " //never null when not checkpoint.\n");
fprintf(cf, " if (txn && !txn->begin_was_logged) {\n");
fprintf(cf, " toku_maybe_log_begin_txn_for_write_operation(txn);\n");
......
......@@ -629,26 +629,6 @@ static int toku_recover_backward_xstillopenprepared (struct logtype_xstillopenpr
return 0;
}
static int toku_recover_suppress_rollback (struct logtype_suppress_rollback *UU(l), RECOVER_ENV UU(renv)) {
struct file_map_tuple *tuple = NULL;
int r = file_map_find(&renv->fmap, l->filenum, &tuple);
if (r==0) {
//File is open
TOKUTXN txn = NULL;
toku_txnid2txn(renv->logger, l->xid, &txn);
assert(txn!=NULL);
FT ft = tuple->ft_handle->ft;
toku_ft_suppress_rollbacks(ft, txn);
toku_txn_maybe_note_ft(txn, ft);
}
return 0;
}
static int toku_recover_backward_suppress_rollback (struct logtype_suppress_rollback *UU(l), RECOVER_ENV UU(renv)) {
// nothing
return 0;
}
static int toku_recover_xbegin (struct logtype_xbegin *l, RECOVER_ENV renv) {
int r;
r = recover_transaction(NULL, l->xid, l->parentxid, renv->logger);
......
......@@ -46,14 +46,6 @@ int
note_ft_used_in_txns_parent(const FT &ft, uint32_t UU(index), TOKUTXN const child) {
TOKUTXN parent = child->parent;
toku_txn_maybe_note_ft(parent, ft);
if (ft->txnid_that_created_or_locked_when_empty == toku_txn_get_txnid(child)) {
//Pass magic "no rollback needed" flag to parent.
ft->txnid_that_created_or_locked_when_empty = toku_txn_get_txnid(parent);
}
if (ft->txnid_that_suppressed_recovery_logs == toku_txn_get_txnid(child)) {
//Pass magic "no recovery needed" flag to parent.
ft->txnid_that_suppressed_recovery_logs = toku_txn_get_txnid(parent);
}
return 0;
}
......@@ -217,11 +209,6 @@ int toku_rollback_commit(TOKUTXN txn, LSN lsn) {
r = txn->open_fts.iterate<struct tokutxn, note_ft_used_in_txns_parent>(txn);
assert(r==0);
// Merge the list of headers that must be checkpointed before commit
if (txn->checkpoint_needed_before_commit) {
txn->parent->checkpoint_needed_before_commit = true;
}
//If this transaction needs an fsync (if it commits)
//save that in the parent. Since the commit really happens in the root txn.
txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit;
......
......@@ -210,10 +210,11 @@ void toku_txn_maybe_note_ft (TOKUTXN txn, FT ft) {
}
// Return the number of bytes that went into the rollback data structure (the uncompressed count if there is compression)
int toku_logger_txn_rollback_raw_count(TOKUTXN txn, uint64_t *raw_count)
int toku_logger_txn_rollback_stats(TOKUTXN txn, struct txn_stat *txn_stat)
{
toku_txn_lock(txn);
*raw_count = txn->roll_info.rollentry_raw_count;
txn_stat->rollback_raw_count = txn->roll_info.rollentry_raw_count;
txn_stat->rollback_num_entries = txn->roll_info.num_rollentries;
toku_txn_unlock(txn);
return 0;
}
......
......@@ -48,7 +48,7 @@ void *toku_memdup_in_rollback(ROLLBACK_LOG_NODE log, const void *v, size_t len);
void toku_maybe_spill_rollbacks(TOKUTXN txn, ROLLBACK_LOG_NODE log);
void toku_txn_maybe_note_ft (TOKUTXN txn, FT h);
int toku_logger_txn_rollback_raw_count(TOKUTXN txn, uint64_t *raw_count);
int toku_logger_txn_rollback_stats(TOKUTXN txn, struct txn_stat *txn_stat);
int toku_find_xid_by_xid (const TXNID &xid, const TXNID &xidfind);
......
......@@ -330,12 +330,11 @@ cachetable_test (void) {
char fname1[] = __SRCFILE__ "test1.dat";
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
toku_cachefile_set_userdata(
f1,
NULL,
f1,
NULL,
&dummy_log_fassociate,
&dummy_log_rollback,
&dummy_close_usr,
&dummy_chckpnt_usr,
&test_begin_checkpoint,
......
......@@ -457,25 +457,24 @@ cachetable_test (void) {
time_of_test = 60;
int r;
toku_cachetable_create(&ct, test_limit, ZERO_LSN, NULL_LOGGER);
char fname1[] = __SRCFILE__ "test-put-checkpoint.dat";
unlink(fname1);
r = toku_cachetable_openf(&f1, ct, fname1, O_RDWR|O_CREAT, S_IRWXU|S_IRWXG|S_IRWXO); assert(r == 0);
toku_cachefile_set_userdata(
f1,
NULL,
f1,
NULL,
&dummy_log_fassociate,
&dummy_log_rollback,
&dummy_close_usr,
&dummy_chckpnt_usr,
test_begin_checkpoint, // called in begin_checkpoint
&dummy_end,
&dummy_end,
&dummy_note_pin,
&dummy_note_unpin
);
toku_pthread_t time_tid;
toku_pthread_t checkpoint_tid;
toku_pthread_t move_tid[NUM_MOVER_THREADS];
......
......@@ -10,7 +10,6 @@
// Dummy callbacks for checkpointing
//
static void dummy_log_fassociate(CACHEFILE UU(cf), void* UU(p)) { }
static void dummy_log_rollback(CACHEFILE UU(cf), void* UU(p)) { }
static void dummy_close_usr(CACHEFILE UU(cf), int UU(i), void* UU(p), bool UU(b), LSN UU(lsn)) { }
static void dummy_chckpnt_usr(CACHEFILE UU(cf), int UU(i), void* UU(p)) { }
static void dummy_begin(LSN UU(lsn), void* UU(p)) { }
......@@ -28,7 +27,6 @@ create_dummy_functions(CACHEFILE cf)
toku_cachefile_set_userdata(cf,
ud,
&dummy_log_fassociate,
&dummy_log_rollback,
&dummy_close_usr,
&dummy_chckpnt_usr,
&dummy_begin,
......
......@@ -152,7 +152,6 @@ void toku_txn_create_txn (
.live_root_txn_list = nullptr,
.xids = xids,
.begin_was_logged = false,
.checkpoint_needed_before_commit = false,
.do_fsync = false,
.force_fsync_on_commit = false,
.do_fsync_lsn = ZERO_LSN,
......@@ -246,21 +245,11 @@ int toku_txn_commit_txn(TOKUTXN txn, int nosync,
poll, poll_extra);
}
void
toku_txn_require_checkpoint_on_commit(TOKUTXN txn) {
txn->checkpoint_needed_before_commit = true;
}
struct xcommit_info {
int r;
TOKUTXN txn;
};
bool toku_txn_requires_checkpoint(TOKUTXN txn) {
return (!txn->parent && txn->checkpoint_needed_before_commit);
}
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra)
{
......@@ -377,16 +366,10 @@ void toku_txn_close_txn(TOKUTXN txn) {
}
int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const txn);
int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const txn)
int remove_txn (const FT &h, const uint32_t UU(idx), TOKUTXN const UU(txn))
// Effect: This function is called on every open FT that a transaction used.
// This function removes the transaction from that FT.
{
if (txn->txnid64==h->txnid_that_created_or_locked_when_empty) {
h->txnid_that_created_or_locked_when_empty = TXNID_NONE;
}
if (txn->txnid64==h->txnid_that_suppressed_recovery_logs) {
h->txnid_that_suppressed_recovery_logs = TXNID_NONE;
}
toku_ft_remove_txn_ref(h);
return 0;
......
......@@ -44,7 +44,6 @@ int toku_txn_load_txninfo (TOKUTXN txn, TXNINFO info);
int toku_txn_commit_txn (TOKUTXN txn, int nosync,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
bool toku_txn_requires_checkpoint(TOKUTXN txn);
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
......@@ -66,9 +65,6 @@ void toku_txn_get_fsync_info(TOKUTXN ttxn, bool* do_fsync, LSN* do_fsync_lsn);
// Complete and destroy a txn
void toku_txn_close_txn(TOKUTXN txn);
// Require a checkpoint upon commit
void toku_txn_require_checkpoint_on_commit(TOKUTXN txn);
// Remove a txn from any live txn lists
void toku_txn_complete_txn(TOKUTXN txn);
......
......@@ -27,6 +27,7 @@
#include <ft/log-internal.h>
#include <ft/checkpoint.h>
#include <portability/toku_atomic.h>
#include "loader.h"
///////////////////////////////////////////////////////////////////////////////////
// Engine status
......@@ -203,7 +204,7 @@ toku_indexer_create_indexer(DB_ENV *env,
//
{
DB_LOADER* loader = NULL;
rval = env->create_loader(env, txn, &loader, NULL, N, &dest_dbs[0], NULL, NULL, DB_PRELOCKED_WRITE | LOADER_USE_PUTS);
rval = toku_loader_create_loader(env, txn, &loader, NULL, N, &dest_dbs[0], NULL, NULL, DB_PRELOCKED_WRITE | LOADER_DISALLOW_PUTS, true);
if (rval) {
goto create_exit;
}
......@@ -473,6 +474,11 @@ build_index(DB_INDEXER *indexer) {
// - unique checks?
if ( result == 0 ) {
// Perform a checkpoint so that all of the indexing makes it to disk before continuing.
// Otherwise indexing would not be crash-safe becasue none of the undo-do messages are in the recovery log.
DB_ENV *env = indexer->i->env;
CHECKPOINTER cp = toku_cachetable_get_checkpointer(env->i->cachetable);
toku_checkpoint(cp, env->i->logger, NULL, NULL, NULL, NULL, INDEXER_CHECKPOINT);
(void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD), 1);
} else {
(void) toku_sync_fetch_and_add(&STATUS_VALUE(INDEXER_BUILD_FAIL), 1);
......@@ -487,14 +493,6 @@ close_indexer(DB_INDEXER *indexer) {
int r = 0;
(void) toku_sync_fetch_and_sub(&STATUS_VALUE(INDEXER_CURRENT), 1);
// Mark txn as needing a checkpoint.
// (This will cause a checkpoint, which is necessary
// because these files are not necessarily on disk and all the operations
// to create them are not in the recovery log.)
DB_TXN *txn = indexer->i->txn;
TOKUTXN tokutxn = db_txn_struct_i(txn)->tokutxn;
toku_txn_require_checkpoint_on_commit(tokutxn);
// Disassociate the indexer from the hot db and free_indexer
disassociate_indexer_from_hot_dbs(indexer);
free_indexer(indexer);
......
......@@ -89,9 +89,6 @@ struct __toku_loader_internal {
void *poll_extra;
char *temp_file_template;
DBT *ekeys;
DBT *evals;
DBT err_key; /* error key */
DBT err_val; /* error val */
int err_i; /* error i */
......@@ -109,21 +106,6 @@ struct __toku_loader_internal {
static void free_loader_resources(DB_LOADER *loader)
{
if ( loader->i ) {
for (int i=0; i<loader->i->N; i++) {
if (loader->i->ekeys &&
loader->i->ekeys[i].data &&
loader->i->ekeys[i].flags == DB_DBT_REALLOC) {
toku_free(loader->i->ekeys[i].data);
}
if (loader->i->evals &&
loader->i->evals[i].data &&
loader->i->evals[i].flags == DB_DBT_REALLOC) {
toku_free(loader->i->evals[i].data);
}
}
if (loader->i->ekeys) toku_free(loader->i->ekeys);
if (loader->i->evals) toku_free(loader->i->evals);
if (loader->i->err_key.data) toku_free(loader->i->err_key.data);
if (loader->i->err_val.data) toku_free(loader->i->err_val.data);
......@@ -171,24 +153,27 @@ static int ft_loader_close_and_redirect(DB_LOADER *loader) {
}
static int create_loader(DB_ENV *env,
DB_TXN *txn,
DB_LOADER **blp,
DB *src_db,
int N,
DB *dbs[],
uint32_t db_flags[/*N*/],
uint32_t dbt_flags[/*N*/],
uint32_t loader_flags,
bool check_empty)
{
// loader_flags currently has the following flags:
// LOADER_DISALLOW_PUTS loader->put is not allowed.
// Loader is only being used for its side effects
// DB_PRELOCKED_WRITE Table lock is already held, no need to relock.
int
toku_loader_create_loader(DB_ENV *env,
DB_TXN *txn,
DB_LOADER **blp,
DB *src_db,
int N,
DB *dbs[],
uint32_t db_flags[/*N*/],
uint32_t dbt_flags[/*N*/],
uint32_t loader_flags,
bool check_empty) {
int rval;
bool use_ft_loader = (loader_flags == 0);
*blp = NULL; // set later when created
DB_LOADER *loader = NULL;
bool use_puts = loader_flags&LOADER_USE_PUTS;
bool puts_allowed = !(loader_flags & LOADER_DISALLOW_PUTS);
XCALLOC(loader); // init to all zeroes (thus initializing the error_callback and poll_func)
XCALLOC(loader->i); // init to all zeroes (thus initializing all pointers to NULL)
......@@ -248,10 +233,8 @@ static int create_loader(DB_ENV *env,
for (int i=0; i<N; i++) {
brts[i] = dbs[i]->i->ft_handle;
}
loader->i->ekeys = NULL;
loader->i->evals = NULL;
LSN load_lsn;
rval = locked_load_inames(env, txn, N, dbs, new_inames_in_env, &load_lsn, use_ft_loader);
rval = locked_load_inames(env, txn, N, dbs, new_inames_in_env, &load_lsn, puts_allowed);
if ( rval!=0 ) {
toku_free(new_inames_in_env);
toku_free(brts);
......@@ -269,7 +252,7 @@ static int create_loader(DB_ENV *env,
loader->i->temp_file_template,
load_lsn,
ttxn,
!use_puts);
puts_allowed);
if ( rval!=0 ) {
toku_free(new_inames_in_env);
toku_free(brts);
......@@ -278,18 +261,9 @@ static int create_loader(DB_ENV *env,
loader->i->inames_in_env = new_inames_in_env;
toku_free(brts);
if (use_puts) {
XCALLOC_N(loader->i->N, loader->i->ekeys);
XCALLOC_N(loader->i->N, loader->i->evals);
// the following function grabs the ydb lock, so we
// first unlock before calling it
if (!puts_allowed) {
rval = ft_loader_close_and_redirect(loader);
assert_zero(rval);
for (int i=0; i<N; i++) {
loader->i->ekeys[i].flags = DB_DBT_REALLOC;
loader->i->evals[i].flags = DB_DBT_REALLOC;
toku_ft_suppress_recovery_logs(dbs[i]->i->ft_handle, db_txn_struct_i(txn)->tokutxn);
}
loader->i->ft_loader = NULL;
// close the ft_loader and skip to the redirection
rval = 0;
......@@ -312,37 +286,6 @@ static int create_loader(DB_ENV *env,
return rval;
}
// loader_flags currently has three possible values:
// 0 use brt loader
// USE_PUTS do not use brt loader, use log suppression mechanism (2440)
// which results in recursive call here via toku_db_pre_acquire_table_lock()
// DB_PRELOCKED_WRITE do not use brt loader, this is the recursive (inner) call via
// toku_db_pre_acquire_table_lock()
int toku_loader_create_loader(DB_ENV *env,
DB_TXN *txn,
DB_LOADER **blp,
DB *src_db,
int N,
DB *dbs[],
uint32_t db_flags[/*N*/],
uint32_t dbt_flags[/*N*/],
uint32_t loader_flags)
{
return create_loader(
env,
txn,
blp,
src_db,
N,
dbs,
db_flags,
dbt_flags,
loader_flags,
true
);
}
int toku_loader_set_poll_function(DB_LOADER *loader,
int (*poll_func)(void *extra, float progress),
void *poll_extra)
......@@ -377,16 +320,9 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val)
goto cleanup;
}
if (loader->i->loader_flags & LOADER_USE_PUTS) {
r = loader->i->env->put_multiple(loader->i->env,
loader->i->src_db, // src_db
loader->i->txn,
key, val,
loader->i->N, // num_dbs
loader->i->dbs, // (DB**)db_array
loader->i->ekeys,
loader->i->evals,
loader->i->db_flags); // flags_array
if (loader->i->loader_flags & LOADER_DISALLOW_PUTS) {
r = EINVAL;
goto cleanup;
}
else {
// calling toku_ft_loader_put without a lock assumes that the
......@@ -422,7 +358,7 @@ int toku_loader_put(DB_LOADER *loader, DBT *key, DBT *val)
static void redirect_loader_to_empty_dictionaries(DB_LOADER *loader) {
DB_LOADER* tmp_loader = NULL;
int r = create_loader(
int r = toku_loader_create_loader(
loader->i->env,
loader->i->txn,
&tmp_loader,
......@@ -446,7 +382,7 @@ int toku_loader_close(DB_LOADER *loader)
if ( loader->i->error_callback != NULL ) {
loader->i->error_callback(loader->i->dbs[loader->i->err_i], loader->i->err_i, loader->i->err_errno, &loader->i->err_key, &loader->i->err_val, loader->i->error_extra);
}
if (!(loader->i->loader_flags & LOADER_USE_PUTS ) ) {
if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS ) ) {
r = toku_ft_loader_abort(loader->i->ft_loader, true);
redirect_loader_to_empty_dictionaries(loader);
}
......@@ -455,7 +391,7 @@ int toku_loader_close(DB_LOADER *loader)
}
}
else { // no error outstanding
if (!(loader->i->loader_flags & LOADER_USE_PUTS ) ) {
if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS ) ) {
r = ft_loader_close_and_redirect(loader);
if (r) {
redirect_loader_to_empty_dictionaries(loader);
......@@ -481,7 +417,7 @@ int toku_loader_abort(DB_LOADER *loader)
}
}
if (!(loader->i->loader_flags & LOADER_USE_PUTS) ) {
if (!(loader->i->loader_flags & LOADER_DISALLOW_PUTS) ) {
r = toku_ft_loader_abort(loader->i->ft_loader, true);
lazy_assert_zero(r);
}
......
......@@ -40,7 +40,7 @@ Create and set up a loader.
Modifies: :: env, txn, blp, and dbs.
*/
int toku_loader_create_loader(DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags);
int toku_loader_create_loader(DB_ENV *env, DB_TXN *txn, DB_LOADER **blp, DB *src_db, int N, DB *dbs[/*N*/], uint32_t db_flags[/*N*/], uint32_t dbt_flags[/*N*/], uint32_t loader_flags, bool check_empty);
/*
......
......@@ -541,12 +541,14 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
add_test(ydb/loader-stress-test2.tdb loader-stress-test.tdb -r 5000 -s -e dir.loader-stress-test2.tdb)
add_test(ydb/loader-stress-test3.tdb loader-stress-test.tdb -u -c -e dir.loader-stress-test3.tdb)
add_test(ydb/loader-stress-test4.tdb loader-stress-test.tdb -r 10000000 -c -e dir.loader-stress-test4.tdb)
add_test(ydb/loader-stress-test5.tdb loader-stress-test.tdb -c -z -e dir.loader-stress-test5.tdb)
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES
dir.loader-stress-test0.tdb
dir.loader-stress-test1.tdb
dir.loader-stress-test2.tdb
dir.loader-stress-test3.tdb
dir.loader-stress-test4.tdb
dir.loader-stress-test5.tdb
)
list(REMOVE_ITEM loader_tests loader-dup-test.loader)
......@@ -643,11 +645,12 @@ if(BUILD_TESTING OR BUILD_SRC_TESTS)
get_filename_component(base ${loader_test} NAME_WE)
add_test(ydb/${base}.nop.loader ${base}.tdb -e "dir.${base}.nop.loader")
add_test(ydb/${base}.p.loader ${base}.tdb -p -e "dir.${base}.p.loader")
add_test(ydb/${base}.comp.loader ${base}.tdb -z -e "dir.${base}.comp.loader")
if("${tdb_tests_that_should_fail}" MATCHES "${base}.loader")
list(REMOVE_ITEM tdb_tests_that_should_fail ${base}.loader)
list(APPEND tdb_tests_that_should_fail ${base}.nop.loader ${base}.p.loader)
list(APPEND tdb_tests_that_should_fail ${base}.nop.loader ${base}.p.loader ${base}.comp.loader)
endif()
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES "dir.${base}.nop.loader" "dir.${base}.p.loader")
set_property(DIRECTORY APPEND PROPERTY ADDITIONAL_MAKE_CLEAN_FILES "dir.${base}.nop.loader" "dir.${base}.p.loader" "dir.${base}.comp.loader")
endforeach(loader_test)
set(tdb_tests_that_should_fail "ydb/${tdb_tests_that_should_fail}")
......
......@@ -84,7 +84,7 @@ static void do_1381_maybe_lock (int do_loader, uint64_t *raw_count) {
&db,
&mult_put_flags,
&mult_dbt_flags,
LOADER_USE_PUTS
LOADER_COMPRESS_INTERMEDIATES
);
CKERR(r);
}
......@@ -102,7 +102,7 @@ static void do_1381_maybe_lock (int do_loader, uint64_t *raw_count) {
CKERR(r);
}
else {
r = db->put(db, txn, &key, &val, 0);
r = db->put(db, txn, &key, &val, 0);
CKERR(r);
}
}
......@@ -116,11 +116,13 @@ static void do_1381_maybe_lock (int do_loader, uint64_t *raw_count) {
*raw_count = s2->rollback_raw_count - s1->rollback_raw_count;
if (do_loader) {
assert(s1->rollback_raw_count == s2->rollback_raw_count);
assert(s1->rollback_raw_count < s2->rollback_raw_count);
assert(s1->rollback_num_entries + 1 == s2->rollback_num_entries);
} else {
assert(s1->rollback_raw_count < s2->rollback_raw_count);
assert(s1->rollback_num_entries < s2->rollback_num_entries);
}
toku_free(s1); toku_free(s2);
r = txn->commit(txn, 0); CKERR(r);
......
......@@ -36,7 +36,13 @@ static void insert(DB_LOADER *loader, int k, int val_size) {
DBT key = { .data = key_buffer, .size = sizeof key_buffer };
DBT value = { .data = val_buffer, .size = val_size };
r = loader->put(loader, &key, &value); assert_zero(r);
r = loader->put(loader, &key, &value);
if (DISALLOW_PUTS) {
assert(r == EINVAL);
}
else {
assert_zero(r);
}
toku_free(val_buffer);
}
......@@ -65,8 +71,12 @@ int test_main(int argc, char * const argv[]) {
if (verbose > 0) verbose--;
continue;
}
if (strcmp(arg, "-z") == 0) {
loader_flags |= LOADER_COMPRESS_INTERMEDIATES;
continue;
}
if (strcmp(arg, "-p") == 0) {
loader_flags = LOADER_USE_PUTS;
loader_flags |= LOADER_DISALLOW_PUTS;
continue;
}
if (strcmp(arg, "--txn") == 0 && i+1 < argc) {
......
......@@ -68,7 +68,8 @@ int NUM_DBS=default_NUM_DBS;
int NUM_ROWS=default_NUM_ROWS;
//static int NUM_ROWS=50000000;
int CHECK_RESULTS=0;
int USE_PUTS=0;
int DISALLOW_PUTS=0;
int COMPRESS=0;
int event_trigger_lo=0; // what event triggers to use?
int event_trigger_hi =0; // 0 and 0 mean none.
enum {MAGIC=311};
......@@ -528,13 +529,17 @@ static void check_results(DB **dbs)
CKERR(r);
for(int i=0;i<NUM_ROWS;i++) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
CKERR(r);
k = *(unsigned int*)key.data;
pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j);
v = *(unsigned int*)val.data;
// test that we have the expected keys and values
assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j));
if (DISALLOW_PUTS) {
CKERR2(r, EINVAL);
} else {
CKERR(r);
k = *(unsigned int*)key.data;
pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j);
v = *(unsigned int*)val.data;
// test that we have the expected keys and values
assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j));
// printf(" DB[%d] key = %10u, val = %10u, pkey_for_db_key = %10u, pkey_for_val=%10d\n", j, v, k, pkey_for_db_key, pkey_for_val(v, j));
}
}
{printf("."); fflush(stdout);}
r = cursor->c_close(cursor);
......@@ -596,7 +601,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0;
}
uint32_t loader_flags = USE_PUTS; // set with -p option
uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p/-z option
if (verbose >= 2)
printf("old inames:\n");
......@@ -612,8 +617,10 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
r = loader->set_poll_function(loader, poll_function, expect_poll_void);
CKERR(r);
if (verbose)
printf("USE_PUTS = %d\n", USE_PUTS);
if (verbose) {
printf("DISALLOW_PUTS = %d\n", DISALLOW_PUTS);
printf("COMPRESS = %d\n", COMPRESS);
}
if (verbose >= 2)
printf("new inames:\n");
get_inames(new_inames, dbs);
......@@ -627,7 +634,9 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
dbt_init(&key, &k, sizeof(unsigned int));
dbt_init(&val, &v, sizeof(unsigned int));
r = loader->put(loader, &key, &val);
if (r != 0) {
if (DISALLOW_PUTS) {
assert(r == EINVAL);
} else if (r != 0) {
assert(error_injection && error_injected);
failed_put = r;
}
......@@ -649,13 +658,13 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
}
r = loader->close(loader);
CKERR(r);
if (!USE_PUTS) {
if (!DISALLOW_PUTS) {
assert(poll_count>0);
// You cannot count temp files here
}
}
else if (t == abort_via_poll) {
assert(!USE_PUTS); // test makes no sense with USE_PUTS
assert(!DISALLOW_PUTS); // test makes no sense with DISALLOW_PUTS
if (verbose)
printf("closing, but expecting abort via poll\n");
r = loader->close(loader);
......@@ -674,7 +683,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
else
printf("closing, expecting no error because number of system calls was less than predicted (%s)\n", type);
}
if (!USE_PUTS && error_injected) {
if (!DISALLOW_PUTS && error_injected) {
if (r == 0) {
printf("loader->close() returned 0 but should have failed due to injected error from %s on call %d\n",
err_type_str(t), trigger);
......@@ -725,7 +734,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
r = txn->commit(txn, 0);
CKERR(r);
if (!USE_PUTS) {
if (!DISALLOW_PUTS) {
assert_inames_missing(old_inames);
}
if ( CHECK_RESULTS ) {
......@@ -736,7 +745,7 @@ static void test_loader(enum test_type t, DB **dbs, int trigger)
else {
r = txn->abort(txn);
CKERR(r);
if (!USE_PUTS) {
if (!DISALLOW_PUTS) {
assert_inames_missing(new_inames);
}
}
......@@ -962,7 +971,8 @@ static void usage(const char *cmd) {
fprintf(stderr, "Usage: -h -c -s -p -d <num_dbs> -r <num_rows> -t <elow> <ehi> \n%s\n", cmd);
fprintf(stderr, " where -h print this message.\n");
fprintf(stderr, " -c check the results.\n");
fprintf(stderr, " -p LOADER_USE_PUTS.\n");
fprintf(stderr, " -p LOADER_DISALLOW_PUTS.\n");
fprintf(stderr, " -z LOADER_COMPRESS_INTERMEDIATES.\n");
fprintf(stderr, " -k Test only normal operation and abort_via_poll (but thoroughly).\n");
fprintf(stderr, " -s size_factor=1.\n");
fprintf(stderr, " -d <num_dbs> Number of indexes to create (default=%d).\n", default_NUM_DBS);
......@@ -998,8 +1008,10 @@ static void do_args(int argc, char * const argv[]) {
NUM_ROWS = atoi(argv[0]);
} else if (strcmp(argv[0], "-c")==0) {
CHECK_RESULTS = 1;
} else if (strcmp(argv[0], "-z")==0) {
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 0;
DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
printf("DISABLED Using puts as part of #4503\n");
} else if (strcmp(argv[0], "-k")==0) {
test_only_abort_via_poll = 1;
......@@ -1010,7 +1022,7 @@ static void do_args(int argc, char * const argv[]) {
argc--; argv++;
event_trigger_hi = atoi(argv[0]);
} else if (strcmp(argv[0], "-s")==0) {
db_env_set_loader_size_factor(1);
db_env_set_loader_size_factor(1);
} else if (strcmp(argv[0],"-e") == 0 && argc > 1) {
argc--; argv++;
envdir = argv[0];
......
......@@ -61,7 +61,9 @@ static void do_args(int argc, char * const argv[]) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "-p") == 0) {
loader_flags = LOADER_USE_PUTS;
loader_flags |= LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-z") == 0) {
loader_flags |= LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-e") == 0) {
argc--; argv++;
if (argc > 0)
......
......@@ -29,7 +29,7 @@ static void loader_open_abort(int ndb) {
r = env->set_generate_row_callback_for_put(env, put_multiple_generate);
CKERR(r);
int envflags = DB_INIT_LOCK | DB_INIT_LOG | DB_INIT_MPOOL | DB_INIT_TXN | DB_CREATE | DB_PRIVATE;
r = env->open(env, envdir, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
r = env->open(env, envdir, envflags, S_IRWXU+S_IRWXG+S_IRWXO); CKERR(r);
env->set_errfile(env, stderr);
DB *dbs[ndb];
......@@ -49,7 +49,7 @@ static void loader_open_abort(int ndb) {
DB_LOADER *loader;
r = env->create_loader(env, txn, &loader, ndb > 0 ? dbs[0] : NULL, ndb, dbs, db_flags, dbt_flags, loader_flags); CKERR(r);
r = loader->close(loader); CKERR(r);
r = txn->commit(txn, 0); CKERR(r);
......@@ -77,7 +77,9 @@ static void do_args(int argc, char * const argv[]) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "-p") == 0) {
loader_flags = LOADER_USE_PUTS;
loader_flags |= LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-z") == 0) {
loader_flags |= LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-e") == 0) {
argc--; argv++;
if (argc > 0)
......
......@@ -14,7 +14,8 @@ enum {MAX_DBS=256};
int NUM_DBS=5;
int NUM_ROWS=100000;
int CHECK_RESULTS=0;
int USE_PUTS=0;
int DISALLOW_PUTS=0;
int COMPRESS=0;
enum {MAGIC=311};
bool dup_row_at_end = false; // false: duplicate at the begining. true: duplicate at the end. The duplicated row is row 0.
......@@ -156,14 +157,18 @@ static void check_results(DB **dbs)
r = dbs[j]->cursor(dbs[j], txn, &cursor, 0);
CKERR(r);
for(int i=0;i<NUM_ROWS;i++) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
CKERR(r);
k = *(unsigned int*)key.data;
pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j);
v = *(unsigned int*)val.data;
// test that we have the expected keys and values
assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j));
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
if (DISALLOW_PUTS) {
CKERR2(r, EINVAL);
} else {
CKERR(r);
k = *(unsigned int*)key.data;
pkey_for_db_key = (j == 0) ? k : inv_twiddle32(k, j);
v = *(unsigned int*)val.data;
// test that we have the expected keys and values
assert((unsigned int)pkey_for_db_key == (unsigned int)pkey_for_val(v, j));
// printf(" DB[%d] key = %10u, val = %10u, pkey_for_db_key = %10u, pkey_for_val=%10d\n", j, v, k, pkey_for_db_key, pkey_for_val(v, j));
}
}
{printf("."); fflush(stdout);}
r = cursor->c_close(cursor);
......@@ -200,14 +205,14 @@ static void test_loader(DB **dbs)
DB_LOADER *loader;
uint32_t db_flags[MAX_DBS];
uint32_t dbt_flags[MAX_DBS];
for(int i=0;i<MAX_DBS;i++) {
db_flags[i] = DB_NOOVERWRITE;
for(int i=0;i<MAX_DBS;i++) {
db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0;
}
uint32_t loader_flags = USE_PUTS; // set with -p option
uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
// create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0);
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
CKERR(r);
......@@ -238,11 +243,9 @@ static void test_loader(DB **dbs)
dbt_init(&key, &k, sizeof(unsigned int));
dbt_init(&val, &v, sizeof(unsigned int));
r = loader->put(loader, &key, &val);
if (USE_PUTS) {
//PUT loader can return -1 if it finds an error during the puts.
CKERR2s(r, 0,-1);
}
else {
if (DISALLOW_PUTS) {
CKERR2(r, EINVAL);
} else {
CKERR(r);
}
if ( CHECK_RESULTS || verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} }
......@@ -351,7 +354,7 @@ int test_main(int argc, char * const *argv) {
else {
int sizes[]={1,4000000,-1};
//Make PUT loader take about the same amount of time:
if (USE_PUTS) sizes[1] /= 25;
if (DISALLOW_PUTS) sizes[1] /= 25;
for (int i=0; sizes[i]>=0; i++) {
if (verbose) printf("Doing %d\n", sizes[i]);
NUM_ROWS = sizes[i];
......@@ -404,8 +407,10 @@ static void do_args(int argc, char * const argv[]) {
num_rows_set = true;
} else if (strcmp(argv[0], "-c")==0) {
CHECK_RESULTS = 1;
} else if (strcmp(argv[0], "-z")==0) {
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 1;
DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-s")==0) {
db_env_set_loader_size_factor(1);
} else if (strcmp(argv[0], "-E")==0) {
......
......@@ -11,7 +11,8 @@
static const char *envdir = ENVDIR;
DB_ENV *env;
int USE_PUTS=0;
int DISALLOW_PUTS=0;
int COMPRESS=0;
enum {MAX_NAME=128};
enum {NUM_DBS=1};
enum {NUM_KV_PAIRS=3};
......@@ -50,10 +51,10 @@ static void test_loader(DB **dbs)
db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0;
}
uint32_t loader_flags = USE_PUTS; // set with -p option
uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
// create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0);
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
CKERR(r);
......@@ -68,7 +69,11 @@ static void test_loader(DB **dbs)
dbt_init(&key, &kv_pairs[i].key, sizeof(kv_pairs[i].key));
dbt_init(&val, &kv_pairs[i].val, sizeof(kv_pairs[i].val));
r = loader->put(loader, &key, &val);
CKERR(r);
if (DISALLOW_PUTS) {
CKERR2(r, EINVAL);
} else {
CKERR(r);
}
}
*/
// close the loader
......@@ -88,10 +93,14 @@ static void test_loader(DB **dbs)
CKERR(r);
for(int i=0;i<NUM_KV_PAIRS;i++) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
if (r!=0) { fprintf(stderr, "r==%d, failure\n", r); }
CKERR(r);
assert(*(int64_t*)key.data == kv_pairs[i].key);
assert(*(int64_t*)val.data == kv_pairs[i].val);
if (DISALLOW_PUTS) {
CKERR2(r, DB_NOTFOUND);
} else {
if (r!=0) { fprintf(stderr, "r==%d, failure\n", r); }
CKERR(r);
assert(*(int64_t*)key.data == kv_pairs[i].key);
assert(*(int64_t*)val.data == kv_pairs[i].val);
}
}
cursor->c_close(cursor);
}
......@@ -178,8 +187,10 @@ static void do_args(int argc, char * const argv[]) {
} else if (strcmp(argv[0],"-q")==0) {
verbose--;
if (verbose<0) verbose=0;
} else if (strcmp(argv[0], "-z")==0) {
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 1;
DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-e") == 0) {
argc--; argv++;
if (argc > 0)
......
......@@ -11,7 +11,8 @@
static const char *envdir = ENVDIR;
DB_ENV *env;
int USE_PUTS=0;
int DISALLOW_PUTS=0;
int COMPRESS=0;
enum {MAX_NAME=128};
enum {NUM_DBS=1};
enum {NUM_KV_PAIRS=3};
......@@ -47,14 +48,14 @@ static void test_loader(DB **dbs)
DB_LOADER *loader;
uint32_t db_flags[NUM_DBS];
uint32_t dbt_flags[NUM_DBS];
for(int i=0;i<NUM_DBS;i++) {
db_flags[i] = DB_NOOVERWRITE;
for(int i=0;i<NUM_DBS;i++) {
db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0;
}
uint32_t loader_flags = USE_PUTS; // set with -p option
uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p or -c option
// create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0);
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
r = env->create_loader(env, txn, &loader, dbs[0], NUM_DBS, dbs, db_flags, dbt_flags, loader_flags);
CKERR(r);
......@@ -62,7 +63,7 @@ static void test_loader(DB **dbs)
CKERR(r);
r = loader->set_poll_function(loader, NULL, NULL);
CKERR(r);
uint64_t before_puts = toku_test_get_latest_lsn(env);
// using loader->put, put values into DB
DBT key, val;
......@@ -70,7 +71,11 @@ static void test_loader(DB **dbs)
dbt_init(&key, &kv_pairs[i].key, sizeof(kv_pairs[i].key));
dbt_init(&val, &kv_pairs[i].val, sizeof(kv_pairs[i].val));
r = loader->put(loader, &key, &val);
CKERR(r);
if (DISALLOW_PUTS) {
CKERR2(r, EINVAL);
} else {
CKERR(r);
}
}
uint64_t after_puts = toku_test_get_latest_lsn(env);
assert(before_puts == after_puts);
......@@ -90,11 +95,15 @@ static void test_loader(DB **dbs)
r = dbs[j]->cursor(dbs[j], txn, &cursor, 0);
CKERR(r);
for(int i=0;i<NUM_KV_PAIRS;i++) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
if (r!=0) { fprintf(stderr, "r==%d, failure\n", r); }
CKERR(r);
assert(*(int64_t*)key.data == kv_pairs[i].key);
assert(*(int64_t*)val.data == kv_pairs[i].val);
if (DISALLOW_PUTS) {
CKERR2(r, DB_NOTFOUND);
} else {
CKERR(r);
assert(*(int64_t*)key.data == kv_pairs[i].key);
assert(*(int64_t*)val.data == kv_pairs[i].val);
}
}
cursor->c_close(cursor);
}
......@@ -185,7 +194,9 @@ static void do_args(int argc, char * const argv[]) {
fprintf(stderr, "Usage:\n%s\n", cmd);
exit(resultcode);
} else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 1;
DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-z")==0) {
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "--block_size") == 0) {
argc--; argv++;
block_size = atoi(argv[0]);
......
......@@ -23,7 +23,8 @@ enum {MAX_DBS=1024};
int NUM_DBS=1;
int NUM_ROWS=1000000;
int CHECK_RESULTS=1;
int USE_PUTS=0;
int DISALLOW_PUTS=0;
int COMPRESS=0;
enum { old_default_cachesize=1024 }; // MB
int CACHESIZE=old_default_cachesize;
int ALLOW_DUPS=0;
......@@ -247,13 +248,18 @@ static void check_results(DB **dbs) {
// generate the expected keys
unsigned int *expected_key = (unsigned int *) toku_malloc(NUM_ROWS * sizeof (unsigned int));
for (int i = 0; i < NUM_ROWS; i++)
for (int i = 0; i < NUM_ROWS; i++) {
expected_key[i] = j == 0 ? (unsigned int)(i+1) : twiddle32(i+1, j);
}
// sort the keys
qsort(expected_key, NUM_ROWS, sizeof (unsigned int), uint_cmp);
for (int i = 0; i < NUM_ROWS+1; i++) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
if (DISALLOW_PUTS) {
CKERR2(r, DB_NOTFOUND);
break;
}
if (r == DB_NOTFOUND) {
assert(i == NUM_ROWS); // check that there are exactly NUM_ROWS in the dictionary
break;
......@@ -393,16 +399,16 @@ static void test_loader(DB **dbs)
uint32_t db_flags[MAX_DBS];
uint32_t dbt_flags[MAX_DBS];
uint32_t flags = DB_NOOVERWRITE;
if ( (USE_PUTS == 1) && (ALLOW_DUPS == 1) ) flags = 0;
if ( (DISALLOW_PUTS != 0) && (ALLOW_DUPS == 1) ) flags = 0;
for(int i=0;i<MAX_DBS;i++) {
db_flags[i] = flags;
dbt_flags[i] = 0;
}
uint32_t loader_flags = USE_PUTS ? LOADER_USE_PUTS : 0; // set with -p option
uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
// create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0);
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
hiwater_start = hiwater;
if (footprint_print) printf("%s:%d Hiwater=%ld water=%ld\n", __FILE__, __LINE__, hiwater, water);
......@@ -423,7 +429,11 @@ static void test_loader(DB **dbs)
dbt_init(&key, &k, sizeof(unsigned int));
dbt_init(&val, &v, sizeof(unsigned int));
r = loader->put(loader, &key, &val);
CKERR(r);
if (DISALLOW_PUTS) {
CKERR2(r, EINVAL);
} else {
CKERR(r);
}
if ( verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} }
}
if ( verbose ) {printf("\n"); fflush(stdout);}
......@@ -445,7 +455,7 @@ static void test_loader(DB **dbs)
CKERR2s(r,0,TOKUDB_CANCELED);
if (r==0) {
if ( USE_PUTS == 0 ) {
if ( DISALLOW_PUTS == 0 ) {
if (poll_count == 0) printf("%s:%d\n", __FILE__, __LINE__);
assert(poll_count>0);
}
......@@ -650,7 +660,9 @@ static void do_args(int argc, char * const argv[]) {
} else if (strcmp(argv[0], "-c")==0) {
CHECK_RESULTS = 1;
} else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 1;
DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-z")==0) {
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-m")==0) {
argc--; argv++;
CACHESIZE = atoi(argv[0]);
......
......@@ -32,7 +32,8 @@ enum {MAX_DBS=1024};
int NUM_DBS=5;
int NUM_ROWS=100000;
int CHECK_RESULTS=0;
int USE_PUTS=0;
int DISALLOW_PUTS=0;
int COMPRESS=0;
enum { old_default_cachesize=1024 }; // MB
int CACHESIZE=old_default_cachesize;
int ALLOW_DUPS=0;
......@@ -267,6 +268,10 @@ static void check_results(DB **dbs) {
for (int i = 0; i < NUM_ROWS+1; i++) {
r = cursor->c_get(cursor, &key, &val, DB_NEXT);
if (DISALLOW_PUTS) {
CKERR2(r, DB_NOTFOUND);
break;
}
if (r == DB_NOTFOUND) {
assert(i == NUM_ROWS); // check that there are exactly NUM_ROWS in the dictionary
break;
......@@ -357,16 +362,16 @@ static void test_loader(DB **dbs)
uint32_t db_flags[MAX_DBS];
uint32_t dbt_flags[MAX_DBS];
uint32_t flags = DB_NOOVERWRITE;
if ( (USE_PUTS == 1) && (ALLOW_DUPS == 1) ) flags = 0;
if ( (DISALLOW_PUTS) && (ALLOW_DUPS == 1) ) flags = 0;
for(int i=0;i<MAX_DBS;i++) {
db_flags[i] = flags;
dbt_flags[i] = 0;
}
uint32_t loader_flags = USE_PUTS ? LOADER_USE_PUTS : 0; // set with -p option
uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
// create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0);
r = env->txn_begin(env, NULL, &txn, 0);
CKERR(r);
hiwater_start = hiwater;
if (footprint_print) printf("%s:%d Hiwater=%ld water=%ld\n", __FILE__, __LINE__, hiwater, water);
......@@ -387,7 +392,11 @@ static void test_loader(DB **dbs)
dbt_init(&key, &k, sizeof(unsigned int));
dbt_init(&val, &v, sizeof(unsigned int));
r = loader->put(loader, &key, &val);
CKERR(r);
if (DISALLOW_PUTS) {
CKERR2(r, EINVAL);
} else {
CKERR(r);
}
if ( verbose) { if((i%10000) == 0){printf("."); fflush(stdout);} }
}
if ( verbose ) {printf("\n"); fflush(stdout);}
......@@ -409,7 +418,7 @@ static void test_loader(DB **dbs)
CKERR2s(r,0,TOKUDB_CANCELED);
if (r==0) {
if ( USE_PUTS == 0 ) {
if (!DISALLOW_PUTS) {
if (poll_count == 0) printf("%s:%d\n", __FILE__, __LINE__);
assert(poll_count>0);
}
......@@ -432,9 +441,15 @@ static void test_loader(DB **dbs)
if (verbose)
printf("NUM_ROWS=%d n_keys=%" PRIu64 " n_data=%" PRIu64 " dsize=%" PRIu64 " fsize=%" PRIu64 "\n",
NUM_ROWS, stats.bt_nkeys, stats.bt_ndata, stats.bt_dsize, stats.bt_fsize);
assert(stats.bt_nkeys <= (uint64_t)NUM_ROWS); // Fix as part of #4129. Was ==
assert(stats.bt_ndata <= (uint64_t)NUM_ROWS);
assert(stats.bt_dsize == ((uint64_t)NUM_ROWS) * 2 * sizeof(unsigned int));
if (DISALLOW_PUTS) {
assert(stats.bt_nkeys == 0); // Fix as part of #4129. Was ==
assert(stats.bt_ndata == 0);
assert(stats.bt_dsize == 0);
} else {
assert(stats.bt_nkeys <= (uint64_t)NUM_ROWS); // Fix as part of #4129. Was ==
assert(stats.bt_ndata <= (uint64_t)NUM_ROWS);
assert(stats.bt_dsize == ((uint64_t)NUM_ROWS) * 2 * sizeof(unsigned int));
}
r = txn->commit(txn, 0);
CKERR(r);
}
......@@ -633,7 +648,9 @@ static void do_args(int argc, char * const argv[]) {
} else if (strcmp(argv[0], "-c")==0) {
CHECK_RESULTS = 1;
} else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 1;
DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-z")==0) {
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-m")==0) {
argc--; argv++;
CACHESIZE = atoi(argv[0]);
......
......@@ -13,7 +13,8 @@ enum {MAX_NAME=128};
enum {MAX_DBS=16};
enum {MAX_ROW_LEN=1024};
static int NUM_DBS=10;
static int USE_PUTS=0;
static int DISALLOW_PUTS=0;
static int COMPRESS=0;
static int USE_REGION=0;
static const char *envdir = ENVDIR;
......@@ -291,7 +292,7 @@ static int test_loader(DB **dbs)
db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0;
}
uint32_t loader_flags = USE_PUTS; // set with -p option
uint32_t loader_flags = DISALLOW_PUTS | COMPRESS; // set with -p option
FILE *fp;
// select which table to loader
......@@ -335,7 +336,11 @@ static int test_loader(DB **dbs)
dbt_init(&key, &k, sizeof(int));
dbt_init(&val, v, strlen(v)+1);
r = loader->put(loader, &key, &val);
CKERR(r);
if (DISALLOW_PUTS) {
CKERR2(r, EINVAL);
} else {
CKERR(r);
}
if (verbose) { if((i++%10000) == 0){printf("."); fflush(stdout);} }
c = tpch_read_row(fp, &k, v);
}
......@@ -350,7 +355,7 @@ static int test_loader(DB **dbs)
printf(" done\n");
CKERR(r);
if ( USE_PUTS == 0 ) assert(poll_count>0);
if ( DISALLOW_PUTS == 0 ) assert(poll_count>0);
r = txn->commit(txn, 0);
CKERR(r);
......@@ -442,7 +447,9 @@ static void do_args(int argc, char * const argv[]) {
fprintf(stderr, "Usage: -h -p -g\n%s\n", cmd);
exit(resultcode);
} else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = 1;
DISALLOW_PUTS = LOADER_DISALLOW_PUTS;
} else if (strcmp(argv[0], "-z")==0) {
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
} else if (strcmp(argv[0], "-g")==0) {
USE_REGION = 1;
} else if (strcmp(argv[0], "-e") == 0) {
......
......@@ -53,7 +53,7 @@ static bool do_test=false, do_recover=false;
static DB_ENV *env;
static int NUM_ROWS=50000000;
static int USE_PUTS=0;
static int COMPRESS=0;
enum {MAX_NAME=128};
enum {MAGIC=311};
......@@ -290,7 +290,7 @@ static void test_loader(DB **dbs)
db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0;
}
uint32_t loader_flags = USE_PUTS; // set with -p option
uint32_t loader_flags = COMPRESS; // set with -p option
int n = count_temp(env->i->real_data_dir);
assert(n == 0); // Must be no temp files before loader is run
......@@ -308,7 +308,7 @@ static void test_loader(DB **dbs)
r = loader->set_poll_function(loader, poll_function, expect_poll_void);
CKERR(r);
printf("USE_PUTS = %d\n", USE_PUTS);
printf("COMPRESS = %d\n", COMPRESS);
if (verbose) printf("new inames:\n");
get_inames(new_inames, dbs);
......@@ -461,9 +461,9 @@ static void do_args(int argc, char * const argv[]) {
} else if (strcmp(argv[0], "-r")==0) {
argc--; argv++;
NUM_ROWS = atoi(argv[0]);
} else if (strcmp(argv[0], "-p")==0) {
USE_PUTS = LOADER_USE_PUTS;
printf("Using puts\n");
} else if (strcmp(argv[0], "-z")==0) {
COMPRESS = LOADER_COMPRESS_INTERMEDIATES;
printf("Compressing\n");
} else if (strcmp(argv[0], "--test")==0) {
do_test=true;
} else if (strcmp(argv[0], "--recover") == 0) {
......
......@@ -5,6 +5,7 @@
// Verify that log-suppress recovery is done properly. (See ticket 2781.)
// TODO: determine if this is useful at all anymore (log suppression does not exist anymore)
#include <sys/stat.h>
......@@ -97,7 +98,7 @@ load(DB **dbs) {
db_flags[i] = DB_NOOVERWRITE;
dbt_flags[i] = 0;
}
uint32_t loader_flags = LOADER_USE_PUTS;
uint32_t loader_flags = LOADER_COMPRESS_INTERMEDIATES;
// create and initialize loader
r = env->txn_begin(env, NULL, &txn, 0);
......
......@@ -21,7 +21,7 @@ static int put_multiple_generate(DB *UU(dest_db), DB *UU(src_db), DBT *dest_key,
}
static void
test_loader_abort (bool use_puts, bool abort_loader, bool abort_txn) {
test_loader_abort (bool do_compress, bool abort_loader, bool abort_txn) {
DB_ENV * env;
DB *db;
DB_TXN *txn;
......@@ -35,7 +35,7 @@ test_loader_abort (bool use_puts, bool abort_loader, bool abort_txn) {
DB_LOADER *loader;
uint32_t db_flags = 0;
uint32_t dbt_flags = 0;
uint32_t loader_flags = use_puts ? LOADER_USE_PUTS : 0;
uint32_t loader_flags = do_compress ? LOADER_COMPRESS_INTERMEDIATES : 0;
DBC* cursor = NULL;
/* create the dup database file */
......
......@@ -881,7 +881,7 @@ static int UU() loader_op(DB_TXN* txn, ARG UU(arg), void* UU(operation_extra), v
r = db_load->open(db_load, txn, "loader-db", NULL, DB_BTREE, DB_CREATE, 0666);
assert(r == 0);
DB_LOADER *loader;
uint32_t loader_flags = (num == 0) ? 0 : LOADER_USE_PUTS;
uint32_t loader_flags = (num == 0) ? 0 : LOADER_COMPRESS_INTERMEDIATES;
r = env->create_loader(env, txn, &loader, db_load, 1, &db_load, &db_flags, &dbt_flags, loader_flags);
CKERR(r);
......
......@@ -1453,7 +1453,7 @@ env_create_loader(DB_ENV *env,
uint32_t db_flags[/*N*/],
uint32_t dbt_flags[/*N*/],
uint32_t loader_flags) {
int r = toku_loader_create_loader(env, txn, blp, src_db, N, dbs, db_flags, dbt_flags, loader_flags);
int r = toku_loader_create_loader(env, txn, blp, src_db, N, dbs, db_flags, dbt_flags, loader_flags, true);
return r;
}
......
......@@ -219,26 +219,21 @@ toku_txn_prepare (DB_TXN *txn, uint8_t gid[DB_GID_SIZE]) {
return toku_txn_xa_prepare(txn, &xid);
}
static int
static int
toku_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {
XMALLOC(*txn_stat);
return toku_logger_txn_rollback_raw_count(db_txn_struct_i(txn)->tokutxn, &(*txn_stat)->rollback_raw_count);
return toku_logger_txn_rollback_stats(db_txn_struct_i(txn)->tokutxn, *txn_stat);
}
static int
static int
locked_txn_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {
int r = toku_txn_txn_stat(txn, txn_stat);
int r = toku_txn_txn_stat(txn, txn_stat);
return r;
}
static int
locked_txn_commit_with_progress(DB_TXN *txn, uint32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
TOKUTXN ttxn = db_txn_struct_i(txn)->tokutxn;
if (toku_txn_requires_checkpoint(ttxn)) {
CHECKPOINTER cp = toku_cachetable_get_checkpointer(txn->mgrp->i->cachetable);
toku_checkpoint(cp, txn->mgrp->i->logger, NULL, NULL, NULL, NULL, TXN_COMMIT_CHECKPOINT);
}
bool holds_mo_lock = false;
if (!toku_txn_is_read_only(db_txn_struct_i(txn)->tokutxn)) {
// A readonly transaction does no logging, and therefore does not
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment