Commit 14d67581 authored by Yoni Fogel's avatar Yoni Fogel

closes[t:2391] Merge 2391 branch to main

git-svn-id: file:///svn/toku/tokudb@18153 c7de825b-a66e-492c-adef-691d508d4ae1
parent 0bdd64ae
...@@ -346,6 +346,13 @@ struct __toku_db_txn_active { ...@@ -346,6 +346,13 @@ struct __toku_db_txn_active {
char __toku_dummy0[4]; char __toku_dummy0[4];
DB_LSN lsn; /* 32-bit offset=8 size=8, 64=bit offset=8 size=8 */ DB_LSN lsn; /* 32-bit offset=8 size=8, 64=bit offset=8 size=8 */
}; };
typedef struct __toku_txn_progress {
uint64_t entries_total;
uint64_t entries_processed;
uint8_t is_commit;
uint8_t stalled_on_checkpoint;
} *TOKU_TXN_PROGRESS, TOKU_TXN_PROGRESS_S;
typedef void(*TXN_PROGRESS_POLL_FUNCTION)(TOKU_TXN_PROGRESS, void*);
struct txn_stat { struct txn_stat {
u_int64_t rolltmp_raw_count; u_int64_t rolltmp_raw_count;
}; };
...@@ -354,7 +361,9 @@ struct __toku_db_txn { ...@@ -354,7 +361,9 @@ struct __toku_db_txn {
DB_TXN *parent; /* 32-bit offset=4 size=4, 64=bit offset=8 size=8 */ DB_TXN *parent; /* 32-bit offset=4 size=4, 64=bit offset=8 size=8 */
int (*txn_stat)(DB_TXN *, struct txn_stat **); int (*txn_stat)(DB_TXN *, struct txn_stat **);
struct { void *next, *prev; } open_txns; struct { void *next, *prev; } open_txns;
void* __toku_dummy0[7]; int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*);
int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*);
void* __toku_dummy0[5];
char __toku_dummy1[24]; char __toku_dummy1[24];
void *api_internal; /* 32-bit offset=68 size=4, 64=bit offset=112 size=8 */ void *api_internal; /* 32-bit offset=68 size=4, 64=bit offset=112 size=8 */
void* __toku_dummy2[1]; void* __toku_dummy2[1];
......
...@@ -362,6 +362,13 @@ struct __toku_db_txn_active { ...@@ -362,6 +362,13 @@ struct __toku_db_txn_active {
DB_LSN lsn; /* 32-bit offset=8 size=8, 64=bit offset=8 size=8 */ DB_LSN lsn; /* 32-bit offset=8 size=8, 64=bit offset=8 size=8 */
char __toku_dummy1[132]; /* Padding at the end */ char __toku_dummy1[132]; /* Padding at the end */
}; };
typedef struct __toku_txn_progress {
uint64_t entries_total;
uint64_t entries_processed;
uint8_t is_commit;
uint8_t stalled_on_checkpoint;
} *TOKU_TXN_PROGRESS, TOKU_TXN_PROGRESS_S;
typedef void(*TXN_PROGRESS_POLL_FUNCTION)(TOKU_TXN_PROGRESS, void*);
struct txn_stat { struct txn_stat {
u_int64_t rolltmp_raw_count; u_int64_t rolltmp_raw_count;
}; };
...@@ -370,7 +377,9 @@ struct __toku_db_txn { ...@@ -370,7 +377,9 @@ struct __toku_db_txn {
DB_TXN *parent; /* 32-bit offset=4 size=4, 64=bit offset=8 size=8 */ DB_TXN *parent; /* 32-bit offset=4 size=4, 64=bit offset=8 size=8 */
int (*txn_stat)(DB_TXN *, struct txn_stat **); int (*txn_stat)(DB_TXN *, struct txn_stat **);
struct { void *next, *prev; } open_txns; struct { void *next, *prev; } open_txns;
void* __toku_dummy0[12]; int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*);
int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*);
void* __toku_dummy0[10];
char __toku_dummy1[24]; char __toku_dummy1[24];
void *api_internal; /* 32-bit offset=88 size=4, 64=bit offset=152 size=8 */ void *api_internal; /* 32-bit offset=88 size=4, 64=bit offset=152 size=8 */
void* __toku_dummy2[2]; void* __toku_dummy2[2];
......
...@@ -369,6 +369,13 @@ struct __toku_db_txn_active { ...@@ -369,6 +369,13 @@ struct __toku_db_txn_active {
DB_LSN lsn; /* 32-bit offset=16 size=8, 64=bit offset=24 size=8 */ DB_LSN lsn; /* 32-bit offset=16 size=8, 64=bit offset=24 size=8 */
char __toku_dummy2[184]; /* Padding at the end */ char __toku_dummy2[184]; /* Padding at the end */
}; };
typedef struct __toku_txn_progress {
uint64_t entries_total;
uint64_t entries_processed;
uint8_t is_commit;
uint8_t stalled_on_checkpoint;
} *TOKU_TXN_PROGRESS, TOKU_TXN_PROGRESS_S;
typedef void(*TXN_PROGRESS_POLL_FUNCTION)(TOKU_TXN_PROGRESS, void*);
struct txn_stat { struct txn_stat {
u_int64_t rolltmp_raw_count; u_int64_t rolltmp_raw_count;
}; };
...@@ -377,7 +384,9 @@ struct __toku_db_txn { ...@@ -377,7 +384,9 @@ struct __toku_db_txn {
DB_TXN *parent; /* 32-bit offset=4 size=4, 64=bit offset=8 size=8 */ DB_TXN *parent; /* 32-bit offset=4 size=4, 64=bit offset=8 size=8 */
int (*txn_stat)(DB_TXN *, struct txn_stat **); int (*txn_stat)(DB_TXN *, struct txn_stat **);
struct { void *next, *prev; } open_txns; struct { void *next, *prev; } open_txns;
void* __toku_dummy0[15]; int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*);
int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*);
void* __toku_dummy0[13];
char __toku_dummy1[8]; char __toku_dummy1[8];
void *api_internal; /* 32-bit offset=84 size=4, 64=bit offset=160 size=8 */ void *api_internal; /* 32-bit offset=84 size=4, 64=bit offset=160 size=8 */
void* __toku_dummy2[2]; void* __toku_dummy2[2];
......
...@@ -369,6 +369,13 @@ struct __toku_db_txn_active { ...@@ -369,6 +369,13 @@ struct __toku_db_txn_active {
DB_LSN lsn; /* 32-bit offset=16 size=8, 64=bit offset=24 size=8 */ DB_LSN lsn; /* 32-bit offset=16 size=8, 64=bit offset=24 size=8 */
char __toku_dummy2[200]; /* Padding at the end */ char __toku_dummy2[200]; /* Padding at the end */
}; };
typedef struct __toku_txn_progress {
uint64_t entries_total;
uint64_t entries_processed;
uint8_t is_commit;
uint8_t stalled_on_checkpoint;
} *TOKU_TXN_PROGRESS, TOKU_TXN_PROGRESS_S;
typedef void(*TXN_PROGRESS_POLL_FUNCTION)(TOKU_TXN_PROGRESS, void*);
struct txn_stat { struct txn_stat {
u_int64_t rolltmp_raw_count; u_int64_t rolltmp_raw_count;
}; };
...@@ -377,7 +384,9 @@ struct __toku_db_txn { ...@@ -377,7 +384,9 @@ struct __toku_db_txn {
DB_TXN *parent; /* 32-bit offset=4 size=4, 64=bit offset=8 size=8 */ DB_TXN *parent; /* 32-bit offset=4 size=4, 64=bit offset=8 size=8 */
int (*txn_stat)(DB_TXN *, struct txn_stat **); int (*txn_stat)(DB_TXN *, struct txn_stat **);
struct { void *next, *prev; } open_txns; struct { void *next, *prev; } open_txns;
void* __toku_dummy0[15]; int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*);
int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*);
void* __toku_dummy0[13];
char __toku_dummy1[8]; char __toku_dummy1[8];
void *api_internal; /* 32-bit offset=84 size=4, 64=bit offset=160 size=8 */ void *api_internal; /* 32-bit offset=84 size=4, 64=bit offset=160 size=8 */
void* __toku_dummy2[2]; void* __toku_dummy2[2];
......
...@@ -373,6 +373,13 @@ struct __toku_db_txn_active { ...@@ -373,6 +373,13 @@ struct __toku_db_txn_active {
DB_LSN lsn; /* 32-bit offset=16 size=8, 64=bit offset=24 size=8 */ DB_LSN lsn; /* 32-bit offset=16 size=8, 64=bit offset=24 size=8 */
char __toku_dummy2[200]; /* Padding at the end */ char __toku_dummy2[200]; /* Padding at the end */
}; };
typedef struct __toku_txn_progress {
uint64_t entries_total;
uint64_t entries_processed;
uint8_t is_commit;
uint8_t stalled_on_checkpoint;
} *TOKU_TXN_PROGRESS, TOKU_TXN_PROGRESS_S;
typedef void(*TXN_PROGRESS_POLL_FUNCTION)(TOKU_TXN_PROGRESS, void*);
struct txn_stat { struct txn_stat {
u_int64_t rolltmp_raw_count; u_int64_t rolltmp_raw_count;
}; };
...@@ -381,7 +388,9 @@ struct __toku_db_txn { ...@@ -381,7 +388,9 @@ struct __toku_db_txn {
DB_TXN *parent; /* 32-bit offset=4 size=4, 64=bit offset=8 size=8 */ DB_TXN *parent; /* 32-bit offset=4 size=4, 64=bit offset=8 size=8 */
int (*txn_stat)(DB_TXN *, struct txn_stat **); int (*txn_stat)(DB_TXN *, struct txn_stat **);
struct { void *next, *prev; } open_txns; struct { void *next, *prev; } open_txns;
void* __toku_dummy0[16]; int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*);
int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*);
void* __toku_dummy0[14];
char __toku_dummy1[8]; char __toku_dummy1[8];
void *api_internal; /* 32-bit offset=88 size=4, 64=bit offset=168 size=8 */ void *api_internal; /* 32-bit offset=88 size=4, 64=bit offset=168 size=8 */
void* __toku_dummy2[2]; void* __toku_dummy2[2];
......
...@@ -458,7 +458,6 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un ...@@ -458,7 +458,6 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
printf(" u_int64_t fsync_time; /* total time required to fsync */ \n"); printf(" u_int64_t fsync_time; /* total time required to fsync */ \n");
printf("} ENGINE_STATUS;\n"); printf("} ENGINE_STATUS;\n");
print_dbtype(); print_dbtype();
// print_db_notices(); // print_db_notices();
print_defines(); print_defines();
...@@ -544,10 +543,21 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un ...@@ -544,10 +543,21 @@ int main (int argc __attribute__((__unused__)), char *argv[] __attribute__((__un
print_struct("db_txn_active", 0, db_txn_active_fields32, db_txn_active_fields64, sizeof(db_txn_active_fields32)/sizeof(db_txn_active_fields32[0]), 0); print_struct("db_txn_active", 0, db_txn_active_fields32, db_txn_active_fields64, sizeof(db_txn_active_fields32)/sizeof(db_txn_active_fields32[0]), 0);
assert(sizeof(db_txn_fields32)==sizeof(db_txn_fields64)); assert(sizeof(db_txn_fields32)==sizeof(db_txn_fields64));
{ {
//txn progress info
printf("typedef struct __toku_txn_progress {\n");
printf(" uint64_t entries_total;\n");
printf(" uint64_t entries_processed;\n");
printf(" uint8_t is_commit;\n");
printf(" uint8_t stalled_on_checkpoint;\n");
printf("} *TOKU_TXN_PROGRESS, TOKU_TXN_PROGRESS_S;\n");
printf("typedef void(*TXN_PROGRESS_POLL_FUNCTION)(TOKU_TXN_PROGRESS, void*);\n");
printf("struct txn_stat {\n u_int64_t rolltmp_raw_count;\n};\n"); printf("struct txn_stat {\n u_int64_t rolltmp_raw_count;\n};\n");
const char *extra[] = { const char *extra[] = {
"int (*txn_stat)(DB_TXN *, struct txn_stat **)", "int (*txn_stat)(DB_TXN *, struct txn_stat **)",
"struct { void *next, *prev; } open_txns", "struct { void *next, *prev; } open_txns",
"int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*)",
"int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*)",
NULL, NULL,
}; };
print_struct("db_txn", INTERNAL_AT_END, db_txn_fields32, db_txn_fields64, sizeof(db_txn_fields32)/sizeof(db_txn_fields32[0]), extra); print_struct("db_txn", INTERNAL_AT_END, db_txn_fields32, db_txn_fields64, sizeof(db_txn_fields32)/sizeof(db_txn_fields32[0]), extra);
......
...@@ -320,6 +320,13 @@ struct __toku_db_txn_active { ...@@ -320,6 +320,13 @@ struct __toku_db_txn_active {
u_int32_t txnid; u_int32_t txnid;
DB_LSN lsn; DB_LSN lsn;
}; };
typedef struct __toku_txn_progress {
uint64_t entries_total;
uint64_t entries_processed;
uint8_t is_commit;
uint8_t stalled_on_checkpoint;
} *TOKU_TXN_PROGRESS, TOKU_TXN_PROGRESS_S;
typedef void(*TXN_PROGRESS_POLL_FUNCTION)(TOKU_TXN_PROGRESS, void*);
struct txn_stat { struct txn_stat {
u_int64_t rolltmp_raw_count; u_int64_t rolltmp_raw_count;
}; };
...@@ -328,6 +335,8 @@ struct __toku_db_txn { ...@@ -328,6 +335,8 @@ struct __toku_db_txn {
DB_TXN *parent; DB_TXN *parent;
int (*txn_stat)(DB_TXN *, struct txn_stat **); int (*txn_stat)(DB_TXN *, struct txn_stat **);
struct { void *next, *prev; } open_txns; struct { void *next, *prev; } open_txns;
int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*);
int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*);
void *api_internal; void *api_internal;
int (*abort) (DB_TXN *); int (*abort) (DB_TXN *);
int (*commit) (DB_TXN*, u_int32_t); int (*commit) (DB_TXN*, u_int32_t);
......
...@@ -320,6 +320,13 @@ struct __toku_db_txn_active { ...@@ -320,6 +320,13 @@ struct __toku_db_txn_active {
u_int32_t txnid; u_int32_t txnid;
DB_LSN lsn; DB_LSN lsn;
}; };
typedef struct __toku_txn_progress {
uint64_t entries_total;
uint64_t entries_processed;
uint8_t is_commit;
uint8_t stalled_on_checkpoint;
} *TOKU_TXN_PROGRESS, TOKU_TXN_PROGRESS_S;
typedef void(*TXN_PROGRESS_POLL_FUNCTION)(TOKU_TXN_PROGRESS, void*);
struct txn_stat { struct txn_stat {
u_int64_t rolltmp_raw_count; u_int64_t rolltmp_raw_count;
}; };
...@@ -328,6 +335,8 @@ struct __toku_db_txn { ...@@ -328,6 +335,8 @@ struct __toku_db_txn {
DB_TXN *parent; DB_TXN *parent;
int (*txn_stat)(DB_TXN *, struct txn_stat **); int (*txn_stat)(DB_TXN *, struct txn_stat **);
struct { void *next, *prev; } open_txns; struct { void *next, *prev; } open_txns;
int (*commit_with_progress)(DB_TXN*, uint32_t, TXN_PROGRESS_POLL_FUNCTION, void*);
int (*abort_with_progress)(DB_TXN*, TXN_PROGRESS_POLL_FUNCTION, void*);
void *api_internal; void *api_internal;
int (*abort) (DB_TXN *); int (*abort) (DB_TXN *);
int (*commit) (DB_TXN*, u_int32_t); int (*commit) (DB_TXN*, u_int32_t);
......
...@@ -120,6 +120,10 @@ struct tokutxn { ...@@ -120,6 +120,10 @@ struct tokutxn {
XIDS xids; //Represents the xid list XIDS xids; //Represents the xid list
BOOL force_fsync_on_commit; //This transaction NEEDS an fsync once (if) it commits. (commit means root txn) BOOL force_fsync_on_commit; //This transaction NEEDS an fsync once (if) it commits. (commit means root txn)
BOOL has_done_work; //If this transaction has not done work, there is no need to fsync. BOOL has_done_work; //If this transaction has not done work, there is no need to fsync.
TXN_PROGRESS_POLL_FUNCTION progress_poll_fun;
void * progress_poll_fun_extra;
uint64_t num_rollentries;
uint64_t num_rollentries_processed;
}; };
static inline int toku_logsizeof_u_int8_t (u_int32_t v __attribute__((__unused__))) { static inline int toku_logsizeof_u_int8_t (u_int32_t v __attribute__((__unused__))) {
......
...@@ -548,6 +548,7 @@ generate_rollbacks (void) { ...@@ -548,6 +548,7 @@ generate_rollbacks (void) {
fprintf(cf, " txn->newest_logentry = v;\n"); fprintf(cf, " txn->newest_logentry = v;\n");
fprintf(cf, " txn->rollentry_resident_bytecount += rollback_fsize;\n"); fprintf(cf, " txn->rollentry_resident_bytecount += rollback_fsize;\n");
fprintf(cf, " txn->rollentry_raw_count += rollback_fsize;\n"); fprintf(cf, " txn->rollentry_raw_count += rollback_fsize;\n");
fprintf(cf, " txn->num_rollentries++;\n");
fprintf(cf, " return toku_maybe_spill_rollbacks(txn);\n}\n"); fprintf(cf, " return toku_maybe_spill_rollbacks(txn);\n}\n");
}); });
......
...@@ -930,7 +930,7 @@ static int toku_recover_commit (struct logtype_commit *l, RECOVER_ENV renv) { ...@@ -930,7 +930,7 @@ static int toku_recover_commit (struct logtype_commit *l, RECOVER_ENV renv) {
} }
// commit the transaction // commit the transaction
r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn); r = toku_txn_commit_with_lsn(txn, TRUE, recover_yield, NULL, l->lsn, NULL, NULL);
assert(r == 0); assert(r == 0);
// close the transaction // close the transaction
...@@ -958,7 +958,7 @@ static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) { ...@@ -958,7 +958,7 @@ static int toku_recover_xabort (struct logtype_xabort *l, RECOVER_ENV renv) {
} }
// abort the transaction // abort the transaction
r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, l->lsn); r = toku_txn_abort_with_lsn(txn, recover_yield, NULL, l->lsn, NULL, NULL);
assert(r == 0); assert(r == 0);
// close the transaction // close the transaction
...@@ -1089,7 +1089,7 @@ static void recover_abort_live_txns(RECOVER_ENV renv) { ...@@ -1089,7 +1089,7 @@ static void recover_abort_live_txns(RECOVER_ENV renv) {
TOKUTXN txn = (TOKUTXN) v; TOKUTXN txn = (TOKUTXN) v;
// abort the transaction // abort the transaction
r = toku_txn_abort_txn(txn, recover_yield, NULL); r = toku_txn_abort_txn(txn, recover_yield, NULL, NULL, NULL);
assert(r == 0); assert(r == 0);
// close the transaction // close the transaction
......
...@@ -345,7 +345,9 @@ toku_rollback_tablelock_on_empty_table (FILENUM filenum, ...@@ -345,7 +345,9 @@ toku_rollback_tablelock_on_empty_table (FILENUM filenum,
LSN UU(oplsn)) LSN UU(oplsn))
{ {
//TODO: Replace truncate function with something that doesn't need to mess with checkpoints. //TODO: Replace truncate function with something that doesn't need to mess with checkpoints.
toku_poll_txn_progress_function(txn, FALSE, TRUE);
yield(toku_checkpoint_safe_client_lock, yield_v); yield(toku_checkpoint_safe_client_lock, yield_v);
toku_poll_txn_progress_function(txn, FALSE, FALSE);
// on rollback we have to make the file be empty, since we locked an empty table, and then may have done things to it. // on rollback we have to make the file be empty, since we locked an empty table, and then may have done things to it.
CACHEFILE cf; CACHEFILE cf;
......
...@@ -7,17 +7,34 @@ ...@@ -7,17 +7,34 @@
static void note_txn_closing (TOKUTXN txn); static void note_txn_closing (TOKUTXN txn);
void
toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint) {
if (txn->progress_poll_fun) {
TOKU_TXN_PROGRESS_S progress = {
.entries_total = txn->num_rollentries,
.entries_processed = txn->num_rollentries_processed,
.is_commit = is_commit,
.stalled_on_checkpoint = stall_for_checkpoint};
txn->progress_poll_fun(&progress, txn->progress_poll_fun_extra);
}
}
int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn) { int toku_commit_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn) {
int r=0; int r=0;
rolltype_dispatch_assign(item, toku_commit_, r, txn, yield, yieldv, lsn); rolltype_dispatch_assign(item, toku_commit_, r, txn, yield, yieldv, lsn);
txn->num_rollentries_processed++;
if (txn->num_rollentries_processed % 1024 == 0)
toku_poll_txn_progress_function(txn, TRUE, FALSE);
return r; return r;
} }
int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn) { int toku_abort_rollback_item (TOKUTXN txn, struct roll_entry *item, YIELDF yield, void*yieldv, LSN lsn) {
int r=0; int r=0;
rolltype_dispatch_assign(item, toku_rollback_, r, txn, yield, yieldv, lsn); rolltype_dispatch_assign(item, toku_rollback_, r, txn, yield, yieldv, lsn);
if (r!=0) return r; txn->num_rollentries_processed++;
return 0; if (txn->num_rollentries_processed % 1024 == 0)
toku_poll_txn_progress_function(txn, FALSE, FALSE);
return r;
} }
void toku_rollback_txn_close (TOKUTXN txn) { void toku_rollback_txn_close (TOKUTXN txn) {
...@@ -156,6 +173,7 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) { ...@@ -156,6 +173,7 @@ int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn) {
//save that in the parent. Since the commit really happens in the root txn. //save that in the parent. Since the commit really happens in the root txn.
txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit; txn->parent->force_fsync_on_commit |= txn->force_fsync_on_commit;
txn->parent->has_done_work |= txn->has_done_work; txn->parent->has_done_work |= txn->has_done_work;
txn->parent->num_rollentries += txn->num_rollentries;
} else { } else {
// do the commit calls and free everything // do the commit calls and free everything
// we do the commit calls in reverse order too. // we do the commit calls in reverse order too.
......
...@@ -9,6 +9,7 @@ ...@@ -9,6 +9,7 @@
// these routines in rollback.c // these routines in rollback.c
void toku_poll_txn_progress_function(TOKUTXN txn, uint8_t is_commit, uint8_t stall_for_checkpoint);
int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn); int toku_rollback_commit(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn);
int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn); int toku_rollback_abort(TOKUTXN txn, YIELDF yield, void*yieldv, LSN lsn);
void toku_rollback_txn_close (TOKUTXN txn); void toku_rollback_txn_close (TOKUTXN txn);
......
...@@ -36,6 +36,10 @@ int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGE ...@@ -36,6 +36,10 @@ int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGE
result->oldest_logentry = result->newest_logentry = 0; result->oldest_logentry = result->newest_logentry = 0;
result->rollentry_arena = memarena_create(); result->rollentry_arena = memarena_create();
result->num_rollentries = 0;
result->num_rollentries_processed = 0;
result->progress_poll_fun = NULL;
result->progress_poll_fun_extra = NULL;
if (toku_omt_size(logger->live_txns) == 0) { if (toku_omt_size(logger->live_txns) == 0) {
assert(logger->oldest_living_xid == TXNID_NONE_LIVING); assert(logger->oldest_living_xid == TXNID_NONE_LIVING);
...@@ -73,17 +77,22 @@ died: ...@@ -73,17 +77,22 @@ died:
// Doesn't close the txn, just performs the commit operations. // Doesn't close the txn, just performs the commit operations.
int toku_txn_commit_txn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv) { int toku_txn_commit_txn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv,
return toku_txn_commit_with_lsn(txn, nosync, yield, yieldv, ZERO_LSN); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) {
return toku_txn_commit_with_lsn(txn, nosync, yield, yieldv, ZERO_LSN, poll, poll_extra);
} }
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn) { int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) {
int r; int r;
// panic handled in log_commit // panic handled in log_commit
//Child transactions do not actually 'commit'. They promote their changes to parent, so no need to fsync if this txn has a parent. //Child transactions do not actually 'commit'. They promote their changes to parent, so no need to fsync if this txn has a parent.
int do_fsync = !txn->parent && (txn->force_fsync_on_commit || (!nosync && txn->has_done_work)); int do_fsync = !txn->parent && (txn->force_fsync_on_commit || (!nosync && txn->has_done_work));
txn->progress_poll_fun = poll;
txn->progress_poll_fun_extra = poll_extra;
r = toku_log_commit(txn->logger, (LSN*)0, do_fsync, txn->txnid64); // exits holding neither of the tokulogger locks. r = toku_log_commit(txn->logger, (LSN*)0, do_fsync, txn->txnid64); // exits holding neither of the tokulogger locks.
if (r!=0) if (r!=0)
return r; return r;
...@@ -92,15 +101,19 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv ...@@ -92,15 +101,19 @@ int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv
} }
// Doesn't close the txn, just performs the abort operations. // Doesn't close the txn, just performs the abort operations.
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv) { int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv,
return toku_txn_abort_with_lsn(txn, yield, yieldv, ZERO_LSN); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) {
return toku_txn_abort_with_lsn(txn, yield, yieldv, ZERO_LSN, poll, poll_extra);
} }
int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn) { int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra) {
//printf("%s:%d aborting\n", __FILE__, __LINE__); //printf("%s:%d aborting\n", __FILE__, __LINE__);
// Must undo everything. Must undo it all in reverse order. // Must undo everything. Must undo it all in reverse order.
// Build the reverse list // Build the reverse list
//printf("%s:%d abort\n", __FILE__, __LINE__); //printf("%s:%d abort\n", __FILE__, __LINE__);
txn->progress_poll_fun = poll;
txn->progress_poll_fun_extra = poll_extra;
int r=0; int r=0;
r = toku_log_xabort(txn->logger, (LSN*)0, 0, txn->txnid64); r = toku_log_xabort(txn->logger, (LSN*)0, 0, txn->txnid64);
if (r!=0) if (r!=0)
......
...@@ -8,11 +8,15 @@ ...@@ -8,11 +8,15 @@
int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger); int toku_txn_begin_txn (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger);
int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger, TXNID xid); int toku_txn_begin_with_xid (TOKUTXN parent_tokutxn, TOKUTXN *tokutxn, TOKULOGGER logger, TXNID xid);
int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv); int toku_txn_commit_txn (TOKUTXN txn, int nosync, YIELDF yield, void *yieldv,
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
int toku_txn_commit_with_lsn(TOKUTXN txn, int nosync, YIELDF yield, void *yieldv, LSN oplsn,
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv); TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn);
int toku_txn_abort_txn(TOKUTXN txn, YIELDF yield, void *yieldv,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
int toku_txn_abort_with_lsn(TOKUTXN txn, YIELDF yield, void *yieldv, LSN oplsn,
TXN_PROGRESS_POLL_FUNCTION poll, void *poll_extra);
void toku_txn_close_txn(TOKUTXN txn); void toku_txn_close_txn(TOKUTXN txn);
XIDS toku_txn_get_xids (TOKUTXN); XIDS toku_txn_get_xids (TOKUTXN);
......
/* -*- mode: C; c-basic-offset: 4 -*- */
#ident "Copyright (c) 2010 Tokutek Inc. All rights reserved."
#include "test.h"
/*
- ydb layer test of progress report on commit, abort.
- test1:
create two txns
perform operations (inserts and deletes)
commit or abort inner txn
if abort, verify progress callback was called with correct args
if commit, verify progress callback was not called
commit or abort outer txn
verify progress callback was called with correct args
Note: inner loop ends with commit, so when outer loop completes,
it should be called for all operations performed by inner loop.
perform_ops {
for i = 0 -> 5 {
for j = 0 -> 1023
if (j & 0x20) insert
else delete
}
verify (n) {
verify that callback was called n times with correct args
}
test1:
for c0 = 0, 1 {
for c1 = 0, 1 {
begin txn0
perform_ops (txn0)
begin txn1
perform ops (tnx1)
if c1
abort txn1
verify (n)
else
commit txn1
verify (0)
}
if c0
abort txn0
verify (2n)
else
commit txn0
verify (2n)
}
- test2
- create empty dictionary
- begin txn
- lock empty dictionary (full range lock)
- abort
- verify that callback was called twice, first with stalled-on-checkpoint true, then with stalled-on-checkpoint false
*/
#define DICT_0 "dict_0.db"
static DB_ENV *env = NULL;
static DB_TXN *txn_parent = NULL;
static DB_TXN *txn_child = NULL;
static DB *db;
static char *dname = DICT_0;
static DBT key;
static DBT val;
static void start_txn(void);
static void commit_txn(int);
static void open_db(void);
static void close_db(void);
static void insert(void);
static void delete(void);
static void
start_env(void) {
assert(env==NULL);
system("rm -rf " ENVDIR);
toku_os_mkdir(ENVDIR, S_IRWXU+S_IRWXG+S_IRWXO);
int r;
r = db_env_create(&env, 0);
CKERR(r);
r = env->open(env, ENVDIR, DB_INIT_LOCK|DB_INIT_LOG|DB_INIT_MPOOL|DB_INIT_TXN|DB_CREATE|DB_PRIVATE, S_IRWXU+S_IRWXG+S_IRWXO);
CKERR(r);
dname = DICT_0;
dbt_init(&key, "key", strlen("key")+1);
dbt_init(&val, "val", strlen("val")+1);
open_db();
close_db();
}
static void
end_env(void) {
int r;
r=env->close(env, 0);
CKERR(r);
env = NULL;
}
static void
start_txn(void) {
assert(env!=NULL);
int r;
if (!txn_parent) {
r=env->txn_begin(env, 0, &txn_parent, 0);
}
else {
assert(!txn_child);
r=env->txn_begin(env, txn_parent, &txn_child, 0);
}
CKERR(r);
}
struct progress_expect {
int num_calls;
uint8_t is_commit_expected;
uint8_t stalled_on_checkpoint_expected;
uint64_t min_entries_total_expected;
uint64_t last_entries_processed;
};
static void poll(TOKU_TXN_PROGRESS progress, void *extra) {
struct progress_expect *info = extra;
info->num_calls++;
assert(progress->is_commit == info->is_commit_expected);
assert(progress->stalled_on_checkpoint == info->stalled_on_checkpoint_expected);
assert(progress->entries_total >= info->min_entries_total_expected);
assert(progress->entries_processed == 1024 + info->last_entries_processed);
info->last_entries_processed = progress->entries_processed;
}
//expect_number_polls is number of times polling function should be called.
static void
abort_txn(int expect_number_polls) {
assert(env!=NULL);
DB_TXN *txn;
BOOL child;
if (txn_child) {
txn = txn_child;
child = TRUE;
}
else {
txn = txn_parent;
child = FALSE;
}
assert(txn);
struct progress_expect extra = {
.num_calls = 0,
.is_commit_expected = 0,
.stalled_on_checkpoint_expected = 0,
.min_entries_total_expected = expect_number_polls * 1024,
.last_entries_processed = 0
};
int r;
r=txn->abort_with_progress(txn, poll, &extra);
CKERR(r);
assert(extra.num_calls == expect_number_polls);
if (child)
txn_child = NULL;
else
txn_parent = NULL;
}
static void
commit_txn(int expect_number_polls) {
assert(env!=NULL);
DB_TXN *txn;
BOOL child;
if (txn_child) {
txn = txn_child;
child = TRUE;
}
else {
txn = txn_parent;
child = FALSE;
}
assert(txn);
if (child)
assert(expect_number_polls == 0);
struct progress_expect extra = {
.num_calls = 0,
.is_commit_expected = 1,
.stalled_on_checkpoint_expected = 0,
.min_entries_total_expected = expect_number_polls * 1024,
.last_entries_processed = 0
};
int r;
r=txn->commit_with_progress(txn, 0, poll, &extra);
CKERR(r);
assert(extra.num_calls == expect_number_polls);
if (child)
txn_child = NULL;
else
txn_parent = NULL;
}
static void
open_db(void) {
assert(env!=NULL);
assert(db == NULL);
int r;
r = db_create(&db, env, 0);
CKERR(r);
r=db->open(db, NULL, dname, 0, DB_BTREE, DB_CREATE, S_IRWXU+S_IRWXG+S_IRWXO);
CKERR(r);
}
static void
close_db(void) {
assert(env!=NULL);
assert(db != NULL);
int r;
r = db->close(db, 0);
CKERR(r);
db = NULL;
}
static void
insert(void) {
assert(env!=NULL);
assert(db!=NULL);
DB_TXN *txn = txn_child ? txn_child : txn_parent;
assert(txn);
int r=db->put(db, txn,
&key,
&val,
DB_YESOVERWRITE);
CKERR(r);
}
static void
delete(void) {
assert(env!=NULL);
assert(db!=NULL);
DB_TXN *txn = txn_child ? txn_child : txn_parent;
assert(txn);
int r=db->del(db, txn,
&key,
DB_DELETE_ANY);
CKERR(r);
}
static void
perform_ops(int n) {
int i;
int j;
for (i = 0; i < n; i++) {
for (j = 0; j < 1024; j++) {
if (j & 0x20)
delete();
else
insert();
}
}
}
static void
progress_test_1(int n, int commit) {
start_env();
open_db();
{
start_txn();
{
start_txn();
perform_ops(n);
abort_txn(n);
}
{
start_txn();
perform_ops(n);
commit_txn(0);
}
perform_ops(n);
if (commit)
commit_txn(2*n);
else
abort_txn(2*n);
}
close_db();
end_env();
}
struct progress_stall_expect {
int num_calls;
BOOL has_been_stalled;
};
static void stall_poll(TOKU_TXN_PROGRESS progress, void *extra) {
struct progress_stall_expect *info = extra;
info->num_calls++;
assert(info->num_calls <= 2);
assert(progress->is_commit == FALSE);
if (!info->has_been_stalled) {
assert(info->num_calls==1);
assert(progress->stalled_on_checkpoint);
info->has_been_stalled = TRUE;
}
else {
assert(info->num_calls==2);
assert(!progress->stalled_on_checkpoint);
}
}
static void
abort_txn_stall_checkpoint(void) {
assert(env!=NULL);
assert(txn_parent);
assert(!txn_child);
struct progress_stall_expect extra = {
.num_calls = 0,
.has_been_stalled = FALSE
};
int r;
r=txn_parent->abort_with_progress(txn_parent, stall_poll, &extra);
CKERR(r);
assert(extra.num_calls == 2);
txn_parent = NULL;
}
static void
lock(void) {
assert(env!=NULL);
assert(db!=NULL);
assert(txn_parent);
assert(!txn_child);
int r=db->pre_acquire_table_lock(db, txn_parent);
CKERR(r);
}
static void
progress_test_2(void) {
start_env();
open_db();
start_txn();
lock();
abort_txn_stall_checkpoint();
close_db();
end_env();
}
int
test_main (int argc, char *argv[])
{
parse_args(argc, argv);
int commit;
for (commit = 0; commit <= 1; commit++) {
progress_test_1(4, commit);
}
progress_test_2();
return 0;
}
...@@ -133,7 +133,7 @@ static void env_remove_open_txn(DB_ENV *UU(env), DB_TXN *txn) { ...@@ -133,7 +133,7 @@ static void env_remove_open_txn(DB_ENV *UU(env), DB_TXN *txn) {
toku_list_remove((struct toku_list *) (void *) &txn->open_txns); toku_list_remove((struct toku_list *) (void *) &txn->open_txns);
} }
static int toku_txn_abort(DB_TXN * txn); static int toku_txn_abort(DB_TXN * txn, TXN_PROGRESS_POLL_FUNCTION, void*);
/* db methods */ /* db methods */
static inline int db_opened(DB *db) { static inline int db_opened(DB *db) {
...@@ -366,7 +366,7 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db ...@@ -366,7 +366,7 @@ static int toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *db
static int toku_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t min, u_int32_t flags); static int toku_env_txn_checkpoint(DB_ENV * env, u_int32_t kbyte, u_int32_t min, u_int32_t flags);
static int toku_db_close(DB * db, u_int32_t flags); static int toku_db_close(DB * db, u_int32_t flags);
static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, int internal); static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, int internal);
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags); static int toku_txn_commit(DB_TXN * txn, u_int32_t flags, TXN_PROGRESS_POLL_FUNCTION, void*);
static int db_open_iname(DB * db, DB_TXN * txn, const char *iname, u_int32_t flags, int mode); static int db_open_iname(DB * db, DB_TXN * txn, const char *iname, u_int32_t flags, int mode);
static void finalize_file_removal(int fd, void * extra); static void finalize_file_removal(int fd, void * extra);
...@@ -710,7 +710,7 @@ toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) { ...@@ -710,7 +710,7 @@ toku_env_open(DB_ENV * env, const char *home, u_int32_t flags, int mode) {
assert(r==0); assert(r==0);
} }
if (using_txns) { if (using_txns) {
r = toku_txn_commit(txn, 0); r = toku_txn_commit(txn, 0, NULL, NULL);
assert(r==0); assert(r==0);
} }
toku_ydb_unlock(); toku_ydb_unlock();
...@@ -1602,13 +1602,14 @@ static void ydb_yield (voidfp f, void *UU(v)) { ...@@ -1602,13 +1602,14 @@ static void ydb_yield (voidfp f, void *UU(v)) {
toku_ydb_lock(); toku_ydb_lock();
} }
static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) { static int toku_txn_commit(DB_TXN * txn, u_int32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
if (!txn) return EINVAL; if (!txn) return EINVAL;
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
//Recursively kill off children //Recursively kill off children
if (db_txn_struct_i(txn)->child) { if (db_txn_struct_i(txn)->child) {
//commit of child sets the child pointer to NULL //commit of child sets the child pointer to NULL
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, flags); int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, flags, NULL, NULL);
if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) { if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) {
txn->mgrp->i->is_panicked = r_child; txn->mgrp->i->is_panicked = r_child;
txn->mgrp->i->panic_string = toku_strdup("Recursive child commit failed during parent commit.\n"); txn->mgrp->i->panic_string = toku_strdup("Recursive child commit failed during parent commit.\n");
...@@ -1636,12 +1637,12 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) { ...@@ -1636,12 +1637,12 @@ static int toku_txn_commit(DB_TXN * txn, u_int32_t flags) {
// frees the tokutxn // frees the tokutxn
// Calls ydb_yield(NULL) occasionally // Calls ydb_yield(NULL) occasionally
//r = toku_logger_abort(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL); //r = toku_logger_abort(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL);
r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL); r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL, poll, poll_extra);
else else
// frees the tokutxn // frees the tokutxn
// Calls ydb_yield(NULL) occasionally // Calls ydb_yield(NULL) occasionally
//r = toku_logger_commit(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL); //r = toku_logger_commit(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL);
r = toku_txn_commit_txn(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL); r = toku_txn_commit_txn(db_txn_struct_i(txn)->tokutxn, nosync, ydb_yield, NULL, poll, poll_extra);
if (r!=0 && !toku_env_is_panicked(txn->mgrp)) { if (r!=0 && !toku_env_is_panicked(txn->mgrp)) {
txn->mgrp->i->is_panicked = r; txn->mgrp->i->is_panicked = r;
...@@ -1689,12 +1690,13 @@ static u_int32_t toku_txn_id(DB_TXN * txn) { ...@@ -1689,12 +1690,13 @@ static u_int32_t toku_txn_id(DB_TXN * txn) {
return -1; return -1;
} }
static int toku_txn_abort(DB_TXN * txn) { static int toku_txn_abort(DB_TXN * txn,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
HANDLE_PANICKED_ENV(txn->mgrp); HANDLE_PANICKED_ENV(txn->mgrp);
//Recursively kill off children (abort or commit are both correct, commit is cheaper) //Recursively kill off children (abort or commit are both correct, commit is cheaper)
if (db_txn_struct_i(txn)->child) { if (db_txn_struct_i(txn)->child) {
//commit of child sets the child pointer to NULL //commit of child sets the child pointer to NULL
int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, DB_TXN_NOSYNC); int r_child = toku_txn_commit(db_txn_struct_i(txn)->child, DB_TXN_NOSYNC, NULL, NULL);
if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) { if (r_child !=0 && !toku_env_is_panicked(txn->mgrp)) {
txn->mgrp->i->is_panicked = r_child; txn->mgrp->i->is_panicked = r_child;
txn->mgrp->i->panic_string = toku_strdup("Recursive child commit failed during parent abort.\n"); txn->mgrp->i->panic_string = toku_strdup("Recursive child commit failed during parent abort.\n");
...@@ -1714,7 +1716,7 @@ static int toku_txn_abort(DB_TXN * txn) { ...@@ -1714,7 +1716,7 @@ static int toku_txn_abort(DB_TXN * txn) {
assert(toku_list_empty(&db_txn_struct_i(txn)->dbs_that_must_close_before_abort)); assert(toku_list_empty(&db_txn_struct_i(txn)->dbs_that_must_close_before_abort));
//int r = toku_logger_abort(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL); //int r = toku_logger_abort(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL);
int r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL); int r = toku_txn_abort_txn(db_txn_struct_i(txn)->tokutxn, ydb_yield, NULL, poll, poll_extra);
if (r!=0 && !toku_env_is_panicked(txn->mgrp)) { if (r!=0 && !toku_env_is_panicked(txn->mgrp)) {
txn->mgrp->i->is_panicked = r; txn->mgrp->i->is_panicked = r;
txn->mgrp->i->panic_string = toku_strdup("Error during abort.\n"); txn->mgrp->i->panic_string = toku_strdup("Error during abort.\n");
...@@ -1750,20 +1752,34 @@ static int locked_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) { ...@@ -1750,20 +1752,34 @@ static int locked_txn_stat (DB_TXN *txn, struct txn_stat **txn_stat) {
toku_ydb_lock(); u_int32_t r = toku_txn_stat(txn, txn_stat); toku_ydb_unlock(); return r; toku_ydb_lock(); u_int32_t r = toku_txn_stat(txn, txn_stat); toku_ydb_unlock(); return r;
} }
static int locked_txn_commit(DB_TXN *txn, u_int32_t flags) { static int locked_txn_commit_with_progress(DB_TXN *txn, u_int32_t flags,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
toku_multi_operation_client_lock(); //Cannot checkpoint during a commit. toku_multi_operation_client_lock(); //Cannot checkpoint during a commit.
toku_ydb_lock(); int r = toku_txn_commit(txn, flags); toku_ydb_unlock(); toku_ydb_lock(); int r = toku_txn_commit(txn, flags, poll, poll_extra); toku_ydb_unlock();
toku_multi_operation_client_unlock(); //Cannot checkpoint during a commit. toku_multi_operation_client_unlock(); //Cannot checkpoint during a commit.
return r; return r;
} }
static int locked_txn_abort(DB_TXN *txn) { static int locked_txn_abort_with_progress(DB_TXN *txn,
TXN_PROGRESS_POLL_FUNCTION poll, void* poll_extra) {
toku_multi_operation_client_lock(); //Cannot checkpoint during an abort. toku_multi_operation_client_lock(); //Cannot checkpoint during an abort.
toku_ydb_lock(); int r = toku_txn_abort(txn); toku_ydb_unlock(); toku_ydb_lock(); int r = toku_txn_abort(txn, poll, poll_extra); toku_ydb_unlock();
toku_multi_operation_client_unlock(); //Cannot checkpoint during an abort. toku_multi_operation_client_unlock(); //Cannot checkpoint during an abort.
return r; return r;
} }
static int locked_txn_commit(DB_TXN *txn, u_int32_t flags) {
int r;
r = locked_txn_commit_with_progress(txn, flags, NULL, NULL);
return r;
}
static int locked_txn_abort(DB_TXN *txn) {
int r;
r = locked_txn_abort_with_progress(txn, NULL, NULL);
return r;
}
static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, int internal) { static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t flags, int internal) {
HANDLE_PANICKED_ENV(env); HANDLE_PANICKED_ENV(env);
HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, stxn); //Cannot create child while child already exists. HANDLE_ILLEGAL_WORKING_PARENT_TXN(env, stxn); //Cannot create child while child already exists.
...@@ -1813,11 +1829,17 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f ...@@ -1813,11 +1829,17 @@ static int toku_txn_begin(DB_ENV *env, DB_TXN * stxn, DB_TXN ** txn, u_int32_t f
memset(result, 0, result_size); memset(result, 0, result_size);
//toku_ydb_notef("parent=%p flags=0x%x\n", stxn, flags); //toku_ydb_notef("parent=%p flags=0x%x\n", stxn, flags);
result->mgrp = env; result->mgrp = env;
result->abort = locked_txn_abort; #define STXN(name) result->name = locked_txn_ ## name
result->commit = locked_txn_commit; STXN(abort);
result->id = locked_txn_id; STXN(commit);
result->parent = stxn; STXN(abort_with_progress);
STXN(commit_with_progress);
STXN(id);
#undef STXN
result->txn_stat = locked_txn_stat; result->txn_stat = locked_txn_stat;
result->parent = stxn;
#if !TOKUDB_NATIVE_H #if !TOKUDB_NATIVE_H
MALLOC(db_txn_struct_i(result)); MALLOC(db_txn_struct_i(result));
if (!db_txn_struct_i(result)) { if (!db_txn_struct_i(result)) {
...@@ -4087,11 +4109,11 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP ...@@ -4087,11 +4109,11 @@ toku_db_open(DB * db, DB_TXN * txn, const char *fname, const char *dbname, DBTYP
if (using_txns) { if (using_txns) {
// close txn // close txn
if (r == 0) { // commit if (r == 0) { // commit
r = toku_txn_commit(child, DB_TXN_NOSYNC); r = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL);
assert(r==0); // TODO panic assert(r==0); // TODO panic
} }
else { // abort else { // abort
int r2 = toku_txn_abort(child); int r2 = toku_txn_abort(child, NULL, NULL);
assert(r2==0); // TODO panic assert(r2==0); // TODO panic
} }
} }
...@@ -4475,11 +4497,11 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna ...@@ -4475,11 +4497,11 @@ toku_env_dbremove(DB_ENV * env, DB_TXN *txn, const char *fname, const char *dbna
if (using_txns) { if (using_txns) {
// close txn // close txn
if (r == 0) { // commit if (r == 0) { // commit
r = toku_txn_commit(child, DB_TXN_NOSYNC); r = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL);
assert(r==0); // TODO panic assert(r==0); // TODO panic
} }
else { // abort else { // abort
int r2 = toku_txn_abort(child); int r2 = toku_txn_abort(child, NULL, NULL);
assert(r2==0); // TODO panic assert(r2==0); // TODO panic
} }
} }
...@@ -4596,11 +4618,11 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam ...@@ -4596,11 +4618,11 @@ toku_env_dbrename(DB_ENV *env, DB_TXN *txn, const char *fname, const char *dbnam
if (using_txns) { if (using_txns) {
// close txn // close txn
if (r == 0) { // commit if (r == 0) { // commit
r = toku_txn_commit(child, DB_TXN_NOSYNC); r = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL);
assert(r==0); // TODO panic assert(r==0); // TODO panic
} }
else { // abort else { // abort
int r2 = toku_txn_abort(child); int r2 = toku_txn_abort(child, NULL, NULL);
assert(r2==0); // TODO panic assert(r2==0); // TODO panic
} }
} }
...@@ -4809,8 +4831,8 @@ static inline int toku_db_construct_autotxn(DB* db, DB_TXN **txn, BOOL* changed, ...@@ -4809,8 +4831,8 @@ static inline int toku_db_construct_autotxn(DB* db, DB_TXN **txn, BOOL* changed,
static inline int toku_db_destruct_autotxn(DB_TXN *txn, int r, BOOL changed) { static inline int toku_db_destruct_autotxn(DB_TXN *txn, int r, BOOL changed) {
if (!changed) return r; if (!changed) return r;
if (r==0) return toku_txn_commit(txn, 0); if (r==0) return toku_txn_commit(txn, 0, NULL, NULL);
toku_txn_abort(txn); toku_txn_abort(txn, NULL, NULL);
return r; return r;
} }
...@@ -5300,11 +5322,11 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname ...@@ -5300,11 +5322,11 @@ ydb_load_inames(DB_ENV * env, DB_TXN * txn, int N, DB * dbs[N], char * new_iname
if (using_txns) { if (using_txns) {
// close txn // close txn
if (rval == 0) { // all well so far, commit child if (rval == 0) { // all well so far, commit child
rval = toku_txn_commit(child, DB_TXN_NOSYNC); rval = toku_txn_commit(child, DB_TXN_NOSYNC, NULL, NULL);
assert(rval==0); assert(rval==0);
} }
else { // abort child else { // abort child
int r2 = toku_txn_abort(child); int r2 = toku_txn_abort(child, NULL, NULL);
assert(r2==0); assert(r2==0);
for (i=0; i<N; i++) { for (i=0; i<N; i++) {
if (new_inames[i]) { if (new_inames[i]) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment