Commit 868051fe authored by Xavier Thompson's avatar Xavier Thompson

deque.hpp: Report resize requests in pop and steal

parent 8c717087
......@@ -23,20 +23,16 @@ namespace typon::fdt::lock_free
using u8 = typename ring_buffer<T>::u8;
using u64 = typename ring_buffer<T>::u64;
static constexpr typename pop_type::template state<1> Abort {};
static constexpr typename pop_type::template state<2> Prune {};
static constexpr unsigned char Abort {1};
static constexpr unsigned char Resize {2};
using enum std::memory_order;
const u8 _bits;
std::atomic<u64> _top {1};
std::atomic<u64> _bottom {1};
std::atomic<array_type *> _array;
deque(u8 bits = 3) noexcept
: _bits(bits)
, _array(new array_type(bits))
{}
deque(u8 bits = 3) noexcept : _array(new array_type(bits)) {}
~deque()
{
......@@ -47,7 +43,7 @@ namespace typon::fdt::lock_free
{
u64 bottom = _bottom.load(relaxed);
u64 top = _top.load(acquire);
array_type * array = _array.load(relaxed);
auto array = _array.load(relaxed);
if (bottom - top > array->capacity() - 1)
{
array = array->grow(top, bottom);
......@@ -58,68 +54,69 @@ namespace typon::fdt::lock_free
_bottom.store(bottom + 1, relaxed);
}
pop_type pop() noexcept
optional<T> pop() noexcept
{
u64 bottom = _bottom.load(relaxed) - 1;
array_type * array = _array.load(relaxed);
auto array = _array.load(relaxed);
_bottom.store(bottom, relaxed);
std::atomic_thread_fence(seq_cst);
u64 top = _top.load(relaxed);
pop_type x {};
if (top <= bottom)
{
x = array->get(bottom);
if (top == bottom)
{
if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed))
{
x = {};
_bottom.store(bottom + 1, relaxed);
return array->_next ? optional<T>(Resize) : optional<T>();
}
_bottom.store(bottom + 1, relaxed);
}
T x = array->get(bottom);
if (array->_next && array->capacity() > (bottom - top) * 4)
{
return { x, Resize };
}
return { x };
}
else
{
_bottom.store(bottom + 1, relaxed);
}
u64 capacity = array->capacity();
if (capacity > u64(1) << _bits && capacity > (bottom - top) * 4)
{
x.set_state(Prune);
}
return x;
}
array_type * shrink() noexcept
{
u64 bottom = _bottom.load(relaxed);
array_type * array = _array.load(relaxed);
u64 top = _top.load(relaxed);
array_type * next = array->shrink(top, bottom);
if (next)
{
_array.store(next, relaxed);
return array;
}
return nullptr;
_bottom.store(bottom + 1, relaxed);
return array->_next ? optional<T>(Resize) : optional<T>();
}
pop_type steal() noexcept
optional<T> steal() noexcept
{
u64 top = _top.load(acquire);
std::atomic_thread_fence(seq_cst);
u64 bottom = _bottom.load(acquire);
// Use acquire instead of consume (semantics under revision).
auto array = _array.load(acquire);
if (top < bottom)
{
array_type * array = _array.load(consume);
T x = array->get(top);
if (!_top.compare_exchange_strong(top, top + 1, seq_cst, relaxed))
{
return { Abort };
}
if (array->_next && array->capacity() > (bottom - top) * 4)
{
return { x, Resize };
}
return { x };
}
return {};
return array->_next ? optional<T>(Resize) : optional<T>();
}
array_type * shrink() noexcept
{
u64 bottom = _bottom.load(relaxed);
u64 top = _top.load(relaxed);
auto array = _array.load(relaxed);
if (auto next = array->shrink(top, bottom))
{
_array.store(next, relaxed);
return array;
}
return nullptr;
}
};
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment