Commit e880b620 authored by Xavier Thompson's avatar Xavier Thompson

Improve scheduler:

- Let thieves remove empty suspended deques from pools
- When enabling empty suspended deques, reinsert them in a pool
- Use direct locking when suspending and enabling
parent cb264baa
......@@ -20,17 +20,19 @@ namespace typon
using enum std::memory_order;
enum State : unsigned char { Empty, Suspended, Resumable };
std::atomic<u64> _top {1};
std::atomic<u64> _bottom {1};
std::atomic<ring_buffer *> _buffer { new ring_buffer(3) };
std::coroutine_handle<> _coroutine;
std::atomic<bool> _resumable;
std::atomic<State> _state;
Deque() noexcept {}
Deque(std::coroutine_handle<> coroutine) noexcept
: _coroutine(coroutine)
, _resumable(true)
, _state(Resumable)
{}
~Deque()
......@@ -107,7 +109,7 @@ namespace typon
void suspend(std::coroutine_handle<> coroutine) noexcept
{
_resumable.store(false);
_state.store(Suspended);
_coroutine = coroutine;
}
......
......@@ -44,17 +44,17 @@ namespace typon
static void push(Continuation task) noexcept
{
get()._worker[thread_id]._active.load()->push(task);
get()._worker[thread_id]._deque.load()->push(task);
}
static bool pop() noexcept
{
Scheduler & scheduler = get();
Deque * active = scheduler._worker[thread_id]._active.load();
bool result = active->pop();
if (auto array = active->reclaim())
Deque * deque = scheduler._worker[thread_id]._deque.load();
bool result = deque->pop();
if (auto garbage = deque->reclaim())
{
scheduler._gc.retire(array);
scheduler._gc.retire(garbage);
}
return result;
}
......@@ -63,24 +63,23 @@ namespace typon
{
Scheduler & scheduler = get();
Worker & worker = scheduler._worker[thread_id];
Deque * deque = worker._active.load();
worker._active.store(nullptr);
Deque * deque = worker._deque.load();
worker._deque.store(nullptr);
deque->suspend(coroutine);
for (uint i = 0; i < scheduler._concurrency * 2; i++)
{
uint id = fdt::random::random() % scheduler._concurrency;
if (scheduler._worker[id].try_add(deque))
{
return deque;
}
}
worker.add(deque);
uint id = fdt::random::random() % scheduler._concurrency;
scheduler._worker[id].add(deque);
return deque;
}
static void enable(Deque * deque) noexcept
{
deque->_resumable.store(true);
auto state = deque->_state.exchange(Deque::Resumable);
if (state == Deque::Empty)
{
Scheduler & scheduler = get();
uint id = fdt::random::random() % scheduler._concurrency;
scheduler._worker[id].add(deque);
}
get()._notifyer.notify_one();
}
......
......@@ -43,7 +43,7 @@ namespace typon
};
std::mutex _mutex;
std::atomic<Deque *> _active {nullptr};
std::atomic<Deque *> _deque {nullptr};
std::vector<Deque *> _pool;
fdt::lock_free::garbage_collector * _gc;
......@@ -53,29 +53,29 @@ namespace typon
{
delete deque;
}
if (auto active = _active.load())
if (auto deque = _deque.load())
{
delete active;
delete deque;
}
}
void resume(Work & work) noexcept
{
auto active = _active.load();
auto deque = _deque.load();
if (work._state == Work::Resumable)
{
_active.store(work._deque);
if (active)
_deque.store(work._deque);
if (deque)
{
_gc->retire(active);
_gc->retire(deque);
}
work._deque->resume();
}
else
{
if (!active)
if (!deque)
{
_active.store(new Deque());
_deque.store(new Deque());
}
work._task.resume();
}
......@@ -105,43 +105,53 @@ namespace typon
return {};
}
std::lock_guard lock(_mutex, std::adopt_lock);
auto active = _active.load();
auto total = _pool.size() + bool(active);
auto deque = _deque.load();
auto total = _pool.size() + bool(deque);
if (total == 0)
{
return {};
}
auto index = fdt::random::random64() % (_pool.size() + bool(active));
auto index = fdt::random::random64() % total;
if (index == _pool.size())
{
if (auto task = active->steal())
if (auto task = deque->steal())
{
task.thefts()++;
return task;
}
return {};
}
auto deque = _pool[index];
if (!deque->_resumable.load())
deque = _pool[index];
if (deque->_state.load() == Deque::Resumable)
{
auto task = deque->pop_top();
if (auto garbage = deque->reclaim())
{
delete garbage;
}
if (task)
if (index < _pool.size() - 1)
{
task.thefts()++;
return task;
_pool[index] = _pool.back();
}
return {};
_pool.pop_back();
return deque;
}
auto task = deque->pop_top();
if (auto garbage = deque->reclaim())
{
delete garbage;
}
if (task)
{
task.thefts()++;
return task;
}
if (index < _pool.size() - 1)
{
_pool[index] = _pool.back();
}
_pool.pop_back();
return deque;
Deque::State expected = Deque::Suspended;
if (!deque->_state.compare_exchange_strong(expected, Deque::Empty))
{
return deque;
}
return {};
}
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment