Commit b5e5bdab authored by Xavier Thompson's avatar Xavier Thompson

WIP Future Scheduler

parent b1f03d3b
#ifndef TYPON_CORE_FUTURE_HPP_INCLUDED
#define TYPON_CORE_FUTURE_HPP_INCLUDED
#include <atomic>
#include <cstdint>
#include <memory>
#include <type_traits>
#include <typon/fundamental/scope.hpp>
#include <typon/core/scheduler.hpp>
#include <typon/core/work_deque.hpp>
namespace typon
{
template <typename T>
struct Future
{
static constexpr std::uintptr_t ready {0};
static constexpr std::uintptr_t no_waiter {1};
std::atomic_uintptr_t _state {no_waiter};
bool _consumed {false};
union
{
T _value;
};
~Future()
{
if (!_consumed)
{
if (_state.load(std::memory_order_acquire) == ready)
{
std::destroy_at(std::addressof(_value));
}
}
}
void put(T value)
{
std::construct_at(std::addressof(_value), std::move(value));
auto state = _state.exchange(ready, std::memory_order_acq_rel);
if (state != no_waiter)
{
Scheduler::resume(reinterpret_cast<WorkDeque *>(state));
}
}
bool await_ready() noexcept
{
return _state.load(std::memory_order_acquire) == ready;
}
void await_suspend(std::coroutine_handle<> coroutine) noexcept
{
auto deque = Scheduler::suspend(coroutine);
auto state = reinterpret_cast<std::uintptr_t>(deque);
if (_state.exchange(state, std::memory_order_acq_rel) == ready)
{
Scheduler::resume(deque);
}
}
T await_resume() noexcept
{
fdt::defer defer( [this]() { std::destroy_at(std::addressof(_value)); });
_consumed = true;
return std::move(_value);
}
};
template <typename T>
requires std::is_trivially_destructible_v<T>
struct Future<T>
{
static constexpr std::uintptr_t ready {0};
static constexpr std::uintptr_t no_waiter {1};
std::atomic_uintptr_t _state {no_waiter};
union
{
T _value;
};
void put(T value)
{
std::construct_at(std::addressof(_value), std::move(value));
auto state = _state.exchange(ready, std::memory_order_acq_rel);
if (state != no_waiter)
{
Scheduler::resume(reinterpret_cast<WorkDeque *>(state));
}
}
bool await_ready() noexcept
{
return _state.load(std::memory_order_acquire) == ready;
}
void await_suspend(std::coroutine_handle<> coroutine) noexcept
{
auto deque = Scheduler::suspend(coroutine);
auto state = reinterpret_cast<std::uintptr_t>(deque);
if (_state.exchange(state, std::memory_order_acq_rel) == ready)
{
Scheduler::resume(deque);
}
}
T await_resume() noexcept
{
return std::move(_value);
}
};
template <typename T>
struct Future<T&>
{
static constexpr std::uintptr_t ready {0};
static constexpr std::uintptr_t no_waiter {1};
std::atomic_uintptr_t _state {no_waiter};
T* _value;
void put(T& value)
{
_value = std::addressof(value);
auto state = _state.exchange(ready, std::memory_order_acq_rel);
if (state != no_waiter)
{
Scheduler::resume(reinterpret_cast<WorkDeque *>(state));
}
}
bool await_ready() noexcept
{
return _state.load(std::memory_order_acquire) == ready;
}
void await_suspend(std::coroutine_handle<> coroutine) noexcept
{
auto deque = Scheduler::suspend(coroutine);
auto state = reinterpret_cast<std::uintptr_t>(deque);
if (_state.exchange(state, std::memory_order_acq_rel) == ready)
{
Scheduler::resume(deque);
}
}
T& await_resume() noexcept
{
return *_value;
}
};
template <>
struct Future<void>
{
static constexpr std::uintptr_t ready {0};
static constexpr std::uintptr_t no_waiter {1};
std::atomic_uintptr_t _state {no_waiter};
void put()
{
auto state = _state.exchange(ready, std::memory_order_acq_rel);
if (state != no_waiter)
{
Scheduler::resume(reinterpret_cast<WorkDeque *>(state));
}
}
bool await_ready() noexcept
{
return _state.load(std::memory_order_acquire) == ready;
}
void await_suspend(std::coroutine_handle<> coroutine) noexcept
{
auto deque = Scheduler::suspend(coroutine);
auto state = reinterpret_cast<std::uintptr_t>(deque);
if (_state.exchange(state, std::memory_order_acq_rel) == ready)
{
Scheduler::resume(deque);
}
}
void await_resume() noexcept {}
};
}
#endif // TYPON_CORE_FUTURE_HPP_INCLUDED
......@@ -5,8 +5,6 @@
#include <coroutine>
#include <cstdint>
//
#include <typon/core/continuation.hpp>
#include <typon/core/result.hpp>
#include <typon/core/scheduler.hpp>
......@@ -22,16 +20,15 @@ namespace typon
~Root()
{
_coroutine.destroy();
if (_coroutine.promise()._count.fetch_sub(1) == 1)
{
_coroutine.destroy();
}
}
struct promise_type : Result<void>
{
Continuation::Data _data;
promise_type() noexcept
: _data(std::coroutine_handle<promise_type>::from_promise(*this))
{}
std::atomic<std::uint_fast8_t> _count {0};
Root get_return_object() noexcept
{
......@@ -47,23 +44,28 @@ namespace typon
{
struct awaitable : std::suspend_always
{
void await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept
std::atomic<std::uint_fast8_t> & _count;
void await_suspend(std::coroutine_handle<> coroutine) noexcept
{
// !! bad racy
coroutine.promise()._data._n.store(0, std::memory_order_release);
coroutine.promise()._data._n.notify_one();
_count.store(2);
_count.notify_one();
if (_count.fetch_sub(1) == 1)
{
coroutine.destroy();
}
}
};
return awaitable {};
return awaitable { {}, _count };
}
};
void call() &&
{
Scheduler::get();
Scheduler::schedule(_coroutine);
_coroutine.promise()._data._n.wait(Continuation::Data::UMAX, std::memory_order_acquire);
Scheduler::insert(_coroutine);
_coroutine.promise()._count.wait(0);
_coroutine.promise().get();
}
};
......
......@@ -2,6 +2,7 @@
#define TYPON_CORE_SCHEDULER_HPP_INCLUDED
#include <atomic>
#include <coroutine>
#include <cstdint>
#include <exception>
#include <thread>
......@@ -14,6 +15,8 @@
#include <typon/fundamental/random.hpp>
#include <typon/core/continuation.hpp>
#include <typon/core/worker.hpp>
#include <typon/core/work_deque.hpp>
namespace typon
......@@ -22,8 +25,8 @@ namespace typon
struct Scheduler
{
using uint = unsigned int;
using Deque = fdt::lock_free::deque<Continuation>;
using Task = typename Deque::pop_type;
using Task = typename fdt::lock_free::deque<Continuation>::pop_type;
using Work = Worker::Work;
using GC = fdt::lock_free::gc;
static inline thread_local uint thread_id;
......@@ -34,26 +37,27 @@ namespace typon
return scheduler;
}
static void push(Continuation task) noexcept
static void insert(std::coroutine_handle<> task) noexcept
{
get()._deque[thread_id].push(task);
Scheduler & scheduler = get();
uint id = fdt::random::random() % scheduler._concurrency;
scheduler._worker[id].add(new WorkDeque(task, true));
scheduler._notifyer.notify_one();
}
static void schedule(Continuation task) noexcept
static void push(Continuation task) noexcept
{
Scheduler & scheduler = get();
scheduler._deque[thread_id].push(task);
scheduler._notifyer.notify_one();
get()._worker[thread_id]._active.load()->push(task);
}
static Task pop() noexcept
{
Scheduler & scheduler = get();
Deque & deque = scheduler._deque[thread_id];
Task task = deque.pop();
if (task.match(Deque::Prune))
WorkDeque * active = scheduler._worker[thread_id]._active.load();
Task task = active->pop();
if (task.match(fdt::lock_free::deque<Continuation>::Prune))
{
if (auto array = deque.shrink())
if (auto array = active->shrink())
{
scheduler._gc.retire(array);
}
......@@ -61,9 +65,33 @@ namespace typon
return task;
}
static WorkDeque * suspend(std::coroutine_handle<> coroutine) noexcept
{
Scheduler & scheduler = get();
Worker & worker = scheduler._worker[thread_id];
WorkDeque * deque = worker._active.load();
worker._active.store(nullptr);
deque->suspend(coroutine);
for (uint i = 0; i < scheduler._concurrency * 2; i++)
{
uint id = fdt::random::random() % scheduler._concurrency;
if (scheduler._worker[id].try_add(deque))
{
return deque;
}
}
worker.add(deque);
return deque;
}
static void resume(WorkDeque * deque) noexcept
{
deque->_resumable.store(true);
}
std::atomic<uint> _actives = 0;
std::atomic<uint> _thieves = 0;
std::vector<Deque> _deque;
std::vector<Worker> _worker;
std::vector<std::thread> _thread;
std::atomic_bool _done {false};
fdt::lock_free::event_count<> _notifyer;
......@@ -71,7 +99,7 @@ namespace typon
GC _gc;
Scheduler(uint concurrency) noexcept
: _deque(concurrency + 1)
: _worker(concurrency)
, _concurrency(concurrency)
, _gc(concurrency)
{
......@@ -82,14 +110,14 @@ namespace typon
_thread.emplace_back([this, id]() {
thread_id = id;
Task task {};
Work work {};
for(;;)
{
exploit_task(task);
if (!wait_for_task(task))
if (!wait_for_work(work))
{
break;
}
exploit_work(work);
}
});
}
......@@ -105,73 +133,42 @@ namespace typon
}
}
void exploit_task(Task & task) noexcept
void exploit_work(Work & work) noexcept
{
if (task)
if (_actives.fetch_add(1) == 0)
{
if (_actives.fetch_add(1) == 0)
{
if (_thieves.load() == 0)
{
_notifyer.notify_one();
}
}
while(task)
if (_thieves.load() == 0)
{
task->resume();
task = _deque[thread_id].pop();
if (task)
{
printf("[%u] ERROR unexpectedly pop valid task %p\n", thread_id, task->_data);
}
_notifyer.notify_one();
}
_actives.fetch_sub(1);
}
_worker[thread_id].resume(work, _gc);
_actives.fetch_sub(1);
}
void explore_task(Task & task) noexcept
void explore_work(Work & work) noexcept
{
_gc.enter(thread_id);
for (uint i = 0; i < _concurrency * 2 + 1; i++)
{
uint id = fdt::random::random() % _concurrency;
if (id == thread_id)
{
task = _deque.back().steal();
}
else
work = _worker[id].try_steal();
if (work)
{
task = _deque[id].steal();
}
if (task)
{
task->thefts()++;
break;
}
}
_gc.leave(thread_id);
}
bool wait_for_task(Task & task) noexcept
bool wait_for_work(Work & work) noexcept
{
wait_for_task:
_thieves.fetch_add(1);
explore_task:
explore_task(task);
if (task)
{
if (_thieves.fetch_sub(1) == 1)
{
_notifyer.notify_one();
}
return true;
}
auto key = _notifyer.prepare_wait();
task = _deque.back().steal();
if (task || task.match(Deque::Abort))
work = {};
while (true)
{
_notifyer.cancel_wait();
if (task)
_thieves.fetch_add(1);
explore_work(work);
if (work)
{
if (_thieves.fetch_sub(1) == 1)
{
......@@ -179,25 +176,26 @@ namespace typon
}
return true;
}
goto explore_task;
}
if (_done.load())
{
_notifyer.cancel_wait();
_notifyer.notify_all();
_thieves.fetch_sub(1);
return false;
}
if (_thieves.fetch_sub(1) == 1)
{
if (_actives.load() > 0)
auto key = _notifyer.prepare_wait();
if (_done.load())
{
_notifyer.cancel_wait();
goto wait_for_task;
_notifyer.notify_all();
_thieves.fetch_sub(1);
return false;
}
if (_thieves.fetch_sub(1) == 1)
{
if (_actives.load() > 0)
{
_notifyer.cancel_wait();
continue;
}
}
_notifyer.cancel_wait();
(void) key;
//_notifyer.wait(key);
}
_notifyer.wait(key);
return true;
}
};
......
#ifndef TYPON_CORE_WORK_DEQUE_HPP_INCLUDED
#define TYPON_CORE_WORK_DEQUE_HPP_INCLUDED
#include <atomic>
#include <coroutine>
#include <typon/fundamental/deque.hpp>
#include <typon/core/continuation.hpp>
namespace typon
{
struct WorkDeque
{
fdt::lock_free::deque<Continuation> _deque;
std::coroutine_handle<> _coroutine;
std::atomic<bool> _resumable;
WorkDeque() noexcept {}
WorkDeque(std::coroutine_handle<> coroutine, bool resumable) noexcept
: _coroutine(coroutine)
, _resumable(resumable)
{}
void push(Continuation x) noexcept
{
_deque.push(std::move(x));
}
auto pop() noexcept
{
return _deque.pop();
}
auto steal() noexcept
{
return _deque.steal();
}
void suspend(std::coroutine_handle<> coroutine) noexcept
{
_resumable.store(false);
_coroutine = coroutine;
}
void resume() noexcept
{
_coroutine.resume();
}
auto shrink() noexcept
{
return _deque.shrink();
}
};
}
#endif // TYPON_CORE_WORK_DEQUE_HPP_INCLUDED
#ifndef TYPON_CORE_WORKER_HPP_INCLUDED
#define TYPON_CORE_WORKER_HPP_INCLUDED
#include <mutex>
#include <type_traits>
#include <utility>
#include <variant>
#include <vector>
#include <typon/fundamental/gc.hpp>
#include <typon/fundamental/optional.hpp>
#include <typon/fundamental/random.hpp>
#include <typon/core/continuation.hpp>
#include <typon/core/work_deque.hpp>
namespace typon
{
struct Worker
{
struct Work
{
static_assert(std::is_trivially_destructible_v<Continuation>);
enum State : char { Empty, Resumable, Stolen };
State _state;
union
{
WorkDeque * _deque;
Continuation _task;
};
Work() noexcept : _state(Empty) {}
Work(WorkDeque * deque) noexcept : _state(Resumable), _deque(deque) {}
Work(Continuation task) noexcept : _state(Stolen), _task(task) {}
operator bool() noexcept
{
return _state != Empty;
}
};
std::mutex _mutex;
std::atomic<WorkDeque *> _active {nullptr};
std::vector<WorkDeque *> _pool;
void resume(Work & work, fdt::lock_free::gc & gc) noexcept
{
auto active = _active.load();
if (work._state == Work::Resumable)
{
_active.store(work._deque);
work._deque->resume();
if (active)
{
gc.retire(active);
}
}
else
{
if (!active)
{
_active.store(new WorkDeque());
}
work._task.resume();
}
}
bool try_add(WorkDeque * deque) noexcept
{
if (!_mutex.try_lock())
{
return false;
}
std::lock_guard lock(_mutex, std::adopt_lock);
unsafe_add(deque);
return true;
}
void add(WorkDeque * deque) noexcept
{
std::lock_guard lock(_mutex);
unsafe_add(deque);
}
void unsafe_add(WorkDeque * deque) noexcept
{
_pool.push_back(deque);
}
Work try_steal() noexcept
{
if (!_mutex.try_lock())
{
return {};
}
std::lock_guard lock(_mutex, std::adopt_lock);
return unsafe_steal();
}
Work unsafe_steal() noexcept
{
auto index = fdt::random::random64() % (_pool.size() + 1);
if (index == _pool.size())
{
if (auto active = _active.load())
{
if (auto task = active->steal())
{
task->thefts()++;
return *task;
}
}
return {};
}
auto deque = _pool[index];
if (!deque->_resumable.load())
{
if (auto task = deque->steal())
{
task->thefts()++;
return *task;
}
return {};
}
if (index < _pool.size() - 1)
{
_pool[index] = _pool.back();
}
_pool.pop_back();
return deque;
}
};
}
#endif // TYPON_CORE_WORKER_HPP_INCLUDED
......@@ -9,6 +9,8 @@ namespace typon::fdt::random
static thread_local std::mt19937 random { std::random_device{}() };
static thread_local std::mt19937_64 random64 { std::random_device{}() };
}
......
......@@ -5,6 +5,7 @@
#include <typon/core/async.hpp>
#include <typon/core/fork.hpp>
#include <typon/core/future.hpp>
#include <typon/core/join.hpp>
#include <typon/core/root.hpp>
#include <typon/core/task.hpp>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment