Commit ea0b7a87 authored by Xavier Thompson's avatar Xavier Thompson

ring_buffer.hpp: hold onto smaller arrays when growing

parent 83528917
......@@ -36,20 +36,13 @@ namespace typon
static void push(Continuation task) noexcept
{
Scheduler & scheduler = get();
if (auto garbage = scheduler._deque[thread_id].push(task))
{
scheduler._gc.retire(garbage);
}
get()._deque[thread_id].push(task);
}
static void schedule(Continuation task) noexcept
{
Scheduler & scheduler = get();
if (auto garbage = scheduler._deque[thread_id].push(task))
{
scheduler._gc.retire(garbage);
}
scheduler._deque[thread_id].push(task);
scheduler._notifyer.notify_one();
}
......
......@@ -44,22 +44,19 @@ namespace typon::fdt::lock_free
delete _array.load(relaxed);
}
auto push(T x) noexcept
void push(T x) noexcept
{
u64 bottom = _bottom.load(relaxed);
u64 top = _top.load(acquire);
array_type * array = _array.load(relaxed);
array_type * garbage = nullptr;
if (bottom - top > array->capacity() - 1)
{
garbage = array;
array = array->grow(top, bottom);
_array.store(array);
}
array->put(bottom, x);
std::atomic_thread_fence(release);
_bottom.store(bottom + 1, relaxed);
return garbage;
}
pop_type pop() noexcept
......
......@@ -19,16 +19,22 @@ namespace typon::fdt::lock_free
using enum std::memory_order;
const u64 _mask;
ring_buffer * _next;
std::atomic<T> * const _array;
ring_buffer(u8 bits) noexcept
ring_buffer(u8 bits, ring_buffer * next = nullptr) noexcept
: _mask((u64(1) << bits) - 1)
, _next(next)
, _array(new std::atomic<T>[this->capacity()])
{}
~ring_buffer()
{
delete [] _array;
if (_next)
{
delete _next;
}
}
u64 capacity() noexcept
......@@ -57,18 +63,27 @@ namespace typon::fdt::lock_free
ring_buffer * grow(u64 start, u64 end) noexcept
{
auto buffer = new ring_buffer(std::countr_one(_mask) + 1);
auto buffer = new ring_buffer(std::countr_one(_mask) + 1, this);
return fill(buffer, start, end);
}
ring_buffer * shrink(u64 start, u64 end) noexcept
{
u8 bits = std::countr_one(_mask);
if (bits < 2)
ring_buffer * last = nullptr;
auto next = this;
auto size = (end - start);
auto threshold = size * 2;
while (next->_next && next->_next->capacity() >= threshold)
{
last = next;
next = next->_next;
}
if (!last)
{
return nullptr;
}
return fill(new ring_buffer(bits - 1), start, end);
return fill(std::exchange(last->_next, nullptr), start, end);
}
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment