Commit 4e51f48d authored by Juho Snellman's avatar Juho Snellman

Tweak the benchmark a little bit more

- Add one more distinct timer type, which is occasionally
  rescheduled to happen earlier than before and occasionally
  canceled.
- Add some comments, in case someone else wants to port the
  benchmark to another library.
- Print a "total message count" field in the result CSV, as
  a quick way of verifying the results are correct.
parent f415004b
...@@ -16,37 +16,57 @@ static bool allow_schedule_in_range = true; ...@@ -16,37 +16,57 @@ static bool allow_schedule_in_range = true;
// if using non-deterministic features like schedule_in_range). // if using non-deterministic features like schedule_in_range).
static bool print_trace = true; static bool print_trace = true;
static int pair_count = 5; static int pair_count = 5;
// The total number of response messages received on all units.
// Printed in the final output, useful as a poor man's output checksum.
static long total_rx_count = 0;
// Pretend we're using timer ticks of 20 microseconds. So 50000 ticks
// is one second.
static Tick time_ms = 50;
static Tick time_s = 1000*time_ms;
class Unit { class Unit {
public: public:
Unit(TimerWheel* timers, int request_interval=1000*50) Unit(TimerWheel* timers, int request_interval=1*time_s)
: timers_(timers), : timers_(timers),
idle_timer_(this), idle_timer_(this),
close_timer_(this), close_timer_(this),
pace_timer_(this), pace_timer_(this),
request_timer_(this), request_timer_(this),
request_deadline_timer_(this),
id_(id_counter_++), id_(id_counter_++),
request_interval_(request_interval) { request_interval_ticks_(request_interval) {
} }
~Unit() { ~Unit() {
if (print_trace) { if (print_trace) {
printf("delete %d, rx-count=%d\n", id_, rx_count_); printf("delete %d, rx-count=%d\n", id_, rx_count_);
} }
total_rx_count += rx_count_;
} }
// Create a full work unit from the two halves.
void pair_with(Unit* other) {
other_ = other;
}
// Start the benchmark, acting either as the client or the server.
void start(bool server) { void start(bool server) {
unidle(); unidle();
timers_->schedule(&close_timer_, 180*1000*50); // Start shutdown of this work unit in 180s.
timers_->schedule(&close_timer_, 180*time_s);
if (!server) { if (!server) {
// Fire off the first server from the client.
on_request(); on_request();
} }
} }
// Queue "count" messages to be transmitted.
void transmit(int count) { void transmit(int count) {
tx_count_ += count; tx_count_ += count;
deliver(); deliver();
} }
// Deliver as many response messages as we have quota for. Then
// start off a timer to refresh the quota.
void deliver() { void deliver() {
unidle(); unidle();
int amount = std::min(pace_quota_, tx_count_); int amount = std::min(pace_quota_, tx_count_);
...@@ -57,71 +77,104 @@ public: ...@@ -57,71 +77,104 @@ public:
timers_->schedule(&pace_timer_, pace_interval_ticks_); timers_->schedule(&pace_timer_, pace_interval_ticks_);
} }
} }
// Receive some number of response messages.
void receive(int count) { void receive(int count) {
unidle(); unidle();
// Receive the first response to a given request. Move the
// deadline timer back in time (since this connection is now
// clearly active).
if (waiting_for_response_) {
timers_->schedule(&request_deadline_timer_,
pace_interval_ticks_ * RESPONSE_SIZE * 2);
waiting_for_response_ = false;
}
rx_count_++; rx_count_++;
if (rx_count_ % 100 == 0) { // We've received the full response. Stop the deadline timer,
timers_->schedule(&request_timer_, request_interval_); // and start another timer that'll trigger the next request.
if (rx_count_ % RESPONSE_SIZE == 0) {
request_deadline_timer_.cancel();
timers_->schedule(&request_timer_, request_interval_ticks_);
} }
} }
void pair_with(Unit* other) {
other_ = other;
}
// First time this timer gets executed, we put the object into a
// closing state where it'll start winding down work. Then we
// forcibly close it a bit later. We do it like this to remove any
// non-determinism between the execution order of the close timer
// and the pace timer.
void on_close() { void on_close() {
if (closing_) { if (closing_) {
delete this; delete this;
} else { } else {
// First time the timeout triggers, we just put the
// object into a closing state where it'll start winding down
// work. Then we forcibly close it a bit later. We do it like
// this to remove any non-determinism between the execution order
// of the close timer and the pace timer.
closing_ = true; closing_ = true;
timers_->schedule(&close_timer_, 10*1000*50); timers_->schedule(&close_timer_, 10*time_s);
} }
} }
// Refresh transmit quota.
void on_pace() { void on_pace() {
if (tx_count_) { if (tx_count_) {
pace_quota_ = 1; pace_quota_ = 1;
deliver(); deliver();
} }
} }
// The endpoint has been idle for too long, kill it.
void on_idle() { void on_idle() {
delete this; delete this;
} }
// Send a new request (unless were draining traffic).
void on_request() { void on_request() {
if (!closing_) { if (!closing_) {
other_->transmit(100); // Expect a response within this time.
timers_->schedule(&request_deadline_timer_,
pace_interval_ticks_ * RESPONSE_SIZE * 4);
waiting_for_response_ = true;
other_->transmit(RESPONSE_SIZE);
} }
} }
// We've done some work. Move the idle timer further into the future.
void unidle() { void unidle() {
if (allow_schedule_in_range) { if (allow_schedule_in_range) {
timers_->schedule_in_range(&idle_timer_, timers_->schedule_in_range(&idle_timer_, 60*time_s,
60*1000*50, 61*time_s);
61*1000*50);
} else { } else {
timers_->schedule(&idle_timer_, 60*1000*50); timers_->schedule(&idle_timer_, 60*time_s);
} }
} }
// Something has gone wrong. Forcibly close down both sides.
void on_request_deadline() {
fprintf(stderr, "Request did not finish by deadline\n");
delete this;
delete other_;
}
private: private:
TimerWheel* timers_; TimerWheel* timers_;
// This timer gets rescheduled far into the future at very frequent
// intervals.
MemberTimerEvent<Unit, &Unit::on_idle> idle_timer_; MemberTimerEvent<Unit, &Unit::on_idle> idle_timer_;
// This timers gets scheduled twice, and executed twice.
MemberTimerEvent<Unit, &Unit::on_close> close_timer_; MemberTimerEvent<Unit, &Unit::on_close> close_timer_;
// This gets scheduled very soon at frequent intervals, and is always
// executed.
MemberTimerEvent<Unit, &Unit::on_pace> pace_timer_; MemberTimerEvent<Unit, &Unit::on_pace> pace_timer_;
// This gets scheduled about 150-200 times during the benchmark a
// medium duration from now, and is always executed
MemberTimerEvent<Unit, &Unit::on_request> request_timer_; MemberTimerEvent<Unit, &Unit::on_request> request_timer_;
// This gets scheduled at a medium duration 150-200 times during a
// benchmark, but always gets canceled (not rescheduled).
MemberTimerEvent<Unit, &Unit::on_request_deadline> request_deadline_timer_;
static int id_counter_; static int id_counter_;
const static int RESPONSE_SIZE = 128;
int id_; int id_;
int tx_count_ = 0; int tx_count_ = 0;
int rx_count_ = 0; int rx_count_ = 0;
Unit* other_ = NULL; Unit* other_ = NULL;
int pace_quota_ = 1; int pace_quota_ = 1;
int pace_interval_ticks_ = 10; int pace_interval_ticks_ = 10;
int request_interval_ = 100; int request_interval_ticks_;
bool closing_ = false; bool closing_ = false;
bool waiting_for_response_ = false;
}; };
int Unit::id_counter_ = 0; int Unit::id_counter_ = 0;
...@@ -139,26 +192,26 @@ static void make_unit_pair(TimerWheel* timers, int request_interval) { ...@@ -139,26 +192,26 @@ static void make_unit_pair(TimerWheel* timers, int request_interval) {
bool bench() { bool bench() {
TimerWheel timers; TimerWheel timers;
// Create the events evenly spread during this time range. // Create the events evenly spread during this time range.
int create_period = 1000*50; int create_period = 1*time_s;
double create_progress_per_iter = (double) pair_count / create_period * 2; double create_progress_per_iter = (double) pair_count / create_period * 2;
double current_progress = 0; double current_progress = 0;
long int count = 0; long int count = 0;
while (timers.now() < 1000*50) { while (timers.now() < create_period) {
current_progress += (rand() * create_progress_per_iter) / RAND_MAX; current_progress += (rand() * create_progress_per_iter) / RAND_MAX;
while (current_progress > 1) { while (current_progress > 1) {
--current_progress; --current_progress;
make_unit_pair(&timers, 1000*50 + rand() % 100); make_unit_pair(&timers, 1*time_s + rand() % 100);
++count; ++count;
} }
timers.advance(1); timers.advance(1);
} }
fprintf(stderr, "%ld active pairs (%ld timers)\n", fprintf(stderr, "%ld work units (%ld timers)\n",
count, count * 8); count, count * 10);
while (timers.now() < 300*1000*50) { while (timers.now() < 300*time_s) {
Tick t = timers.ticks_to_next_event(10000); Tick t = timers.ticks_to_next_event(100*time_ms);
timers.advance(t); timers.advance(t);
} }
...@@ -202,9 +255,10 @@ int main(int argc, char** argv) { ...@@ -202,9 +255,10 @@ int main(int argc, char** argv) {
bench(); bench();
getrusage(RUSAGE_SELF, &end); getrusage(RUSAGE_SELF, &end);
printf("%s,%d,%s,%lf\n", argv[0], pair_count, printf("%s,%d,%s,%lf,%ld\n", argv[0], pair_count,
(allow_schedule_in_range ? "yes" : "no"), (allow_schedule_in_range ? "yes" : "no"),
(end.ru_utime.tv_sec + end.ru_utime.tv_usec / 1000000.0) - (end.ru_utime.tv_sec + end.ru_utime.tv_usec / 1000000.0) -
(start.ru_utime.tv_sec + start.ru_utime.tv_usec / 1000000.0)); (start.ru_utime.tv_sec + start.ru_utime.tv_usec / 1000000.0),
total_rx_count);
return 0; return 0;
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment