Commit 0b07808b authored by Xavier Thompson's avatar Xavier Thompson

Expand asynchronous I/O support

parent 2802714e
......@@ -3,21 +3,53 @@
#include <chrono>
#include <coroutine>
#include <string_view>
#include <linux/time_types.h>
#include <liburing.h>
#include <typon/scheduler.hpp>
#include <typon/syscall_completion.hpp>
namespace typon::io
{
struct SyscallAwaitable : std::suspend_always
{
union
{
SyscallCompletion _completion;
io_uring_sqe * _sqe;
};
SyscallAwaitable() noexcept
{
io_uring * ring = Scheduler::ring();
_sqe = io_uring_get_sqe(ring);
}
void await_suspend(std::coroutine_handle<> continuation) noexcept
{
auto stack = Scheduler::suspend(continuation);
io_uring_sqe * sqe = _sqe;
_completion._stack = stack;
io_uring_sqe_set_data(sqe, &(_completion));
io_uring * ring = Scheduler::ring();
io_uring_submit(ring);
}
auto await_resume() noexcept
{
return _completion._result;
}
};
template <typename Rep, typename Period>
auto sleep(std::chrono::duration<Rep, Period> duration) noexcept
{
struct awaitable : std::suspend_always
struct awaitable : SyscallAwaitable
{
__kernel_timespec _ts;
......@@ -27,22 +59,99 @@ namespace typon::io
auto sec = duration_cast<seconds>(duration);
auto nsec = duration_cast<nanoseconds>(duration - sec);
_ts = __kernel_timespec({sec.count(), nsec.count()});
}
void await_suspend(std::coroutine_handle<> continuation) noexcept
{
auto stack = Scheduler::suspend(continuation);
io_uring * ring = Scheduler::ring();
io_uring_sqe * sqe = io_uring_get_sqe(ring);
io_uring_prep_timeout(sqe, &_ts, 0, 0);
io_uring_sqe_set_data(sqe, stack);
io_uring_submit(ring);
io_uring_prep_timeout(_sqe, &(_ts), 0, 0);
}
};
return awaitable(duration);
}
auto write(int fd, std::string_view bytes, __u64 offset = -1) noexcept
{
SyscallAwaitable awaitable;
io_uring_prep_write(awaitable._sqe, fd, bytes.data(), bytes.size(), offset);
return awaitable;
}
auto read(int fd, void * buf, unsigned nbytes, __u64 offset = -1) noexcept
{
SyscallAwaitable awaitable;
io_uring_prep_read(awaitable._sqe, fd, buf, nbytes, offset);
return awaitable;
}
auto openat(int dfd, const char * path, int flags, mode_t mode) noexcept
{
SyscallAwaitable awaitable;
io_uring_prep_openat(awaitable._sqe, dfd, path, flags, mode);
return awaitable;
}
auto close(int fd) noexcept
{
SyscallAwaitable awaitable;
io_uring_prep_close(awaitable._sqe, fd);
return awaitable;
}
auto statx(int dfd,
const char * path,
int flags,
unsigned mask,
struct statx * statxbuf) noexcept
{
SyscallAwaitable awaitable;
io_uring_prep_statx(awaitable._sqe, dfd, path, flags, mask, statxbuf);
return awaitable;
}
auto socket(int domain, int type, int protocol) noexcept
{
SyscallAwaitable awaitable;
io_uring_prep_socket(awaitable._sqe, domain, type, protocol, 0);
return awaitable;
}
auto accept(int sockfd,
sockaddr * addr,
socklen_t * addrlen,
int flags = 0) noexcept
{
SyscallAwaitable awaitable;
io_uring_prep_accept(awaitable._sqe, sockfd, addr, addrlen, flags);
return awaitable;
}
auto connect(int sockfd, const sockaddr * addr, socklen_t addrlen) noexcept
{
SyscallAwaitable awaitable;
io_uring_prep_connect(awaitable._sqe, sockfd, addr, addrlen);
return awaitable;
}
auto recv(int sockfd, void * buf, size_t len, int flags) noexcept
{
SyscallAwaitable awaitable;
io_uring_prep_recv(awaitable._sqe, sockfd, buf, len, flags);
return awaitable;
}
auto send(int sockfd, std::string_view bytes, int flags) noexcept
{
SyscallAwaitable awaitable;
auto len = bytes.size();
io_uring_prep_send(awaitable._sqe, sockfd, bytes.data(), len, flags);
return awaitable;
}
auto shutdown(int sockfd, int how) noexcept
{
SyscallAwaitable awaitable;
io_uring_prep_shutdown(awaitable._sqe, sockfd, how);
return awaitable;
}
}
......
......@@ -35,6 +35,7 @@
#include <typon/pool.hpp>
#include <typon/random.hpp>
#include <typon/stack.hpp>
#include <typon/syscall_completion.hpp>
#include <typon/theft_point.hpp>
......@@ -173,15 +174,18 @@ namespace typon
io_uring_cqe * cqe;
if (io_uring_wait_cqe(ring, &cqe))
{
throw std::runtime_error("io_uring_wait_cqe() => failed");
throw std::runtime_error("io_uring_wait_cqe() failed");
}
void * data = io_uring_cqe_get_data(cqe);
auto result = cqe->res;
io_uring_cqe_seen(ring, cqe);
if (!data)
{
break;
}
Scheduler::enable(reinterpret_cast<Stack *>(data));
auto completion = reinterpret_cast<SyscallCompletion *>(data);
completion->_result = result;
Scheduler::enable(completion->_stack);
}
});
}
......
#ifndef TYPON_SYSCALL_COMPLETION_HPP_INCLUDED
#define TYPON_SYSCALL_COMPLETION_HPP_INCLUDED
#include <cstdint>
#include <typon/stack.hpp>
namespace typon
{
struct SyscallCompletion
{
using u32 = std::uint_fast32_t;
Stack * _stack;
u32 _result;
};
}
#endif // TYPON_SYSCALL_COMPLETION_HPP_INCLUDED
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment