Commit 49bb603e authored by Xavier Thompson's avatar Xavier Thompson

Move deque.hpp into core and refactor

parent 016fb949
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <atomic> #include <atomic>
#include <concepts> #include <concepts>
#include <coroutine> #include <coroutine>
#include <cstddef>
#include <cstdint> #include <cstdint>
#include <exception> #include <exception>
#include <limits> #include <limits>
...@@ -87,6 +88,8 @@ namespace typon ...@@ -87,6 +88,8 @@ namespace typon
Continuation() noexcept {} Continuation() noexcept {}
Continuation(std::nullptr_t null) noexcept : _data(null) {}
template <typename Promise> template <typename Promise>
Continuation(std::coroutine_handle<Promise> coroutine) noexcept Continuation(std::coroutine_handle<Promise> coroutine) noexcept
: _data(&(coroutine.promise()._data)) : _data(&(coroutine.promise()._data))
...@@ -112,6 +115,11 @@ namespace typon ...@@ -112,6 +115,11 @@ namespace typon
_data->_coroutine.resume(); _data->_coroutine.resume();
} }
operator bool() noexcept
{
return _data;
}
operator std::coroutine_handle<>() noexcept operator std::coroutine_handle<>() noexcept
{ {
return _data->_coroutine; return _data->_coroutine;
......
...@@ -7,114 +7,128 @@ ...@@ -7,114 +7,128 @@
#include <type_traits> #include <type_traits>
#include <utility> #include <utility>
#include <typon/fundamental/optional.hpp>
#include <typon/fundamental/ring_buffer.hpp> #include <typon/fundamental/ring_buffer.hpp>
namespace typon::fdt::lock_free namespace typon
{ {
template <typename T> struct Deque
struct deque
{ {
using array_type = ring_buffer<T>; using ring_buffer = fdt::lock_free::ring_buffer<Continuation>;
using pop_type = fdt::optional<T>; using u64 = ring_buffer::u64;
using u8 = typename ring_buffer<T>::u8;
using u64 = typename ring_buffer<T>::u64;
static constexpr unsigned char Abort {1};
static constexpr unsigned char Resize {2};
using enum std::memory_order; using enum std::memory_order;
std::atomic<u64> _top {1}; std::atomic<u64> _top {1};
std::atomic<u64> _bottom {1}; std::atomic<u64> _bottom {1};
std::atomic<array_type *> _array; std::atomic<ring_buffer *> _buffer { new ring_buffer(3) };
std::coroutine_handle<> _coroutine;
std::atomic<bool> _resumable;
deque(u8 bits = 3) noexcept : _array(new array_type(bits)) {} Deque() noexcept {}
~deque() Deque(std::coroutine_handle<> coroutine) noexcept
: _coroutine(coroutine)
, _resumable(true)
{}
~Deque()
{ {
delete _array.load(relaxed); delete _buffer.load(relaxed);
} }
void push(T x) noexcept void push(Continuation x) noexcept
{ {
u64 bottom = _bottom.load(relaxed); u64 bottom = _bottom.load(relaxed);
u64 top = _top.load(acquire); u64 top = _top.load(acquire);
auto array = _array.load(relaxed); ring_buffer * buffer = _buffer.load(relaxed);
if (bottom - top > array->capacity() - 1) if (bottom - top > buffer->capacity() - 1)
{ {
array = array->grow(top, bottom); buffer = buffer->grow(top, bottom);
_array.store(array); _buffer.store(buffer);
} }
array->put(bottom, x); buffer->put(bottom, x);
std::atomic_thread_fence(release); std::atomic_thread_fence(release);
_bottom.store(bottom + 1, relaxed); _bottom.store(bottom + 1, relaxed);
} }
optional<T> pop() noexcept bool pop() noexcept
{ {
u64 bottom = _bottom.load(relaxed) - 1; u64 bottom = _bottom.load(relaxed) - 1;
auto array = _array.load(relaxed);
_bottom.store(bottom, relaxed); _bottom.store(bottom, relaxed);
std::atomic_thread_fence(seq_cst); std::atomic_thread_fence(seq_cst);
u64 top = _top.load(relaxed); u64 top = _top.load(relaxed);
if (top <= bottom) if (top < bottom)
{ {
return true;
}
if (top == bottom) if (top == bottom)
{ {
if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) bool win = _top.compare_exchange_strong(top, top + 1, seq_cst, relaxed);
{
_bottom.store(bottom + 1, relaxed); _bottom.store(bottom + 1, relaxed);
return array->_next ? optional<T>(Resize) : optional<T>(); return win;
} }
_bottom.store(bottom + 1, relaxed); _bottom.store(bottom + 1, relaxed);
} return false;
T x = array->get(bottom);
if (array->_next && array->capacity() > (bottom - top) * 4)
{
return { x, Resize };
}
return { x };
}
_bottom.store(bottom + 1, relaxed);
return array->_next ? optional<T>(Resize) : optional<T>();
} }
optional<T> steal() noexcept Continuation steal() noexcept
{ {
u64 top = _top.load(acquire); u64 top = _top.load(acquire);
std::atomic_thread_fence(seq_cst); std::atomic_thread_fence(seq_cst);
u64 bottom = _bottom.load(acquire); u64 bottom = _bottom.load(acquire);
// Use acquire instead of consume (semantics under revision). ring_buffer * buffer = _buffer.load(consume);
auto array = _array.load(acquire);
if (top < bottom) if (top < bottom)
{ {
T x = array->get(top); Continuation x = buffer->get(top);
if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed)) if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed))
{ {
return { Abort }; return { nullptr };
}
return x;
} }
if (array->_next && array->capacity() > (bottom - top) * 4) return { nullptr };
}
Continuation pop_top() noexcept
{ {
return { x, Resize }; u64 top = _top.load(relaxed);
u64 bottom = _bottom.load(relaxed);
auto buffer = _buffer.load(relaxed);
if (top < bottom)
{
Continuation x = buffer->get(top);
_top.store(top + 1, relaxed);
return x;
} }
return { x }; return { nullptr };
} }
return array->_next ? optional<T>(Resize) : optional<T>();
void suspend(std::coroutine_handle<> coroutine) noexcept
{
_resumable.store(false);
_coroutine = coroutine;
}
void resume() noexcept
{
_coroutine.resume();
} }
array_type * shrink() noexcept ring_buffer * reclaim() noexcept
{ {
u64 bottom = _bottom.load(relaxed); u64 bottom = _bottom.load(relaxed);
u64 top = _top.load(relaxed); u64 top = _top.load(relaxed);
auto array = _array.load(relaxed); auto buffer = _buffer.load(relaxed);
if (auto next = array->shrink(top, bottom)) u64 capacity = buffer->capacity();
if (bottom < top + (capacity >> 2))
{
if (auto next = buffer->shrink(top, bottom))
{ {
_array.store(next, relaxed); _buffer.store(next, relaxed);
return array; return buffer;
}
} }
return nullptr; return nullptr;
} }
......
...@@ -51,11 +51,11 @@ namespace typon ...@@ -51,11 +51,11 @@ namespace typon
{ {
std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept std::coroutine_handle<> await_suspend(std::coroutine_handle<promise_type> coroutine) noexcept
{ {
if (auto continuation = Scheduler::pop()) auto continuation = coroutine.promise()._continuation;
if (Scheduler::pop())
{ {
return *continuation; return continuation;
} }
auto & continuation = coroutine.promise()._continuation;
u64 n = continuation.n().fetch_sub(1, std::memory_order_acq_rel); u64 n = continuation.n().fetch_sub(1, std::memory_order_acq_rel);
if (n == 1) if (n == 1)
{ {
......
...@@ -8,8 +8,8 @@ ...@@ -8,8 +8,8 @@
#include <typon/fundamental/scope.hpp> #include <typon/fundamental/scope.hpp>
#include <typon/core/deque.hpp>
#include <typon/core/scheduler.hpp> #include <typon/core/scheduler.hpp>
#include <typon/core/work_deque.hpp>
namespace typon namespace typon
...@@ -45,7 +45,7 @@ namespace typon ...@@ -45,7 +45,7 @@ namespace typon
auto state = _state.exchange(ready, std::memory_order_acq_rel); auto state = _state.exchange(ready, std::memory_order_acq_rel);
if (state != no_waiter) if (state != no_waiter)
{ {
Scheduler::enable(reinterpret_cast<WorkDeque *>(state)); Scheduler::enable(reinterpret_cast<Deque *>(state));
} }
} }
...@@ -92,7 +92,7 @@ namespace typon ...@@ -92,7 +92,7 @@ namespace typon
auto state = _state.exchange(ready, std::memory_order_acq_rel); auto state = _state.exchange(ready, std::memory_order_acq_rel);
if (state != no_waiter) if (state != no_waiter)
{ {
Scheduler::enable(reinterpret_cast<WorkDeque *>(state)); Scheduler::enable(reinterpret_cast<Deque *>(state));
} }
} }
...@@ -133,7 +133,7 @@ namespace typon ...@@ -133,7 +133,7 @@ namespace typon
auto state = _state.exchange(ready, std::memory_order_acq_rel); auto state = _state.exchange(ready, std::memory_order_acq_rel);
if (state != no_waiter) if (state != no_waiter)
{ {
Scheduler::enable(reinterpret_cast<WorkDeque *>(state)); Scheduler::enable(reinterpret_cast<Deque *>(state));
} }
} }
...@@ -172,7 +172,7 @@ namespace typon ...@@ -172,7 +172,7 @@ namespace typon
auto state = _state.exchange(ready, std::memory_order_acq_rel); auto state = _state.exchange(ready, std::memory_order_acq_rel);
if (state != no_waiter) if (state != no_waiter)
{ {
Scheduler::enable(reinterpret_cast<WorkDeque *>(state)); Scheduler::enable(reinterpret_cast<Deque *>(state));
} }
} }
......
...@@ -8,15 +8,13 @@ ...@@ -8,15 +8,13 @@
#include <thread> #include <thread>
#include <vector> #include <vector>
#include <typon/fundamental/deque.hpp>
#include <typon/fundamental/event_count.hpp> #include <typon/fundamental/event_count.hpp>
#include <typon/fundamental/garbage_collector.hpp> #include <typon/fundamental/garbage_collector.hpp>
#include <typon/fundamental/optional.hpp>
#include <typon/fundamental/random.hpp> #include <typon/fundamental/random.hpp>
#include <typon/core/continuation.hpp> #include <typon/core/continuation.hpp>
#include <typon/core/deque.hpp>
#include <typon/core/worker.hpp> #include <typon/core/worker.hpp>
#include <typon/core/work_deque.hpp>
namespace typon namespace typon
...@@ -25,7 +23,6 @@ namespace typon ...@@ -25,7 +23,6 @@ namespace typon
struct Scheduler struct Scheduler
{ {
using uint = unsigned int; using uint = unsigned int;
using Task = typename fdt::lock_free::deque<Continuation>::pop_type;
using Work = Worker::Work; using Work = Worker::Work;
using garbage_collector = fdt::lock_free::garbage_collector; using garbage_collector = fdt::lock_free::garbage_collector;
...@@ -41,7 +38,7 @@ namespace typon ...@@ -41,7 +38,7 @@ namespace typon
{ {
Scheduler & scheduler = get(); Scheduler & scheduler = get();
uint id = fdt::random::random() % scheduler._concurrency; uint id = fdt::random::random() % scheduler._concurrency;
scheduler._worker[id].add(new WorkDeque(task, true)); scheduler._worker[id].add(new Deque(task));
scheduler._notifyer.notify_one(); scheduler._notifyer.notify_one();
} }
...@@ -50,26 +47,23 @@ namespace typon ...@@ -50,26 +47,23 @@ namespace typon
get()._worker[thread_id]._active.load()->push(task); get()._worker[thread_id]._active.load()->push(task);
} }
static Task pop() noexcept static bool pop() noexcept
{ {
Scheduler & scheduler = get(); Scheduler & scheduler = get();
WorkDeque * active = scheduler._worker[thread_id]._active.load(); Deque * active = scheduler._worker[thread_id]._active.load();
Task task = active->pop(); bool result = active->pop();
if (task.get_flags() == fdt::lock_free::deque<Continuation>::Resize) if (auto array = active->reclaim())
{
if (auto array = active->shrink())
{ {
scheduler._gc.retire(array); scheduler._gc.retire(array);
} }
} return result;
return task;
} }
static WorkDeque * suspend(std::coroutine_handle<> coroutine) noexcept static Deque * suspend(std::coroutine_handle<> coroutine) noexcept
{ {
Scheduler & scheduler = get(); Scheduler & scheduler = get();
Worker & worker = scheduler._worker[thread_id]; Worker & worker = scheduler._worker[thread_id];
WorkDeque * deque = worker._active.load(); Deque * deque = worker._active.load();
worker._active.store(nullptr); worker._active.store(nullptr);
deque->suspend(coroutine); deque->suspend(coroutine);
for (uint i = 0; i < scheduler._concurrency * 2; i++) for (uint i = 0; i < scheduler._concurrency * 2; i++)
...@@ -84,7 +78,7 @@ namespace typon ...@@ -84,7 +78,7 @@ namespace typon
return deque; return deque;
} }
static void enable(WorkDeque * deque) noexcept static void enable(Deque * deque) noexcept
{ {
deque->_resumable.store(true); deque->_resumable.store(true);
get()._notifyer.notify_one(); get()._notifyer.notify_one();
...@@ -143,7 +137,11 @@ namespace typon ...@@ -143,7 +137,11 @@ namespace typon
_notifyer.notify_one(); _notifyer.notify_one();
} }
} }
_worker[thread_id].resume(work, _gc); auto garbage = _worker[thread_id].resume(work);
if (garbage)
{
_gc.retire(garbage);
}
_actives.fetch_sub(1); _actives.fetch_sub(1);
} }
......
#ifndef TYPON_CORE_WORK_DEQUE_HPP_INCLUDED
#define TYPON_CORE_WORK_DEQUE_HPP_INCLUDED
#include <atomic>
#include <coroutine>
#include <typon/fundamental/deque.hpp>
#include <typon/core/continuation.hpp>
namespace typon
{
struct WorkDeque
{
fdt::lock_free::deque<Continuation> _deque;
std::coroutine_handle<> _coroutine;
std::atomic<bool> _resumable;
WorkDeque() noexcept {}
WorkDeque(std::coroutine_handle<> coroutine, bool resumable) noexcept
: _coroutine(coroutine)
, _resumable(resumable)
{}
void push(Continuation x) noexcept
{
_deque.push(std::move(x));
}
auto pop() noexcept
{
return _deque.pop();
}
auto steal() noexcept
{
return _deque.steal();
}
void suspend(std::coroutine_handle<> coroutine) noexcept
{
_resumable.store(false);
_coroutine = coroutine;
}
void resume() noexcept
{
_coroutine.resume();
}
auto shrink() noexcept
{
return _deque.shrink();
}
};
}
#endif // TYPON_CORE_WORK_DEQUE_HPP_INCLUDED
...@@ -8,11 +8,9 @@ ...@@ -8,11 +8,9 @@
#include <vector> #include <vector>
#include <typon/fundamental/garbage_collector.hpp> #include <typon/fundamental/garbage_collector.hpp>
#include <typon/fundamental/optional.hpp>
#include <typon/fundamental/random.hpp> #include <typon/fundamental/random.hpp>
#include <typon/core/continuation.hpp> #include <typon/core/continuation.hpp>
#include <typon/core/work_deque.hpp>
namespace typon namespace typon
...@@ -28,13 +26,13 @@ namespace typon ...@@ -28,13 +26,13 @@ namespace typon
State _state; State _state;
union union
{ {
WorkDeque * _deque; Deque * _deque;
Continuation _task; Continuation _task;
}; };
Work() noexcept : _state(Empty) {} Work() noexcept : _state(Empty) {}
Work(WorkDeque * deque) noexcept : _state(Resumable), _deque(deque) {} Work(Deque * deque) noexcept : _state(Resumable), _deque(deque) {}
Work(Continuation task) noexcept : _state(Stolen), _task(task) {} Work(Continuation task) noexcept : _state(Stolen), _task(task) {}
...@@ -45,8 +43,8 @@ namespace typon ...@@ -45,8 +43,8 @@ namespace typon
}; };
std::mutex _mutex; std::mutex _mutex;
std::atomic<WorkDeque *> _active {nullptr}; std::atomic<Deque *> _active {nullptr};
std::vector<WorkDeque *> _pool; std::vector<Deque *> _pool;
~Worker() ~Worker()
{ {
...@@ -60,7 +58,7 @@ namespace typon ...@@ -60,7 +58,7 @@ namespace typon
} }
} }
void resume(Work & work, fdt::lock_free::garbage_collector & gc) noexcept Deque * resume(Work & work) noexcept
{ {
auto active = _active.load(); auto active = _active.load();
if (work._state == Work::Resumable) if (work._state == Work::Resumable)
...@@ -69,26 +67,27 @@ namespace typon ...@@ -69,26 +67,27 @@ namespace typon
work._deque->resume(); work._deque->resume();
if (active) if (active)
{ {
gc.retire(active); return active;
} }
} }
else else
{ {
if (!active) if (!active)
{ {
_active.store(new WorkDeque()); _active.store(new Deque());
} }
work._task.resume(); work._task.resume();
} }
return nullptr;
} }
void add(WorkDeque * deque) noexcept void add(Deque * deque) noexcept
{ {
std::lock_guard lock(_mutex); std::lock_guard lock(_mutex);
_pool.push_back(deque); _pool.push_back(deque);
} }
bool try_add(WorkDeque * deque) noexcept bool try_add(Deque * deque) noexcept
{ {
if (!_mutex.try_lock()) if (!_mutex.try_lock())
{ {
...@@ -117,26 +116,23 @@ namespace typon ...@@ -117,26 +116,23 @@ namespace typon
{ {
if (auto task = active->steal()) if (auto task = active->steal())
{ {
task->thefts()++; task.thefts()++;
return *task; return task;
} }
return {}; return {};
} }
auto deque = _pool[index]; auto deque = _pool[index];
if (!deque->_resumable.load()) if (!deque->_resumable.load())
{ {
auto task = deque->steal(); auto task = deque->pop_top();
if (task.get_flags() == fdt::lock_free::deque<Continuation>::Resize) if (auto garbage = deque->reclaim())
{ {
if (auto array = deque->shrink()) delete garbage;
{
delete array;
}
} }
if (task) if (task)
{ {
task->thefts()++; task.thefts()++;
return *task; return task;
} }
return {}; return {};
} }
...@@ -147,7 +143,6 @@ namespace typon ...@@ -147,7 +143,6 @@ namespace typon
_pool.pop_back(); _pool.pop_back();
return deque; return deque;
} }
}; };
} }
......
#ifndef TYPON_FUNDAMENTAL_OPTIONAL_HPP_INCLUDED
#define TYPON_FUNDAMENTAL_OPTIONAL_HPP_INCLUDED
#include <type_traits>
namespace typon::fdt
{
template <typename T>
requires std::is_trivially_copyable_v<T>
struct optional
{
unsigned char _state;
union
{
T _value;
};
optional() noexcept : _state(0) {}
optional(T value) noexcept : _state(1), _value(value) {}
optional(unsigned char flags) noexcept : _state(flags << 1) {}
optional(T value, unsigned char flags) noexcept
: _state((flags << 1) | 1)
, _value(value)
{}
~optional()
{
if (_state & 1)
{
std::destroy_at(std::addressof(_value));
}
}
operator bool() noexcept
{
return _state & 1;
}
unsigned char get_flags() noexcept
{
return _state >> 1;
}
void set_flags(unsigned char flags) noexcept
{
_state = (flags << 1) | (_state & 1);
}
T * operator->() noexcept
{
return std::addressof(_value);
}
T & operator*() noexcept
{
return _value;
}
};
}
#endif // TYPON_FUNDAMENTAL_OPTIONAL_HPP_INCLUDED
...@@ -14,6 +14,7 @@ namespace typon::fdt::lock_free ...@@ -14,6 +14,7 @@ namespace typon::fdt::lock_free
template <typename T> template <typename T>
requires std::is_trivially_copyable_v<T> requires std::is_trivially_copyable_v<T>
&& std::is_trivially_destructible_v<T>
struct ring_buffer struct ring_buffer
{ {
using u8 = std::uint_least8_t; using u8 = std::uint_least8_t;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment