Commit 913bc65a authored by Xavier Thompson's avatar Xavier Thompson

Add minimal asynchronous I/O support

parent 9393f360
#ifndef TYPON_IO_HPP_INCLUDED
#define TYPON_IO_HPP_INCLUDED
#include <chrono>
#include <coroutine>
#include <linux/time_types.h>
#include <liburing.h>
#include <typon/scheduler.hpp>
namespace typon::io
{
template <typename Rep, typename Period>
auto sleep(std::chrono::duration<Rep, Period> duration) noexcept
{
struct awaitable : std::suspend_always
{
__kernel_timespec _ts;
awaitable(std::chrono::duration<Rep, Period> duration) noexcept
{
using namespace std::chrono;
auto sec = duration_cast<seconds>(duration);
auto nsec = duration_cast<nanoseconds>(duration - sec);
_ts = __kernel_timespec({sec.count(), nsec.count()});
}
void await_suspend(std::coroutine_handle<> continuation) noexcept
{
auto stack = Scheduler::suspend(continuation);
io_uring * ring = Scheduler::ring();
io_uring_sqe * sqe = io_uring_get_sqe(ring);
io_uring_prep_timeout(sqe, &_ts, 0, 0);
io_uring_sqe_set_data(sqe, stack);
io_uring_submit(ring);
}
};
return awaitable(duration);
}
}
#endif // TYPON_IO_HPP_INCLUDED
...@@ -28,6 +28,8 @@ ...@@ -28,6 +28,8 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
#include <liburing.h>
#include <typon/event_count.hpp> #include <typon/event_count.hpp>
#include <typon/garbage_collector.hpp> #include <typon/garbage_collector.hpp>
#include <typon/pool.hpp> #include <typon/pool.hpp>
...@@ -52,7 +54,9 @@ namespace typon ...@@ -52,7 +54,9 @@ namespace typon
EventCount<> _notifyer; EventCount<> _notifyer;
std::vector<Pool> _pool; std::vector<Pool> _pool;
std::vector<Stack *> _stack; std::vector<Stack *> _stack;
std::vector<io_uring> _ring;
std::vector<std::thread> _thread; std::vector<std::thread> _thread;
std::vector<std::thread> _io_thread;
GarbageCollector _gc; GarbageCollector _gc;
static inline thread_local uint thread_id; static inline thread_local uint thread_id;
...@@ -63,6 +67,11 @@ namespace typon ...@@ -63,6 +67,11 @@ namespace typon
return scheduler; return scheduler;
} }
static io_uring * ring() noexcept
{
return &(get()._ring[thread_id]);
}
static void schedule(std::coroutine_handle<> coroutine) noexcept static void schedule(std::coroutine_handle<> coroutine) noexcept
{ {
Pool & pool = get().random(); Pool & pool = get().random();
...@@ -124,8 +133,20 @@ namespace typon ...@@ -124,8 +133,20 @@ namespace typon
, _mask((1 << std::bit_width(concurrency)) - 1) , _mask((1 << std::bit_width(concurrency)) - 1)
, _pool(this->_mask + 1) , _pool(this->_mask + 1)
, _stack(concurrency, nullptr) , _stack(concurrency, nullptr)
, _ring(concurrency)
, _gc(concurrency) , _gc(concurrency)
{ {
// Initialize I/O rings
for (uint id = 0; id < concurrency; id++)
{
// The queue depth can be 1 because we submit requests one by one
if (io_uring_queue_init(1, &(_ring[id]), 0))
{
std::terminate();
}
}
// Spawn worker threads
for (uint id = 0; id < concurrency; id++) for (uint id = 0; id < concurrency; id++)
{ {
_thread.emplace_back([this, id]() { _thread.emplace_back([this, id]() {
...@@ -141,6 +162,29 @@ namespace typon ...@@ -141,6 +162,29 @@ namespace typon
} }
}); });
} }
// Spawn I/O completion threads
for (uint id = 0; id < concurrency; id++)
{
_io_thread.emplace_back([this, id]() {
io_uring * ring = &(_ring[id]);
for(;;)
{
io_uring_cqe * cqe;
if (io_uring_wait_cqe(ring, &cqe))
{
throw std::runtime_error("io_uring_wait_cqe() => failed");
}
void * data = io_uring_cqe_get_data(cqe);
io_uring_cqe_seen(ring, cqe);
if (!data)
{
break;
}
Scheduler::enable(reinterpret_cast<Stack *>(data));
}
});
}
} }
bool wait_for_work(std::coroutine_handle<> & coroutine) noexcept bool wait_for_work(std::coroutine_handle<> & coroutine) noexcept
...@@ -269,12 +313,33 @@ namespace typon ...@@ -269,12 +313,33 @@ namespace typon
~Scheduler() noexcept ~Scheduler() noexcept
{ {
// Signal worker threads to stop
_done.store(true); _done.store(true);
_notifyer.notify_all(); _notifyer.notify_all();
// Wait until all worker threads are done
for (auto & t : _thread) for (auto & t : _thread)
{ {
t.join(); t.join();
} }
// Signal I/O threads to stop
for (uint id = 0; id < _concurrency; id++)
{
// An sqe should be available, because SQPOLL is not used
io_uring_sqe * sqe = io_uring_get_sqe(&(_ring[id]));
io_uring_prep_nop(sqe);
io_uring_sqe_set_data(sqe, nullptr);
io_uring_submit(&(_ring[id]));
}
// Wait until all I/O threads are done
for (auto & t : _io_thread)
{
t.join();
}
// Cleanup all I/O rings
for (uint id = 0; id < _concurrency; id++)
{
io_uring_queue_exit(&(_ring[id]));
}
} }
}; };
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include <typon/fork.hpp> #include <typon/fork.hpp>
#include <typon/forked.hpp> #include <typon/forked.hpp>
#include <typon/future.hpp> #include <typon/future.hpp>
#include <typon/io.hpp>
#include <typon/join.hpp> #include <typon/join.hpp>
#include <typon/meta.hpp> #include <typon/meta.hpp>
#include <typon/mutex.hpp> #include <typon/mutex.hpp>
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment