Commit 607b3753 authored by Xavier Thompson's avatar Xavier Thompson

Improve scheduler work adaptation

parent 0f32456b
......@@ -23,6 +23,7 @@ namespace typon
struct Scheduler
{
using uint = unsigned int;
using u64 = std::uint_fast64_t;
using Work = Worker::Work;
using garbage_collector = fdt::lock_free::garbage_collector;
......@@ -38,6 +39,7 @@ namespace typon
{
uint id = fdt::random::random() % get()._concurrency;
get()._worker[id].add(new Deque(task));
get()._stealables.fetch_add(1);
get()._notifyer.notify_one();
}
......@@ -67,12 +69,13 @@ namespace typon
{
uint id = fdt::random::random() % get()._concurrency;
get()._worker[id].add(deque);
get()._stealables.fetch_add(1);
}
get()._notifyer.notify_one();
}
std::atomic<uint> _actives = 0;
std::atomic<uint> _thieves = 0;
std::atomic<u64> _stealables = 0;
std::vector<Worker> _worker;
std::vector<std::thread> _thread;
std::atomic_bool _done {false};
......@@ -87,7 +90,8 @@ namespace typon
{
for (uint i = 0; i < concurrency; i++)
{
_worker[i]._gc = &_gc;
_worker[i]._gc = &(_gc);
_worker[i]._stealables = &(_stealables);
}
thread_id = concurrency;
......@@ -122,15 +126,7 @@ namespace typon
void exploit_work(Work & work) noexcept
{
if (_actives.fetch_add(1) == 0)
{
if (_thieves.load() == 0)
{
_notifyer.notify_one();
}
}
_worker[thread_id].resume(work);
_actives.fetch_sub(1);
}
void explore_work(Work & work) noexcept
......@@ -147,19 +143,6 @@ namespace typon
}
}
void detect_work(Work & work) noexcept
{
auto epoch = _gc.epoch(thread_id);
for (uint id = 0; id < _concurrency; id++)
{
work = _worker[id].steal();
if (work)
{
break;
}
}
}
bool wait_for_work(Work & work) noexcept
{
work = {};
......@@ -176,16 +159,6 @@ namespace typon
return true;
}
auto key = _notifyer.prepare_wait();
detect_work(work);
if (work)
{
_notifyer.cancel_wait();
if (_thieves.fetch_sub(1) == 1)
{
_notifyer.notify_one();
}
return true;
}
if (_done.load())
{
_notifyer.cancel_wait();
......@@ -195,14 +168,12 @@ namespace typon
}
if (_thieves.fetch_sub(1) == 1)
{
if (_actives.load() > 0)
if (_stealables.load() > 0)
{
_notifyer.cancel_wait();
continue;
}
}
// _notifyer.cancel_wait();
// (void) key;
_notifyer.wait(key);
}
}
......
......@@ -47,6 +47,7 @@ namespace typon
std::mutex _mutex;
std::atomic<Deque *> _deque {nullptr};
std::vector<Deque *> _pool;
std::atomic_uint_fast64_t * _stealables;
fdt::lock_free::garbage_collector * _gc;
~Worker()
......@@ -88,24 +89,30 @@ namespace typon
void resume(Work & work) noexcept
{
auto deque = _deque.load();
if (work._state == Work::Resumable)
{
auto deque = _deque.load();
_deque.store(work._deque);
if (deque)
{
_gc->retire(deque);
}
_stealables->fetch_add(1);
work._deque->resume();
}
else
{
if (!deque)
if (!_deque.load())
{
_deque.store(new Deque());
}
_stealables->fetch_add(1);
work._task.resume();
}
if (_deque.load())
{
_stealables->fetch_sub(1);
}
}
void push(Continuation task) noexcept
......@@ -177,6 +184,7 @@ namespace typon
{
return deque;
}
_stealables->fetch_sub(1);
return {};
}
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment